#!/usr/bin/env python3 """ web_serve.py — KL630 Web Control Panel UniFi-style web UI: compile, deploy via Telnet, RTSP stream preview. Usage: pip install -r requirements.txt python web_serve.py python web_serve.py --port 8080 # by mars """ import sys import os import re import json import shutil import subprocess import threading import time from pathlib import Path try: from flask import Flask, jsonify, request, Response, send_from_directory, abort except ImportError: print("ERROR: Flask not installed. Run: pip install -r requirements.txt") sys.exit(1) import telnetlib # stdlib, no install needed try: import cv2 HAS_CV2 = True except Exception as _e: HAS_CV2 = False print(f"WARNING: opencv-python import failed: {_e}") # ── Paths ───────────────────────────────────────────────────────────────────── SCRIPT_DIR = Path(__file__).resolve().parent BUILD_DIR = SCRIPT_DIR / "build" BINARY_NAME = "kp_firmware_host_stream" INI_SRC = SCRIPT_DIR / "ini" / "host_stream.ini" DEPLOY_SRC = SCRIPT_DIR / "tools" / "device" / "deploy.sh" DEMO_RTSP_SRC = SCRIPT_DIR / "tools" / "device" / "demo_rtsp.sh" DEMO_HDMI_SRC = SCRIPT_DIR / "tools" / "device" / "demo_hdmi.sh" DEMO_RTSP_HDMI_SRC = SCRIPT_DIR / "tools" / "device" / "demo_rtsp_hdmi.sh" NEF_DIR = SCRIPT_DIR / "nef" KCURL_SRC = Path("/home/user/Documents/GOFACE/奧創雲/kCurl-1.1.1-release/kCurl-linux-armv7") CONFIG_FILE = SCRIPT_DIR / ".web_config.json" PROFILES_FILE = SCRIPT_DIR / "model_profiles.json" BIN_DIR_DEVICE = "/mnt/flash/plus/kp_firmware/kp_firmware_0/kp_firmware/bin" FW_PATH_DEVICE = "/mnt/flash/vienna/kp_firmware_host_stream" DEFAULT_CONFIG = { "host_ip": "192.168.3.1", "kl630_ip": "192.168.3.10", "port": 8080, "docker_image": "kl630-dev", } # ── Config ──────────────────────────────────────────────────────────────────── def load_config(): if CONFIG_FILE.exists(): try: return {**DEFAULT_CONFIG, **json.loads(CONFIG_FILE.read_text(encoding="utf-8"))} except Exception: pass return dict(DEFAULT_CONFIG) def save_config(cfg): CONFIG_FILE.write_text(json.dumps(cfg, indent=2), encoding="utf-8") # Patch HOST_URL in deploy.sh to match the saved host_ip + port _patch_deploy_sh(f"http://{cfg['host_ip']}:{cfg['port']}") def _patch_deploy_sh(host_url): """Rewrite HOST_URL line in deploy.sh (source if exists, always build copy).""" pattern = r'HOST_URL="[^"]*"' replacement = f'HOST_URL="{host_url}"' for target in [DEPLOY_SRC, BUILD_DIR / "deploy.sh"]: if target.exists(): text = target.read_text(encoding="utf-8") target.write_text(re.sub(pattern, replacement, text), encoding="utf-8") # ── Build dir helpers ───────────────────────────────────────────────────────── def prepare_build_dir(): BUILD_DIR.mkdir(exist_ok=True) for src, name in [ (INI_SRC, "host_stream.ini"), (DEPLOY_SRC, "deploy.sh"), (DEMO_RTSP_SRC, "demo_rtsp.sh"), (DEMO_HDMI_SRC, "demo_hdmi.sh"), (DEMO_RTSP_HDMI_SRC, "demo_rtsp_hdmi.sh"), ]: if Path(src).exists(): shutil.copy2(src, BUILD_DIR / name) # Copy kCurl ARM binary so device can wget it if KCURL_SRC.exists(): shutil.copy2(KCURL_SRC, BUILD_DIR / "kCurl") # Copy all NEF models into build/nef/ so any model can be wget'd if NEF_DIR.exists(): nef_dst_dir = BUILD_DIR / "nef" nef_dst_dir.mkdir(parents=True, exist_ok=True) for nef in NEF_DIR.iterdir(): if nef.is_file() and nef.suffix.lower() == ".nef": shutil.copy2(nef, nef_dst_dir / nef.name) def list_build_files(): BUILD_DIR.mkdir(exist_ok=True) files = [] for f in sorted(BUILD_DIR.rglob("*")): if f.is_file(): files.append({"name": str(f.relative_to(BUILD_DIR)).replace("\\", "/"), "size": f.stat().st_size}) return files # ── SSE helpers ─────────────────────────────────────────────────────────────── def sse(text, kind="log"): payload = json.dumps({"kind": kind, "text": text.replace("\r", "")}) return f"data: {payload}\n\n" def sse_done(): return sse("__DONE__", "done") _compile_lock = threading.Lock() # ── Flask app ───────────────────────────────────────────────────────────────── app = Flask(__name__) # ── Static file serving (for device wget at root path) ─────────────────────── TOOLS_DEVICE_DIR = SCRIPT_DIR / "tools" / "device" @app.route("/") def serve_build_file(filename): # Demo/deploy scripts are served directly from tools/device/ (source of truth). # Everything else (binary, NEF, INI, ...) comes from build/. tools_path = TOOLS_DEVICE_DIR / filename if tools_path.exists() and tools_path.is_file(): return send_from_directory(TOOLS_DEVICE_DIR, filename) path = BUILD_DIR / filename if path.exists() and path.is_file(): return send_from_directory(BUILD_DIR, filename) abort(404) # ── API: config ─────────────────────────────────────────────────────────────── @app.route("/api/config", methods=["GET"]) def api_config_get(): return jsonify(load_config()) @app.route("/api/config", methods=["POST"]) def api_config_post(): cfg = load_config() data = request.get_json() or {} for k in ("host_ip", "kl630_ip", "port", "docker_image"): if k in data: cfg[k] = data[k] save_config(cfg) return jsonify({"ok": True}) # ── API: files ──────────────────────────────────────────────────────────────── @app.route("/api/files") def api_files(): return jsonify(list_build_files()) # ── API: compile (SSE) ──────────────────────────────────────────────────────── @app.route("/api/compile/run") def api_compile_run(): if not _compile_lock.acquire(blocking=False): def busy(): yield sse("Another compile is already running.", "error") yield sse_done() return Response(busy(), mimetype="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}) cfg = load_config() def generate(): try: docker_image = cfg["docker_image"] if shutil.which("docker") is None: yield sse("ERROR: docker not found in PATH", "error") return # Ensure image exists r = subprocess.run(["docker", "image", "inspect", docker_image], capture_output=True) if r.returncode != 0: yield sse(f"Image '{docker_image}' not found — building from Dockerfile...", "warn") dockerfile = SCRIPT_DIR / "Dockerfile" if not dockerfile.exists(): yield sse(f"ERROR: Dockerfile not found", "error") return proc = subprocess.Popen( ["docker", "build", "-t", docker_image, str(SCRIPT_DIR)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, encoding="utf-8", errors="replace", bufsize=1 ) for line in proc.stdout: yield sse(line.rstrip()) proc.wait() if proc.returncode != 0: yield sse(f"Image build FAILED (exit {proc.returncode})", "error") return yield sse(f"Image '{docker_image}' built.", "ok") else: yield sse(f"Image '{docker_image}' found.", "ok") # Cross-compile mount = str(SCRIPT_DIR).replace("\\", "/") cmd = [ "docker", "run", "--rm", "-v", f"{mount}:/workspace/kl630_build", docker_image, "bash", "/workspace/kl630_build/compile.sh", ] yield sse(f"Starting cross-compile (ARM armv7-a)...") proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, encoding="utf-8", errors="replace", bufsize=1 ) for line in proc.stdout: yield sse(line.rstrip()) proc.wait() if proc.returncode != 0: yield sse(f"Compile FAILED (exit {proc.returncode})", "error") return # Copy scripts to build/ prepare_build_dir() yield sse("Copied INI + scripts to build/", "ok") yield sse("Compile SUCCESS", "ok") finally: _compile_lock.release() yield sse_done() return Response(generate(), mimetype="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}) # ── Telnet helper ───────────────────────────────────────────────────────────── _SENTINEL = "__KL630_DONE__" def _telnet_connect(ip, timeout=10): """ Connect to KL630 Telnet (root, no password). KL630 BusyBox may auto-login or show login: — handles both. Returns telnetlib.Telnet ready to accept commands. """ tn = telnetlib.Telnet(ip, 23, timeout=timeout) # Wait for either a login prompt or the shell '#' prompt (up to 5 s). # read_until is reliable; read_very_eager() was timing-dependent. try: data = tn.read_until(b"#", timeout=5).decode("utf-8", errors="replace") except Exception: data = "" if "login:" in data.lower(): tn.write(b"root\n") try: after = tn.read_until(b"#", timeout=5).decode("utf-8", errors="replace") except Exception: after = "" if "password:" in after.lower(): tn.write(b"\n") try: tn.read_until(b"#", timeout=5) except Exception: pass # Flush any trailing chars after '#' (e.g. the space in "# ") time.sleep(0.1) tn.read_very_eager() return tn _ANSI_ESC = re.compile(r'\x1b\[[0-9;]*[A-Za-z]|\x1b\][^\x07]*\x07|\r') def _strip_ansi(text): return _ANSI_ESC.sub("", text) def _telnet_run(tn, cmd, timeout=60): """ Run cmd, block until sentinel received, return output lines. Uses read_until() so it reliably waits for slow commands (wget etc.). ANSI escape codes are stripped. If read_until times out (sentinel not found), the stale sentinel is drained before returning so subsequent commands are not contaminated. """ tn.write(f"{cmd}; echo {_SENTINEL}\n".encode()) try: data = tn.read_until(_SENTINEL.encode(), timeout=timeout) except EOFError: return [] # Timed out — sentinel was NOT found in the returned data. # Send a fresh unique sync marker so read_until consumes ALL stale output # (including the original sentinel that may still be in transit). # This guarantees the buffer is clean before the next command runs. if _SENTINEL.encode() not in data: sync = f"__KL630_SYNC_{id(tn) & 0xFFFF:04X}__".encode() try: tn.write(b"echo " + sync + b"\n") tn.read_until(sync, timeout=25) except Exception: pass time.sleep(0.1) try: tn.read_very_eager() except Exception: pass return [] text = _strip_ansi(data.decode("utf-8", errors="replace")) try: tn.read_until(b"\n", timeout=2) # consume newline after sentinel except Exception: pass lines = [] cmd_prefix = cmd[:40] for ln in text.splitlines(): ln = ln.strip() if (ln and _SENTINEL not in ln and ln not in ("#", "$") and cmd_prefix not in ln): # skip shell echo of the command lines.append(ln) return lines def _telnet_run_bg(tn, cmd): """Fire a background command (nohup … &) without waiting.""" tn.write(f"{cmd}\n".encode()) time.sleep(1.5) try: out = tn.read_very_eager().decode("utf-8", errors="replace") return [ln.strip() for ln in out.splitlines() if ln.strip()] except Exception: return [] def _drain_shell(tn): """Drain all residual shell output and disable terminal echo. Root cause of cascade: BusyBox shell echoes every command back into the socket. read_until(SENTINEL) finds the sentinel inside the command echo *before* the command executes, returns early, and leaves the real output + real sentinel in the buffer for the next command to misread. Fix: drain with a sentinel echo (works even when echo is on because read_very_eager() consumes the actual output), then run `stty -echo` so subsequent commands are NOT reflected back. """ # Step 1 – drain any residual data (echo ON here, but read_very_eager # catches the actual sentinel output after read_until grabs the echo copy) sync = b"__KL630_RDY__" tn.write(b"echo " + sync + b"\n") try: tn.read_until(sync, timeout=10) # finds sync in command echo — fine except Exception: pass time.sleep(0.1) try: tn.read_very_eager() # drain actual sync output + prompt except Exception: pass # Step 2 – disable terminal echo so commands aren't reflected back tn.write(b"stty -echo\n") time.sleep(0.2) try: tn.read_very_eager() # drain the echo of "stty -echo" itself + prompt except Exception: pass def _sync_demo_scripts_on_device(tn, base, bd): """Sync demo restart scripts used by Apply-to-Device restart path.""" cmds = [ ("Syncing demo_rtsp.sh...", f"wget -q {base}/demo_rtsp.sh -O {bd}/ini/demo_rtsp.sh && chmod +x {bd}/ini/demo_rtsp.sh && cp {bd}/ini/demo_rtsp.sh {bd}/demo_rtsp.sh && cp {bd}/ini/demo_rtsp.sh /mnt/flash/vienna/demo_rtsp.sh && echo 'demo_rtsp.sh OK'", 30), ("Syncing demo_hdmi.sh...", f"wget -q {base}/demo_hdmi.sh -O {bd}/ini/demo_hdmi.sh && chmod +x {bd}/ini/demo_hdmi.sh && cp {bd}/ini/demo_hdmi.sh {bd}/demo_hdmi.sh && cp {bd}/ini/demo_hdmi.sh /mnt/flash/vienna/demo_hdmi.sh && echo 'demo_hdmi.sh OK'", 30), ("Syncing demo_rtsp_hdmi.sh...", f"wget -q {base}/demo_rtsp_hdmi.sh -O {bd}/ini/demo_rtsp_hdmi.sh && chmod +x {bd}/ini/demo_rtsp_hdmi.sh && cp {bd}/ini/demo_rtsp_hdmi.sh {bd}/demo_rtsp_hdmi.sh && cp {bd}/ini/demo_rtsp_hdmi.sh /mnt/flash/vienna/demo_rtsp_hdmi.sh && echo 'demo_rtsp_hdmi.sh OK'", 30), ] out = [] for label, cmd, timeout in cmds: out.append(("log", label)) out.append(("prompt", f"$ {cmd}")) for ln in _telnet_run(tn, cmd, timeout=timeout): out.append(("ok", ln)) return out def _verify_firmware_running(tn): """Check whether firmware process is up after restart and return parsed result.""" check_cmd = ( "for i in 1 2 3 4 5; do " "pid=$(pidof kp_firmware_host_stream 2>/dev/null); " "if [ -n \"$pid\" ]; then echo \"FW PID: $pid\"; exit 0; fi; " "sleep 1; " "done; " "echo 'FW NOT RUNNING'; " "tail -n 30 /tmp/fw.log 2>/dev/null" ) lines = _telnet_run(tn, check_cmd, timeout=20) ok = any("FW PID:" in ln for ln in lines) return ok, lines # ── API: deploy via Telnet (SSE) ────────────────────────────────────────────── @app.route("/api/deploy/run") def api_deploy_run(): cfg = load_config() kl_ip = request.args.get("ip", cfg["kl630_ip"]) h_ip = request.args.get("host_ip", cfg["host_ip"]) port = int(request.args.get("port", cfg["port"])) out_rtsp = request.args.get("out_rtsp", "1") == "1" out_hdmi = request.args.get("out_hdmi", "0") == "1" base = f"http://{h_ip}:{port}" bd = BIN_DIR_DEVICE fw = FW_PATH_DEVICE ini = f"{bd}/ini/host_stream.ini" model_path = _ini_get_str("ModelPath") or "nef/STDC_0520.nef" nef_filename = model_path.split("/")[-1] if out_hdmi and not out_rtsp: start_cmd = f"cd {bd} && nohup sh ./ini/demo_hdmi.sh > /tmp/fw.log 2>&1 &" start_label = "Starting HDMI demo (background)..." elif out_rtsp and out_hdmi: start_cmd = f"cd {bd} && nohup sh ./ini/demo_rtsp_hdmi.sh > /tmp/fw.log 2>&1 &" start_label = "Starting RTSP+HDMI demo (background)..." elif out_rtsp: start_cmd = f"cd {bd} && nohup sh ./ini/demo_rtsp.sh > /tmp/fw.log 2>&1 &" start_label = "Starting RTSP demo (background)..." else: start_cmd = f"cd {bd} && nohup LD_LIBRARY_PATH=/mnt/flash/vienna/lib {fw} > /tmp/fw.log 2>&1 &" start_label = "Starting firmware (inference only)..." voc_val = 1 if out_hdmi else 0 set_voc = f"sed -i 's/^voc_enable.*/voc_enable = {voc_val}/' {ini} && echo 'voc OK'" # (label, cmd, background, timeout_s) steps = [ ("Stopping old firmware...", "killall -9 kp_firmware_host_stream 2>/dev/null; killall -9 rtsps 2>/dev/null; sleep 1; rm -f /dev/shm/*", False, 30), ("Downloading firmware binary...", f"wget -q {base}/kp_firmware_host_stream -O {fw} && chmod +x {fw} && ls -lh {fw}", False, 60), ("Downloading INI...", f"wget -q {base}/host_stream.ini -O {ini} && echo 'INI OK'", False, 30), (f"Downloading NEF model ({nef_filename})...", f"mkdir -p {bd}/nef && wget -q {base}/nef/{nef_filename} -O {bd}/nef/{nef_filename} && echo 'NEF OK'", False, 180), ("Downloading demo_rtsp.sh...", f"wget -q {base}/demo_rtsp.sh -O {bd}/ini/demo_rtsp.sh && chmod +x {bd}/ini/demo_rtsp.sh && cp {bd}/ini/demo_rtsp.sh {bd}/demo_rtsp.sh && cp {bd}/ini/demo_rtsp.sh /mnt/flash/vienna/demo_rtsp.sh && echo 'demo_rtsp.sh OK'", False, 30), ("Downloading demo_hdmi.sh...", f"wget -q {base}/demo_hdmi.sh -O {bd}/ini/demo_hdmi.sh && chmod +x {bd}/ini/demo_hdmi.sh && cp {bd}/ini/demo_hdmi.sh {bd}/demo_hdmi.sh && cp {bd}/ini/demo_hdmi.sh /mnt/flash/vienna/demo_hdmi.sh && echo 'demo_hdmi.sh OK'", False, 30), ("Downloading demo_rtsp_hdmi.sh...", f"wget -q {base}/demo_rtsp_hdmi.sh -O {bd}/ini/demo_rtsp_hdmi.sh && chmod +x {bd}/ini/demo_rtsp_hdmi.sh && cp {bd}/ini/demo_rtsp_hdmi.sh {bd}/demo_rtsp_hdmi.sh && cp {bd}/ini/demo_rtsp_hdmi.sh /mnt/flash/vienna/demo_rtsp_hdmi.sh && echo 'demo_rtsp_hdmi.sh OK'", False, 30), ("Pushing deploy.sh to device...", f"wget -q {base}/deploy.sh -O /tmp/deploy.sh && chmod +x /tmp/deploy.sh && echo 'deploy.sh OK'", False, 30), ("Downloading kCurl (event upload tool)...", f"wget -q {base}/kCurl -O {bd}/kCurl && chmod +x {bd}/kCurl && echo 'kCurl OK'", False, 30), ("Setting VOC/HDMI output...", set_voc, False, 10), (start_label, start_cmd, True, 10), ] def generate(): yield sse(f"Connecting to {kl_ip}:23 via Telnet...") try: tn = _telnet_connect(kl_ip) _drain_shell(tn) yield sse(f"Connected to {kl_ip}", "ok") for label, cmd, bg, timeout in steps: yield sse(label) yield sse(f"$ {cmd}", "prompt") if bg: for ln in _telnet_run_bg(tn, cmd): if ln.strip(): yield sse(ln.strip()) else: for ln in _telnet_run(tn, cmd, timeout=timeout): yield sse(ln) tn.close() yield sse("Deploy complete. Firmware running in background.", "ok") yield sse(f"Firmware log: /tmp/fw.log", "log") except EOFError: yield sse("Connection closed by device.", "error") except Exception as e: yield sse(f"Telnet error: {e}", "error") yield sse_done() return Response(generate(), mimetype="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}) # ── API: first-time ISP setup via Telnet (SSE) ──────────────────────────────── @app.route("/api/setup/run") def api_setup_run(): cfg = load_config() kl_ip = request.args.get("ip", cfg["kl630_ip"]) bd = BIN_DIR_DEVICE awb = f"{bd}/Resource/AWB/AutoWhiteBalance.ini" isp0 = f"{bd}/Resource/ISP/0/pqtable_ispe_Config.cfg" isp1 = f"{bd}/Resource/ISP/1/pqtable_ispe_Config.cfg" commands = [ f"sed -i 's/dwStatisticsSrcType = 0/dwStatisticsSrcType = 2/' {awb}", f"sed -i 's/bGTREnable = 0/bGTREnable = 1/' {isp0}", f"sed -i 's/bGTREnable = 0/bGTREnable = 1/' {isp1}", f"grep dwStatisticsSrcType {awb}", f"grep bGTREnable {isp0}", ] def generate(): yield sse(f"Connecting to {kl_ip}:23 via Telnet...") try: tn = _telnet_connect(kl_ip) _drain_shell(tn) yield sse("Connected.", "ok") for cmd in commands: yield sse(f"$ {cmd}", "prompt") for ln in _telnet_run(tn, cmd): yield sse(ln, "ok") tn.close() yield sse("One-time ISP setup complete. Settings written to flash.", "ok") except Exception as e: yield sse(f"Telnet error: {e}", "error") yield sse_done() return Response(generate(), mimetype="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}) # ── API: RTSP → MJPEG proxy (via opencv-python, no system ffmpeg needed) ────── _stream_active = False @app.route("/api/stream/video") def api_stream_video(): global _stream_active if not HAS_CV2: return "opencv-python not installed. Run: pip install -r requirements.txt", 503 cfg = load_config() kl_ip = request.args.get("ip", cfg["kl630_ip"]) rtsp_url = f"rtsp://{kl_ip}/live1.sdp" # Suppress OpenCV WARN spam (e.g. "Stream timeout triggered") os.environ["OPENCV_LOG_LEVEL"] = "SILENT" # Force TCP, set 5-second connect timeout (microseconds) os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = "rtsp_transport;tcp|timeout;5000000" cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG) cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) # Try first frame with 6-second wall-clock timeout first = [False, None] def _read_first(): first[0], first[1] = cap.read() t = threading.Thread(target=_read_first, daemon=True) t.start() t.join(timeout=6) if not first[0]: cap.release() return "RTSP stream not available — is firmware running?", 503 def mjpeg_frames(): global _stream_active _stream_active = True try: # Serve the first frame we already read _, jpeg = cv2.imencode(".jpg", cv2.resize(first[1], (960, 540)), [cv2.IMWRITE_JPEG_QUALITY, 70]) yield b"--frame\r\nContent-Type: image/jpeg\r\n\r\n" + jpeg.tobytes() + b"\r\n" fail_count = 0 while _stream_active: ret, frame = cap.read() if not ret: fail_count += 1 if fail_count >= 10: break time.sleep(0.1) continue fail_count = 0 frame = cv2.resize(frame, (960, 540)) _, jpeg = cv2.imencode(".jpg", frame, [cv2.IMWRITE_JPEG_QUALITY, 70]) yield b"--frame\r\nContent-Type: image/jpeg\r\n\r\n" + jpeg.tobytes() + b"\r\n" finally: cap.release() _stream_active = False return Response( mjpeg_frames(), mimetype="multipart/x-mixed-replace; boundary=frame", ) @app.route("/api/stream/stop", methods=["POST"]) def api_stream_stop(): global _stream_active _stream_active = False return jsonify({"ok": True}) # ── STDC live stats (background Telnet polling) ─────────────────────────────── _stdc_stats = {} _stdc_lock = threading.Lock() _stdc_running = False _stdc_thread = None _RE_STDC_CLASS = re.compile( r'\[STDC\] frame=(\d+) mov=(\d+) diff=([\d.]+) ' r'bunker=([\d.]+)% car=([\d.]+)% grass=([\d.]+)% greenery=([\d.]+)% ' r'person=([\d.]+)% pond=([\d.]+)% road=([\d.]+)% tree=([\d.]+)%' ) _RE_STDC_STATUS = re.compile(r'\[STDC\] (ON ROAD|ON GRASS)') _RE_STDC_GRASSTIME = re.compile(r'\[STDC\] ON GRASS: ([\d.]+)s') _RE_STDC_WARN = re.compile(r'\[STDC WARN\] (.+)') def _parse_stdc_lines(lines): result = {} warns = [] for ln in reversed(lines): m = _RE_STDC_CLASS.search(ln) if m and 'road' not in result: result.update({ 'frame': int(m.group(1)), 'moving': int(m.group(2)), 'diff': float(m.group(3)), 'bunker': float(m.group(4)), 'car': float(m.group(5)), 'grass': float(m.group(6)), 'greenery': float(m.group(7)), 'person': float(m.group(8)), 'pond': float(m.group(9)), 'road': float(m.group(10)), 'tree': float(m.group(11)), }) ms = _RE_STDC_STATUS.search(ln) if ms and 'status' not in result: result['status'] = ms.group(1) mg = _RE_STDC_GRASSTIME.search(ln) if mg and 'grass_time' not in result: result['grass_time'] = float(mg.group(1)) result['grass_warning'] = '*** GRASS WARNING ***' in ln mw = _RE_STDC_WARN.search(ln) if mw: w = mw.group(1).strip() if w not in warns: warns.append(w) if warns: result['warns'] = list(reversed(warns)) return result def _stdc_poll_loop(ip): global _stdc_running tn = None while _stdc_running: try: if tn is None: tn = _telnet_connect(ip) lines = _telnet_run(tn, "tail -30 /tmp/fw.log 2>/dev/null", timeout=5) parsed = _parse_stdc_lines(lines) if parsed: with _stdc_lock: _stdc_stats.update(parsed) except Exception: try: if tn: tn.close() except Exception: pass tn = None time.sleep(3) continue time.sleep(0.5) if tn: try: tn.close() except Exception: pass @app.route("/api/stdc/start", methods=["POST"]) def api_stdc_start(): global _stdc_running, _stdc_thread data = request.get_json(force=True) or {} ip = data.get("ip", load_config()["kl630_ip"]) if _stdc_running and _stdc_thread and _stdc_thread.is_alive(): return jsonify({"ok": True, "msg": "already running"}) _stdc_running = True with _stdc_lock: _stdc_stats.clear() _stdc_thread = threading.Thread(target=_stdc_poll_loop, args=(ip,), daemon=True) _stdc_thread.start() return jsonify({"ok": True}) @app.route("/api/stdc/stop", methods=["POST"]) def api_stdc_stop(): global _stdc_running _stdc_running = False with _stdc_lock: _stdc_stats.clear() return jsonify({"ok": True}) @app.route("/api/stdc/stats") def api_stdc_stats(): with _stdc_lock: return jsonify(dict(_stdc_stats)) # ── Persistent terminal Telnet session ─────────────────────────────────────── _term_tn = None _term_ip = None _term_lock = threading.Lock() def _get_term_tn(ip): global _term_tn, _term_ip # Reuse existing connection if same IP; reconnect if dead or changed if _term_tn is not None and _term_ip == ip: try: _term_tn.get_socket().getpeername() # Also verify the shell is actually responsive (TCP alive ≠ shell alive) _term_tn.write(b"echo __TN_ALIVE__\n") data = _term_tn.read_until(b"__TN_ALIVE__", timeout=3) if b"__TN_ALIVE__" in data: try: _term_tn.read_until(b"\n", timeout=1) # consume trailing newline except Exception: pass return _term_tn except Exception: pass try: if _term_tn: _term_tn.close() except Exception: pass _term_tn = _telnet_connect(ip) _drain_shell(_term_tn) _term_ip = ip return _term_tn # ── API: terminal exec (SSE) ────────────────────────────────────────────────── @app.route("/api/terminal/exec") def api_terminal_exec(): cmd = request.args.get("cmd", "").strip() ip = request.args.get("ip", load_config()["kl630_ip"]) if not cmd: return jsonify({"error": "empty cmd"}), 400 def generate(): global _term_tn, _term_ip try: with _term_lock: tn = _get_term_tn(ip) lines = _telnet_run(tn, cmd, timeout=30) if lines: for ln in lines: yield sse(ln, "log") else: yield sse("(no output)", "log") except Exception as e: with _term_lock: _term_tn = None # force reconnect next time yield sse(f"Error: {e}", "error") yield sse_done() return Response(generate(), mimetype="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}) # ── API: write autostart script (SSE) ───────────────────────────────────────── @app.route("/api/autostart/write") def api_autostart_write(): ip = request.args.get("ip", load_config()["kl630_ip"]) out_rtsp = request.args.get("out_rtsp", "1") == "1" out_hdmi = request.args.get("out_hdmi", "0") == "1" bd = BIN_DIR_DEVICE fw = FW_PATH_DEVICE ini = f"{bd}/ini/host_stream.ini" # S95done in rcS checks /mnt/flash/etc/rc.local and runs it — this is the boot hook. # Must run in background (&) so init can continue to spawn the login shell. VIENNA = "/mnt/flash/vienna" target = "/mnt/flash/etc/rc.local" if out_hdmi and not out_rtsp: demo_line = f"nohup sh {VIENNA}/demo_hdmi.sh > /tmp/fw.log 2>&1 &" elif out_rtsp and out_hdmi: demo_line = f"nohup sh {VIENNA}/demo_rtsp_hdmi.sh > /tmp/fw.log 2>&1 &" elif out_rtsp: demo_line = f"nohup sh {VIENNA}/demo_rtsp.sh > /tmp/fw.log 2>&1 &" else: demo_line = f"nohup sh -c 'LD_LIBRARY_PATH={VIENNA}/lib {fw}' > /tmp/fw.log 2>&1 &" script_lines = [ "#!/bin/sh", "# KL630 firmware autostart — written by web_serve.py", "mkdir -p /tmp/venc/c0/", "mkdir -p /tmp/aenc/c0/", "mkdir -p /tmp/playback/c0/", "mkdir -p /tmp/twoway/c0/", "mkdir -p /tmp/sr/c0/", "mkdir -p /tmp/sdcard", "mount /dev/mmcblk0p1 /tmp/sdcard 2>/dev/null || true", f"cd {VIENNA}/drivers && sh driver.sh 2>/dev/null", "sleep 2", f"export LD_LIBRARY_PATH={VIENNA}/lib", f"cd {VIENNA}", "sleep 3", demo_line, ] # Use printf with \n to avoid single-quote escaping issues in echo # Each line is written via printf '%s\n' "line" >> target def _esc_printf(s): # Escape double-quotes and backslashes for embedding in printf "..." return s.replace("\\", "\\\\").replace('"', '\\"') write_cmds = [f'printf "%s\\n" "{_esc_printf(ln)}" >> {target}' for ln in script_lines] modes = [] if out_rtsp: modes.append("RTSP") if out_hdmi: modes.append("HDMI") mode_str = "+".join(modes) if modes else "inference-only" def generate(): yield sse(f"Output mode: {mode_str}") yield sse(f"Connecting to {ip}:23...") try: tn = _telnet_connect(ip) _drain_shell(tn) yield sse("Connected.", "ok") # Ensure directory exists and clear existing file _telnet_run(tn, f"mkdir -p /mnt/flash/etc && rm -f {target}", timeout=5) for cmd in write_cmds: _telnet_run(tn, cmd, timeout=5) # silent, just writing for ln in _telnet_run(tn, f"chmod +x {target} && echo 'chmod OK'"): yield sse(ln, "ok") yield sse(f"Script written to {target}", "ok") # Sync to flash before closing (UBIFS needs explicit sync or data is lost on power-off) _telnet_run(tn, "sync", timeout=10) # Verify for ln in _telnet_run(tn, f"cat {target}"): yield sse(ln, "log") tn.close() yield sse("Autostart configured. Will run on next boot.", "ok") except Exception as e: yield sse(f"Telnet error: {e}", "error") yield sse_done() return Response(generate(), mimetype="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}) @app.route("/api/mount_sd") def api_mount_sd(): ip = request.args.get("ip", load_config()["kl630_ip"]) def generate(): yield sse(f"Connecting to {ip}:23...") try: tn = _telnet_connect(ip) _drain_shell(tn) yield sse("Connected.", "ok") for ln in _telnet_run(tn, "mkdir -p /tmp/sdcard", timeout=5): yield sse(ln, "log") for ln in _telnet_run(tn, "mount /dev/mmcblk0p1 /tmp/sdcard 2>&1 || echo 'already mounted or no SD card'", timeout=10): yield sse(ln, "log") for ln in _telnet_run(tn, "df -h /tmp/sdcard", timeout=5): yield sse(ln, "ok") tn.close() yield sse("SD card mount done.", "ok") except Exception as e: yield sse(f"Telnet error: {e}", "error") yield sse_done() return Response(generate(), mimetype="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}) @app.route("/api/autostart/read") def api_autostart_read(): ip = request.args.get("ip", load_config()["kl630_ip"]) target = "/mnt/flash/etc/rc.local" def generate(): yield sse(f"Reading {target} from {ip}...") try: tn = _telnet_connect(ip) _drain_shell(tn) lines = _telnet_run(tn, f"cat {target} 2>/dev/null || echo '(file not found)'") tn.close() for ln in lines: yield sse(ln, "log") except Exception as e: yield sse(f"Telnet error: {e}", "error") yield sse_done() return Response(generate(), mimetype="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}) # ── INI helpers ────────────────────────────────────────────────────────────── def _ini_get(key): """Read a key's integer value from host_stream.ini.""" try: text = INI_SRC.read_text(encoding="utf-8") m = re.search(rf"^\s*{re.escape(key)}\s*=\s*(\d+)", text, re.MULTILINE) return int(m.group(1)) if m else None except Exception: return None def _ini_set(key, value): """Patch an integer key in host_stream.ini (preserves comments) and refresh build/.""" text = INI_SRC.read_text(encoding="utf-8") text = re.sub( rf"^(\s*{re.escape(key)}\s*=\s*)\d+", lambda m: m.group(1) + str(value), text, flags=re.MULTILINE ) INI_SRC.write_text(text, encoding="utf-8") prepare_build_dir() def _ini_get_str(key): """Read a quoted string value from host_stream.ini (e.g. ModelPath).""" try: text = INI_SRC.read_text(encoding="utf-8") m = re.search(rf'^\s*{re.escape(key)}\s*=\s*"([^"]*)"', text, re.MULTILINE) return m.group(1) if m else None except Exception: return None def _ini_set_str(key, value): """Patch a quoted string key in host_stream.ini (e.g. ModelPath).""" text = INI_SRC.read_text(encoding="utf-8") text = re.sub( rf'^(\s*{re.escape(key)}\s*=\s*)"[^"]*"', lambda m: m.group(1) + f'"{value}"', text, flags=re.MULTILINE ) INI_SRC.write_text(text, encoding="utf-8") prepare_build_dir() # ── API: INI read ───────────────────────────────────────────────────────────── @app.route("/api/ini", methods=["GET"]) def api_ini_get(): return jsonify({ "fec_mode": _ini_get("fec_mode"), "initial_fec_app_type": _ini_get("initial_fec_app_type"), "eis_enable": _ini_get("eis_enable"), "draw_box_enable": _ini_get("DrawBoxEnable"), "voc_enable": _ini_get("voc_enable"), "verbose_log": _ini_get("verbose_log"), }) # ── API: INI write + optional device apply (SSE) ───────────────────────────── @app.route("/api/ini/apply") def api_ini_apply(): fec_mode = int(request.args.get("fec_mode", 0)) app_type = int(request.args.get("app_type", 0)) eis = int(request.args.get("eis", 0)) draw_box = int(request.args.get("draw_box", 0)) verbose_log = int(request.args.get("verbose_log", 0)) out_rtsp = request.args.get("out_rtsp", "1") == "1" out_hdmi = request.args.get("out_hdmi", "0") == "1" apply_dev = request.args.get("device", "0") == "1" cfg = load_config() kl_ip = request.args.get("ip", cfg["kl630_ip"]) base = f"http://{cfg['host_ip']}:{cfg['port']}" bd = BIN_DIR_DEVICE ini_dev = f"{bd}/ini/host_stream.ini" def generate(): # 1. Update INI on disk (including voc_enable) voc_val = 1 if out_hdmi else 0 _ini_set("fec_mode", fec_mode) _ini_set("initial_fec_app_type", app_type) _ini_set("eis_enable", eis) _ini_set("DrawBoxEnable", draw_box) _ini_set("voc_enable", voc_val) _ini_set("verbose_log", verbose_log) yield sse(f"INI updated: fec_mode={fec_mode} app_type={app_type} eis={eis} draw_box={draw_box} voc={voc_val} verbose_log={verbose_log}", "ok") if not apply_dev: yield sse_done() return # 2. Push to device via Telnet + restart firmware yield sse(f"Connecting to {kl_ip}:23...") try: tn = _telnet_connect(kl_ip) _drain_shell(tn) yield sse("Connected.", "ok") sed_cmds = [ f"sed -i 's/^fec_mode.*/fec_mode = {fec_mode}/' {ini_dev} && echo 'fec_mode OK'", f"sed -i 's/^initial_fec_app_type.*/initial_fec_app_type = {app_type}/' {ini_dev} && echo 'app_type OK'", f"sed -i 's/^eis_enable.*/eis_enable = {eis}/' {ini_dev} && echo 'eis OK'", f"sed -i 's/^DrawBoxEnable.*/DrawBoxEnable = {draw_box}/' {ini_dev} && echo 'draw_box OK'", f"sed -i 's/^verbose_log.*/verbose_log = {verbose_log}/' {ini_dev} && echo 'verbose_log OK'", ] for cmd in sed_cmds: yield sse(f"$ {cmd}", "prompt") for ln in _telnet_run(tn, cmd): yield sse(ln, "ok") # Set VOC/HDMI output mode _telnet_run(tn, f"sed -i 's/^voc_enable.*/voc_enable = {voc_val}/' {ini_dev}") # Keep restart scripts in sync with host so Apply behaves like Deploy. for kind, text in _sync_demo_scripts_on_device(tn, base, bd): yield sse(text, kind) # Restart firmware — choose script based on output mode yield sse("Restarting firmware...") _telnet_run(tn, "killall -9 kp_firmware_host_stream 2>/dev/null; killall -9 rtsps 2>/dev/null; sleep 1; rm -f /dev/shm/*") if out_hdmi and not out_rtsp: restart_cmd = f"cd {bd} && nohup sh ./ini/demo_hdmi.sh > /tmp/fw.log 2>&1 &" elif out_rtsp and out_hdmi: restart_cmd = f"cd {bd} && nohup sh ./ini/demo_rtsp_hdmi.sh > /tmp/fw.log 2>&1 &" elif out_rtsp: restart_cmd = f"cd {bd} && nohup sh ./ini/demo_rtsp.sh > /tmp/fw.log 2>&1 &" else: restart_cmd = f"cd {bd} && nohup LD_LIBRARY_PATH=/mnt/flash/vienna/lib {FW_PATH_DEVICE} > /tmp/fw.log 2>&1 &" _telnet_run_bg(tn, restart_cmd) ok, lines = _verify_firmware_running(tn) for ln in lines: yield sse(ln, "log" if ok else "warn") tn.close() if ok: yield sse("Firmware restarted.", "ok") else: yield sse("Restart command sent, but firmware is not running. Use Deploy to fully resync binary/scripts.", "error") except Exception as e: yield sse(f"Telnet error: {e}", "error") yield sse_done() return Response(generate(), mimetype="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}) # ── API: model management ──────────────────────────────────────────────────── @app.route("/api/model/list") def api_model_list(): nef_build = BUILD_DIR / "nef" nef_build.mkdir(parents=True, exist_ok=True) files = [ {"name": f.name, "size": f.stat().st_size} for f in sorted(nef_build.iterdir()) if f.is_file() and f.suffix.lower() == ".nef" ] return jsonify(files) @app.route("/api/model") def api_model_get(): return jsonify({ "model_path": _ini_get_str("ModelPath"), "model_id": _ini_get("ModelId"), "job_id": _ini_get("JobId"), }) @app.route("/api/model/upload", methods=["POST"]) def api_model_upload(): if "file" not in request.files: return jsonify({"error": "no file"}), 400 f = request.files["file"] if not f.filename.lower().endswith(".nef"): return jsonify({"error": "only .nef files are accepted"}), 400 nef_build = BUILD_DIR / "nef" nef_build.mkdir(parents=True, exist_ok=True) NEF_DIR.mkdir(exist_ok=True) safe_name = re.sub(r"[^a-zA-Z0-9._\-]", "_", f.filename) f.save(str(nef_build / safe_name)) # serve immediately shutil.copy2(nef_build / safe_name, NEF_DIR / safe_name) # keep source in sync return jsonify({"ok": True, "filename": safe_name, "size": (nef_build / safe_name).stat().st_size}) @app.route("/api/model/profiles", methods=["GET", "POST"]) def api_model_profiles(): if request.method == "POST": profiles = request.get_json(force=True) if not isinstance(profiles, list): return jsonify({"error": "expected a JSON array"}), 400 PROFILES_FILE.write_text(json.dumps(profiles, indent=2, ensure_ascii=False)) return jsonify({"ok": True}) if PROFILES_FILE.exists(): return jsonify(json.loads(PROFILES_FILE.read_text())) return jsonify([]) @app.route("/api/model/apply") def api_model_apply(): model_path = request.args.get("model_path", "").strip() model_id = int(request.args.get("model_id", 32769)) job_id = int(request.args.get("job_id", 200)) apply_dev = request.args.get("device", "0") == "1" out_rtsp = request.args.get("out_rtsp", "1") == "1" out_hdmi = request.args.get("out_hdmi", "0") == "1" cfg = load_config() kl_ip = request.args.get("ip", cfg["kl630_ip"]) base = f"http://{cfg['host_ip']}:{cfg['port']}" bd = BIN_DIR_DEVICE ini_dev = f"{bd}/ini/host_stream.ini" def generate(): # 1. Update INI on disk if model_path: _ini_set_str("ModelPath", model_path) _ini_set("ModelId", model_id) _ini_set("JobId", job_id) yield sse(f"INI saved: ModelPath={model_path} ModelId={model_id} JobId={job_id}", "ok") if not apply_dev: yield sse_done() return # 2. Apply to device via Telnet nef_filename = model_path.split("/")[-1] if model_path else "" yield sse(f"Connecting to {kl_ip}:23...") try: tn = _telnet_connect(kl_ip) _drain_shell(tn) yield sse("Connected.", "ok") # Patch INI on device sed_cmds = [] if model_path: sed_cmds.append( f"sed -i 's|^ModelPath.*|ModelPath = \"{model_path}\"|' {ini_dev} && echo 'ModelPath OK'" ) sed_cmds += [ f"sed -i 's/^ModelId.*/ModelId = {model_id}/' {ini_dev} && echo 'ModelId OK'", f"sed -i 's/^JobId.*/JobId = {job_id}/' {ini_dev} && echo 'JobId OK'", ] for cmd in sed_cmds: yield sse(f"$ {cmd}", "prompt") for ln in _telnet_run(tn, cmd): yield sse(ln, "ok") # Download NEF to device if nef_filename: nef_cmd = (f"mkdir -p {bd}/nef && " f"wget -q {base}/nef/{nef_filename} -O {bd}/nef/{nef_filename} && " f"echo 'NEF OK'") yield sse(f"Downloading NEF: {nef_filename} (may take a moment)...") yield sse(f"$ {nef_cmd}", "prompt") for ln in _telnet_run(tn, nef_cmd, timeout=180): yield sse(ln) # Keep restart scripts in sync with host so Apply behaves like Deploy. for kind, text in _sync_demo_scripts_on_device(tn, base, bd): yield sse(text, kind) # Restart firmware — choose script based on output mode yield sse("Restarting firmware...") _telnet_run(tn, "killall -9 kp_firmware_host_stream 2>/dev/null; " "killall -9 rtsps 2>/dev/null; sleep 1; rm -f /dev/shm/*") model_env = f"MODEL_PATH='{model_path}' MODEL_ID={model_id} JOB_ID={job_id}" if out_hdmi and not out_rtsp: restart_cmd = f"cd {bd} && nohup {model_env} sh ./ini/demo_hdmi.sh > /tmp/fw.log 2>&1 &" elif out_rtsp and out_hdmi: restart_cmd = f"cd {bd} && nohup {model_env} sh ./ini/demo_rtsp_hdmi.sh > /tmp/fw.log 2>&1 &" elif out_rtsp: restart_cmd = f"cd {bd} && nohup {model_env} sh ./ini/demo_rtsp.sh > /tmp/fw.log 2>&1 &" else: restart_cmd = f"cd {bd} && nohup LD_LIBRARY_PATH=/mnt/flash/vienna/lib {FW_PATH_DEVICE} > /tmp/fw.log 2>&1 &" _telnet_run_bg(tn, restart_cmd) ok, lines = _verify_firmware_running(tn) for ln in lines: yield sse(ln, "log" if ok else "warn") tn.close() if ok: yield sse("Firmware restarted with new model.", "ok") else: yield sse("Restart command sent, but firmware is not running. Use Deploy to fully resync binary/scripts.", "error") except Exception as e: yield sse(f"Telnet error: {e}", "error") yield sse_done() return Response(generate(), mimetype="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}) # ── Web UI (served at /) ────────────────────────────────────────────────────── @app.route("/") def index(): return HTML_TEMPLATE # ── HTML Template ───────────────────────────────────────────────────────────── HTML_TEMPLATE = """ KL630 Control
Network Config
HTTP Server Files
  • Loading...
