jim800121chen c54f16fca0 Initial commit: visionA monorepo with local-tool subproject
local-tool/: visionA-local desktop app
- M1: Wails shell + Go server + Next.js UI + Mock mode (macOS dmg ready)
- M2: i18n (zh-TW/en) + Settings 4-tab refactor
- M3: Embedded Python 3.12 runtime (python-build-standalone) + KneronPLUS wheels
- M4: Windows Inno Setup script (build on Windows runner)
- M5: Linux AppImage script + udev rule (build on Linux runner)
- M6: ffmpeg (GPL, pending legal review) + yt-dlp bundled
- Lifecycle: watchServer health check, fatal native dialog,
            Wails IPC raise endpoint, stale process cleanup

.autoflow/: full PRD / Design Spec / Architecture / Testing docs
            (4 rounds tri-party discussion + cross review)
.github/workflows/: macOS / Windows / Linux build CI

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-11 22:10:38 +08:00

231 lines
5.0 KiB
Go

package camera
import (
"context"
"time"
"visiona-local/server/internal/driver"
)
// SourceType identifies the kind of frame source used in the pipeline.
type SourceType string
const (
SourceCamera SourceType = "camera"
SourceImage SourceType = "image"
SourceVideo SourceType = "video"
SourceBatchImage SourceType = "batch_image"
)
type InferencePipeline struct {
source FrameSource
sourceType SourceType
device driver.DeviceDriver
frameCh chan<- []byte
resultCh chan<- *driver.InferenceResult
cancel context.CancelFunc
doneCh chan struct{}
frameOffset int // starting frame index (non-zero after seek)
}
func NewInferencePipeline(
source FrameSource,
sourceType SourceType,
device driver.DeviceDriver,
frameCh chan<- []byte,
resultCh chan<- *driver.InferenceResult,
) *InferencePipeline {
return &InferencePipeline{
source: source,
sourceType: sourceType,
device: device,
frameCh: frameCh,
resultCh: resultCh,
doneCh: make(chan struct{}),
}
}
// NewInferencePipelineWithOffset creates a pipeline with a frame offset (used after seek).
func NewInferencePipelineWithOffset(
source FrameSource,
sourceType SourceType,
device driver.DeviceDriver,
frameCh chan<- []byte,
resultCh chan<- *driver.InferenceResult,
frameOffset int,
) *InferencePipeline {
return &InferencePipeline{
source: source,
sourceType: sourceType,
device: device,
frameCh: frameCh,
resultCh: resultCh,
doneCh: make(chan struct{}),
frameOffset: frameOffset,
}
}
func (p *InferencePipeline) Start() {
ctx, cancel := context.WithCancel(context.Background())
p.cancel = cancel
go p.run(ctx)
}
func (p *InferencePipeline) Stop() {
if p.cancel != nil {
p.cancel()
}
}
// Done returns a channel that closes when the pipeline finishes.
// For camera mode this only closes on Stop(); for image/video it
// closes when the source is exhausted.
func (p *InferencePipeline) Done() <-chan struct{} {
return p.doneCh
}
func (p *InferencePipeline) run(ctx context.Context) {
defer close(p.doneCh)
targetInterval := time.Second / 15 // 15 FPS
inferenceRan := false // for image mode: only run inference once
frameIndex := 0 // video frame counter
for {
select {
case <-ctx.Done():
return
default:
}
start := time.Now()
var jpegFrame []byte
var readErr error
// Video mode: ReadFrame blocks on channel, need to respect ctx cancel
if p.sourceType == SourceVideo {
vs := p.source.(*VideoSource)
select {
case <-ctx.Done():
return
case frame, ok := <-vs.frameCh:
if !ok {
return // all frames consumed
}
jpegFrame = frame
}
} else {
jpegFrame, readErr = p.source.ReadFrame()
if readErr != nil {
time.Sleep(100 * time.Millisecond)
continue
}
}
// Send to MJPEG stream
select {
case p.frameCh <- jpegFrame:
default:
}
// Batch image mode: process each image sequentially, then advance.
if p.sourceType == SourceBatchImage {
mis := p.source.(*MultiImageSource)
for {
select {
case <-ctx.Done():
return
default:
}
frame, err := mis.ReadFrame()
if err != nil {
return
}
// Send current frame to MJPEG
select {
case p.frameCh <- frame:
default:
}
// Run inference on this image
result, inferErr := p.device.RunInference(frame)
if inferErr == nil {
entry := mis.CurrentEntry()
result.ImageIndex = mis.CurrentIndex()
result.TotalImages = mis.TotalImages()
result.Filename = entry.Filename
select {
case p.resultCh <- result:
default:
}
}
// Move to next image
if !mis.Advance() {
// Keep sending last frame for late-connecting MJPEG clients (~2s)
for i := 0; i < 30; i++ {
select {
case <-ctx.Done():
return
default:
}
select {
case p.frameCh <- frame:
default:
}
time.Sleep(time.Second / 15)
}
return
}
}
}
// Image mode: only run inference once, then keep sending
// the same frame to MJPEG so late-connecting clients can see it.
if p.sourceType == SourceImage {
if !inferenceRan {
inferenceRan = true
result, err := p.device.RunInference(jpegFrame)
if err == nil {
select {
case p.resultCh <- result:
default:
}
}
}
elapsed := time.Since(start)
if elapsed < targetInterval {
time.Sleep(targetInterval - elapsed)
}
continue
}
// Camera / Video mode: run inference every frame
result, err := p.device.RunInference(jpegFrame)
if err != nil {
continue
}
// Video mode: attach frame progress
if p.sourceType == SourceVideo {
result.FrameIndex = p.frameOffset + frameIndex
frameIndex++
vs := p.source.(*VideoSource)
result.TotalFrames = vs.TotalFrames()
}
select {
case p.resultCh <- result:
default:
}
elapsed := time.Since(start)
if elapsed < targetInterval {
time.Sleep(targetInterval - elapsed)
}
}
}