Files
cctv/server/ingest/branch_stats.go
2026-04-22 23:35:59 +01:00

112 lines
3.1 KiB
Go

package ingest
import (
"fmt"
"github.com/go-gst/go-gst/gst"
)
// addStatsBranch populates the pipeline with:
//
// [vTee] ---> fakesink
//
// StreamStats are extracted from the fakesink's handovers (using the sink pad caps) and sent to the returned channel.
// If the channel has no received or is full, the stats are dropped to avoid blocking the pipeline. The channel is never closed.
func (p *CameraPipeline) addStatsBranch() (<-chan StreamStats, error) {
if p.vTee == nil {
return nil, fmt.Errorf("video tee not initialized")
}
vPad := p.vTee.GetRequestPad("src_%u")
if vPad == nil {
return nil, fmt.Errorf("failed to get request pad from video tee")
}
statsChan := make(chan StreamStats)
fakeSink, err := gst.NewElement("fakesink")
if err != nil {
return nil, fmt.Errorf("failed to create fakesink element: %w", err)
}
if err := fakeSink.SetProperty("signal-handoffs", true); err != nil {
return nil, fmt.Errorf("failed to set fakesink signal-handoffs property: %w", err)
}
if err := p.pipeline.Add(fakeSink); err != nil {
return nil, fmt.Errorf("failed to add fakesink to pipeline: %w", err)
}
if res := vPad.Link(fakeSink.GetStaticPad("sink")); res != gst.PadLinkOK {
return nil, fmt.Errorf("failed to link video tee to fakesink: %s", res)
}
fakeSink.Connect("handoff", func(element *gst.Element, _ *gst.Buffer, pad *gst.Pad) {
caps := pad.GetCurrentCaps()
if caps == nil || caps.GetSize() == 0 {
p.log.Error("Stats pad has no or empty caps")
return
}
if caps.GetSize() > 1 {
// Note: should be impossible
p.log.Warn("Stats pad has multiple structures, using the first one", "caps", caps.String())
}
structure := caps.GetStructureAt(0)
if structure == nil {
p.log.Error("Stats pad received nil structure")
return
}
rawWidth, err := structure.GetValue("width")
if err != nil {
p.log.Error("Failed to get width from stats pad caps", "error", err)
return
}
width, ok := rawWidth.(int)
if !ok {
p.log.Error("Width field in stats pad caps is not an int", "value", rawWidth)
return
}
rawHeight, err := structure.GetValue("height")
if err != nil {
p.log.Error("Failed to get height from stats pad caps", "error", err)
return
}
height, ok := rawHeight.(int)
if !ok {
p.log.Error("Height field in stats pad caps is not an int", "value", rawHeight)
return
}
fps, err := structure.GetValue("framerate")
if err != nil {
p.log.Error("Failed to get framerate from stats pad caps", "error", err)
return
}
var fpsFloat float64
switch v := fps.(type) {
case *gst.FractionValue:
fpsFloat = float64(v.Num()) / float64(v.Denom())
case gst.FractionValue:
fpsFloat = float64(v.Num()) / float64(v.Denom())
case float64:
fpsFloat = v
default:
p.log.Error("Framerate field in stats pad caps is not a fraction or float", "type", fmt.Sprintf("%T", fps), "value", fps)
return
}
select {
case statsChan <- StreamStats{
FPS: fpsFloat,
Width: width,
Height: height,
}:
default:
p.log.Warn("Stats channel is full, dropping stats update")
}
return
})
return statsChan, nil
}