diff --git a/package.json b/package.json index e011960..9c3336a 100644 --- a/package.json +++ b/package.json @@ -14,7 +14,8 @@ "docker:login": "echo $REFURBILY_REGISTRY_ACCESS_TOKEN | docker login ${GITEA_REGISTRY_URL#https://} -u refurbily --password-stdin", "docker:push": "docker tag grown:latest ${GITEA_REGISTRY_URL#https://}/tobi/grown:latest && docker push ${GITEA_REGISTRY_URL#https://}/tobi/grown:latest", "docker:build-push": "npm run docker:build && npm run docker:login && npm run docker:push", - "deploy": "docker context create nas --docker \"host=ssh://nas\" 2>/dev/null || true && docker --context nas compose -f docker-compose.yml --env-file .env pull && docker --context nas compose -f docker-compose.yml --env-file .env up -d && docker context use default" + "deploy": "docker context create nas --docker \"host=ssh://nas\" 2>/dev/null || true && docker --context nas compose -f docker-compose.yml --env-file .env pull && docker --context nas compose -f docker-compose.yml --env-file .env up -d && docker context use default", + "ship": "bun --bun run docker:build-push && bun --bun run deploy" }, "devDependencies": { "@lucide/svelte": "^0.577.0", diff --git a/src/hooks.server.ts b/src/hooks.server.ts index 4300b16..e33bd8e 100644 --- a/src/hooks.server.ts +++ b/src/hooks.server.ts @@ -11,7 +11,7 @@ import { const TIMELAPSE_OUTPUT_PATH = process.env.TIMELAPSE_OUTPUT_PATH || "./timelapse"; -const CAPTURE_INTERVAL = "*/10 * * * * *"; // Every 10 seconds +const CAPTURE_INTERVAL = "0 * * * *"; // Every hour at minute 0 // Single camera configuration const CAMERA_STREAM_ID = process.env.STREAM_ID || "camera-1"; diff --git a/src/lib/components/RTSPVideoPlayer.svelte b/src/lib/components/RTSPVideoPlayer.svelte index 0c669fe..ac20f6d 100644 --- a/src/lib/components/RTSPVideoPlayer.svelte +++ b/src/lib/components/RTSPVideoPlayer.svelte @@ -219,21 +219,6 @@ abortController = null; } - const response = await fetch("/api/stream", { - method: "POST", - headers: { - "Content-Type": "application/json", - }, - body: JSON.stringify({ - action: "stop", - streamId, - }), - }); - - if (!response.ok) { - throw new Error("Failed to stop stream"); - } - isPlaying = false; error = null; frameCount = 0; @@ -272,8 +257,8 @@ onMount(() => { // Handle page unload/reload const handleBeforeUnload = () => { - if (isPlaying) { - stopStream(); + if (abortController) { + abortController.abort(); } }; @@ -301,12 +286,12 @@ - Keimli der II. + Keimli die II.
-

Keimli der II.

+

Keimli die II.

; + id: string; + closed: boolean; + lastSeen: number; +} + interface StreamProcess { process: ReturnType | null; rtspUrl: string; startedAt: Date; - clients: Set; + clients: Map; + isListeningToStdout: boolean; + stopTimeout: NodeJS.Timeout | null; } const activeStreams = new Map(); +function setupStreamDataDistribution(streamId: string, stream: StreamProcess) { + if (stream.isListeningToStdout || !stream.process || !stream.process.stdout) { + return; + } + + stream.isListeningToStdout = true; + const nodeStream = stream.process.stdout; + + nodeStream.on("data", (chunk: Buffer) => { + const now = Date.now(); + const deadClients: string[] = []; + + // Distribute to all connected clients and detect dead ones + stream.clients.forEach((client, clientId) => { + if (client.closed) { + return; + } + + try { + client.lastSeen = now; + client.controller.enqueue(new Uint8Array(chunk)); + } catch (error) { + // Client controller is dead - mark for removal + console.log( + `[${streamId}] Client ${clientId} controller error (likely disconnected):`, + (error as Error).message, + ); + client.closed = true; + deadClients.push(clientId); + } + }); + + // Remove dead clients + deadClients.forEach((clientId) => { + stream.clients.delete(clientId); + console.log( + `[${streamId}] ✗ Client disconnected (${stream.clients.size} remaining)`, + ); + + // If no clients remain, schedule stream stop + if (stream.clients.size === 0 && !stream.stopTimeout) { + console.log( + `[${streamId}] No clients connected. Stream will stop in 5 seconds unless a new client connects.`, + ); + stream.stopTimeout = setTimeout(() => { + console.log( + `[${streamId}] 5 second grace period expired. Stopping stream.`, + ); + stopStream(streamId); + }, 5000); + } + }); + }); + + nodeStream.on("end", () => { + console.log(`[${streamId}] Stream process ended`); + stream.clients.forEach((client) => { + if (!client.closed) { + try { + client.controller.close(); + client.closed = true; + } catch { + // Controller already closed + } + } + }); + }); + + nodeStream.on("error", (error: Error) => { + console.error(`[${streamId}] Stream error:`, error); + stream.clients.forEach((client) => { + if (!client.closed) { + try { + client.controller.close(); + client.closed = true; + } catch { + // Controller already closed + } + } + }); + }); +} + export function startStream( streamId: string, rtspUrl: string, @@ -42,6 +134,7 @@ export function startStream( "pipe:1", ]; + console.log(`[${streamId}] Starting stream from ${rtspUrl}`); const ffmpegProcess = spawn("ffmpeg", args); ffmpegProcess.on("error", (err: Error) => { @@ -50,15 +143,22 @@ export function startStream( }); ffmpegProcess.on("close", (code: number | null) => { + if (code !== 0) { + console.error(`[${streamId}] FFmpeg process closed with code ${code}`); + } activeStreams.delete(streamId); }); - activeStreams.set(streamId, { + const stream: StreamProcess = { process: ffmpegProcess, rtspUrl, startedAt: new Date(), - clients: new Set(), - }); + clients: new Map(), + isListeningToStdout: false, + stopTimeout: null, + }; + + activeStreams.set(streamId, stream); return { success: true, @@ -74,34 +174,106 @@ export function startStream( export function stopStream(streamId: string): void { const stream = activeStreams.get(streamId); - if (stream && stream.process) { - stream.process.kill("SIGTERM"); - stream.clients.clear(); - activeStreams.delete(streamId); + if (!stream) { + return; } + + console.log( + `[${streamId}] Stopping stream (${stream.clients.size} clients connected)`, + ); + + // Cancel any pending stop timeout + if (stream.stopTimeout) { + clearTimeout(stream.stopTimeout); + stream.stopTimeout = null; + } + + if (stream.process) { + stream.clients.forEach((client) => { + if (!client.closed) { + try { + client.controller.close(); + client.closed = true; + } catch { + // Controller already closed + } + } + }); + stream.process.kill("SIGTERM"); + } + + stream.clients.clear(); + activeStreams.delete(streamId); } export function getStreamProcess(streamId: string) { return activeStreams.get(streamId); } -export function addStreamClient(streamId: string, client: string): boolean { +export function addStreamClient( + streamId: string, + clientId: string, + controller: ReadableStreamDefaultController, +): boolean { const stream = activeStreams.get(streamId); if (!stream) { + console.error(`[${streamId}] Cannot add client - stream not found`); return false; } - stream.clients.add(client); + + // Cancel any pending stop timeout since we have a new client + if (stream.stopTimeout) { + console.log( + `[${streamId}] Cancelling scheduled stream stop (new client connecting)`, + ); + clearTimeout(stream.stopTimeout); + stream.stopTimeout = null; + } + + // Setup data distribution if not already done + setupStreamDataDistribution(streamId, stream); + + stream.clients.set(clientId, { + id: clientId, + controller, + closed: false, + lastSeen: Date.now(), + }); + console.log( + `[${streamId}] ✓ Client connected (${stream.clients.size} clients total)`, + ); return true; } -export function removeStreamClient(streamId: string, client: string): void { +export function removeStreamClient(streamId: string, clientId: string): void { const stream = activeStreams.get(streamId); if (stream) { - stream.clients.delete(client); + const clientsBeforeRemoval = stream.clients.size; + const client = stream.clients.get(clientId); + if (client && !client.closed) { + try { + client.controller.close(); + client.closed = true; + } catch { + // Controller already closed + } + } + stream.clients.delete(clientId); + console.log( + `[${streamId}] ✗ Client disconnected (${clientsBeforeRemoval} -> ${stream.clients.size} clients)`, + ); - // Auto-stop stream if no clients remain + // If no clients remain, schedule stream stop with 5 second grace period if (stream.clients.size === 0) { - stopStream(streamId); + console.log( + `[${streamId}] No clients connected. Stream will stop in 5 seconds unless a new client connects.`, + ); + stream.stopTimeout = setTimeout(() => { + console.log( + `[${streamId}] 5 second grace period expired. Stopping stream.`, + ); + stopStream(streamId); + }, 5000); } } } diff --git a/src/routes/+page.svelte b/src/routes/+page.svelte index 18c2000..5c40f06 100644 --- a/src/routes/+page.svelte +++ b/src/routes/+page.svelte @@ -2,10 +2,6 @@ import RTSPVideoPlayer from "$lib/components/RTSPVideoPlayer.svelte"; - - RTSP Stream Viewer - -
diff --git a/src/routes/api/stream/+server.ts b/src/routes/api/stream/+server.ts index 934637c..3ffb5be 100644 --- a/src/routes/api/stream/+server.ts +++ b/src/routes/api/stream/+server.ts @@ -5,8 +5,6 @@ import { getStreamStatus, getAllStreams, getStreamProcess, - addStreamClient, - removeStreamClient, } from "$lib/server/streaming.js"; export async function POST({ request }) { @@ -17,12 +15,19 @@ export async function POST({ request }) { return json({ error: "Missing streamId or rtspUrl" }, { status: 400 }); } + // Check if stream already exists and is running + const existingStream = getStreamProcess(streamId); + if (existingStream && existingStream.process) { + return json({ success: true, streamId, reused: true }); + } + + // Stream doesn't exist, start a new one const result = startStream(streamId, rtspUrl); if (result.error) { return json({ error: result.error }, { status: 500 }); } - return json({ success: true, streamId }); + return json({ success: true, streamId, reused: false }); } else if (action === "stop") { if (!streamId) { return json({ error: "Missing streamId" }, { status: 400 }); @@ -44,54 +49,3 @@ export async function POST({ request }) { return json({ error: "Unknown action" }, { status: 400 }); } } - -export async function GET({ url }) { - const streamId = url.searchParams.get("id"); - - if (!streamId) { - return new Response("Missing stream ID", { status: 400 }); - } - - const stream = getStreamProcess(streamId); - if (!stream || !stream.process || !stream.process.stdout) { - return new Response("Stream not found", { status: 404 }); - } - - // Track client - const clientId = Math.random().toString(36).substring(7); - addStreamClient(streamId, clientId); - - // Create readable stream from FFmpeg stdout - const readable = stream.process.stdout; - - // Return as multipart/x-mixed-replace for MJPEG - const response = new Response(readable as any, { - headers: { - "Content-Type": "multipart/x-mixed-replace; boundary=FFMPEG", - "Cache-Control": "no-cache, no-store, must-revalidate", - Pragma: "no-cache", - Expires: "0", - Connection: "keep-alive", - }, - }); - - // Track when connection closes - if (response.body) { - const reader = response.body.getReader(); - (async () => { - try { - while (true) { - await reader.read(); - } - } catch (e) { - // Connection closed - console.error("Error monitoring stream connection:", e); - } finally { - removeStreamClient(streamId, clientId); - reader.releaseLock(); - } - })(); - } - - return response; -} diff --git a/src/routes/api/stream/mjpeg/+server.ts b/src/routes/api/stream/mjpeg/+server.ts index 8c6803d..7636459 100644 --- a/src/routes/api/stream/mjpeg/+server.ts +++ b/src/routes/api/stream/mjpeg/+server.ts @@ -17,65 +17,36 @@ export async function GET({ url }) { } const clientId = Math.random().toString(36).substring(7); - addStreamClient(streamId, clientId); + let cleanupDone = false; - // Create a web ReadableStream from the Node.js stream - const nodeStream = stream.process.stdout; - let controllerClosed = false; + const cleanup = () => { + if (!cleanupDone) { + cleanupDone = true; + console.log(`[${streamId}] Client ${clientId} cleanup`); + removeStreamClient(streamId, clientId); + } + }; - const webReadableStream = new ReadableStream({ + const webReadableStream = new ReadableStream({ start(controller) { - nodeStream.on("data", (chunk) => { + console.log(`[${streamId}] Client ${clientId} connected`); + const success = addStreamClient(streamId, clientId, controller); + if (!success) { + console.log(`[${streamId}] Failed to add client ${clientId}`); try { - if (!controllerClosed) { - controller.enqueue(chunk); - } - } catch (error) { - console.error(`[Stream ${streamId}] Error enqueueing chunk:`, error); - controllerClosed = true; + controller.close(); + } catch { + // Controller already closed } - }); - - nodeStream.on("end", () => { - try { - if (!controllerClosed) { - controller.close(); - controllerClosed = true; - } - } catch (error) { - console.error( - `[Stream ${streamId}] Error closing controller:`, - error, - ); - } - }); - - nodeStream.on("error", (error) => { - console.error(`[Stream ${streamId}] Node stream error:`, error); - try { - if (!controllerClosed) { - controller.close(); - controllerClosed = true; - } - } catch (closeError) { - console.error( - `[Stream ${streamId}] Error closing controller on stream error:`, - closeError, - ); - } - }); - }, - cancel() { - try { - controllerClosed = true; - removeStreamClient(streamId, clientId); - } catch (error) { - console.error(`[Stream ${streamId}] Error during cancel:`, error); + cleanup(); } }, + cancel(reason) { + console.log(`[${streamId}] Client ${clientId} cancelled:`, reason); + cleanup(); + }, }); - // Return the response with proper headers return new Response(webReadableStream, { status: 200, headers: {