303 lines
11 KiB
Python
303 lines
11 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
build_and_serve.py — KL630 一鍵編譯 + 檢查 + HTTP server
|
||
|
||
使用方式:
|
||
python build_and_serve.py # 編譯 + 檢查 + 啟動 server
|
||
python build_and_serve.py --no-build # 跳過編譯,只啟動 server
|
||
python build_and_serve.py --port 9090
|
||
|
||
裝置端:
|
||
wget http://192.168.3.1:<port>/deploy.sh -O /tmp/deploy.sh && sh /tmp/deploy.sh
|
||
"""
|
||
|
||
import sys
|
||
import os
|
||
import struct
|
||
import shutil
|
||
import argparse
|
||
import subprocess
|
||
from pathlib import Path
|
||
from http.server import HTTPServer, SimpleHTTPRequestHandler
|
||
from threading import Thread
|
||
|
||
# ── ANSI colors ─────────────────────────────────────────────────────────────
|
||
def _c(code, text): return f"\033[{code}m{text}\033[0m" if sys.stdout.isatty() else text
|
||
def ok(t): return _c("32;1", t)
|
||
def fail(t): return _c("31;1", t)
|
||
def info(t): return _c("36", t)
|
||
def warn(t): return _c("33", t)
|
||
def head(t): return _c("35;1", t)
|
||
|
||
# ── Paths ────────────────────────────────────────────────────────────────────
|
||
SCRIPT_DIR = Path(__file__).resolve().parent
|
||
BUILD_DIR = SCRIPT_DIR / "build"
|
||
BINARY_NAME = "kp_firmware_host_stream"
|
||
BINARY_PATH = BUILD_DIR / BINARY_NAME
|
||
INI_SRC = SCRIPT_DIR / "ini" / "host_stream.ini"
|
||
DEPLOY_SRC = SCRIPT_DIR / "tools" / "device" / "deploy.sh"
|
||
DEMO_RTSP_SRC = SCRIPT_DIR / "tools" / "device" / "demo_rtsp.sh"
|
||
|
||
# Docker image name — change if yours is different
|
||
DOCKER_IMAGE = "kl630-dev"
|
||
|
||
# ── Step 0: Ensure Docker image exists ───────────────────────────────────────
|
||
def step_ensure_image() -> bool:
|
||
"""Build the Docker image from Dockerfile if it doesn't exist yet."""
|
||
print(head("\n=== [0/3] Docker Image Check ==="))
|
||
|
||
if shutil.which("docker") is None:
|
||
print(fail(" ERROR: 'docker' not found in PATH"))
|
||
return False
|
||
|
||
# Check if image already exists
|
||
r = subprocess.run(
|
||
["docker", "image", "inspect", DOCKER_IMAGE],
|
||
capture_output=True
|
||
)
|
||
if r.returncode == 0:
|
||
print(ok(f" Image '{DOCKER_IMAGE}' already exists, skipping build."))
|
||
return True
|
||
|
||
# Image not found — build it from Dockerfile
|
||
dockerfile = SCRIPT_DIR / "Dockerfile"
|
||
if not dockerfile.exists():
|
||
print(fail(f" ERROR: Dockerfile not found at {dockerfile}"))
|
||
return False
|
||
|
||
print(warn(f" Image '{DOCKER_IMAGE}' not found. Building from Dockerfile..."))
|
||
print(info(f" docker build -t {DOCKER_IMAGE} {SCRIPT_DIR}"))
|
||
print()
|
||
|
||
result = subprocess.run(
|
||
["docker", "build", "-t", DOCKER_IMAGE, str(SCRIPT_DIR)]
|
||
)
|
||
if result.returncode != 0:
|
||
print(fail(f"\n IMAGE BUILD FAILED (exit {result.returncode})"))
|
||
return False
|
||
|
||
print(ok(f"\n Image '{DOCKER_IMAGE}' built successfully."))
|
||
return True
|
||
|
||
# ── Step 1: Docker build ──────────────────────────────────────────────────────
|
||
def step_build() -> bool:
|
||
print(head("\n=== [1/3] Docker Build ==="))
|
||
|
||
# Check docker available
|
||
if shutil.which("docker") is None:
|
||
print(fail(" ERROR: 'docker' not found in PATH"))
|
||
return False
|
||
|
||
# Mount the full project directory — SDK/lib/include/src all come from here.
|
||
mount = str(SCRIPT_DIR).replace("\\", "/")
|
||
cmd = [
|
||
"docker", "run", "--rm",
|
||
"-v", f"{mount}:/workspace/kl630_build",
|
||
DOCKER_IMAGE,
|
||
"bash", "/workspace/kl630_build/compile.sh"
|
||
]
|
||
print(info(f" Image : {DOCKER_IMAGE}"))
|
||
print(info(f" Mount : {mount} -> /workspace/kl630_build"))
|
||
print()
|
||
|
||
result = subprocess.run(cmd)
|
||
if result.returncode != 0:
|
||
print(fail(f"\n BUILD FAILED (exit {result.returncode})"))
|
||
return False
|
||
|
||
print(ok("\n Build succeeded."))
|
||
return True
|
||
|
||
# ── Step 2: Check binary ──────────────────────────────────────────────────────
|
||
def _check_elf_arm(path: Path):
|
||
"""Return (is_arm_elf, description)."""
|
||
try:
|
||
with open(path, "rb") as f:
|
||
hdr = f.read(20)
|
||
if len(hdr) < 20:
|
||
return False, "file too small"
|
||
if hdr[:4] != b"\x7fELF":
|
||
return False, "not an ELF file"
|
||
e_machine = struct.unpack_from("<H", hdr, 18)[0]
|
||
arch_map = {0x28: "ARM (armv7)", 0xb7: "AArch64", 0x3e: "x86_64"}
|
||
arch = arch_map.get(e_machine, f"unknown machine 0x{e_machine:04x}")
|
||
return e_machine == 0x28, arch
|
||
except Exception as e:
|
||
return False, str(e)
|
||
|
||
def step_check() -> bool:
|
||
print(head("\n=== [2/3] Check Binary ==="))
|
||
|
||
if not BINARY_PATH.exists():
|
||
print(fail(f" FAIL: {BINARY_PATH} not found"))
|
||
return False
|
||
|
||
size = BINARY_PATH.stat().st_size
|
||
is_arm, arch = _check_elf_arm(BINARY_PATH)
|
||
|
||
size_ok = size > 100_000
|
||
arch_ok = is_arm
|
||
|
||
print(f" File : {BINARY_PATH}")
|
||
print(f" Size : {size/1024:.1f} KB {'✓' if size_ok else '✗'}")
|
||
print(f" Arch : {arch} {'✓' if arch_ok else '✗'}")
|
||
|
||
# Try readelf if available (in Docker or WSL)
|
||
if shutil.which("readelf"):
|
||
r = subprocess.run(
|
||
["readelf", "-d", str(BINARY_PATH)],
|
||
capture_output=True, text=True
|
||
)
|
||
needed = [l.split("(NEEDED)")[1].strip() for l in r.stdout.splitlines() if "(NEEDED)" in l]
|
||
if needed:
|
||
print(f" NEEDED: {', '.join(needed)}")
|
||
if "libc.so.0" in needed:
|
||
print(ok(" uClibc patch: OK (libc.so.0 present)"))
|
||
elif "libc.so.6" in needed:
|
||
print(fail(" uClibc patch: MISSING (libc.so.6 still present)"))
|
||
|
||
if not (size_ok and arch_ok):
|
||
print(fail("\n CHECK FAILED"))
|
||
return False
|
||
|
||
print(ok("\n Binary looks good."))
|
||
return True
|
||
# ── Step 3: Copy Binary To Network Share ──────────────────────
|
||
def step_copy_to_network(dst_dir: str) -> bool:
|
||
print(head("\n=== [copy] Copy Binary To Network Share ==="))
|
||
|
||
if not BINARY_PATH.exists():
|
||
print(fail(f" FAIL: source binary not found: {BINARY_PATH}"))
|
||
return False
|
||
|
||
try:
|
||
dst_path = Path(dst_dir)
|
||
dst_path.mkdir(parents=True, exist_ok=True)
|
||
|
||
dst_file = dst_path / BINARY_NAME
|
||
shutil.copy2(BINARY_PATH, dst_file)
|
||
|
||
print(ok(f" Copied: {BINARY_PATH}"))
|
||
print(ok(f" -> {dst_file}"))
|
||
return True
|
||
except Exception as e:
|
||
print(fail(f" COPY FAILED: {e}"))
|
||
return False
|
||
# ── Step 4: Prepare serve directory & start HTTP server ──────────────────────
|
||
def step_serve(port: int):
|
||
print(head("\n=== [3/3] Prepare & Serve ==="))
|
||
|
||
BUILD_DIR.mkdir(exist_ok=True)
|
||
|
||
# Copy INI
|
||
if INI_SRC.exists():
|
||
dst = BUILD_DIR / "host_stream.ini"
|
||
shutil.copy2(INI_SRC, dst)
|
||
print(ok(f" Copied : {INI_SRC.name} ({dst.stat().st_size} bytes)"))
|
||
else:
|
||
print(warn(f" WARN: {INI_SRC} not found, skipping"))
|
||
|
||
# Copy deploy.sh
|
||
if DEPLOY_SRC.exists():
|
||
dst = BUILD_DIR / "deploy.sh"
|
||
shutil.copy2(DEPLOY_SRC, dst)
|
||
print(ok(f" Copied : deploy.sh"))
|
||
else:
|
||
print(warn(f" WARN: {DEPLOY_SRC} not found, skipping"))
|
||
|
||
# Copy demo_rtsp.sh
|
||
if DEMO_RTSP_SRC.exists():
|
||
dst = BUILD_DIR / "demo_rtsp.sh"
|
||
shutil.copy2(DEMO_RTSP_SRC, dst)
|
||
print(ok(f" Copied : demo_rtsp.sh"))
|
||
else:
|
||
print(warn(f" WARN: {DEMO_RTSP_SRC} not found, skipping"))
|
||
|
||
# List what will be served
|
||
print(f"\n Serving from: {BUILD_DIR}")
|
||
for f in sorted(BUILD_DIR.iterdir()):
|
||
size_str = f"{f.stat().st_size/1024:.1f} KB"
|
||
print(f" {f.name:<40} {size_str:>10}")
|
||
|
||
# Start server
|
||
os.chdir(BUILD_DIR)
|
||
|
||
class _Handler(SimpleHTTPRequestHandler):
|
||
def log_message(self, fmt, *args):
|
||
# Show only non-304 responses
|
||
code = args[1] if len(args) > 1 else ""
|
||
if code != "304":
|
||
print(info(f" [{self.address_string()}] {fmt % args}"))
|
||
|
||
httpd = HTTPServer(("", port), _Handler)
|
||
|
||
print(head(f"\n=== HTTP Server on port {port} ==="))
|
||
print(f" URL : http://0.0.0.0:{port}/")
|
||
print()
|
||
print(warn(" 裝置端執行:"))
|
||
print(f" wget http://192.168.3.1:{port}/deploy.sh -O /tmp/deploy.sh && sh /tmp/deploy.sh")
|
||
print()
|
||
print(warn(" 或分步驟:"))
|
||
print(f" wget http://192.168.3.1:{port}/{BINARY_NAME} -O /mnt/flash/vienna/{BINARY_NAME}")
|
||
print(f" wget http://192.168.3.1:{port}/host_stream.ini -O $BIN_DIR/ini/host_stream.ini")
|
||
print()
|
||
print(" Ctrl+C 停止\n")
|
||
|
||
try:
|
||
httpd.serve_forever()
|
||
except KeyboardInterrupt:
|
||
print(info("\n Server stopped."))
|
||
|
||
# ── Main ─────────────────────────────────────────────────────────────────────
|
||
def main():
|
||
global DOCKER_IMAGE
|
||
parser = argparse.ArgumentParser(description="KL630 build + check + serve")
|
||
parser.add_argument("--no-build", action="store_true",
|
||
help="跳過 Docker build,只 check + serve")
|
||
parser.add_argument("--no-check", action="store_true",
|
||
help="跳過 binary 檢查")
|
||
parser.add_argument("--copy-dst",default=r"\\192.168.0.122\public\nfs_share",
|
||
help="編譯成功後複製 binary 到指定網路資料夾")
|
||
parser.add_argument("--no-copy",action="store_true",
|
||
help="不要複製 binary 到網路資料夾")
|
||
parser.add_argument("--no-serve", action="store_true",
|
||
help="只 build + check,不啟動 HTTP server")
|
||
parser.add_argument("--port", type=int, default=8080,
|
||
help="HTTP server port (default: 8080)")
|
||
parser.add_argument("--image", default=DOCKER_IMAGE,
|
||
help=f"Docker image name (default: {DOCKER_IMAGE})")
|
||
args = parser.parse_args()
|
||
|
||
|
||
DOCKER_IMAGE = args.image
|
||
|
||
print(head("KL630 Build & Serve"))
|
||
print(f" Root : {SCRIPT_DIR}")
|
||
print(f" Binary : {BINARY_PATH}")
|
||
print(f" Port : {args.port}")
|
||
print(f" Copy : {args.copy_dst}")
|
||
|
||
# ===== Step 1: Build =====
|
||
if not args.no_build:
|
||
if not step_ensure_image():
|
||
sys.exit(1)
|
||
if not step_build():
|
||
sys.exit(1)
|
||
# ===== Step 2: Check =====
|
||
if not args.no_check:
|
||
if not step_check():
|
||
if args.no_build:
|
||
print(warn(" (使用 --no-build 時 binary 可能是舊的)"))
|
||
else:
|
||
sys.exit(1)
|
||
# ===== Step 3: Copy(重點)=====
|
||
if not args.no_copy:
|
||
if not step_copy_to_network(args.copy_dst):
|
||
sys.exit(1)
|
||
# ===== Step 4: Serve =====
|
||
if not args.no_serve:
|
||
step_serve(args.port)
|
||
|
||
if __name__ == "__main__":
|
||
main()
|