package handlers import ( "fmt" "io" "net/url" "os" "path/filepath" "strconv" "strings" "time" "visiona-local/server/internal/api/ws" "visiona-local/server/internal/camera" "visiona-local/server/internal/device" "visiona-local/server/internal/driver" "visiona-local/server/internal/inference" "github.com/gin-gonic/gin" ) type CameraHandler struct { cameraMgr *camera.Manager deviceMgr *device.Manager inferenceSvc *inference.Service wsHub *ws.Hub streamer *camera.MJPEGStreamer pipeline *camera.InferencePipeline activeSource camera.FrameSource sourceType camera.SourceType // Video seek state — preserved across seek operations videoPath string // original file path or resolved URL videoIsURL bool // true if source is a URL videoFPS float64 // target FPS videoInfo camera.VideoInfo // duration, total frames activeDeviceID string // device ID for current video session } func NewCameraHandler( cameraMgr *camera.Manager, deviceMgr *device.Manager, inferenceSvc *inference.Service, wsHub *ws.Hub, ) *CameraHandler { streamer := camera.NewMJPEGStreamer() go streamer.Run() return &CameraHandler{ cameraMgr: cameraMgr, deviceMgr: deviceMgr, inferenceSvc: inferenceSvc, wsHub: wsHub, streamer: streamer, } } func (h *CameraHandler) ListCameras(c *gin.Context) { cameras := h.cameraMgr.ListCameras() c.JSON(200, gin.H{"success": true, "data": gin.H{"cameras": cameras}}) } func (h *CameraHandler) StartPipeline(c *gin.Context) { var req struct { CameraID string `json:"cameraId"` DeviceID string `json:"deviceId"` Width int `json:"width"` Height int `json:"height"` } if err := c.ShouldBindJSON(&req); err != nil { c.JSON(400, gin.H{"success": false, "error": gin.H{"code": "BAD_REQUEST", "message": err.Error()}}) return } if req.Width == 0 { req.Width = 640 } if req.Height == 0 { req.Height = 480 } // Clean up any existing pipeline h.stopActivePipeline() // Open camera if err := h.cameraMgr.Open(0, req.Width, req.Height); err != nil { c.JSON(500, gin.H{"success": false, "error": gin.H{"code": "CAMERA_OPEN_FAILED", "message": err.Error()}}) return } // Get device driver session, err := h.deviceMgr.GetDevice(req.DeviceID) if err != nil { c.JSON(404, gin.H{"success": false, "error": gin.H{"code": "DEVICE_NOT_FOUND", "message": err.Error()}}) return } // Create inference result channel resultCh := make(chan *driver.InferenceResult, 10) // Forward results to WebSocket, enriching with device ID go func() { room := "inference:" + req.DeviceID for result := range resultCh { result.DeviceID = req.DeviceID h.wsHub.BroadcastToRoom(room, result) } }() // Start pipeline with camera as source h.activeSource = h.cameraMgr h.sourceType = camera.SourceCamera h.pipeline = camera.NewInferencePipeline( h.cameraMgr, camera.SourceCamera, session.Driver, h.streamer.FrameChannel(), resultCh, ) h.pipeline.Start() streamURL := "/api/camera/stream" c.JSON(200, gin.H{ "success": true, "data": gin.H{ "streamUrl": streamURL, "sourceType": "camera", }, }) } func (h *CameraHandler) StopPipeline(c *gin.Context) { h.stopActivePipeline() c.JSON(200, gin.H{"success": true}) } func (h *CameraHandler) StreamMJPEG(c *gin.Context) { h.streamer.ServeHTTP(c.Writer, c.Request) } // UploadImage handles image file upload for single-shot inference. func (h *CameraHandler) UploadImage(c *gin.Context) { h.stopActivePipeline() deviceID := c.PostForm("deviceId") if deviceID == "" { c.JSON(400, gin.H{"success": false, "error": gin.H{"code": "BAD_REQUEST", "message": "deviceId is required"}}) return } file, header, err := c.Request.FormFile("file") if err != nil { c.JSON(400, gin.H{"success": false, "error": gin.H{"code": "BAD_REQUEST", "message": "file is required"}}) return } defer file.Close() ext := strings.ToLower(filepath.Ext(header.Filename)) if ext != ".jpg" && ext != ".jpeg" && ext != ".png" { c.JSON(400, gin.H{"success": false, "error": gin.H{"code": "BAD_REQUEST", "message": "only JPG/PNG files are supported"}}) return } // Save to temp file tmpFile, err := os.CreateTemp("", "edge-ai-image-*"+ext) if err != nil { c.JSON(500, gin.H{"success": false, "error": gin.H{"code": "STORAGE_ERROR", "message": err.Error()}}) return } if _, err := io.Copy(tmpFile, file); err != nil { tmpFile.Close() os.Remove(tmpFile.Name()) c.JSON(500, gin.H{"success": false, "error": gin.H{"code": "STORAGE_ERROR", "message": err.Error()}}) return } tmpFile.Close() // Create ImageSource imgSource, err := camera.NewImageSource(tmpFile.Name()) if err != nil { os.Remove(tmpFile.Name()) c.JSON(500, gin.H{"success": false, "error": gin.H{"code": "IMAGE_DECODE_FAILED", "message": err.Error()}}) return } // Get device driver session, err := h.deviceMgr.GetDevice(deviceID) if err != nil { imgSource.Close() c.JSON(404, gin.H{"success": false, "error": gin.H{"code": "DEVICE_NOT_FOUND", "message": err.Error()}}) return } resultCh := make(chan *driver.InferenceResult, 10) go func() { room := "inference:" + deviceID for result := range resultCh { result.DeviceID = deviceID h.wsHub.BroadcastToRoom(room, result) } }() h.activeSource = imgSource h.sourceType = camera.SourceImage h.pipeline = camera.NewInferencePipeline( imgSource, camera.SourceImage, session.Driver, h.streamer.FrameChannel(), resultCh, ) h.pipeline.Start() // Clean up result channel after pipeline completes go func() { <-h.pipeline.Done() close(resultCh) }() w, ht := imgSource.Dimensions() streamURL := "/api/camera/stream" c.JSON(200, gin.H{ "success": true, "data": gin.H{ "streamUrl": streamURL, "sourceType": "image", "width": w, "height": ht, "filename": header.Filename, }, }) } // UploadVideo handles video file upload for frame-by-frame inference. func (h *CameraHandler) UploadVideo(c *gin.Context) { h.stopActivePipeline() deviceID := c.PostForm("deviceId") if deviceID == "" { c.JSON(400, gin.H{"success": false, "error": gin.H{"code": "BAD_REQUEST", "message": "deviceId is required"}}) return } file, header, err := c.Request.FormFile("file") if err != nil { c.JSON(400, gin.H{"success": false, "error": gin.H{"code": "BAD_REQUEST", "message": "file is required"}}) return } defer file.Close() ext := strings.ToLower(filepath.Ext(header.Filename)) if ext != ".mp4" && ext != ".avi" && ext != ".mov" { c.JSON(400, gin.H{"success": false, "error": gin.H{"code": "BAD_REQUEST", "message": "only MP4/AVI/MOV files are supported"}}) return } // Save to temp file tmpFile, err := os.CreateTemp("", "edge-ai-video-*"+ext) if err != nil { c.JSON(500, gin.H{"success": false, "error": gin.H{"code": "STORAGE_ERROR", "message": err.Error()}}) return } if _, err := io.Copy(tmpFile, file); err != nil { tmpFile.Close() os.Remove(tmpFile.Name()) c.JSON(500, gin.H{"success": false, "error": gin.H{"code": "STORAGE_ERROR", "message": err.Error()}}) return } tmpFile.Close() // Probe video info (duration, frame count) before starting pipeline videoInfo := camera.ProbeVideoInfo(tmpFile.Name(), 15) // Create VideoSource videoSource, err := camera.NewVideoSource(tmpFile.Name(), 15) if err != nil { os.Remove(tmpFile.Name()) c.JSON(500, gin.H{"success": false, "error": gin.H{"code": "VIDEO_DECODE_FAILED", "message": err.Error()}}) return } if videoInfo.TotalFrames > 0 { videoSource.SetTotalFrames(videoInfo.TotalFrames) } // Get device driver session, err := h.deviceMgr.GetDevice(deviceID) if err != nil { videoSource.Close() c.JSON(404, gin.H{"success": false, "error": gin.H{"code": "DEVICE_NOT_FOUND", "message": err.Error()}}) return } resultCh := make(chan *driver.InferenceResult, 10) go func() { room := "inference:" + deviceID for result := range resultCh { result.DeviceID = deviceID h.wsHub.BroadcastToRoom(room, result) } }() h.activeSource = videoSource h.sourceType = camera.SourceVideo h.videoPath = tmpFile.Name() h.videoIsURL = false h.videoFPS = 15 h.videoInfo = videoInfo h.activeDeviceID = deviceID h.pipeline = camera.NewInferencePipeline( videoSource, camera.SourceVideo, session.Driver, h.streamer.FrameChannel(), resultCh, ) h.pipeline.Start() // Notify frontend when video playback completes go func() { <-h.pipeline.Done() close(resultCh) h.wsHub.BroadcastToRoom("inference:"+deviceID, map[string]interface{}{ "type": "pipeline_complete", "sourceType": "video", }) }() streamURL := "/api/camera/stream" c.JSON(200, gin.H{ "success": true, "data": gin.H{ "streamUrl": streamURL, "sourceType": "video", "filename": header.Filename, "totalFrames": videoInfo.TotalFrames, "durationSeconds": videoInfo.DurationSec, }, }) } // ytdlpHosts lists hostnames where yt-dlp should be used to resolve the actual // video stream URL before passing to ffmpeg. var ytdlpHosts = map[string]bool{ "youtube.com": true, "www.youtube.com": true, "youtu.be": true, "m.youtube.com": true, "vimeo.com": true, "www.vimeo.com": true, "dailymotion.com": true, "www.dailymotion.com": true, "twitch.tv": true, "www.twitch.tv": true, "bilibili.com": true, "www.bilibili.com": true, "tiktok.com": true, "www.tiktok.com": true, "facebook.com": true, "www.facebook.com": true, "fb.watch": true, "instagram.com": true, "www.instagram.com": true, "twitter.com": true, "x.com": true, } type urlKind int const ( urlDirect urlKind = iota // direct video file or RTSP, pass to ffmpeg directly urlYTDLP // needs yt-dlp to resolve first urlBad // invalid or unsupported ) // classifyVideoURL determines how to handle the given URL. func classifyVideoURL(rawURL string) (urlKind, string) { parsed, err := url.Parse(rawURL) if err != nil { return urlBad, "Invalid URL format" } scheme := strings.ToLower(parsed.Scheme) host := strings.ToLower(parsed.Hostname()) // RTSP streams — direct to ffmpeg if scheme == "rtsp" || scheme == "rtsps" { return urlDirect, "" } // Must be http or https if scheme != "http" && scheme != "https" { return urlBad, "Unsupported protocol: " + scheme + ". Use http, https, or rtsp." } // Known video platforms — use yt-dlp if ytdlpHosts[host] { return urlYTDLP, "" } // Everything else — pass directly to ffmpeg return urlDirect, "" } // StartFromURL handles video/stream inference from a URL (HTTP, HTTPS, RTSP). func (h *CameraHandler) StartFromURL(c *gin.Context) { var req struct { URL string `json:"url"` DeviceID string `json:"deviceId"` } if err := c.ShouldBindJSON(&req); err != nil { c.JSON(400, gin.H{"success": false, "error": gin.H{"code": "BAD_REQUEST", "message": err.Error()}}) return } if req.URL == "" { c.JSON(400, gin.H{"success": false, "error": gin.H{"code": "BAD_REQUEST", "message": "url is required"}}) return } if req.DeviceID == "" { c.JSON(400, gin.H{"success": false, "error": gin.H{"code": "BAD_REQUEST", "message": "deviceId is required"}}) return } // Classify the URL kind, reason := classifyVideoURL(req.URL) if kind == urlBad { c.JSON(400, gin.H{"success": false, "error": gin.H{"code": "UNSUPPORTED_URL", "message": reason}}) return } // For video platforms (YouTube, etc.), resolve actual stream URL via yt-dlp videoURL := req.URL if kind == urlYTDLP { resolved, err := camera.ResolveWithYTDLP(req.URL) if err != nil { c.JSON(500, gin.H{"success": false, "error": gin.H{"code": "URL_RESOLVE_FAILED", "message": "無法解析影片連結: " + err.Error()}}) return } videoURL = resolved } h.stopActivePipeline() // Probe video info (duration, frame count) - may be slow for remote URLs videoInfo := camera.ProbeVideoInfo(videoURL, 15) // Create VideoSource from URL (ffmpeg supports HTTP/HTTPS/RTSP natively) videoSource, err := camera.NewVideoSourceFromURL(videoURL, 15) if err != nil { c.JSON(500, gin.H{"success": false, "error": gin.H{"code": "URL_OPEN_FAILED", "message": err.Error()}}) return } if videoInfo.TotalFrames > 0 { videoSource.SetTotalFrames(videoInfo.TotalFrames) } // Get device driver session, err := h.deviceMgr.GetDevice(req.DeviceID) if err != nil { videoSource.Close() c.JSON(404, gin.H{"success": false, "error": gin.H{"code": "DEVICE_NOT_FOUND", "message": err.Error()}}) return } resultCh := make(chan *driver.InferenceResult, 10) go func() { room := "inference:" + req.DeviceID for result := range resultCh { h.wsHub.BroadcastToRoom(room, result) } }() h.activeSource = videoSource h.sourceType = camera.SourceVideo h.videoPath = videoURL h.videoIsURL = true h.videoFPS = 15 h.videoInfo = videoInfo h.activeDeviceID = req.DeviceID h.pipeline = camera.NewInferencePipeline( videoSource, camera.SourceVideo, session.Driver, h.streamer.FrameChannel(), resultCh, ) h.pipeline.Start() go func() { <-h.pipeline.Done() close(resultCh) h.wsHub.BroadcastToRoom("inference:"+req.DeviceID, map[string]interface{}{ "type": "pipeline_complete", "sourceType": "video", }) }() streamURL := "/api/camera/stream" c.JSON(200, gin.H{ "success": true, "data": gin.H{ "streamUrl": streamURL, "sourceType": "video", "filename": req.URL, "totalFrames": videoInfo.TotalFrames, "durationSeconds": videoInfo.DurationSec, }, }) } // UploadBatchImages handles multiple image files for sequential batch inference. func (h *CameraHandler) UploadBatchImages(c *gin.Context) { h.stopActivePipeline() deviceID := c.PostForm("deviceId") if deviceID == "" { c.JSON(400, gin.H{"success": false, "error": gin.H{"code": "BAD_REQUEST", "message": "deviceId is required"}}) return } form, err := c.MultipartForm() if err != nil { c.JSON(400, gin.H{"success": false, "error": gin.H{"code": "BAD_REQUEST", "message": "multipart form required"}}) return } files := form.File["files"] if len(files) == 0 { c.JSON(400, gin.H{"success": false, "error": gin.H{"code": "BAD_REQUEST", "message": "at least one file is required"}}) return } if len(files) > 50 { c.JSON(400, gin.H{"success": false, "error": gin.H{"code": "BAD_REQUEST", "message": "maximum 50 images per batch"}}) return } // Save all files to temp filePaths := make([]string, 0, len(files)) filenames := make([]string, 0, len(files)) for _, fh := range files { ext := strings.ToLower(filepath.Ext(fh.Filename)) if ext != ".jpg" && ext != ".jpeg" && ext != ".png" { for _, fp := range filePaths { os.Remove(fp) } c.JSON(400, gin.H{"success": false, "error": gin.H{ "code": "BAD_REQUEST", "message": fmt.Sprintf("unsupported file: %s (only JPG/PNG)", fh.Filename), }}) return } f, openErr := fh.Open() if openErr != nil { for _, fp := range filePaths { os.Remove(fp) } c.JSON(500, gin.H{"success": false, "error": gin.H{"code": "STORAGE_ERROR", "message": openErr.Error()}}) return } tmpFile, tmpErr := os.CreateTemp("", "edge-ai-batch-*"+ext) if tmpErr != nil { f.Close() for _, fp := range filePaths { os.Remove(fp) } c.JSON(500, gin.H{"success": false, "error": gin.H{"code": "STORAGE_ERROR", "message": tmpErr.Error()}}) return } io.Copy(tmpFile, f) tmpFile.Close() f.Close() filePaths = append(filePaths, tmpFile.Name()) filenames = append(filenames, fh.Filename) } // Create MultiImageSource batchSource, err := camera.NewMultiImageSource(filePaths, filenames) if err != nil { for _, fp := range filePaths { os.Remove(fp) } c.JSON(500, gin.H{"success": false, "error": gin.H{"code": "IMAGE_DECODE_FAILED", "message": err.Error()}}) return } // Get device driver session, err := h.deviceMgr.GetDevice(deviceID) if err != nil { batchSource.Close() c.JSON(404, gin.H{"success": false, "error": gin.H{"code": "DEVICE_NOT_FOUND", "message": err.Error()}}) return } batchID := fmt.Sprintf("batch-%d", time.Now().UnixNano()) resultCh := make(chan *driver.InferenceResult, 10) go func() { room := "inference:" + deviceID for result := range resultCh { result.DeviceID = deviceID h.wsHub.BroadcastToRoom(room, result) } }() h.activeSource = batchSource h.sourceType = camera.SourceBatchImage h.pipeline = camera.NewInferencePipeline( batchSource, camera.SourceBatchImage, session.Driver, h.streamer.FrameChannel(), resultCh, ) h.pipeline.Start() // Notify frontend when batch completes go func() { <-h.pipeline.Done() close(resultCh) h.wsHub.BroadcastToRoom("inference:"+deviceID, map[string]interface{}{ "type": "pipeline_complete", "sourceType": "batch_image", "batchId": batchID, }) }() // Build image list for response imageList := make([]gin.H, len(batchSource.Images())) for i, entry := range batchSource.Images() { imageList[i] = gin.H{ "index": i, "filename": entry.Filename, "width": entry.Width, "height": entry.Height, } } streamURL := "/api/camera/stream" c.JSON(200, gin.H{ "success": true, "data": gin.H{ "streamUrl": streamURL, "sourceType": "batch_image", "batchId": batchID, "totalImages": len(files), "images": imageList, }, }) } // GetBatchImageFrame serves a specific image from the active batch by index. func (h *CameraHandler) GetBatchImageFrame(c *gin.Context) { if h.sourceType != camera.SourceBatchImage || h.activeSource == nil { c.JSON(404, gin.H{"success": false, "error": gin.H{"code": "NO_BATCH", "message": "no batch image source active"}}) return } indexStr := c.Param("index") index, err := strconv.Atoi(indexStr) if err != nil || index < 0 { c.JSON(400, gin.H{"success": false, "error": gin.H{"code": "BAD_REQUEST", "message": "invalid index"}}) return } mis, ok := h.activeSource.(*camera.MultiImageSource) if !ok { c.JSON(500, gin.H{"success": false, "error": gin.H{"code": "INTERNAL_ERROR", "message": "source type mismatch"}}) return } jpegData, err := mis.GetImageByIndex(index) if err != nil { c.JSON(404, gin.H{"success": false, "error": gin.H{"code": "NOT_FOUND", "message": err.Error()}}) return } c.Data(200, "image/jpeg", jpegData) } // stopPipelineForSeek stops the pipeline and ffmpeg process but keeps the video file. func (h *CameraHandler) stopPipelineForSeek() { if h.pipeline != nil { h.pipeline.Stop() h.pipeline = nil } if h.activeSource != nil { if vs, ok := h.activeSource.(*camera.VideoSource); ok { vs.CloseWithoutRemove() } } h.activeSource = nil } // stopActivePipeline stops the current pipeline and cleans up resources. func (h *CameraHandler) stopActivePipeline() { if h.pipeline != nil { h.pipeline.Stop() h.pipeline = nil } // Only close non-camera sources (camera is managed by cameraMgr) if h.activeSource != nil && h.sourceType != camera.SourceCamera { h.activeSource.Close() } if h.sourceType == camera.SourceCamera { h.cameraMgr.Close() } h.activeSource = nil h.sourceType = "" h.videoPath = "" h.videoIsURL = false h.activeDeviceID = "" } // SeekVideo seeks to a specific position in the current video and restarts inference. func (h *CameraHandler) SeekVideo(c *gin.Context) { var req struct { TimeSeconds float64 `json:"timeSeconds"` } if err := c.ShouldBindJSON(&req); err != nil { c.JSON(400, gin.H{"success": false, "error": gin.H{"code": "BAD_REQUEST", "message": err.Error()}}) return } if h.videoPath == "" || h.sourceType != camera.SourceVideo { c.JSON(400, gin.H{"success": false, "error": gin.H{"code": "NO_VIDEO", "message": "no video is currently playing"}}) return } // Clamp seek time if req.TimeSeconds < 0 { req.TimeSeconds = 0 } if h.videoInfo.DurationSec > 0 && req.TimeSeconds > h.videoInfo.DurationSec { req.TimeSeconds = h.videoInfo.DurationSec } // Stop current pipeline without deleting the video file h.stopPipelineForSeek() // Create new VideoSource with seek position var videoSource *camera.VideoSource var err error if h.videoIsURL { videoSource, err = camera.NewVideoSourceFromURLWithSeek(h.videoPath, h.videoFPS, req.TimeSeconds) } else { videoSource, err = camera.NewVideoSourceWithSeek(h.videoPath, h.videoFPS, req.TimeSeconds) } if err != nil { c.JSON(500, gin.H{"success": false, "error": gin.H{"code": "SEEK_FAILED", "message": err.Error()}}) return } if h.videoInfo.TotalFrames > 0 { videoSource.SetTotalFrames(h.videoInfo.TotalFrames) } // Get device driver session, err := h.deviceMgr.GetDevice(h.activeDeviceID) if err != nil { videoSource.Close() c.JSON(404, gin.H{"success": false, "error": gin.H{"code": "DEVICE_NOT_FOUND", "message": err.Error()}}) return } // Calculate frame offset from seek position frameOffset := int(req.TimeSeconds * h.videoFPS) resultCh := make(chan *driver.InferenceResult, 10) go func() { room := "inference:" + h.activeDeviceID for result := range resultCh { result.DeviceID = h.activeDeviceID h.wsHub.BroadcastToRoom(room, result) } }() h.activeSource = videoSource h.pipeline = camera.NewInferencePipelineWithOffset( videoSource, camera.SourceVideo, session.Driver, h.streamer.FrameChannel(), resultCh, frameOffset, ) h.pipeline.Start() go func() { <-h.pipeline.Done() close(resultCh) h.wsHub.BroadcastToRoom("inference:"+h.activeDeviceID, map[string]interface{}{ "type": "pipeline_complete", "sourceType": "video", }) }() c.JSON(200, gin.H{ "success": true, "data": gin.H{ "seekTo": req.TimeSeconds, "frameOffset": frameOffset, }, }) }