668 lines
26 KiB
Python
668 lines
26 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
KL630 Golf Cart Event Mock Server
|
|
|
|
Two independent channels:
|
|
Channel A (iPad / BLE path): POST /api/event → real-time violation JSON
|
|
Channel B (OOB / Cloud path): POST /api/upload → tar.gz event archive
|
|
|
|
Channel C (CAN bus):
|
|
POST /api/can/send → send a CAN frame via SocketCAN (PCAN-USB FD)
|
|
GET /api/can/status → {available, channel, bitrate, last_error}
|
|
|
|
Plus:
|
|
GET /api/time → provides UTC time to KL630 (no NTP needed)
|
|
GET / → web dashboard
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import io
|
|
import tarfile
|
|
import time
|
|
import datetime
|
|
import threading
|
|
from http.server import HTTPServer, BaseHTTPRequestHandler
|
|
from urllib.parse import urlparse, parse_qs
|
|
|
|
# ── CAN bus support (python-can, optional) ───────────────────────────────────
|
|
try:
|
|
import can
|
|
_CAN_AVAILABLE = True
|
|
except ImportError:
|
|
_CAN_AVAILABLE = False
|
|
|
|
# Event type → CAN Data[0] mapping (must match can_bus.h on KL630)
|
|
_CAN_EVT = {
|
|
'boot': 0x00,
|
|
'road': 0x01,
|
|
'grass': 0x02,
|
|
'car': 0x03,
|
|
'person': 0x04,
|
|
'pond': 0x05,
|
|
'bunker': 0x06,
|
|
'tree': 0x07,
|
|
'hazard': 0x08,
|
|
}
|
|
|
|
class CanBus:
|
|
"""Thread-safe SocketCAN wrapper around python-can Bus.
|
|
|
|
Sends CAN frames (test buttons) AND receives frames from the KL630
|
|
device, reassembling multi-frame JSON strings and appending them to
|
|
the shared events list (same as Channel A).
|
|
"""
|
|
DEFAULT_CHANNEL = 'can0'
|
|
DEFAULT_BITRATE = 250000 # 250 kbps — must match device INI
|
|
|
|
def __init__(self):
|
|
self._lock = threading.Lock()
|
|
self._bus = None
|
|
self._channel = self.DEFAULT_CHANNEL
|
|
self._bitrate = self.DEFAULT_BITRATE
|
|
self._last_error = ''
|
|
self._available = False
|
|
self._rx_thread = None
|
|
self._rx_buf = {} # per-arbitration-id reassembly buffer
|
|
self._rx_count = 0
|
|
self._tx_count = 0
|
|
self._tx_fail = 0
|
|
self._last_rx_ts = 0.0
|
|
self._last_rx_id = None
|
|
self._last_rx_text = ''
|
|
self._last_tx_error = ''
|
|
self._open()
|
|
|
|
def _open(self):
|
|
if not _CAN_AVAILABLE:
|
|
self._last_error = 'python-can not installed (pip install python-can)'
|
|
return
|
|
# Try CAN FD first (PCAN-USB FD), fall back to classic CAN
|
|
for fd_mode in (True, False):
|
|
try:
|
|
kwargs = dict(channel=self._channel, interface='socketcan',
|
|
bitrate=self._bitrate)
|
|
if fd_mode:
|
|
kwargs['fd'] = True
|
|
self._bus = can.interface.Bus(**kwargs)
|
|
self._available = True
|
|
self._last_error = ''
|
|
mode_str = 'FD' if fd_mode else 'classic'
|
|
print(f'[CAN] opened {self._channel} @ {self._bitrate//1000} kbps ({mode_str})')
|
|
# Start receive thread
|
|
self._rx_buf = {}
|
|
t = threading.Thread(target=self._rx_loop, daemon=True,
|
|
name='can_rx')
|
|
t.start()
|
|
self._rx_thread = t
|
|
return
|
|
except Exception as e:
|
|
if fd_mode:
|
|
continue # try classic CAN
|
|
self._last_error = str(e)
|
|
self._available = False
|
|
print(f'[CAN] open failed: {e}')
|
|
print(f'[CAN] hint: sudo ip link set {self._channel} up type can bitrate {self._bitrate}')
|
|
|
|
def _rx_loop(self):
|
|
"""Background thread: receive CAN frames, reassemble JSON, log event."""
|
|
buf = {} # arbitration_id → accumulated bytes
|
|
while self._available and self._bus:
|
|
try:
|
|
msg = self._bus.recv(timeout=1.0)
|
|
if msg is None:
|
|
continue
|
|
aid = msg.arbitration_id
|
|
chunk = bytes(msg.data)
|
|
|
|
# Special handling for control frames (ID=0x75): process immediately as raw
|
|
if aid == 0x75:
|
|
self._on_raw_received(aid, chunk)
|
|
continue
|
|
|
|
# Accumulate bytes for this CAN ID (for JSON reassembly)
|
|
buf.setdefault(aid, b'')
|
|
buf[aid] += chunk
|
|
|
|
# Check for null terminator (end of JSON string)
|
|
if b'\x00' in buf[aid]:
|
|
raw = buf[aid].split(b'\x00', 1)[0]
|
|
buf[aid] = b''
|
|
try:
|
|
text = raw.decode('utf-8').strip()
|
|
if text:
|
|
self._on_json_received(aid, text)
|
|
except Exception:
|
|
self._on_raw_received(aid, raw)
|
|
# Safety: discard if buffer grows too large (no null after 256 bytes)
|
|
elif len(buf[aid]) > 256:
|
|
self._on_raw_received(aid, buf[aid])
|
|
buf[aid] = b''
|
|
except Exception:
|
|
break
|
|
|
|
def _on_json_received(self, can_id, text):
|
|
"""Called when a complete JSON string arrives from CAN bus."""
|
|
try:
|
|
data = json.loads(text)
|
|
except Exception:
|
|
# Not valid JSON — might be a single-frame message without null pad
|
|
# Try treating raw text as-is if it looks like our format
|
|
data = {'raw': text}
|
|
|
|
evt_class = data.get('class', data.get('type', '?'))
|
|
evt_level = data.get('level', '?')
|
|
ts = tw_str()
|
|
print(f' [CHANNEL-C RX] CAN id=0x{can_id:03X} {text}')
|
|
|
|
entry = {
|
|
'server_time': ts,
|
|
'channel': 'CAN',
|
|
'source': 'can',
|
|
'can_id': f'0x{can_id:03X}',
|
|
'response_type': 'violation',
|
|
'content': {
|
|
'id': str(int(time.time() * 1000)),
|
|
'date': ts,
|
|
'type': evt_class,
|
|
'level': evt_level,
|
|
}
|
|
}
|
|
self._rx_count += 1
|
|
self._last_rx_ts = time.time()
|
|
self._last_rx_id = can_id
|
|
self._last_rx_text = text[:160]
|
|
with events_lock:
|
|
events.append(entry)
|
|
if len(events) > 200:
|
|
del events[:-200]
|
|
|
|
def _on_raw_received(self, can_id, raw: bytes):
|
|
hx = ' '.join(f'{b:02X}' for b in raw[:32])
|
|
ts = tw_str()
|
|
|
|
# Control frame detection (ID=0x75): throttle command — add to event list for display
|
|
if can_id == 0x75 and len(raw) >= 1:
|
|
level = raw[0]
|
|
status = '关闭油门' if level == 0 else f'油门={level}'
|
|
display = f'CAN id=0x{can_id:03X} {status}'
|
|
print(f' [CHANNEL-C RX] {display}')
|
|
self._rx_count += 1
|
|
self._last_rx_ts = time.time()
|
|
self._last_rx_id = can_id
|
|
self._last_rx_text = display
|
|
|
|
# Add to event list for display
|
|
entry = {
|
|
'server_time': ts,
|
|
'channel': 'CAN',
|
|
'source': 'can',
|
|
'can_id': f'0x{can_id:03X}',
|
|
'response_type': 'throttle',
|
|
'content': {
|
|
'id': str(int(time.time() * 1000)),
|
|
'date': ts,
|
|
'type': 'throttle',
|
|
'level': level,
|
|
'status': status,
|
|
}
|
|
}
|
|
with events_lock:
|
|
events.append(entry)
|
|
if len(events) > 200:
|
|
del events[:-200]
|
|
return
|
|
|
|
# Other CAN IDs: log as raw data, also add to event list
|
|
display = f'RAW [{hx}]'
|
|
print(f' [CHANNEL-C RX-RAW] CAN id=0x{can_id:03X} {display}')
|
|
|
|
self._rx_count += 1
|
|
self._last_rx_ts = time.time()
|
|
self._last_rx_id = can_id
|
|
self._last_rx_text = display
|
|
|
|
entry = {
|
|
'server_time': ts,
|
|
'channel': 'CAN',
|
|
'source': 'can',
|
|
'can_id': f'0x{can_id:03X}',
|
|
'response_type': 'can_raw',
|
|
'content': {
|
|
'id': str(int(time.time() * 1000)),
|
|
'date': ts,
|
|
'type': 'can_raw',
|
|
'level': 0,
|
|
'hex': hx,
|
|
}
|
|
}
|
|
with events_lock:
|
|
events.append(entry)
|
|
if len(events) > 200:
|
|
del events[:-200]
|
|
|
|
def reopen(self, channel=None, bitrate=None):
|
|
with self._lock:
|
|
if self._bus:
|
|
try: self._bus.shutdown()
|
|
except: pass
|
|
self._bus = None
|
|
if channel: self._channel = channel
|
|
if bitrate: self._bitrate = int(bitrate)
|
|
self._open()
|
|
return self.status()
|
|
|
|
def send(self, can_id: int, data: bytes) -> dict:
|
|
with self._lock:
|
|
if not self._available or not self._bus:
|
|
return {'ok': False, 'error': self._last_error or 'CAN not available'}
|
|
try:
|
|
# Keep TX compatible with classic CAN: split long payloads into
|
|
# 8-byte frames (same framing idea as device-side JSON TX).
|
|
frames = 0
|
|
off = 0
|
|
while off < len(data):
|
|
chunk = data[off:off+8]
|
|
msg = can.Message(
|
|
arbitration_id=can_id,
|
|
data=chunk,
|
|
is_extended_id=False,
|
|
is_fd=False,
|
|
)
|
|
self._bus.send(msg)
|
|
frames += 1
|
|
off += len(chunk)
|
|
if len(data) > 8:
|
|
time.sleep(0.001)
|
|
|
|
self._tx_count += frames
|
|
self._last_tx_error = ''
|
|
return {'ok': True, 'frames': frames}
|
|
except Exception as e:
|
|
self._last_error = str(e)
|
|
self._last_tx_error = str(e)
|
|
self._tx_fail += 1
|
|
return {'ok': False, 'error': str(e)}
|
|
|
|
def check(self, can_id: int = 0x120) -> dict:
|
|
# Short probe payload so status checks are valid on classic CAN too.
|
|
payload = b'\xA5'
|
|
tx = self.send(can_id, payload)
|
|
st = self.status()
|
|
st.update({
|
|
'tx_probe_ok': bool(tx.get('ok')),
|
|
'tx_probe_error': tx.get('error', ''),
|
|
})
|
|
return st
|
|
|
|
def status(self) -> dict:
|
|
now = time.time()
|
|
last_rx_age = None
|
|
if self._last_rx_ts > 0:
|
|
last_rx_age = round(now - self._last_rx_ts, 3)
|
|
return {
|
|
'available': self._available,
|
|
'channel': self._channel,
|
|
'bitrate': self._bitrate,
|
|
'last_error': self._last_error,
|
|
'lib': _CAN_AVAILABLE,
|
|
'rx_count': self._rx_count,
|
|
'tx_count': self._tx_count,
|
|
'tx_fail': self._tx_fail,
|
|
'last_rx_age_sec': last_rx_age,
|
|
'last_rx_id': (f'0x{self._last_rx_id:03X}' if self._last_rx_id is not None else ''),
|
|
'last_rx_text': self._last_rx_text,
|
|
'last_tx_error': self._last_tx_error,
|
|
}
|
|
|
|
|
|
_can_bus = CanBus()
|
|
|
|
TZ_TW = datetime.timezone(datetime.timedelta(hours=8))
|
|
def now_tw(): return datetime.datetime.now(TZ_TW)
|
|
def tw_str(dt=None):
|
|
if dt is None: dt = now_tw()
|
|
return dt.strftime('%Y-%m-%dT%H:%M:%S+08:00')
|
|
|
|
UPLOAD_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'uploads')
|
|
STATIC_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'static')
|
|
os.makedirs(UPLOAD_DIR, exist_ok=True)
|
|
|
|
events = [] # Channel A log
|
|
events_lock = threading.Lock()
|
|
|
|
|
|
class Handler(BaseHTTPRequestHandler):
|
|
|
|
def log_message(self, fmt, *args):
|
|
ts = datetime.datetime.now().strftime('%H:%M:%S')
|
|
print(f"[{ts}] {fmt % args}")
|
|
|
|
# ------------------------------------------------------------------
|
|
def do_OPTIONS(self):
|
|
self.send_response(204)
|
|
self._cors()
|
|
self.end_headers()
|
|
|
|
# ------------------------------------------------------------------
|
|
def do_GET(self):
|
|
path = urlparse(self.path).path
|
|
|
|
if path == '/' or path == '/index.html':
|
|
self._serve_file(os.path.join(STATIC_DIR, 'index.html'), 'text/html')
|
|
|
|
elif path == '/api/time':
|
|
self._json({
|
|
'unix': int(time.time()),
|
|
'iso': tw_str(),
|
|
})
|
|
|
|
elif path == '/api/events':
|
|
with events_lock:
|
|
self._json(list(events))
|
|
|
|
elif path == '/api/can/status':
|
|
self._json(_can_bus.status())
|
|
|
|
elif path == '/api/can/check':
|
|
qs = parse_qs(urlparse(self.path).query)
|
|
can_id = int((qs.get('can_id') or ['0x120'])[0], 0)
|
|
self._json(_can_bus.check(can_id=can_id))
|
|
|
|
elif path == '/api/files':
|
|
files = []
|
|
for name in sorted(os.listdir(UPLOAD_DIR), reverse=True):
|
|
fp = os.path.join(UPLOAD_DIR, name)
|
|
if not os.path.isfile(fp):
|
|
continue
|
|
files.append({
|
|
'name': name,
|
|
'size': os.path.getsize(fp),
|
|
'mtime': datetime.datetime.fromtimestamp(
|
|
os.path.getmtime(fp)
|
|
).astimezone(TZ_TW).strftime('%Y-%m-%d %H:%M:%S'),
|
|
'url': f'/uploads/{name}',
|
|
})
|
|
self._json(files)
|
|
|
|
elif path.startswith('/uploads/'):
|
|
parts = path[len('/uploads/'):].split('/', 1)
|
|
name = parts[0]
|
|
fp = os.path.join(UPLOAD_DIR, name)
|
|
|
|
if len(parts) == 1:
|
|
# Download the tar.gz
|
|
if os.path.isfile(fp):
|
|
with open(fp, 'rb') as f:
|
|
data = f.read()
|
|
self.send_response(200)
|
|
self.send_header('Content-Type', 'application/gzip')
|
|
self.send_header('Content-Disposition', f'attachment; filename="{name}"')
|
|
self.send_header('Content-Length', str(len(data)))
|
|
self._cors()
|
|
self.end_headers()
|
|
self.wfile.write(data)
|
|
else:
|
|
self.send_error(404)
|
|
|
|
else:
|
|
# Serve a file from inside the tar.gz
|
|
# BusyBox tar creates entries as "./filename"; normalise to bare name
|
|
inner = parts[1].lstrip('./')
|
|
if os.path.isfile(fp):
|
|
try:
|
|
with tarfile.open(fp, 'r:gz') as tar:
|
|
# Try bare name first, then with "./" prefix
|
|
try:
|
|
member = tar.getmember(inner)
|
|
except KeyError:
|
|
member = tar.getmember('./' + inner)
|
|
f = tar.extractfile(member)
|
|
data = f.read()
|
|
ext = os.path.splitext(inner)[1].lower()
|
|
ct = {'jpg':'image/jpeg','jpeg':'image/jpeg',
|
|
'png':'image/png','json':'application/json'}.get(ext, 'application/octet-stream')
|
|
self.send_response(200)
|
|
self.send_header('Content-Type', ct)
|
|
self.send_header('Content-Length', str(len(data)))
|
|
self._cors()
|
|
self.end_headers()
|
|
self.wfile.write(data)
|
|
except Exception as e:
|
|
self.send_error(404, str(e))
|
|
else:
|
|
self.send_error(404)
|
|
|
|
elif path.startswith('/api/contents/'):
|
|
# List files inside a tar.gz: GET /api/contents/<filename>
|
|
name = os.path.basename(path[len('/api/contents/'):])
|
|
fp = os.path.join(UPLOAD_DIR, name)
|
|
if os.path.isfile(fp):
|
|
try:
|
|
with tarfile.open(fp, 'r:gz') as tar:
|
|
members = []
|
|
for m in tar.getmembers():
|
|
if not m.isfile():
|
|
continue
|
|
# Strip leading "./" added by BusyBox "tar cf - ."
|
|
clean = m.name.lstrip('./')
|
|
if not clean:
|
|
continue
|
|
members.append({
|
|
'name': clean,
|
|
'size': m.size,
|
|
'is_image': clean.lower().endswith(('.jpg','.jpeg','.png')),
|
|
})
|
|
self._json({'ok': True, 'archive': name, 'files': members})
|
|
except Exception as e:
|
|
self._json({'ok': False, 'error': str(e)})
|
|
else:
|
|
self.send_error(404)
|
|
|
|
else:
|
|
self.send_error(404)
|
|
|
|
# ------------------------------------------------------------------
|
|
def do_POST(self):
|
|
path = urlparse(self.path).path
|
|
length = int(self.headers.get('Content-Length', 0))
|
|
body = self.rfile.read(length)
|
|
|
|
# ── Channel A: real-time event JSON ──────────────────────────
|
|
if path == '/api/event':
|
|
try:
|
|
data = json.loads(body.decode('utf-8'))
|
|
entry = {
|
|
'server_time': tw_str(),
|
|
'source': 'http',
|
|
'channel': 'HTTP',
|
|
}
|
|
entry.update(data)
|
|
# Normalize source/channel even if caller passes custom fields.
|
|
if not entry.get('source'):
|
|
entry['source'] = 'http'
|
|
if not entry.get('channel'):
|
|
entry['channel'] = 'HTTP'
|
|
with events_lock:
|
|
events.append(entry)
|
|
if len(events) > 200:
|
|
del events[:-200]
|
|
ctype = data.get('content', {}).get('type', '?')
|
|
level = data.get('content', {}).get('level', '?')
|
|
print(f" [CHANNEL-A] event type={ctype} level={level}")
|
|
self._json({'ok': True})
|
|
except Exception as e:
|
|
print(f" [CHANNEL-A] parse error: {e}")
|
|
self.send_error(400, str(e))
|
|
|
|
# ── Channel C: CAN bus frame send ────────────────────────────
|
|
elif path == '/api/can/send':
|
|
try:
|
|
req = json.loads(body.decode('utf-8'))
|
|
evt_type = req.get('type', 'hazard')
|
|
evt_level = int(req.get('level', 0))
|
|
can_id = int(req.get('can_id', 0x100))
|
|
# Same JSON payload as BLE/iPad channel
|
|
payload = f'{{"class":"{evt_type}","level":{evt_level}}}'
|
|
result = _can_bus.send(can_id, payload.encode('utf-8'))
|
|
if result['ok']:
|
|
print(f" [CHANNEL-C] CAN id=0x{can_id:03X} {payload}")
|
|
else:
|
|
print(f" [CHANNEL-C] CAN FAIL: {result['error']}")
|
|
self._json(result)
|
|
except Exception as e:
|
|
self._json({'ok': False, 'error': str(e)})
|
|
|
|
elif path == '/api/can/send_cmd':
|
|
try:
|
|
req = json.loads(body.decode('utf-8'))
|
|
cmd = int(req.get('cmd', 0)) & 0xFF
|
|
can_id = int(req.get('can_id', 0x75))
|
|
payload = bytes([cmd, 0, 0, 0, 0, 0, 0, 0])
|
|
result = _can_bus.send(can_id, payload)
|
|
if result.get('ok'):
|
|
print(f' [CHANNEL-C TX-CMD] CAN id=0x{can_id:03X} cmd=0x{cmd:02X}')
|
|
ts = tw_str()
|
|
entry = {
|
|
'server_time': ts,
|
|
'channel': 'CAN',
|
|
'source': 'can',
|
|
'can_id': f'0x{can_id:03X}',
|
|
'response_type': 'can_tx_cmd',
|
|
'content': {
|
|
'id': str(int(time.time() * 1000)),
|
|
'date': ts,
|
|
'type': 'can_tx_cmd',
|
|
'level': cmd,
|
|
'hex': f'{cmd:02X} 00 00 00 00 00 00 00',
|
|
}
|
|
}
|
|
with events_lock:
|
|
events.append(entry)
|
|
if len(events) > 200:
|
|
del events[:-200]
|
|
self._json(result)
|
|
except Exception as e:
|
|
self._json({'ok': False, 'error': str(e)})
|
|
|
|
elif path == '/api/can/config':
|
|
try:
|
|
req = json.loads(body.decode('utf-8'))
|
|
result = _can_bus.reopen(
|
|
channel=req.get('channel'),
|
|
bitrate=req.get('bitrate'),
|
|
)
|
|
self._json(result)
|
|
except Exception as e:
|
|
self._json({'ok': False, 'error': str(e)})
|
|
|
|
elif path == '/api/can/bringup':
|
|
import subprocess
|
|
try:
|
|
req = json.loads(body.decode('utf-8'))
|
|
ch = req.get('channel', 'can0')
|
|
br = int(req.get('bitrate', 250000))
|
|
# Bring the interface down first, then up with the requested bitrate
|
|
subprocess.run(['sudo', 'ip', 'link', 'set', ch, 'down'], check=False)
|
|
r = subprocess.run(
|
|
['sudo', 'ip', 'link', 'set', ch, 'up', 'type', 'can', 'bitrate', str(br)],
|
|
capture_output=True, text=True
|
|
)
|
|
if r.returncode == 0:
|
|
print(f'[CAN] brought up {ch} @ {br//1000} kbps')
|
|
self._json({'ok': True})
|
|
else:
|
|
err = (r.stderr or r.stdout).strip()
|
|
print(f'[CAN] bring-up failed: {err}')
|
|
self._json({'ok': False, 'error': err})
|
|
except Exception as e:
|
|
self._json({'ok': False, 'error': str(e)})
|
|
|
|
# ── Channel B: tar.gz archive upload ─────────────────────────
|
|
elif path in ('/api/upload', '/api/golf.cgi'):
|
|
# 1) kCurl sends filename as query param: /api/upload?filename=xxx
|
|
qs = parse_qs(urlparse(self.path).query)
|
|
filename = (qs.get('filename') or [None])[0]
|
|
# 2) fallback: raw-socket upload uses Content-Disposition header
|
|
if not filename:
|
|
cd = self.headers.get('Content-Disposition', '')
|
|
for part in cd.split(';'):
|
|
part = part.strip()
|
|
if part.startswith('filename='):
|
|
filename = part[9:].strip().strip('"')
|
|
# 3) last resort: timestamp-based name
|
|
if not filename:
|
|
ts = now_tw().strftime('%Y%m%d_%H%M%S')
|
|
filename = f'event_{ts}.tar.gz'
|
|
|
|
# sanitise
|
|
filename = os.path.basename(filename)
|
|
fp = os.path.join(UPLOAD_DIR, filename)
|
|
with open(fp, 'wb') as f:
|
|
f.write(body)
|
|
print(f" [CHANNEL-B] {path} saved {filename} ({len(body):,} bytes)")
|
|
self._json({'ok': True, 'path': path, 'filename': filename, 'bytes': len(body)})
|
|
|
|
else:
|
|
self.send_error(404)
|
|
|
|
# ------------------------------------------------------------------
|
|
def _json(self, obj):
|
|
data = json.dumps(obj, ensure_ascii=False, indent=None).encode('utf-8')
|
|
self.send_response(200)
|
|
self.send_header('Content-Type', 'application/json; charset=utf-8')
|
|
self.send_header('Content-Length', str(len(data)))
|
|
self._cors()
|
|
self.end_headers()
|
|
self.wfile.write(data)
|
|
|
|
def _cors(self):
|
|
self.send_header('Access-Control-Allow-Origin', '*')
|
|
self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS')
|
|
self.send_header('Access-Control-Allow-Headers', 'Content-Type, Content-Disposition')
|
|
|
|
def _serve_file(self, filepath, content_type):
|
|
if not os.path.isfile(filepath):
|
|
self.send_error(404)
|
|
return
|
|
with open(filepath, 'rb') as f:
|
|
data = f.read()
|
|
self.send_response(200)
|
|
self.send_header('Content-Type', content_type + '; charset=utf-8')
|
|
self.send_header('Content-Length', str(len(data)))
|
|
# Prevent browser from caching HTML so edits are always picked up
|
|
self.send_header('Cache-Control', 'no-store, no-cache, must-revalidate')
|
|
self.send_header('Pragma', 'no-cache')
|
|
self._cors()
|
|
self.end_headers()
|
|
self.wfile.write(data)
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────────────
|
|
if __name__ == '__main__':
|
|
PORT = 8081
|
|
server = HTTPServer(('0.0.0.0', PORT), Handler)
|
|
print("=" * 55)
|
|
print(" KL630 Golf Event Mock Server")
|
|
print("=" * 55)
|
|
print(f" Dashboard : http://localhost:{PORT}/")
|
|
print(f" Time API : GET http://localhost:{PORT}/api/time")
|
|
print(f" Event API : POST http://localhost:{PORT}/api/event")
|
|
print(f" Upload API : POST http://localhost:{PORT}/api/upload")
|
|
print(f" Golf API : POST http://localhost:{PORT}/api/golf.cgi")
|
|
print(f" CAN status : GET http://localhost:{PORT}/api/can/status")
|
|
print(f" CAN send : POST http://localhost:{PORT}/api/can/send")
|
|
print(f" Uploads : {UPLOAD_DIR}")
|
|
st = _can_bus.status()
|
|
if st['available']:
|
|
print(f" CAN bus : {st['channel']} @ {st['bitrate']//1000} kbps [OK]")
|
|
else:
|
|
print(f" CAN bus : {st.get('last_error','unavailable')} [OFFLINE]")
|
|
if st['lib']:
|
|
print(f" fix: sudo ip link set can0 up type can bitrate 250000")
|
|
print("=" * 55)
|
|
try:
|
|
server.serve_forever()
|
|
except KeyboardInterrupt:
|
|
print("\nStopped.")
|