package ingest import ( "context" "fmt" "io" "log/slog" "net/url" "reflect" "sync/atomic" "time" "github.com/go-gst/go-glib/glib" "github.com/go-gst/go-gst/gst" "github.com/go-gst/go-gst/gst/app" ) type CameraPipeline struct { log *slog.Logger pipeline *gst.Pipeline vTee *gst.Element aTee *gst.Element thumbnailBlockPad *gst.Pad thumbnailSink *app.Sink thumbnailBlockProbe uint64 thumbnailBlocked atomic.Bool thumbnailTimeout time.Duration thumbnailReq chan thumbnailReq liveVValve *gst.Element liveAValve *gst.Element liveSink *app.Sink liveTimeout time.Duration liveSubscribe chan subscribeReq liveUnsubscribe chan unsubscribeReq } type jobResult struct { Err error } func CreatePipeline(log *slog.Logger) (*CameraPipeline, error) { var err error p := &CameraPipeline{ log: log, } p.pipeline, err = gst.NewPipeline("") if err != nil { return nil, fmt.Errorf("failed to create GStreamer pipeline: %w", err) } return p, nil } func (p *CameraPipeline) Close() { p.pipeline.SetState(gst.StateNull) p.thumbnailBlockPad = nil p.thumbnailSink = nil p.liveVValve = nil p.liveAValve = nil p.liveSink = nil p.vTee = nil p.aTee = nil p.pipeline = nil } func (p *CameraPipeline) Run(ctx context.Context) error { if err := p.pipeline.SetState(gst.StatePlaying); err != nil { return fmt.Errorf("failed to set pipeline to playing: %w", err) } if p.thumbnailSink != nil { go p.thumbnailManager(ctx) } if p.liveSink != nil { go p.liveManager(ctx) } bus := p.pipeline.GetBus() for { select { case <-ctx.Done(): p.Close() return nil default: } done, err := handleMessage(p.log, bus.TimedPop(gst.ClockTime((500 * time.Millisecond).Nanoseconds()))) if err != nil { p.Close() return fmt.Errorf("pipeline error: %w", err) } if done { p.Close() return nil } } } func handleMessage(log *slog.Logger, msg *gst.Message) (bool, error) { if msg == nil { return false, nil } // nolint:exhaustive switch msg.Type() { case gst.MessageEOS: log.Info("Pipeline reached EOS") return true, nil case gst.MessageError: err := msg.ParseError() return true, fmt.Errorf("pipeline error: %w", err) default: log.Debug(fmt.Sprintf("Pipeline message: %s", msg)) return false, nil } } // addRtspSource populates the pipeline with: // // rtspsrc // |---> (dynamic video depayloader + parser) --> vTee // |---> (dynamic audio depayloader + parser) --> aTee func (p *CameraPipeline) addRtspSource(rtspURL *url.URL) error { if p.vTee != nil || p.aTee != nil { return fmt.Errorf("source already added") } src, err := gst.NewElement("rtspsrc") if err != nil { return fmt.Errorf("failed to create rtspsrc element: %w", err) } if err := src.SetProperty("location", rtspURL.String()); err != nil { return fmt.Errorf("failed to set rtspsrc location property: %w", err) } if err := src.SetProperty("latency", uint(200)); err != nil { return fmt.Errorf("failed to set rtspsrc latency property: %w", err) } p.vTee, err = gst.NewElement("tee") if err != nil { return fmt.Errorf("failed to create video tee element: %w", err) } if err := p.vTee.SetProperty("name", "video_tee"); err != nil { return fmt.Errorf("failed to set video tee name property: %w", err) } p.aTee, err = gst.NewElement("tee") if err != nil { return fmt.Errorf("failed to create audio tee element: %w", err) } if err := p.aTee.SetProperty("name", "audio_tee"); err != nil { return fmt.Errorf("failed to set audio tee name property: %w", err) } if err := p.pipeline.AddMany(src, p.vTee, p.aTee); err != nil { return fmt.Errorf("failed to add elements to pipeline: %w", err) } // Dynamically link the appropriate depayloader src.Connect("pad-added", func(element *gst.Element, pad *gst.Pad) { log := p.log.With("src", element.GetName(), "pad", pad.GetName()) caps := pad.GetCurrentCaps() if caps == nil || caps.GetSize() == 0 { log.Error("Ignoring pad (no or empty caps)") return } if caps.GetSize() > 1 { // Note: should be impossible log.Warn("Pad has multiple structures, using the first one", "caps", caps.String()) } structure := caps.GetStructureAt(0) if structure == nil || structure.Name() != "application/x-rtp" { log.Error("Ignoring pad (not RTP)", "caps", caps.String()) return } mediaRaw, err := structure.GetValue("media") media, ok := mediaRaw.(string) if err != nil || !ok { log.Error("Ignoring pad (no media field)", "caps", caps.String()) return } encodingRaw, err := structure.GetValue("encoding-name") encoding, ok := encodingRaw.(string) if err != nil || !ok { log.Error("Ignoring pad (no encoding-name field)", "caps", caps.String()) return } var chain []*gst.Element switch media { case "video": switch encoding { case "H264": chain, err = p.addStaticChain(pad, p.vTee.GetStaticPad("sink"), "rtph264depay", "h264parse") case "H265": chain, err = p.addStaticChain(pad, p.vTee.GetStaticPad("sink"), "rtph265depay", "h265parse") default: log.Error("Ignoring video pad (unsupported encoding)", "caps", caps.String()) return } case "audio": switch encoding { case "MPEG4-GENERIC": chain, err = p.addStaticChain(pad, p.aTee.GetStaticPad("sink"), "rtpmp4gdepay", "aacparse") default: log.Error("Ignoring audio pad (unsupported encoding)", "caps", caps.String()) return } default: log.Error("Ignoring pad (unsupported media)", "caps", caps.String()) return } if err != nil { log.Error("Failed to add depayloader", "caps", caps.String(), "error", err) return } for _, element := range chain { element.SyncStateWithParent() } }) return nil } func (p *CameraPipeline) addStaticChain(src *gst.Pad, sink *gst.Pad, chain ...string) ([]*gst.Element, error) { factories := make([]elementFactory, len(chain)) for i, name := range chain { factories[i] = elementFactory{name: name, props: nil} } return p.addStaticChainProps(src, sink, factories...) } type elementFactory struct { name string props map[string]any } func (p *CameraPipeline) addStaticChainProps(src *gst.Pad, sink *gst.Pad, chain ...elementFactory) ([]*gst.Element, error) { chainElements := make([]*gst.Element, len(chain)) last := src for i, factory := range chain { elem, err := gst.NewElement(factory.name) if err != nil { return nil, fmt.Errorf("failed to create %s: %w", factory.name, err) } for propName, propValue := range factory.props { if err := elem.SetProperty(propName, propValue); err != nil { // try to set enum property valueErr := setGlibValueProperty(elem, propName, propValue) if valueErr != nil { p.log.Warn("Failed to set property glib value", "name", propName, "value", propValue, "error", valueErr) return nil, fmt.Errorf("failed to set property %s on %s: %w", propName, factory.name, err) } } } if err := p.pipeline.Add(elem); err != nil { return nil, fmt.Errorf("failed to add %s to pipeline: %w", factory.name, err) } if last != nil { if res := last.Link(elem.GetStaticPad("sink")); res != gst.PadLinkOK { return nil, fmt.Errorf("failed to link %s to src: %s", factory.name, res) } } last = elem.GetStaticPad("src") chainElements[i] = elem } if sink != nil { if res := last.Link(sink); res != gst.PadLinkOK { return nil, fmt.Errorf("failed to link depayloader chain to sink: %s", res) } } return chainElements, nil } func gstBufferToBytes(buf *gst.Buffer) ([]byte, error) { mapInfo := buf.Map(gst.MapRead) if mapInfo == nil { return nil, fmt.Errorf("failed to map buffer") } defer buf.Unmap() data := make([]byte, mapInfo.Size()) if _, err := io.ReadFull(mapInfo.Reader(), data); err != nil { return nil, fmt.Errorf("failed to read buffer data: %w", err) } return data, nil } func setGlibValueProperty(elem *gst.Element, name string, value any) error { propType, err := elem.GetPropertyType(name) if err != nil { return fmt.Errorf("get property type %s: %w", name, err) } v, err := glib.ValueInit(propType) if err != nil { return fmt.Errorf("init gvalue for %s: %w", name, err) } val := reflect.ValueOf(value) switch { case propType.IsA(glib.TYPE_ENUM) && val.CanInt(): v.SetEnum(int(val.Int())) // safe because these config values are all done locally and guaranteed to be within int32 case propType.IsA(glib.TYPE_ENUM) && val.CanUint(): v.SetEnum(int(val.Uint())) // safe because these config values are all done locally and guaranteed to be within int32 case propType.IsA(glib.TYPE_FLAGS) && val.CanInt(): v.SetFlags(uint(val.Int())) // safe because these config values are all done locally and guaranteed to be within uint32 case propType.IsA(glib.TYPE_FLAGS) && val.CanUint(): v.SetFlags(uint(val.Uint())) // safe because these config values are all done locally and guaranteed to be within uint32 default: return fmt.Errorf("unsupported property type for %s: %T (kind = %s), expected %s", name, value, val.Kind(), propType.Name()) } if err := elem.SetPropertyValue(name, v); err != nil { return fmt.Errorf("failed to set glib value property %s: %w", name, err) } return nil }