package ingest import ( "fmt" "github.com/go-gst/go-gst/gst" ) // addStatsBranch populates the pipeline with: // // [vTee] ---> fakesink // // StreamStats are extracted from the fakesink's handovers (using the sink pad caps) and sent to the returned channel. // If the channel has no received or is full, the stats are dropped to avoid blocking the pipeline. The channel is never closed. func (p *CameraPipeline) addStatsBranch() (<-chan StreamStats, error) { if p.vTee == nil { return nil, fmt.Errorf("video tee not initialized") } vPad := p.vTee.GetRequestPad("src_%u") if vPad == nil { return nil, fmt.Errorf("failed to get request pad from video tee") } statsChan := make(chan StreamStats) fakeSink, err := gst.NewElement("fakesink") if err != nil { return nil, fmt.Errorf("failed to create fakesink element: %w", err) } if err := fakeSink.SetProperty("signal-handoffs", true); err != nil { return nil, fmt.Errorf("failed to set fakesink signal-handoffs property: %w", err) } if err := p.pipeline.Add(fakeSink); err != nil { return nil, fmt.Errorf("failed to add fakesink to pipeline: %w", err) } if res := vPad.Link(fakeSink.GetStaticPad("sink")); res != gst.PadLinkOK { return nil, fmt.Errorf("failed to link video tee to fakesink: %s", res) } fakeSink.Connect("handoff", func(element *gst.Element, _ *gst.Buffer, pad *gst.Pad) { caps := pad.GetCurrentCaps() if caps == nil || caps.GetSize() == 0 { p.log.Error("Stats pad has no or empty caps") return } if caps.GetSize() > 1 { // Note: should be impossible p.log.Warn("Stats pad has multiple structures, using the first one", "caps", caps.String()) } structure := caps.GetStructureAt(0) if structure == nil { p.log.Error("Stats pad received nil structure") return } rawWidth, err := structure.GetValue("width") if err != nil { p.log.Error("Failed to get width from stats pad caps", "error", err) return } width, ok := rawWidth.(int) if !ok { p.log.Error("Width field in stats pad caps is not an int", "value", rawWidth) return } rawHeight, err := structure.GetValue("height") if err != nil { p.log.Error("Failed to get height from stats pad caps", "error", err) return } height, ok := rawHeight.(int) if !ok { p.log.Error("Height field in stats pad caps is not an int", "value", rawHeight) return } fps, err := structure.GetValue("framerate") if err != nil { p.log.Error("Failed to get framerate from stats pad caps", "error", err) return } var fpsFloat float64 switch v := fps.(type) { case *gst.FractionValue: fpsFloat = float64(v.Num()) / float64(v.Denom()) case gst.FractionValue: fpsFloat = float64(v.Num()) / float64(v.Denom()) case float64: fpsFloat = v default: p.log.Error("Framerate field in stats pad caps is not a fraction or float", "type", fmt.Sprintf("%T", fps), "value", fps) return } select { case statsChan <- StreamStats{ FPS: fpsFloat, Width: width, Height: height, }: default: p.log.Warn("Stats channel is full, dropping stats update") } return }) return statsChan, nil }