Files
cctv/server/api.go
2026-04-23 23:49:10 +01:00

115 lines
2.7 KiB
Go

package main
import (
"context"
"errors"
"io"
"net/http"
"github.com/gorilla/websocket"
"github.com/pocketbase/pocketbase/core"
)
var liveWSUpgrader = websocket.Upgrader{
CheckOrigin: func(_ *http.Request) bool {
return true
},
}
func registerAPI(se *core.ServeEvent) {
group := se.Router.Group("/api/cctv")
group.GET("/thumb/{streamId}", func(e *core.RequestEvent) error {
streamId := e.Request.PathValue("streamId")
if streamId == "" {
return e.BadRequestError("Missing stream ID", nil)
}
img, err := ingestService.GetThumbnail(e.Request.Context(), streamId)
if err != nil {
return e.InternalServerError("Failed to get thumbnail", err)
}
if img == nil {
return e.NotFoundError("Thumbnail not found", nil)
}
e.Response.Header().Set("Content-Type", "image/jpeg")
e.Response.Header().Set("Cache-Control", "no-cache, no-store, must-revalidate")
_, err = io.Copy(e.Response, img)
if err != nil {
return e.InternalServerError("Failed to write thumbnail", err)
}
return nil
})
group.GET("/live/{streamId}", func(e *core.RequestEvent) error {
return e.BadRequestError("Use /api/cctv/live/ws/{streamId} for live streaming", nil)
})
group.GET("/live/ws/{streamId}", func(e *core.RequestEvent) error {
streamId := e.Request.PathValue("streamId")
if streamId == "" {
return e.BadRequestError("Missing stream ID", nil)
}
conn, err := liveWSUpgrader.Upgrade(e.Response, e.Request, nil)
if err != nil {
return e.InternalServerError("Failed to upgrade websocket connection", err)
}
defer conn.Close()
ctx, cancel := context.WithCancel(e.Request.Context())
defer cancel()
stream, err := ingestService.SubscribeLive(ctx, streamId)
if err != nil {
writeLiveWSError(conn, err.Error())
return nil
}
clientDone := make(chan struct{})
go func() {
defer close(clientDone)
for {
if _, _, err := conn.ReadMessage(); err != nil {
return
}
}
}()
for {
select {
case <-ctx.Done():
return nil
case <-clientDone:
return nil
case chunk, ok := <-stream:
if !ok {
writeLiveWSError(conn, "live stream ended")
return nil
}
if chunk == nil {
continue
}
payload, err := io.ReadAll(chunk)
if err != nil {
writeLiveWSError(conn, "failed to read stream chunk")
return nil
}
if err := conn.WriteMessage(websocket.BinaryMessage, payload); err != nil {
if errors.Is(err, websocket.ErrCloseSent) {
return nil
}
return nil
}
}
}
})
}
func writeLiveWSError(conn *websocket.Conn, msg string) {
payload := make([]byte, len(msg)+1)
payload[0] = 0x03
copy(payload[1:], []byte(msg))
_ = conn.WriteMessage(websocket.BinaryMessage, payload)
}