package inference import ( "context" "sync" "visiona-local/server/internal/device" "visiona-local/server/internal/driver" ) type stream struct { cancel context.CancelFunc done chan struct{} } type Service struct { deviceMgr *device.Manager streams map[string]*stream mu sync.Mutex } func NewService(deviceMgr *device.Manager) *Service { return &Service{ deviceMgr: deviceMgr, streams: make(map[string]*stream), } } func (s *Service) Start(deviceID string, resultCh chan<- *driver.InferenceResult) error { session, err := s.deviceMgr.GetDevice(deviceID) if err != nil { return err } if err := session.Driver.StartInference(); err != nil { return err } ctx, cancel := context.WithCancel(context.Background()) done := make(chan struct{}) s.mu.Lock() s.streams[deviceID] = &stream{cancel: cancel, done: done} s.mu.Unlock() go func() { defer close(done) defer session.Driver.StopInference() for { select { case <-ctx.Done(): return default: result, err := session.Driver.ReadInference() if err != nil { continue } select { case resultCh <- result: default: } } } }() return nil } // StopAll stops all running inference streams. Used during graceful shutdown. func (s *Service) StopAll() { s.mu.Lock() ids := make([]string, 0, len(s.streams)) for id := range s.streams { ids = append(ids, id) } s.mu.Unlock() for _, id := range ids { _ = s.Stop(id) } } func (s *Service) Stop(deviceID string) error { s.mu.Lock() st, ok := s.streams[deviceID] if ok { delete(s.streams, deviceID) } s.mu.Unlock() if ok { st.cancel() <-st.done // wait for goroutine to finish and StopInference to complete } return nil }