package camera import ( "bufio" "fmt" "io" "os" "os/exec" "strconv" "strings" "sync" "sync/atomic" ) // VideoInfo holds metadata extracted by ffprobe before pipeline starts. type VideoInfo struct { DurationSec float64 // total duration in seconds TotalFrames int // estimated total frames at target FPS } // ProbeVideoInfo runs ffprobe to extract duration from a video file or URL. // Returns zero values (no error) when duration is indeterminate (e.g. live streams). func ProbeVideoInfo(input string, fps float64) VideoInfo { cmd := exec.Command("ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "format=duration", "-of", "csv=p=0", input, ) out, err := cmd.Output() if err != nil { return VideoInfo{} } durStr := strings.TrimSpace(string(out)) if durStr == "" || durStr == "N/A" { return VideoInfo{} } dur, err := strconv.ParseFloat(durStr, 64) if err != nil { return VideoInfo{} } if fps <= 0 { fps = 15 } return VideoInfo{ DurationSec: dur, TotalFrames: int(dur * fps), } } // VideoSource reads a video file or URL frame-by-frame using ffmpeg, outputting // JPEG frames via stdout. Reuses the same JPEG SOI/EOI marker parsing // pattern as FFmpegCamera. type VideoSource struct { cmd *exec.Cmd stdout io.ReadCloser frameCh chan []byte // decoded frames queue mu sync.Mutex done chan struct{} finished bool err error filePath string // local file path (empty for URL sources) isURL bool // true when source is a URL, skip file cleanup totalFrames int64 // 0 means unknown frameCount int64 // atomic counter incremented in readLoop } // NewVideoSource starts an ffmpeg process that decodes a video file // and outputs MJPEG frames to stdout at the specified FPS. func NewVideoSource(filePath string, fps float64) (*VideoSource, error) { return newVideoSource(filePath, fps, false, 0) } // NewVideoSourceWithSeek starts ffmpeg from a specific position (in seconds). func NewVideoSourceWithSeek(filePath string, fps float64, seekSeconds float64) (*VideoSource, error) { return newVideoSource(filePath, fps, false, seekSeconds) } // NewVideoSourceFromURL starts an ffmpeg process that reads from a URL // (HTTP, HTTPS, RTSP, etc.) and outputs MJPEG frames to stdout. func NewVideoSourceFromURL(rawURL string, fps float64) (*VideoSource, error) { return newVideoSource(rawURL, fps, true, 0) } // NewVideoSourceFromURLWithSeek starts ffmpeg from a URL at a specific position. func NewVideoSourceFromURLWithSeek(rawURL string, fps float64, seekSeconds float64) (*VideoSource, error) { return newVideoSource(rawURL, fps, true, seekSeconds) } // ResolveWithYTDLP uses yt-dlp to extract the direct video stream URL // from platforms like YouTube, Vimeo, etc. // Returns the resolved direct URL or an error. func ResolveWithYTDLP(rawURL string) (string, error) { // yt-dlp -f "best[ext=mp4]/best" --get-url cmd := exec.Command("yt-dlp", "-f", "best[ext=mp4]/best", "--get-url", rawURL) out, err := cmd.Output() if err != nil { if exitErr, ok := err.(*exec.ExitError); ok { return "", fmt.Errorf("yt-dlp failed: %s", string(exitErr.Stderr)) } return "", fmt.Errorf("yt-dlp not available: %w", err) } resolved := strings.TrimSpace(string(out)) if resolved == "" { return "", fmt.Errorf("yt-dlp returned empty URL") } // yt-dlp may return multiple lines (video + audio); take only the first if idx := strings.Index(resolved, "\n"); idx > 0 { resolved = resolved[:idx] } return resolved, nil } func newVideoSource(input string, fps float64, isURL bool, seekSeconds float64) (*VideoSource, error) { if fps <= 0 { fps = 15 } args := []string{} if seekSeconds > 0 { args = append(args, "-ss", fmt.Sprintf("%.3f", seekSeconds)) } args = append(args, "-i", input, "-vf", fmt.Sprintf("fps=%g", fps), "-f", "image2pipe", "-vcodec", "mjpeg", "-q:v", "5", "-an", "-", ) cmd := exec.Command("ffmpeg", args...) stdout, err := cmd.StdoutPipe() if err != nil { return nil, fmt.Errorf("failed to get stdout pipe: %w", err) } cmd.Stderr = nil if err := cmd.Start(); err != nil { return nil, fmt.Errorf("failed to start ffmpeg: %w", err) } filePath := "" if !isURL { filePath = input } vs := &VideoSource{ cmd: cmd, stdout: stdout, frameCh: make(chan []byte, 30), // buffer up to 30 frames done: make(chan struct{}), filePath: filePath, isURL: isURL, } go vs.readLoop() return vs, nil } // readLoop scans ffmpeg stdout for JPEG SOI/EOI markers to extract frames. func (v *VideoSource) readLoop() { defer close(v.done) defer close(v.frameCh) reader := bufio.NewReaderSize(v.stdout, 1024*1024) buf := make([]byte, 0, 512*1024) inFrame := false for { b, err := reader.ReadByte() if err != nil { v.mu.Lock() v.finished = true if err != io.EOF { v.err = fmt.Errorf("ffmpeg stream ended: %w", err) } v.mu.Unlock() return } if !inFrame { if b == 0xFF { next, err := reader.ReadByte() if err != nil { v.mu.Lock() v.finished = true v.mu.Unlock() return } if next == 0xD8 { buf = buf[:0] buf = append(buf, 0xFF, 0xD8) inFrame = true } } continue } buf = append(buf, b) if b == 0xD9 && len(buf) >= 2 && buf[len(buf)-2] == 0xFF { frame := make([]byte, len(buf)) copy(frame, buf) v.frameCh <- frame // blocks if buffer full, applies backpressure atomic.AddInt64(&v.frameCount, 1) inFrame = false } } } // ReadFrame returns the next decoded frame, blocking until one is available. // Returns an error when all frames have been consumed and ffmpeg has finished. func (v *VideoSource) ReadFrame() ([]byte, error) { frame, ok := <-v.frameCh if !ok { return nil, fmt.Errorf("video playback complete") } return frame, nil } // SetTotalFrames sets the expected total frame count (from ffprobe). func (v *VideoSource) SetTotalFrames(n int) { atomic.StoreInt64(&v.totalFrames, int64(n)) } // TotalFrames returns the expected total frame count, or 0 if unknown. func (v *VideoSource) TotalFrames() int { return int(atomic.LoadInt64(&v.totalFrames)) } // FrameCount returns the number of frames decoded so far. func (v *VideoSource) FrameCount() int { return int(atomic.LoadInt64(&v.frameCount)) } // IsFinished returns true when the video file has been fully decoded // AND all buffered frames have been consumed. func (v *VideoSource) IsFinished() bool { v.mu.Lock() finished := v.finished v.mu.Unlock() return finished && len(v.frameCh) == 0 } // CloseWithoutRemove stops the ffmpeg process but does NOT delete the temp file. // Used when seeking: we need to restart ffmpeg from a different position but keep the file. func (v *VideoSource) CloseWithoutRemove() error { if v.cmd != nil && v.cmd.Process != nil { _ = v.cmd.Process.Kill() _ = v.cmd.Wait() } for range v.frameCh { } <-v.done return nil } func (v *VideoSource) Close() error { if v.cmd != nil && v.cmd.Process != nil { _ = v.cmd.Process.Kill() _ = v.cmd.Wait() } // Drain any remaining frames so readLoop can exit for range v.frameCh { } <-v.done // Only remove temp files, not URL sources if !v.isURL && v.filePath != "" { _ = os.Remove(v.filePath) } return nil }