gf_ai_box/tools/mock_server/test_client.py
2026-04-12 17:47:54 +08:00

258 lines
9.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
KL630 Event Simulator — mimics what the KL630 firmware will send.
Usage:
python test_client.py # run full grass scenario
python test_client.py --hazard bunker # single hazard event
python test_client.py --person # person detection event
python test_client.py --server http://192.168.0.99 # point at real OOB Enabler (production)
"""
import argparse
import json
import os
import io
import tarfile
import time
import datetime
import urllib.request
import urllib.error
DEFAULT_SERVER = 'http://127.0.0.1:8081'
def now_iso():
return datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
def post_json(server, path, payload):
data = json.dumps(payload).encode('utf-8')
req = urllib.request.Request(
server + path,
data=data,
headers={'Content-Type': 'application/json'},
method='POST'
)
try:
with urllib.request.urlopen(req, timeout=5) as r:
resp = json.loads(r.read())
print(f"{resp}")
except urllib.error.URLError as e:
print(f" [ERROR] {e}")
def post_targz(server, path, filename, tgz_bytes):
req = urllib.request.Request(
server + path,
data=tgz_bytes,
headers={
'Content-Type': 'application/gzip',
'Content-Disposition': f'attachment; filename="{filename}"',
'Content-Length': str(len(tgz_bytes)),
},
method='POST'
)
try:
with urllib.request.urlopen(req, timeout=10) as r:
resp = json.loads(r.read())
print(f"{resp}")
except urllib.error.URLError as e:
print(f" [ERROR] {e}")
def get_server_time(server):
"""Get UTC time from mock server (simulates NTP via OOB Enabler)."""
try:
with urllib.request.urlopen(server + '/api/time', timeout=3) as r:
t = json.loads(r.read())
print(f" Server time: {t['iso']} (unix={t['unix']})")
return t['unix']
except Exception as e:
print(f" [WARN] Could not get server time: {e}")
return int(time.time())
def make_fake_jpeg(label: str) -> bytes:
"""Return a minimal valid JPEG (1x1 grey pixel) with a comment."""
# Tiny 1×1 grey JPEG
JPEG_1x1 = bytes([
0xFF,0xD8,0xFF,0xE0,0x00,0x10,0x4A,0x46,0x49,0x46,0x00,0x01,
0x01,0x00,0x00,0x01,0x00,0x01,0x00,0x00,
0xFF,0xDB,0x00,0x43,0x00,0x08,0x06,0x06,0x07,0x06,0x05,0x08,
0x07,0x07,0x07,0x09,0x09,0x08,0x0A,0x0C,0x14,0x0D,0x0C,0x0B,
0x0B,0x0C,0x19,0x12,0x13,0x0F,0x14,0x1D,0x1A,0x1F,0x1E,0x1D,
0x1A,0x1C,0x1C,0x20,0x24,0x2E,0x27,0x20,0x22,0x2C,0x23,0x1C,
0x1C,0x28,0x37,0x29,0x2C,0x30,0x31,0x34,0x34,0x34,0x1F,0x27,
0x39,0x3D,0x38,0x32,0x3C,0x2E,0x33,0x34,0x32,
0xFF,0xC0,0x00,0x0B,0x08,0x00,0x01,0x00,0x01,0x01,0x01,0x11,0x00,
0xFF,0xC4,0x00,0x1F,0x00,0x00,0x01,0x05,0x01,0x01,0x01,0x01,
0x01,0x01,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x01,0x02,
0x03,0x04,0x05,0x06,0x07,0x08,0x09,0x0A,0x0B,
0xFF,0xC4,0x00,0xB5,0x10,0x00,0x02,0x01,0x03,0x03,0x02,0x04,
0x03,0x05,0x05,0x04,0x04,0x00,0x00,0x01,0x7D,0x01,0x02,0x03,
0x00,0x04,0x11,0x05,0x12,0x21,0x31,0x41,0x06,0x13,0x51,0x61,
0x07,0x22,0x71,0x14,0x32,0x81,0x91,0xA1,0x08,0x23,0x42,0xB1,
0xC1,0x15,0x52,0xD1,0xF0,0x24,0x33,0x62,0x72,0x82,0x09,0x0A,
0x16,0x17,0x18,0x19,0x1A,0x25,0x26,0x27,0x28,0x29,0x2A,0x34,
0x35,0x36,0x37,0x38,0x39,0x3A,0x43,0x44,0x45,0x46,0x47,0x48,
0x49,0x4A,0x53,0x54,0x55,0x56,0x57,0x58,0x59,0x5A,0x63,0x64,
0x65,0x66,0x67,0x68,0x69,0x6A,0x73,0x74,0x75,0x76,0x77,0x78,
0x79,0x7A,0x83,0x84,0x85,0x86,0x87,0x88,0x89,0x8A,0x92,0x93,
0x94,0x95,0x96,0x97,0x98,0x99,0x9A,0xA2,0xA3,0xA4,0xA5,0xA6,
0xA7,0xA8,0xA9,0xAA,0xB2,0xB3,0xB4,0xB5,0xB6,0xB7,0xB8,0xB9,
0xBA,0xC2,0xC3,0xC4,0xC5,0xC6,0xC7,0xC8,0xC9,0xCA,0xD2,0xD3,
0xD4,0xD5,0xD6,0xD7,0xD8,0xD9,0xDA,0xE1,0xE2,0xE3,0xE4,0xE5,
0xE6,0xE7,0xE8,0xE9,0xEA,0xF1,0xF2,0xF3,0xF4,0xF5,0xF6,0xF7,
0xF8,0xF9,0xFA,
0xFF,0xDA,0x00,0x08,0x01,0x01,0x00,0x00,0x3F,0x00,0xFB,0xD0,
0xFF,0xD9
])
# prepend a JPEG comment with the label
comment = f'KL630 {label}'.encode('utf-8')
com_seg = b'\xFF\xFE' + len(comment).to_bytes(2,'big') + b'\x00' + comment
return JPEG_1x1[:2] + com_seg + JPEG_1x1[2:]
def build_targz(event_id: str, event_type: str, max_level: int,
duration_sec: float, images: dict) -> bytes:
"""
Build tar.gz in memory.
images = { 'level1.jpg': bytes, ... }
"""
buf = io.BytesIO()
with tarfile.open(fileobj=buf, mode='w:gz') as tar:
# event.json
summary = {
'id': event_id,
'date': now_iso(),
'type': event_type,
'max_level': max_level,
'duration_sec': round(duration_sec, 1),
'images': list(images.keys()),
}
jdata = json.dumps(summary, indent=2).encode('utf-8')
ti = tarfile.TarInfo(name='event.json')
ti.size = len(jdata)
tar.addfile(ti, io.BytesIO(jdata))
# images
for name, data in images.items():
ti = tarfile.TarInfo(name=name)
ti.size = len(data)
tar.addfile(ti, io.BytesIO(data))
return buf.getvalue()
# ── Scenarios ─────────────────────────────────────────────────────────
def scenario_grass(server, event_id):
"""Full grass violation: L1 → L2 → L3 → L0, then upload tar.gz."""
print("\n[Grass Scenario]")
print("Step 0: get server time (simulates NTP from OOB Enabler)")
get_server_time(server)
images = {}
start_time = time.time()
levels = [
(0, 1, "草地 Level 1"),
(6, 2, "草地 Level 2"),
(10, 3, "草地 Level 3"),
(13, 0, "離開草地"),
]
for delay, level, label in levels:
elapsed = time.time() - start_time
wait = delay - elapsed
if wait > 0:
print(f" waiting {wait:.1f}s ...")
time.sleep(wait)
ts = now_iso()
payload = {
'response_type': 'violation',
'content': {'id': event_id, 'date': ts, 'type': 'grass', 'level': level}
}
print(f"\n→ [CHANNEL A] {label}")
post_json(server, '/api/event', payload)
if level in (1, 2, 3):
images[f'level{level}.jpg'] = make_fake_jpeg(f'grass L{level} {ts}')
duration = time.time() - start_time
# Wait 1 minute before upload (use 3s in test to not block too long)
wait_before_upload = 3 # set to 60 for real behaviour
print(f"\n (waiting {wait_before_upload}s before tar.gz upload — real firmware waits 60s)")
time.sleep(wait_before_upload)
tgz = build_targz(event_id, 'grass', 3, duration, images)
ts_fn = datetime.datetime.utcnow().strftime('%Y%m%d_%H%M%S')
filename = f'event_{event_id}_{ts_fn}.tar.gz'
print(f"\n→ [CHANNEL B] uploading {filename} ({len(tgz):,} bytes)")
post_targz(server, '/api/upload', filename, tgz)
def scenario_hazard(server, hazard_type, event_id):
"""Single-shot hazard event — no level, no tar.gz wait."""
labels = {'bunker': '沙坑', 'pond': '水池', 'tree': '樹木'}
label = labels.get(hazard_type, hazard_type)
print(f"\n[Hazard Scenario: {label}]")
get_server_time(server)
ts = now_iso()
payload = {
'response_type': 'violation',
'content': {'id': event_id, 'date': ts, 'type': hazard_type, 'level': 1}
}
print(f"→ [CHANNEL A] {label} 單次紀錄")
post_json(server, '/api/event', payload)
# Single image, upload immediately
images = {'snapshot.jpg': make_fake_jpeg(f'{hazard_type} {ts}')}
tgz = build_targz(event_id, hazard_type, 1, 0, images)
ts_fn = datetime.datetime.utcnow().strftime('%Y%m%d_%H%M%S')
filename = f'event_{event_id}_{ts_fn}.tar.gz'
print(f"→ [CHANNEL B] uploading {filename} ({len(tgz):,} bytes)")
post_targz(server, '/api/upload', filename, tgz)
def scenario_person(server, event_id):
"""Person detection — single-shot."""
print("\n[Person Scenario]")
get_server_time(server)
ts = now_iso()
payload = {
'response_type': 'violation',
'content': {'id': event_id, 'date': ts, 'type': 'person', 'level': 1}
}
print("→ [CHANNEL A] 行人偵測 單次紀錄")
post_json(server, '/api/event', payload)
images = {'snapshot.jpg': make_fake_jpeg(f'person {ts}')}
tgz = build_targz(event_id, 'person', 1, 0, images)
ts_fn = datetime.datetime.utcnow().strftime('%Y%m%d_%H%M%S')
filename = f'event_{event_id}_{ts_fn}.tar.gz'
print(f"→ [CHANNEL B] uploading {filename} ({len(tgz):,} bytes)")
post_targz(server, '/api/upload', filename, tgz)
# ── Main ───────────────────────────────────────────────────────────────
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--server', default=DEFAULT_SERVER)
parser.add_argument('--hazard', choices=['bunker', 'pond', 'tree'])
parser.add_argument('--person', action='store_true')
args = parser.parse_args()
event_id = str(int(time.time()))
if args.hazard:
scenario_hazard(args.server, args.hazard, event_id)
elif args.person:
scenario_person(args.server, event_id)
else:
scenario_grass(args.server, event_id)