package ingest import ( "bytes" "context" "fmt" "log/slog" "net/url" "sync" "time" "git.koval.net/cyclane/cctv/server/models" "github.com/pocketbase/pocketbase/core" "github.com/pocketbase/pocketbase/tools/store" "golang.org/x/sync/errgroup" ) type Ingest struct { sync.WaitGroup app core.App log *slog.Logger pipelines *store.Store[string, *activePipeline] } type activePipeline struct { stream *models.Stream pipeline *CameraPipeline cancel context.CancelFunc terminated <-chan struct{} } func BeginIngest(ctx context.Context, app core.App) *Ingest { log := app.Logger().With("svc", "ingest") ingest := &Ingest{ WaitGroup: sync.WaitGroup{}, app: app, log: log, pipelines: store.New[string, *activePipeline](nil), } // initialise thumbnail pipelines thumbnailStreams := map[string]*models.Stream{} streams := []*models.Stream{} err := app.RunInTransaction(func(txApp core.App) error { if err := txApp.RecordQuery("streams").All(&streams); err != nil { return fmt.Errorf("failed to fetch stream records: %w", err) } for _, stream := range streams { other, ok := thumbnailStreams[stream.CameraID()] if !ok || thumbnailStreamScore(stream) < thumbnailStreamScore(other) { thumbnailStreams[stream.CameraID()] = stream } } streamRecords := make([]*core.Record, len(streams)) for i, stream := range streams { streamRecords[i] = stream.Record } if errs := txApp.ExpandRecords(streamRecords, []string{"camera"}, nil); len(errs) > 0 { return fmt.Errorf("failed to expand camera relation for stream records: %v", errs) } return nil }) if err != nil { log.Error("Failed to initialize stream pipelines", "error", err) return ingest } group := errgroup.Group{} for _, stream := range streams { group.Go(func() error { log := log.With("stream", stream.Id) rtspUrl, err := url.Parse(stream.URL()) if err != nil { return fmt.Errorf("failed to parse stream URL for stream %s: %w", stream.Id, err) } password, err := stream.Camera().Password(app.EncryptionEnv()) if err != nil { return fmt.Errorf("failed to decrypt camera password for stream %s: %w", stream.Id, err) } rtspUrl.User = url.UserPassword(stream.Camera().Username(), password) pipeline, err := CreatePipeline(log) if err != nil { return fmt.Errorf("failed to create pipeline for stream %s: %w", stream.Id, err) } if err := pipeline.addRtspSource(rtspUrl); err != nil { return fmt.Errorf("failed to add RTSP source to pipeline for stream %s: %w", stream.Id, err) } consumers := false if thumbnailStream, ok := thumbnailStreams[stream.CameraID()]; ok && thumbnailStream.Id == stream.Id { consumers = true if err := pipeline.addThumbnailBranch(480, 270, 1000*time.Millisecond, 20*time.Second); err != nil { return fmt.Errorf("failed to add thumbnail branch to pipeline for stream %s: %w", stream.Id, err) } } if err := pipeline.addLiveBranch(20 * time.Second); err != nil { return fmt.Errorf("failed to add live branch to pipeline for stream %s: %w", stream.Id, err) } if consumers { log.Info("Starting pipeline for stream with consumers.") } else { log.Info("Stream has no consumers, ignoring pipeline.") pipeline.Close() return nil } pipelineCtx, cancel := context.WithCancel(ctx) terminated := make(chan struct{}) ingest.pipelines.Set(stream.Id, &activePipeline{ stream: stream, pipeline: pipeline, cancel: cancel, terminated: terminated, }) ingest.WaitGroup.Go(func() { if err := pipeline.Run(pipelineCtx); err != nil { log.Error("Stream pipeline exited with error", "error", err) } close(terminated) }) return nil }) } if err := group.Wait(); err != nil { log.Error("Failed to start ingest pipelines", "error", err) } return ingest } func (ingest *Ingest) GetThumbnail(ctx context.Context, streamId string) (*bytes.Reader, error) { active, ok := ingest.pipelines.GetOk(streamId) if !ok { return nil, fmt.Errorf("no active pipeline for stream %s", streamId) } thumb, err := active.pipeline.GetThumbnail(ctx) if err != nil { return nil, fmt.Errorf("failed to get thumbnail from pipeline for stream %s: %w", streamId, err) } return thumb, nil } func thumbnailStreamScore(stream *models.Stream) float64 { return stream.FPS() * float64(stream.Height()*stream.Width()) } func (ingest *Ingest) SubscribeLive(ctx context.Context, streamId string) (<-chan *bytes.Reader, error) { active, ok := ingest.pipelines.GetOk(streamId) if !ok { return nil, fmt.Errorf("no active pipeline for stream %s", streamId) } id, stream, err := active.pipeline.LiveSubscribe(1*time.Second, 16) if err != nil { return nil, fmt.Errorf("failed to subscribe to live stream %s: %w", streamId, err) } out := make(chan *bytes.Reader, 16) go func() { defer close(out) defer active.pipeline.LiveUnsubscribe(id) for { select { case <-ctx.Done(): return case buf, ok := <-stream: if !ok { return } select { case <-ctx.Done(): return case out <- buf: } } } }() return out, nil }