package ingest import ( "bytes" "context" "fmt" "time" "github.com/go-gst/go-gst/gst" "github.com/go-gst/go-gst/gst/app" ) const ( liveFrameTypeInit = 0x01 liveFrameTypeMedia = 0x02 liveFrameTypeError = 0x03 ) const ( liveSampleTimeout = 200 * time.Millisecond liveSampleBufferSize = 5 ) type subscribeReq struct { BufferSize int Result chan<- subscribeRes } type subscribeRes struct { Id int Stream <-chan *bytes.Reader Err error } type unsubscribeReq struct { Id int } // addLiveBranch populates the pipeline with: // // [vTee] ---> queue ---> valve ---> mp4mux ---> appsink // [aTee] ---> queue ---> valve -----^ func (p *CameraPipeline) addLiveBranch(branchTimeout time.Duration) error { if p.vTee == nil { return fmt.Errorf("video tee not initialized") } if p.aTee == nil { return fmt.Errorf("audio tee not initialized") } vPad := p.vTee.GetRequestPad("src_%u") if vPad == nil { return fmt.Errorf("failed to get request pad from video tee") } aPad := p.aTee.GetRequestPad("src_%u") if aPad == nil { return fmt.Errorf("failed to get request pad from audio tee") } vQueue, err := gst.NewElement("queue") if err != nil { return fmt.Errorf("failed to create video queue element: %w", err) } if err := setGlibValueProperty(vQueue, "leaky", 2); err != nil { // downstream return fmt.Errorf("failed to set video queue leaky property: %w", err) } if err := vQueue.SetProperty("max-size-time", uint64(2*time.Second.Nanoseconds())); err != nil { return fmt.Errorf("failed to set video queue max-size-time property: %w", err) } vValve, err := gst.NewElement("valve") if err != nil { return fmt.Errorf("failed to create video valve element: %w", err) } if err := vValve.SetProperty("drop", true); err != nil { return fmt.Errorf("failed to set video valve drop property: %w", err) } if err := setGlibValueProperty(vValve, "drop-mode", 1); err != nil { // forward-sticky return fmt.Errorf("failed to set video valve drop-mode property: %w", err) } aQueue, err := gst.NewElement("queue") if err != nil { return fmt.Errorf("failed to create audio queue element: %w", err) } if err := setGlibValueProperty(aQueue, "leaky", 2); err != nil { // downstream return fmt.Errorf("failed to set audio queue leaky property: %w", err) } if err := aQueue.SetProperty("max-size-time", uint64(2*time.Second.Nanoseconds())); err != nil { return fmt.Errorf("failed to set audio queue max-size-time property: %w", err) } aValve, err := gst.NewElement("valve") if err != nil { return fmt.Errorf("failed to create audio valve element: %w", err) } if err := aValve.SetProperty("drop", true); err != nil { return fmt.Errorf("failed to set audio valve drop property: %w", err) } if err := setGlibValueProperty(aValve, "drop-mode", 1); err != nil { // forward-sticky return fmt.Errorf("failed to set audio valve drop-mode property: %w", err) } mp4Mux, err := gst.NewElement("mp4mux") if err != nil { return fmt.Errorf("failed to create mp4mux element: %w", err) } if err := mp4Mux.SetProperty("fragment-duration", uint(100)); err != nil { return fmt.Errorf("failed to set mp4mux fragment-duration property: %w", err) } if err := setGlibValueProperty(mp4Mux, "fragment-mode", 0); err != nil { // dash-or-mss return fmt.Errorf("failed to set mp4mux fragment-mode property: %w", err) } if err := mp4Mux.SetProperty("interleave-time", uint64(0)); err != nil { return fmt.Errorf("failed to set mp4mux interleave-time property: %w", err) } if err := mp4Mux.SetProperty("latency", uint64(0)); err != nil { return fmt.Errorf("failed to set mp4mux latency property: %w", err) } appSink, err := gst.NewElement("appsink") if err != nil { return fmt.Errorf("failed to create appsink element: %w", err) } if err := appSink.SetProperty("max-buffers", uint(5)); err != nil { return fmt.Errorf("failed to set appsink max-buffers property: %w", err) } if err := appSink.SetProperty("sync", false); err != nil { return fmt.Errorf("failed to set appsink sync property: %w", err) } if err := appSink.SetProperty("async", false); err != nil { return fmt.Errorf("failed to set appsink async property: %w", err) } if err := p.pipeline.AddMany(vQueue, vValve, aQueue, aValve, mp4Mux, appSink); err != nil { return fmt.Errorf("failed to add live branch elements to pipeline: %w", err) } if res := vPad.Link(vQueue.GetStaticPad("sink")); res != gst.PadLinkOK { return fmt.Errorf("failed to link video tee to video queue: %s", res) } if res := aPad.Link(aQueue.GetStaticPad("sink")); res != gst.PadLinkOK { return fmt.Errorf("failed to link audio tee to audio queue: %s", res) } if res := vQueue.GetStaticPad("src").Link(vValve.GetStaticPad("sink")); res != gst.PadLinkOK { return fmt.Errorf("failed to link video queue to video valve: %s", res) } if res := aQueue.GetStaticPad("src").Link(aValve.GetStaticPad("sink")); res != gst.PadLinkOK { return fmt.Errorf("failed to link audio queue to audio valve: %s", res) } if res := vValve.GetStaticPad("src").Link(mp4Mux.GetRequestPad("video_%u")); res != gst.PadLinkOK { return fmt.Errorf("failed to link video valve to mp4mux: %s", res) } if res := aValve.GetStaticPad("src").Link(mp4Mux.GetRequestPad("audio_%u")); res != gst.PadLinkOK { return fmt.Errorf("failed to link audio valve to mp4mux: %s", res) } if res := mp4Mux.GetStaticPad("src").Link(appSink.GetStaticPad("sink")); res != gst.PadLinkOK { return fmt.Errorf("failed to link mp4mux to appsink: %s", res) } p.liveVValve = vValve p.liveAValve = aValve p.liveSink = app.SinkFromElement(appSink) p.liveTimeout = branchTimeout p.liveSubscribe = make(chan subscribeReq) p.liveUnsubscribe = make(chan unsubscribeReq) return nil } func (p *CameraPipeline) liveSampler(ctx context.Context, result chan<- []byte) { SamplerLoop: for { sample := p.liveSink.TryPullSample(gst.ClockTime(liveSampleTimeout.Nanoseconds())) if sample == nil { select { case <-ctx.Done(): return default: p.log.Debug("Live sampler timed out waiting for sample") continue SamplerLoop } } buf := sample.GetBuffer() if buf == nil { p.log.Warn("Received nil buffer from live sink") continue SamplerLoop } data, err := gstBufferToBytes(buf) if err != nil { p.log.Warn("Failed to convert buffer to bytes") continue SamplerLoop } select { case <-ctx.Done(): return case result <- data: } } } func (p *CameraPipeline) liveManager(ctx context.Context) { active := false var initSeg []byte assembler := newFMP4Assembler() var lastMediaEmit time.Time subscribers := &subscribersManager{m: make(map[int]subscriber), nextID: 0} samples := make(chan []byte, liveSampleBufferSize) samplerCtx, samplerCancel := context.WithCancel(ctx) defer func() { subscribers.Close() samplerCancel() }() defer p.log.Debug("Live branch manager exiting") var start time.Time var numSamples int for { var timeout <-chan time.Time if active && subscribers.len() == 0 { timeout = time.After(p.liveTimeout) } select { case <-ctx.Done(): samplerCancel() p.log.Info("Context cancelled", "numSamples", numSamples, "duration", time.Since(start)) return case req := <-p.liveSubscribe: if !active { p.log.Info("Activating live branch") initSeg = nil assembler = newFMP4Assembler() lastMediaEmit = time.Time{} if err := p.liveVValve.SetProperty("drop", false); err != nil { req.Result <- subscribeRes{Err: fmt.Errorf("failed to activate video valve: %w", err)} continue } if err := p.liveAValve.SetProperty("drop", false); err != nil { if rollbackErr := p.liveVValve.SetProperty("drop", true); rollbackErr != nil { // try to rollback p.log.Warn("Failed to rollback video valve state after audio valve activation failure", "error", rollbackErr) } req.Result <- subscribeRes{Err: fmt.Errorf("failed to activate audio valve: %w", err)} continue } go p.liveSampler(samplerCtx, samples) active = true start = time.Now() } stream := make(chan *bytes.Reader, req.BufferSize) id := subscribers.AddSubscriber(stream) if initSeg != nil { if !subscribers.SendOrUnsubscribe(id, initSeg) { req.Result <- subscribeRes{Err: fmt.Errorf("subscriber missed init segment")} continue } } req.Result <- subscribeRes{Id: id, Stream: stream} case req := <-p.liveUnsubscribe: subscribers.RemoveSubscriber(req.Id) case data := <-samples: chunks, err := assembler.Push(data) if err != nil { p.log.Warn("Failed to assemble live fMP4 chunks", "error", err) continue } numSamples += 1 for _, chunk := range chunks { framed := marshalLiveFrame(chunk.FrameType, chunk.Payload) switch chunk.FrameType { case liveFrameTypeInit: initSeg = framed p.log.Debug("Emitted init segment", "size", len(chunk.Payload)) case liveFrameTypeMedia: now := time.Now() if !lastMediaEmit.IsZero() { p.log.Debug("Emitted media segment", "size", len(chunk.Payload), "gap", now.Sub(lastMediaEmit)) } else { p.log.Debug("Emitted media segment", "size", len(chunk.Payload)) } lastMediaEmit = now } subscribers.Broadcast(framed) } case <-timeout: p.log.Info("Deactivating live branch due to inactivity") if err := p.liveVValve.SetProperty("drop", true); err != nil { p.log.Warn("Failed to deactivate video valve: %w", err) } if err := p.liveAValve.SetProperty("drop", true); err != nil { p.log.Warn("Failed to deactivate audio valve: %w", err) } samplerCancel() samplerCtx, samplerCancel = context.WithCancel(ctx) active = false } } } func marshalLiveFrame(frameType byte, payload []byte) []byte { framed := make([]byte, len(payload)+1) framed[0] = frameType copy(framed[1:], payload) return framed } func (p *CameraPipeline) LiveSubscribe(timeout time.Duration, bufferSize int) (id int, stream <-chan *bytes.Reader, err error) { result := make(chan subscribeRes) req := subscribeReq{ BufferSize: bufferSize, Result: result, } select { case <-time.After(timeout): return 0, nil, fmt.Errorf("live subscribe request timed out") case p.liveSubscribe <- req: } res := <-result return res.Id, res.Stream, res.Err } func (p *CameraPipeline) LiveUnsubscribe(id int) { p.liveUnsubscribe <- unsubscribeReq{Id: id} } type subscriber struct { Stream chan<- *bytes.Reader } type subscribersManager struct { m map[int]subscriber nextID int } func (sm subscribersManager) len() int { return len(sm.m) } func (sm *subscribersManager) AddSubscriber(stream chan<- *bytes.Reader) int { id := sm.nextID sm.m[id] = subscriber{Stream: stream} sm.nextID++ return id } func (sm *subscribersManager) RemoveSubscriber(id int) { if sub, ok := sm.m[id]; ok { close(sub.Stream) delete(sm.m, id) } } func (sm *subscribersManager) SendOrUnsubscribe(id int, data []byte) bool { if sub, ok := sm.m[id]; ok { select { case sub.Stream <- bytes.NewReader(data): return true default: sm.RemoveSubscriber(id) return false } } return false } func (sm *subscribersManager) Close() { for id := range sm.m { sm.RemoveSubscriber(id) } } func (sm *subscribersManager) Broadcast(data []byte) { for id := range sm.m { sm.SendOrUnsubscribe(id, data) } }