#!/usr/bin/env python3 """ KL630 Golf Cart Event Mock Server Two independent channels: Channel A (iPad / BLE path): POST /api/event → real-time violation JSON Channel B (OOB / Cloud path): POST /api/upload → tar.gz event archive Plus: GET /api/time → provides UTC time to KL630 (no NTP needed) GET / → web dashboard """ import json import os import io import tarfile import time import datetime import threading from http.server import HTTPServer, BaseHTTPRequestHandler from urllib.parse import urlparse, parse_qs TZ_TW = datetime.timezone(datetime.timedelta(hours=8)) def now_tw(): return datetime.datetime.now(TZ_TW) def tw_str(dt=None): if dt is None: dt = now_tw() return dt.strftime('%Y-%m-%dT%H:%M:%S+08:00') UPLOAD_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'uploads') STATIC_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'static') os.makedirs(UPLOAD_DIR, exist_ok=True) events = [] # Channel A log events_lock = threading.Lock() class Handler(BaseHTTPRequestHandler): def log_message(self, fmt, *args): ts = datetime.datetime.now().strftime('%H:%M:%S') print(f"[{ts}] {fmt % args}") # ------------------------------------------------------------------ def do_OPTIONS(self): self.send_response(204) self._cors() self.end_headers() # ------------------------------------------------------------------ def do_GET(self): path = urlparse(self.path).path if path == '/' or path == '/index.html': self._serve_file(os.path.join(STATIC_DIR, 'index.html'), 'text/html') elif path == '/api/time': self._json({ 'unix': int(time.time()), 'iso': tw_str(), }) elif path == '/api/events': with events_lock: self._json(list(events)) elif path == '/api/files': files = [] for name in sorted(os.listdir(UPLOAD_DIR), reverse=True): fp = os.path.join(UPLOAD_DIR, name) if not os.path.isfile(fp): continue files.append({ 'name': name, 'size': os.path.getsize(fp), 'mtime': datetime.datetime.fromtimestamp( os.path.getmtime(fp) ).astimezone(TZ_TW).strftime('%Y-%m-%d %H:%M:%S'), 'url': f'/uploads/{name}', }) self._json(files) elif path.startswith('/uploads/'): parts = path[len('/uploads/'):].split('/', 1) name = parts[0] fp = os.path.join(UPLOAD_DIR, name) if len(parts) == 1: # Download the tar.gz if os.path.isfile(fp): with open(fp, 'rb') as f: data = f.read() self.send_response(200) self.send_header('Content-Type', 'application/gzip') self.send_header('Content-Disposition', f'attachment; filename="{name}"') self.send_header('Content-Length', str(len(data))) self._cors() self.end_headers() self.wfile.write(data) else: self.send_error(404) else: # Serve a file from inside the tar.gz # BusyBox tar creates entries as "./filename"; normalise to bare name inner = parts[1].lstrip('./') if os.path.isfile(fp): try: with tarfile.open(fp, 'r:gz') as tar: # Try bare name first, then with "./" prefix try: member = tar.getmember(inner) except KeyError: member = tar.getmember('./' + inner) f = tar.extractfile(member) data = f.read() ext = os.path.splitext(inner)[1].lower() ct = {'jpg':'image/jpeg','jpeg':'image/jpeg', 'png':'image/png','json':'application/json'}.get(ext, 'application/octet-stream') self.send_response(200) self.send_header('Content-Type', ct) self.send_header('Content-Length', str(len(data))) self._cors() self.end_headers() self.wfile.write(data) except Exception as e: self.send_error(404, str(e)) else: self.send_error(404) elif path.startswith('/api/contents/'): # List files inside a tar.gz: GET /api/contents/ name = os.path.basename(path[len('/api/contents/'):]) fp = os.path.join(UPLOAD_DIR, name) if os.path.isfile(fp): try: with tarfile.open(fp, 'r:gz') as tar: members = [] for m in tar.getmembers(): if not m.isfile(): continue # Strip leading "./" added by BusyBox "tar cf - ." clean = m.name.lstrip('./') if not clean: continue members.append({ 'name': clean, 'size': m.size, 'is_image': clean.lower().endswith(('.jpg','.jpeg','.png')), }) self._json({'ok': True, 'archive': name, 'files': members}) except Exception as e: self._json({'ok': False, 'error': str(e)}) else: self.send_error(404) else: self.send_error(404) # ------------------------------------------------------------------ def do_POST(self): path = urlparse(self.path).path length = int(self.headers.get('Content-Length', 0)) body = self.rfile.read(length) # ── Channel A: real-time event JSON ────────────────────────── if path == '/api/event': try: data = json.loads(body.decode('utf-8')) entry = { 'server_time': tw_str(), } entry.update(data) with events_lock: events.append(entry) if len(events) > 200: del events[:-200] ctype = data.get('content', {}).get('type', '?') level = data.get('content', {}).get('level', '?') print(f" [CHANNEL-A] event type={ctype} level={level}") self._json({'ok': True}) except Exception as e: print(f" [CHANNEL-A] parse error: {e}") self.send_error(400, str(e)) # ── Channel B: tar.gz archive upload ───────────────────────── elif path in ('/api/upload', '/api/golf.cgi'): # 1) kCurl sends filename as query param: /api/upload?filename=xxx qs = parse_qs(urlparse(self.path).query) filename = (qs.get('filename') or [None])[0] # 2) fallback: raw-socket upload uses Content-Disposition header if not filename: cd = self.headers.get('Content-Disposition', '') for part in cd.split(';'): part = part.strip() if part.startswith('filename='): filename = part[9:].strip().strip('"') # 3) last resort: timestamp-based name if not filename: ts = now_tw().strftime('%Y%m%d_%H%M%S') filename = f'event_{ts}.tar.gz' # sanitise filename = os.path.basename(filename) fp = os.path.join(UPLOAD_DIR, filename) with open(fp, 'wb') as f: f.write(body) print(f" [CHANNEL-B] {path} saved {filename} ({len(body):,} bytes)") self._json({'ok': True, 'path': path, 'filename': filename, 'bytes': len(body)}) else: self.send_error(404) # ------------------------------------------------------------------ def _json(self, obj): data = json.dumps(obj, ensure_ascii=False, indent=None).encode('utf-8') self.send_response(200) self.send_header('Content-Type', 'application/json; charset=utf-8') self.send_header('Content-Length', str(len(data))) self._cors() self.end_headers() self.wfile.write(data) def _cors(self): self.send_header('Access-Control-Allow-Origin', '*') self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS') self.send_header('Access-Control-Allow-Headers', 'Content-Type, Content-Disposition') def _serve_file(self, filepath, content_type): if not os.path.isfile(filepath): self.send_error(404) return with open(filepath, 'rb') as f: data = f.read() self.send_response(200) self.send_header('Content-Type', content_type + '; charset=utf-8') self.send_header('Content-Length', str(len(data))) self.end_headers() self.wfile.write(data) # ────────────────────────────────────────────────────────────────────── if __name__ == '__main__': PORT = 8081 server = HTTPServer(('0.0.0.0', PORT), Handler) print("=" * 55) print(" KL630 Golf Event Mock Server") print("=" * 55) print(f" Dashboard : http://localhost:{PORT}/") print(f" Time API : GET http://localhost:{PORT}/api/time") print(f" Event API : POST http://localhost:{PORT}/api/event") print(f" Upload API : POST http://localhost:{PORT}/api/upload") print(f" Golf API : POST http://localhost:{PORT}/api/golf.cgi") print(f" Uploads : {UPLOAD_DIR}") print("=" * 55) try: server.serve_forever() except KeyboardInterrupt: print("\nStopped.")