Device wget base URL:
Quick deploy cmd (on device):
RTSP Stream Preview
Stream not started
rtsp://—/live1.sdp
AI Overlay
INI Settings
Model Settings
Actions
Output Log / Terminal
#
""" # ── Entry point ─────────────────────────────────────────────────────────────── def _ensure_docker_image_bg(): """Background thread: check Docker + build image on startup so first compile is fast.""" cfg = load_config() docker_image = cfg.get("docker_image", "kl630-dev") if shutil.which("docker") is None: print("WARNING: docker not found in PATH — compile will fail.") return r = subprocess.run(["docker", "image", "inspect", docker_image], capture_output=True) if r.returncode == 0: print(f"[Docker] Image '{docker_image}' ready.") return dockerfile = SCRIPT_DIR / "Dockerfile" if not dockerfile.exists(): print(f"WARNING: Dockerfile not found, cannot pre-build image.") return print(f"[Docker] Image '{docker_image}' not found — building now (background)...") r2 = subprocess.run(["docker", "build", "-t", docker_image, str(SCRIPT_DIR)]) if r2.returncode == 0: print(f"[Docker] Image '{docker_image}' built successfully.") else: print(f"[Docker] Image build FAILED (exit {r2.returncode}).") if __name__ == "__main__": # by mars import argparse parser = argparse.ArgumentParser(description="KL630 Web Control Panel") parser.add_argument("--port", type=int, default=8080, help="Web UI + file server port") parser.add_argument("--host", default="0.0.0.0") args = parser.parse_args() if not HAS_CV2: print("WARNING: opencv-python not installed — RTSP stream preview disabled.") print(" Run: pip install -r requirements.txt") prepare_build_dir() # Pre-check Docker image in background so first compile doesn't wait threading.Thread(target=_ensure_docker_image_bg, daemon=True).start() print(f"\nKL630 Control Panel -> http://localhost:{args.port}/\n") app.run(host=args.host, port=args.port, debug=False, threaded=True)