#!/usr/bin/env python3 """ KL630 Event Simulator — mimics what the KL630 firmware will send. Usage: python test_client.py # run full grass scenario python test_client.py --hazard bunker # single hazard event python test_client.py --person # person detection event python test_client.py --server http://192.168.0.99 # point at real OOB Enabler (production) """ import argparse import json import os import io import tarfile import time import datetime import urllib.request import urllib.error DEFAULT_SERVER = 'http://127.0.0.1:8081' def now_iso(): return datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ') def post_json(server, path, payload): data = json.dumps(payload).encode('utf-8') req = urllib.request.Request( server + path, data=data, headers={'Content-Type': 'application/json'}, method='POST' ) try: with urllib.request.urlopen(req, timeout=5) as r: resp = json.loads(r.read()) print(f" → {resp}") except urllib.error.URLError as e: print(f" [ERROR] {e}") def post_targz(server, path, filename, tgz_bytes): req = urllib.request.Request( server + path, data=tgz_bytes, headers={ 'Content-Type': 'application/gzip', 'Content-Disposition': f'attachment; filename="{filename}"', 'Content-Length': str(len(tgz_bytes)), }, method='POST' ) try: with urllib.request.urlopen(req, timeout=10) as r: resp = json.loads(r.read()) print(f" → {resp}") except urllib.error.URLError as e: print(f" [ERROR] {e}") def get_server_time(server): """Get UTC time from mock server (simulates NTP via OOB Enabler).""" try: with urllib.request.urlopen(server + '/api/time', timeout=3) as r: t = json.loads(r.read()) print(f" Server time: {t['iso']} (unix={t['unix']})") return t['unix'] except Exception as e: print(f" [WARN] Could not get server time: {e}") return int(time.time()) def make_fake_jpeg(label: str) -> bytes: """Return a minimal valid JPEG (1x1 grey pixel) with a comment.""" # Tiny 1×1 grey JPEG JPEG_1x1 = bytes([ 0xFF,0xD8,0xFF,0xE0,0x00,0x10,0x4A,0x46,0x49,0x46,0x00,0x01, 0x01,0x00,0x00,0x01,0x00,0x01,0x00,0x00, 0xFF,0xDB,0x00,0x43,0x00,0x08,0x06,0x06,0x07,0x06,0x05,0x08, 0x07,0x07,0x07,0x09,0x09,0x08,0x0A,0x0C,0x14,0x0D,0x0C,0x0B, 0x0B,0x0C,0x19,0x12,0x13,0x0F,0x14,0x1D,0x1A,0x1F,0x1E,0x1D, 0x1A,0x1C,0x1C,0x20,0x24,0x2E,0x27,0x20,0x22,0x2C,0x23,0x1C, 0x1C,0x28,0x37,0x29,0x2C,0x30,0x31,0x34,0x34,0x34,0x1F,0x27, 0x39,0x3D,0x38,0x32,0x3C,0x2E,0x33,0x34,0x32, 0xFF,0xC0,0x00,0x0B,0x08,0x00,0x01,0x00,0x01,0x01,0x01,0x11,0x00, 0xFF,0xC4,0x00,0x1F,0x00,0x00,0x01,0x05,0x01,0x01,0x01,0x01, 0x01,0x01,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x01,0x02, 0x03,0x04,0x05,0x06,0x07,0x08,0x09,0x0A,0x0B, 0xFF,0xC4,0x00,0xB5,0x10,0x00,0x02,0x01,0x03,0x03,0x02,0x04, 0x03,0x05,0x05,0x04,0x04,0x00,0x00,0x01,0x7D,0x01,0x02,0x03, 0x00,0x04,0x11,0x05,0x12,0x21,0x31,0x41,0x06,0x13,0x51,0x61, 0x07,0x22,0x71,0x14,0x32,0x81,0x91,0xA1,0x08,0x23,0x42,0xB1, 0xC1,0x15,0x52,0xD1,0xF0,0x24,0x33,0x62,0x72,0x82,0x09,0x0A, 0x16,0x17,0x18,0x19,0x1A,0x25,0x26,0x27,0x28,0x29,0x2A,0x34, 0x35,0x36,0x37,0x38,0x39,0x3A,0x43,0x44,0x45,0x46,0x47,0x48, 0x49,0x4A,0x53,0x54,0x55,0x56,0x57,0x58,0x59,0x5A,0x63,0x64, 0x65,0x66,0x67,0x68,0x69,0x6A,0x73,0x74,0x75,0x76,0x77,0x78, 0x79,0x7A,0x83,0x84,0x85,0x86,0x87,0x88,0x89,0x8A,0x92,0x93, 0x94,0x95,0x96,0x97,0x98,0x99,0x9A,0xA2,0xA3,0xA4,0xA5,0xA6, 0xA7,0xA8,0xA9,0xAA,0xB2,0xB3,0xB4,0xB5,0xB6,0xB7,0xB8,0xB9, 0xBA,0xC2,0xC3,0xC4,0xC5,0xC6,0xC7,0xC8,0xC9,0xCA,0xD2,0xD3, 0xD4,0xD5,0xD6,0xD7,0xD8,0xD9,0xDA,0xE1,0xE2,0xE3,0xE4,0xE5, 0xE6,0xE7,0xE8,0xE9,0xEA,0xF1,0xF2,0xF3,0xF4,0xF5,0xF6,0xF7, 0xF8,0xF9,0xFA, 0xFF,0xDA,0x00,0x08,0x01,0x01,0x00,0x00,0x3F,0x00,0xFB,0xD0, 0xFF,0xD9 ]) # prepend a JPEG comment with the label comment = f'KL630 {label}'.encode('utf-8') com_seg = b'\xFF\xFE' + len(comment).to_bytes(2,'big') + b'\x00' + comment return JPEG_1x1[:2] + com_seg + JPEG_1x1[2:] def build_targz(event_id: str, event_type: str, max_level: int, duration_sec: float, images: dict) -> bytes: """ Build tar.gz in memory. images = { 'level1.jpg': bytes, ... } """ buf = io.BytesIO() with tarfile.open(fileobj=buf, mode='w:gz') as tar: # event.json summary = { 'id': event_id, 'date': now_iso(), 'type': event_type, 'max_level': max_level, 'duration_sec': round(duration_sec, 1), 'images': list(images.keys()), } jdata = json.dumps(summary, indent=2).encode('utf-8') ti = tarfile.TarInfo(name='event.json') ti.size = len(jdata) tar.addfile(ti, io.BytesIO(jdata)) # images for name, data in images.items(): ti = tarfile.TarInfo(name=name) ti.size = len(data) tar.addfile(ti, io.BytesIO(data)) return buf.getvalue() # ── Scenarios ───────────────────────────────────────────────────────── def scenario_grass(server, event_id, upload_path): """Full grass violation: L1 → L2 → L3 → L0, then upload tar.gz.""" print("\n[Grass Scenario]") print("Step 0: get server time (simulates NTP from OOB Enabler)") get_server_time(server) images = {} start_time = time.time() levels = [ (0, 1, "草地 Level 1"), (6, 2, "草地 Level 2"), (10, 3, "草地 Level 3"), (13, 0, "離開草地"), ] for delay, level, label in levels: elapsed = time.time() - start_time wait = delay - elapsed if wait > 0: print(f" waiting {wait:.1f}s ...") time.sleep(wait) ts = now_iso() payload = { 'response_type': 'violation', 'content': {'id': event_id, 'date': ts, 'type': 'grass', 'level': level} } print(f"\n→ [CHANNEL A] {label}") post_json(server, '/api/event', payload) if level in (1, 2, 3): images[f'level{level}.jpg'] = make_fake_jpeg(f'grass L{level} {ts}') duration = time.time() - start_time # Wait 1 minute before upload (use 3s in test to not block too long) wait_before_upload = 3 # set to 60 for real behaviour print(f"\n (waiting {wait_before_upload}s before tar.gz upload — real firmware waits 60s)") time.sleep(wait_before_upload) tgz = build_targz(event_id, 'grass', 3, duration, images) ts_fn = datetime.datetime.utcnow().strftime('%Y%m%d_%H%M%S') filename = f'event_{event_id}_{ts_fn}.tar.gz' print(f"\n→ [CHANNEL B] uploading {filename} ({len(tgz):,} bytes)") post_targz(server, upload_path, filename, tgz) def scenario_hazard(server, hazard_type, event_id, upload_path): """Single-shot hazard event — no level, no tar.gz wait.""" labels = {'bunker': '沙坑', 'pond': '水池', 'tree': '樹木'} label = labels.get(hazard_type, hazard_type) print(f"\n[Hazard Scenario: {label}]") get_server_time(server) ts = now_iso() payload = { 'response_type': 'violation', 'content': {'id': event_id, 'date': ts, 'type': hazard_type, 'level': 1} } print(f"→ [CHANNEL A] {label} 單次紀錄") post_json(server, '/api/event', payload) # Single image, upload immediately images = {'snapshot.jpg': make_fake_jpeg(f'{hazard_type} {ts}')} tgz = build_targz(event_id, hazard_type, 1, 0, images) ts_fn = datetime.datetime.utcnow().strftime('%Y%m%d_%H%M%S') filename = f'event_{event_id}_{ts_fn}.tar.gz' print(f"→ [CHANNEL B] uploading {filename} ({len(tgz):,} bytes)") post_targz(server, upload_path, filename, tgz) def scenario_person(server, event_id, upload_path): """Person detection — single-shot.""" print("\n[Person Scenario]") get_server_time(server) ts = now_iso() payload = { 'response_type': 'violation', 'content': {'id': event_id, 'date': ts, 'type': 'person', 'level': 1} } print("→ [CHANNEL A] 行人偵測 單次紀錄") post_json(server, '/api/event', payload) images = {'snapshot.jpg': make_fake_jpeg(f'person {ts}')} tgz = build_targz(event_id, 'person', 1, 0, images) ts_fn = datetime.datetime.utcnow().strftime('%Y%m%d_%H%M%S') filename = f'event_{event_id}_{ts_fn}.tar.gz' print(f"→ [CHANNEL B] uploading {filename} ({len(tgz):,} bytes)") post_targz(server, upload_path, filename, tgz) # ── Main ─────────────────────────────────────────────────────────────── if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('--server', default=DEFAULT_SERVER) parser.add_argument('--upload-path', default='/api/upload', choices=['/api/upload', '/api/golf.cgi']) parser.add_argument('--hazard', choices=['bunker', 'pond', 'tree']) parser.add_argument('--person', action='store_true') args = parser.parse_args() event_id = str(int(time.time())) if args.hazard: scenario_hazard(args.server, args.hazard, event_id, args.upload_path) elif args.person: scenario_person(args.server, event_id, args.upload_path) else: scenario_grass(args.server, event_id, args.upload_path)