From 285533c13e45a92d08ea0b4c362fddd1e815bdd5 Mon Sep 17 00:00:00 2001 From: Gleb Koval Date: Thu, 23 Apr 2026 23:49:10 +0100 Subject: [PATCH] demo live stream --- server/api.go | 77 +++++- server/go.mod | 1 + server/go.sum | 2 + server/ingest/branch_live.go | 45 +++- server/ingest/fmp4_assembler.go | 139 ++++++++++ server/ingest/ingest.go | 29 +- server/ingest/pipeline.go | 14 +- web/src/lib/live-mse.ts | 154 +++++++++++ web/src/routes/cam-poc/+page.svelte | 392 ++++++++++++++++++++++++++++ 9 files changed, 837 insertions(+), 16 deletions(-) create mode 100644 server/ingest/fmp4_assembler.go create mode 100644 web/src/lib/live-mse.ts create mode 100644 web/src/routes/cam-poc/+page.svelte diff --git a/server/api.go b/server/api.go index 53cca10..cea66f2 100644 --- a/server/api.go +++ b/server/api.go @@ -1,11 +1,22 @@ package main import ( + "context" + "errors" "io" + "net/http" + + "github.com/gorilla/websocket" "github.com/pocketbase/pocketbase/core" ) +var liveWSUpgrader = websocket.Upgrader{ + CheckOrigin: func(_ *http.Request) bool { + return true + }, +} + func registerAPI(se *core.ServeEvent) { group := se.Router.Group("/api/cctv") @@ -31,11 +42,73 @@ func registerAPI(se *core.ServeEvent) { }) group.GET("/live/{streamId}", func(e *core.RequestEvent) error { + return e.BadRequestError("Use /api/cctv/live/ws/{streamId} for live streaming", nil) + }) + + group.GET("/live/ws/{streamId}", func(e *core.RequestEvent) error { streamId := e.Request.PathValue("streamId") if streamId == "" { return e.BadRequestError("Missing stream ID", nil) } - ingestService.SubscribeLive(e.Request.Context(), streamId) - return nil + + conn, err := liveWSUpgrader.Upgrade(e.Response, e.Request, nil) + if err != nil { + return e.InternalServerError("Failed to upgrade websocket connection", err) + } + defer conn.Close() + + ctx, cancel := context.WithCancel(e.Request.Context()) + defer cancel() + + stream, err := ingestService.SubscribeLive(ctx, streamId) + if err != nil { + writeLiveWSError(conn, err.Error()) + return nil + } + + clientDone := make(chan struct{}) + go func() { + defer close(clientDone) + for { + if _, _, err := conn.ReadMessage(); err != nil { + return + } + } + }() + + for { + select { + case <-ctx.Done(): + return nil + case <-clientDone: + return nil + case chunk, ok := <-stream: + if !ok { + writeLiveWSError(conn, "live stream ended") + return nil + } + if chunk == nil { + continue + } + payload, err := io.ReadAll(chunk) + if err != nil { + writeLiveWSError(conn, "failed to read stream chunk") + return nil + } + if err := conn.WriteMessage(websocket.BinaryMessage, payload); err != nil { + if errors.Is(err, websocket.ErrCloseSent) { + return nil + } + return nil + } + } + } }) } + +func writeLiveWSError(conn *websocket.Conn, msg string) { + payload := make([]byte, len(msg)+1) + payload[0] = 0x03 + copy(payload[1:], []byte(msg)) + _ = conn.WriteMessage(websocket.BinaryMessage, payload) +} diff --git a/server/go.mod b/server/go.mod index 60f157a..f57dcff 100644 --- a/server/go.mod +++ b/server/go.mod @@ -6,6 +6,7 @@ require ( github.com/caarlos0/env/v11 v11.4.0 github.com/go-gst/go-glib v1.4.1-0.20250303082535-35ebad1471fd github.com/go-gst/go-gst v1.4.0 + github.com/gorilla/websocket v1.5.3 github.com/gowvp/onvif v0.0.14 github.com/joho/godotenv v1.5.1 github.com/pocketbase/pocketbase v0.36.7 diff --git a/server/go.sum b/server/go.sum index b33bc2e..12435d2 100644 --- a/server/go.sum +++ b/server/go.sum @@ -45,6 +45,8 @@ github.com/google/pprof v0.0.0-20260115054156-294ebfa9ad83 h1:z2ogiKUYzX5Is6zr/v github.com/google/pprof v0.0.0-20260115054156-294ebfa9ad83/go.mod h1:MxpfABSjhmINe3F1It9d+8exIHFvUqtLIRCdOGNXqiI= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gowvp/onvif v0.0.14 h1:NNrFqzqBHf9Z9MEQOiDpunpagXUHQraRjFmyiXhUwr4= github.com/gowvp/onvif v0.0.14/go.mod h1:Dshr55Q/Xgwa9XMQBPBQBMOWj/2Sq+DxLhdNY35uoFc= github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= diff --git a/server/ingest/branch_live.go b/server/ingest/branch_live.go index c3436dc..610b562 100644 --- a/server/ingest/branch_live.go +++ b/server/ingest/branch_live.go @@ -10,6 +10,12 @@ import ( "github.com/go-gst/go-gst/gst/app" ) +const ( + liveFrameTypeInit = 0x01 + liveFrameTypeMedia = 0x02 + liveFrameTypeError = 0x03 +) + const ( liveSampleTimeout = 200 * time.Millisecond liveSampleBufferSize = 5 @@ -97,7 +103,7 @@ func (p *CameraPipeline) addLiveBranch(branchTimeout time.Duration) error { if err != nil { return fmt.Errorf("failed to create mp4mux element: %w", err) } - if err := mp4Mux.SetProperty("fragment-duration", uint(100*time.Millisecond.Nanoseconds())); err != nil { + if err := mp4Mux.SetProperty("fragment-duration", uint(100)); err != nil { return fmt.Errorf("failed to set mp4mux fragment-duration property: %w", err) } if err := setGlibValueProperty(mp4Mux, "fragment-mode", 0); err != nil { // dash-or-mss @@ -193,6 +199,8 @@ SamplerLoop: func (p *CameraPipeline) liveManager(ctx context.Context) { active := false var initSeg []byte + assembler := newFMP4Assembler() + var lastMediaEmit time.Time subscribers := &subscribersManager{m: make(map[int]subscriber), nextID: 0} samples := make(chan []byte, liveSampleBufferSize) samplerCtx, samplerCancel := context.WithCancel(ctx) @@ -217,6 +225,9 @@ func (p *CameraPipeline) liveManager(ctx context.Context) { case req := <-p.liveSubscribe: if !active { p.log.Info("Activating live branch") + initSeg = nil + assembler = newFMP4Assembler() + lastMediaEmit = time.Time{} if err := p.liveVValve.SetProperty("drop", false); err != nil { req.Result <- subscribeRes{Err: fmt.Errorf("failed to activate video valve: %w", err)} continue @@ -244,10 +255,29 @@ func (p *CameraPipeline) liveManager(ctx context.Context) { case req := <-p.liveUnsubscribe: subscribers.RemoveSubscriber(req.Id) case data := <-samples: - // TODO : init segment - p.log.Debug("got video sample") + chunks, err := assembler.Push(data) + if err != nil { + p.log.Warn("Failed to assemble live fMP4 chunks", "error", err) + continue + } numSamples += 1 - subscribers.Broadcast(data) + for _, chunk := range chunks { + framed := marshalLiveFrame(chunk.FrameType, chunk.Payload) + switch chunk.FrameType { + case liveFrameTypeInit: + initSeg = framed + p.log.Debug("Emitted init segment", "size", len(chunk.Payload)) + case liveFrameTypeMedia: + now := time.Now() + if !lastMediaEmit.IsZero() { + p.log.Debug("Emitted media segment", "size", len(chunk.Payload), "gap", now.Sub(lastMediaEmit)) + } else { + p.log.Debug("Emitted media segment", "size", len(chunk.Payload)) + } + lastMediaEmit = now + } + subscribers.Broadcast(framed) + } case <-timeout: p.log.Info("Deactivating live branch due to inactivity") if err := p.liveVValve.SetProperty("drop", true); err != nil { @@ -263,6 +293,13 @@ func (p *CameraPipeline) liveManager(ctx context.Context) { } } +func marshalLiveFrame(frameType byte, payload []byte) []byte { + framed := make([]byte, len(payload)+1) + framed[0] = frameType + copy(framed[1:], payload) + return framed +} + func (p *CameraPipeline) LiveSubscribe(timeout time.Duration, bufferSize int) (id int, stream <-chan *bytes.Reader, err error) { result := make(chan subscribeRes) req := subscribeReq{ diff --git a/server/ingest/fmp4_assembler.go b/server/ingest/fmp4_assembler.go new file mode 100644 index 0000000..9c34ee0 --- /dev/null +++ b/server/ingest/fmp4_assembler.go @@ -0,0 +1,139 @@ +package ingest + +import ( + "encoding/binary" + "fmt" +) + +type liveChunk struct { + FrameType byte + Payload []byte +} + +type fmp4Assembler struct { + pending []byte + + initReady bool + initBuf []byte + + mediaPrefix []byte + inFragment bool + fragmentBuf []byte +} + +func newFMP4Assembler() *fmp4Assembler { + return &fmp4Assembler{} +} + +func (a *fmp4Assembler) Push(data []byte) ([]liveChunk, error) { + if len(data) == 0 { + return nil, nil + } + a.pending = append(a.pending, data...) + + result := make([]liveChunk, 0, 2) + for { + box, boxType, ok, err := a.nextBox() + if err != nil { + return nil, err + } + if !ok { + break + } + chunks, err := a.consumeBox(boxType, box) + if err != nil { + return nil, err + } + result = append(result, chunks...) + } + return result, nil +} + +func (a *fmp4Assembler) nextBox() ([]byte, string, bool, error) { + if len(a.pending) < 8 { + return nil, "", false, nil + } + + headerLen := 8 + size := uint64(binary.BigEndian.Uint32(a.pending[:4])) + if size == 1 { + if len(a.pending) < 16 { + return nil, "", false, nil + } + headerLen = 16 + size = binary.BigEndian.Uint64(a.pending[8:16]) + } + if size == 0 { + // 0 means box runs to stream end; wait for more data. + return nil, "", false, nil + } + if size < uint64(headerLen) { + return nil, "", false, fmt.Errorf("invalid MP4 box size %d", size) + } + if size > uint64(len(a.pending)) { + return nil, "", false, nil + } + + boxLen := int(size) + boxType := string(a.pending[4:8]) + box := make([]byte, boxLen) + copy(box, a.pending[:boxLen]) + a.pending = a.pending[boxLen:] + return box, boxType, true, nil +} + +func (a *fmp4Assembler) consumeBox(boxType string, box []byte) ([]liveChunk, error) { + if !a.initReady { + a.initBuf = append(a.initBuf, box...) + if boxType != "moov" { + return nil, nil + } + + a.initReady = true + initPayload := cloneBytes(a.initBuf) + a.initBuf = nil + return []liveChunk{{FrameType: liveFrameTypeInit, Payload: initPayload}}, nil + } + + switch boxType { + case "styp", "sidx", "prft": + if a.inFragment { + a.fragmentBuf = append(a.fragmentBuf, box...) + } else { + a.mediaPrefix = append(a.mediaPrefix, box...) + } + return nil, nil + case "moof": + a.fragmentBuf = a.fragmentBuf[:0] + if len(a.mediaPrefix) > 0 { + a.fragmentBuf = append(a.fragmentBuf, a.mediaPrefix...) + a.mediaPrefix = a.mediaPrefix[:0] + } + a.fragmentBuf = append(a.fragmentBuf, box...) + a.inFragment = true + return nil, nil + case "mdat": + if !a.inFragment { + return nil, nil + } + a.fragmentBuf = append(a.fragmentBuf, box...) + mediaPayload := cloneBytes(a.fragmentBuf) + a.fragmentBuf = a.fragmentBuf[:0] + a.inFragment = false + return []liveChunk{{FrameType: liveFrameTypeMedia, Payload: mediaPayload}}, nil + default: + if a.inFragment { + a.fragmentBuf = append(a.fragmentBuf, box...) + } + return nil, nil + } +} + +func cloneBytes(data []byte) []byte { + if len(data) == 0 { + return nil + } + out := make([]byte, len(data)) + copy(out, data) + return out +} diff --git a/server/ingest/ingest.go b/server/ingest/ingest.go index 458d309..0c6ca99 100644 --- a/server/ingest/ingest.go +++ b/server/ingest/ingest.go @@ -154,13 +154,26 @@ func (ingest *Ingest) SubscribeLive(ctx context.Context, streamId string) (<-cha if err != nil { return nil, fmt.Errorf("failed to subscribe to live stream %s: %w", streamId, err) } - defer active.pipeline.LiveUnsubscribe(id) - for { - select { - case <-ctx.Done(): - return nil, nil - case buf := <-stream: - ingest.log.Debug("Received live stream chunk", "streamId", streamId, "chunkSize", buf.Len()) + out := make(chan *bytes.Reader, 16) + go func() { + defer close(out) + defer active.pipeline.LiveUnsubscribe(id) + for { + select { + case <-ctx.Done(): + return + case buf, ok := <-stream: + if !ok { + return + } + select { + case <-ctx.Done(): + return + case out <- buf: + } + } } - } + }() + + return out, nil } diff --git a/server/ingest/pipeline.go b/server/ingest/pipeline.go index 9427488..06c8a97 100644 --- a/server/ingest/pipeline.go +++ b/server/ingest/pipeline.go @@ -197,9 +197,19 @@ func (p *CameraPipeline) addRtspSource(rtspURL *url.URL) error { case "video": switch encoding { case "H264": - chain, err = p.addStaticChain(pad, p.vTee.GetStaticPad("sink"), "rtph264depay", "h264parse") + chain, err = p.addStaticChainProps( + pad, + p.vTee.GetStaticPad("sink"), + elementFactory{name: "rtph264depay"}, + elementFactory{name: "h264parse", props: map[string]any{"config-interval": int(-1)}}, + ) case "H265": - chain, err = p.addStaticChain(pad, p.vTee.GetStaticPad("sink"), "rtph265depay", "h265parse") + chain, err = p.addStaticChainProps( + pad, + p.vTee.GetStaticPad("sink"), + elementFactory{name: "rtph265depay"}, + elementFactory{name: "h265parse", props: map[string]any{"config-interval": int(-1)}}, + ) default: log.Error("Ignoring video pad (unsupported encoding)", "caps", caps.String()) return diff --git a/web/src/lib/live-mse.ts b/web/src/lib/live-mse.ts new file mode 100644 index 0000000..bfb6bda --- /dev/null +++ b/web/src/lib/live-mse.ts @@ -0,0 +1,154 @@ +export const LIVE_FRAME_INIT = 0x01; +export const LIVE_FRAME_MEDIA = 0x02; +export const LIVE_FRAME_ERROR = 0x03; + +export type LiveFrame = { + type: number; + payload: Uint8Array; +}; + +export function decodeLiveFrame(data: ArrayBuffer): LiveFrame | null { + const bytes = new Uint8Array(data); + if (bytes.length === 0) { + return null; + } + return { + type: bytes[0], + payload: bytes.slice(1) + }; +} + +export class SourceBufferAppender { + private readonly sourceBuffer: SourceBuffer; + private readonly onError: (err: unknown) => void; + private readonly queue: Uint8Array[] = []; + private disposed = false; + + constructor(sourceBuffer: SourceBuffer, onError: (err: unknown) => void) { + this.sourceBuffer = sourceBuffer; + this.onError = onError; + this.onUpdateEnd = this.onUpdateEnd.bind(this); + this.sourceBuffer.addEventListener("updateend", this.onUpdateEnd); + } + + append(segment: Uint8Array) { + if (this.disposed || segment.length === 0) { + return; + } + this.queue.push(segment); + this.drain(); + } + + pendingSegments(): number { + return this.queue.length; + } + + dispose() { + if (this.disposed) { + return; + } + this.disposed = true; + this.queue.length = 0; + this.sourceBuffer.removeEventListener("updateend", this.onUpdateEnd); + } + + private onUpdateEnd() { + this.drain(); + } + + private drain() { + if (this.disposed || this.sourceBuffer.updating || this.queue.length === 0) { + return; + } + const next = this.queue.shift(); + if (!next) { + return; + } + try { + const raw = next.buffer.slice(next.byteOffset, next.byteOffset + next.byteLength) as ArrayBuffer; + this.sourceBuffer.appendBuffer(raw); + } catch (err) { + this.onError(err); + } + } +} + +export function pickLiveMimeType(initSegment: Uint8Array): string | null { + const hasAvc1 = includesASCII(initSegment, "avc1"); + const hasHvc1 = includesASCII(initSegment, "hvc1"); + const hasHev1 = includesASCII(initSegment, "hev1"); + const avcCodec = parseAvcCodec(initSegment); + + const candidates: string[] = []; + if (hasAvc1 && avcCodec) { + candidates.push(`video/mp4; codecs="${avcCodec},mp4a.40.2"`); + candidates.push(`video/mp4; codecs="${avcCodec}"`); + } + if (hasAvc1) { + candidates.push('video/mp4; codecs="avc1.640028,mp4a.40.2"'); + candidates.push('video/mp4; codecs="avc1.42E01E,mp4a.40.2"'); + } + if (hasHvc1) { + candidates.push('video/mp4; codecs="hvc1,mp4a.40.2"'); + candidates.push('video/mp4; codecs="hvc1"'); + } + if (hasHev1) { + candidates.push('video/mp4; codecs="hev1,mp4a.40.2"'); + candidates.push('video/mp4; codecs="hev1"'); + } + candidates.push("video/mp4"); + + for (const candidate of candidates) { + if (MediaSource.isTypeSupported(candidate)) { + return candidate; + } + } + return null; +} + +export function buildLiveWsUrl(streamId: string): string { + const current = new URL(window.location.href); + const apiBaseParam = current.searchParams.get("apiBase"); + const apiBase = apiBaseParam ? new URL(apiBaseParam, current) : current; + const protocol = apiBase.protocol === "https:" ? "wss:" : "ws:"; + return `${protocol}//${apiBase.host}/api/cctv/live/ws/${encodeURIComponent(streamId)}`; +} + +function parseAvcCodec(data: Uint8Array): string | null { + const idx = indexOfASCII(data, "avcC"); + if (idx < 0 || idx+8 >= data.length) { + return null; + } + const profile = data[idx + 5]; + const compatibility = data[idx + 6]; + const level = data[idx + 7]; + return `avc1.${hex2(profile)}${hex2(compatibility)}${hex2(level)}`; +} + +function includesASCII(data: Uint8Array, text: string): boolean { + return indexOfASCII(data, text) >= 0; +} + +function indexOfASCII(data: Uint8Array, text: string): number { + if (text.length === 0 || text.length > data.length) { + return -1; + } + const codes = Array.from(text, (char) => char.charCodeAt(0)); + for (let i = 0; i <= data.length - codes.length; i++) { + let matched = true; + for (let j = 0; j < codes.length; j++) { + if (data[i + j] !== codes[j]) { + matched = false; + break; + } + } + if (matched) { + return i; + } + } + return -1; +} + +function hex2(value: number): string { + return value.toString(16).toUpperCase().padStart(2, "0"); +} diff --git a/web/src/routes/cam-poc/+page.svelte b/web/src/routes/cam-poc/+page.svelte new file mode 100644 index 0000000..7941a18 --- /dev/null +++ b/web/src/routes/cam-poc/+page.svelte @@ -0,0 +1,392 @@ + + +
+
+
+

Live Camera PoC

+

+ WebSocket binary stream to MSE using muxed fMP4 init/media segments. +

+
+ + + +
+

{wsUrl}

+
+ +
+
+ +
+
+
+ Status + {status} +
+
+ WebSocket + {wsState} +
+
+ SourceBuffer MIME + {sourceMime} +
+
+ Init frames + {initFrames} +
+
+ Media frames + {mediaFrames} +
+
+ Pending append queue + {pendingSegments} +
+
+
Buffered ranges
+
{bufferedRanges}
+
+ {#if errorText.length > 0} +
+ {errorText} +
+ {/if} + {#if playbackHint.length > 0} +
+

{playbackHint}

+ +
+ {/if} +
+
+
+