demo live stream

This commit is contained in:
2026-04-23 23:49:10 +01:00
parent bee7869af4
commit 285533c13e
9 changed files with 837 additions and 16 deletions

View File

@@ -10,6 +10,12 @@ import (
"github.com/go-gst/go-gst/gst/app"
)
const (
liveFrameTypeInit = 0x01
liveFrameTypeMedia = 0x02
liveFrameTypeError = 0x03
)
const (
liveSampleTimeout = 200 * time.Millisecond
liveSampleBufferSize = 5
@@ -97,7 +103,7 @@ func (p *CameraPipeline) addLiveBranch(branchTimeout time.Duration) error {
if err != nil {
return fmt.Errorf("failed to create mp4mux element: %w", err)
}
if err := mp4Mux.SetProperty("fragment-duration", uint(100*time.Millisecond.Nanoseconds())); err != nil {
if err := mp4Mux.SetProperty("fragment-duration", uint(100)); err != nil {
return fmt.Errorf("failed to set mp4mux fragment-duration property: %w", err)
}
if err := setGlibValueProperty(mp4Mux, "fragment-mode", 0); err != nil { // dash-or-mss
@@ -193,6 +199,8 @@ SamplerLoop:
func (p *CameraPipeline) liveManager(ctx context.Context) {
active := false
var initSeg []byte
assembler := newFMP4Assembler()
var lastMediaEmit time.Time
subscribers := &subscribersManager{m: make(map[int]subscriber), nextID: 0}
samples := make(chan []byte, liveSampleBufferSize)
samplerCtx, samplerCancel := context.WithCancel(ctx)
@@ -217,6 +225,9 @@ func (p *CameraPipeline) liveManager(ctx context.Context) {
case req := <-p.liveSubscribe:
if !active {
p.log.Info("Activating live branch")
initSeg = nil
assembler = newFMP4Assembler()
lastMediaEmit = time.Time{}
if err := p.liveVValve.SetProperty("drop", false); err != nil {
req.Result <- subscribeRes{Err: fmt.Errorf("failed to activate video valve: %w", err)}
continue
@@ -244,10 +255,29 @@ func (p *CameraPipeline) liveManager(ctx context.Context) {
case req := <-p.liveUnsubscribe:
subscribers.RemoveSubscriber(req.Id)
case data := <-samples:
// TODO : init segment
p.log.Debug("got video sample")
chunks, err := assembler.Push(data)
if err != nil {
p.log.Warn("Failed to assemble live fMP4 chunks", "error", err)
continue
}
numSamples += 1
subscribers.Broadcast(data)
for _, chunk := range chunks {
framed := marshalLiveFrame(chunk.FrameType, chunk.Payload)
switch chunk.FrameType {
case liveFrameTypeInit:
initSeg = framed
p.log.Debug("Emitted init segment", "size", len(chunk.Payload))
case liveFrameTypeMedia:
now := time.Now()
if !lastMediaEmit.IsZero() {
p.log.Debug("Emitted media segment", "size", len(chunk.Payload), "gap", now.Sub(lastMediaEmit))
} else {
p.log.Debug("Emitted media segment", "size", len(chunk.Payload))
}
lastMediaEmit = now
}
subscribers.Broadcast(framed)
}
case <-timeout:
p.log.Info("Deactivating live branch due to inactivity")
if err := p.liveVValve.SetProperty("drop", true); err != nil {
@@ -263,6 +293,13 @@ func (p *CameraPipeline) liveManager(ctx context.Context) {
}
}
func marshalLiveFrame(frameType byte, payload []byte) []byte {
framed := make([]byte, len(payload)+1)
framed[0] = frameType
copy(framed[1:], payload)
return framed
}
func (p *CameraPipeline) LiveSubscribe(timeout time.Duration, bufferSize int) (id int, stream <-chan *bytes.Reader, err error) {
result := make(chan subscribeRes)
req := subscribeReq{

View File

@@ -0,0 +1,139 @@
package ingest
import (
"encoding/binary"
"fmt"
)
type liveChunk struct {
FrameType byte
Payload []byte
}
type fmp4Assembler struct {
pending []byte
initReady bool
initBuf []byte
mediaPrefix []byte
inFragment bool
fragmentBuf []byte
}
func newFMP4Assembler() *fmp4Assembler {
return &fmp4Assembler{}
}
func (a *fmp4Assembler) Push(data []byte) ([]liveChunk, error) {
if len(data) == 0 {
return nil, nil
}
a.pending = append(a.pending, data...)
result := make([]liveChunk, 0, 2)
for {
box, boxType, ok, err := a.nextBox()
if err != nil {
return nil, err
}
if !ok {
break
}
chunks, err := a.consumeBox(boxType, box)
if err != nil {
return nil, err
}
result = append(result, chunks...)
}
return result, nil
}
func (a *fmp4Assembler) nextBox() ([]byte, string, bool, error) {
if len(a.pending) < 8 {
return nil, "", false, nil
}
headerLen := 8
size := uint64(binary.BigEndian.Uint32(a.pending[:4]))
if size == 1 {
if len(a.pending) < 16 {
return nil, "", false, nil
}
headerLen = 16
size = binary.BigEndian.Uint64(a.pending[8:16])
}
if size == 0 {
// 0 means box runs to stream end; wait for more data.
return nil, "", false, nil
}
if size < uint64(headerLen) {
return nil, "", false, fmt.Errorf("invalid MP4 box size %d", size)
}
if size > uint64(len(a.pending)) {
return nil, "", false, nil
}
boxLen := int(size)
boxType := string(a.pending[4:8])
box := make([]byte, boxLen)
copy(box, a.pending[:boxLen])
a.pending = a.pending[boxLen:]
return box, boxType, true, nil
}
func (a *fmp4Assembler) consumeBox(boxType string, box []byte) ([]liveChunk, error) {
if !a.initReady {
a.initBuf = append(a.initBuf, box...)
if boxType != "moov" {
return nil, nil
}
a.initReady = true
initPayload := cloneBytes(a.initBuf)
a.initBuf = nil
return []liveChunk{{FrameType: liveFrameTypeInit, Payload: initPayload}}, nil
}
switch boxType {
case "styp", "sidx", "prft":
if a.inFragment {
a.fragmentBuf = append(a.fragmentBuf, box...)
} else {
a.mediaPrefix = append(a.mediaPrefix, box...)
}
return nil, nil
case "moof":
a.fragmentBuf = a.fragmentBuf[:0]
if len(a.mediaPrefix) > 0 {
a.fragmentBuf = append(a.fragmentBuf, a.mediaPrefix...)
a.mediaPrefix = a.mediaPrefix[:0]
}
a.fragmentBuf = append(a.fragmentBuf, box...)
a.inFragment = true
return nil, nil
case "mdat":
if !a.inFragment {
return nil, nil
}
a.fragmentBuf = append(a.fragmentBuf, box...)
mediaPayload := cloneBytes(a.fragmentBuf)
a.fragmentBuf = a.fragmentBuf[:0]
a.inFragment = false
return []liveChunk{{FrameType: liveFrameTypeMedia, Payload: mediaPayload}}, nil
default:
if a.inFragment {
a.fragmentBuf = append(a.fragmentBuf, box...)
}
return nil, nil
}
}
func cloneBytes(data []byte) []byte {
if len(data) == 0 {
return nil
}
out := make([]byte, len(data))
copy(out, data)
return out
}

View File

@@ -154,13 +154,26 @@ func (ingest *Ingest) SubscribeLive(ctx context.Context, streamId string) (<-cha
if err != nil {
return nil, fmt.Errorf("failed to subscribe to live stream %s: %w", streamId, err)
}
defer active.pipeline.LiveUnsubscribe(id)
for {
select {
case <-ctx.Done():
return nil, nil
case buf := <-stream:
ingest.log.Debug("Received live stream chunk", "streamId", streamId, "chunkSize", buf.Len())
out := make(chan *bytes.Reader, 16)
go func() {
defer close(out)
defer active.pipeline.LiveUnsubscribe(id)
for {
select {
case <-ctx.Done():
return
case buf, ok := <-stream:
if !ok {
return
}
select {
case <-ctx.Done():
return
case out <- buf:
}
}
}
}
}()
return out, nil
}

View File

@@ -197,9 +197,19 @@ func (p *CameraPipeline) addRtspSource(rtspURL *url.URL) error {
case "video":
switch encoding {
case "H264":
chain, err = p.addStaticChain(pad, p.vTee.GetStaticPad("sink"), "rtph264depay", "h264parse")
chain, err = p.addStaticChainProps(
pad,
p.vTee.GetStaticPad("sink"),
elementFactory{name: "rtph264depay"},
elementFactory{name: "h264parse", props: map[string]any{"config-interval": int(-1)}},
)
case "H265":
chain, err = p.addStaticChain(pad, p.vTee.GetStaticPad("sink"), "rtph265depay", "h265parse")
chain, err = p.addStaticChainProps(
pad,
p.vTee.GetStaticPad("sink"),
elementFactory{name: "rtph265depay"},
elementFactory{name: "h265parse", props: map[string]any{"config-interval": int(-1)}},
)
default:
log.Error("Ignoring video pad (unsupported encoding)", "caps", caps.String())
return