some fixes

This commit is contained in:
2026-03-11 23:15:54 +01:00
parent de707294ec
commit 8de0096644
7 changed files with 222 additions and 143 deletions

View File

@@ -14,7 +14,8 @@
"docker:login": "echo $REFURBILY_REGISTRY_ACCESS_TOKEN | docker login ${GITEA_REGISTRY_URL#https://} -u refurbily --password-stdin", "docker:login": "echo $REFURBILY_REGISTRY_ACCESS_TOKEN | docker login ${GITEA_REGISTRY_URL#https://} -u refurbily --password-stdin",
"docker:push": "docker tag grown:latest ${GITEA_REGISTRY_URL#https://}/tobi/grown:latest && docker push ${GITEA_REGISTRY_URL#https://}/tobi/grown:latest", "docker:push": "docker tag grown:latest ${GITEA_REGISTRY_URL#https://}/tobi/grown:latest && docker push ${GITEA_REGISTRY_URL#https://}/tobi/grown:latest",
"docker:build-push": "npm run docker:build && npm run docker:login && npm run docker:push", "docker:build-push": "npm run docker:build && npm run docker:login && npm run docker:push",
"deploy": "docker context create nas --docker \"host=ssh://nas\" 2>/dev/null || true && docker --context nas compose -f docker-compose.yml --env-file .env pull && docker --context nas compose -f docker-compose.yml --env-file .env up -d && docker context use default" "deploy": "docker context create nas --docker \"host=ssh://nas\" 2>/dev/null || true && docker --context nas compose -f docker-compose.yml --env-file .env pull && docker --context nas compose -f docker-compose.yml --env-file .env up -d && docker context use default",
"ship": "bun --bun run docker:build-push && bun --bun run deploy"
}, },
"devDependencies": { "devDependencies": {
"@lucide/svelte": "^0.577.0", "@lucide/svelte": "^0.577.0",

View File

@@ -11,7 +11,7 @@ import {
const TIMELAPSE_OUTPUT_PATH = const TIMELAPSE_OUTPUT_PATH =
process.env.TIMELAPSE_OUTPUT_PATH || "./timelapse"; process.env.TIMELAPSE_OUTPUT_PATH || "./timelapse";
const CAPTURE_INTERVAL = "*/10 * * * * *"; // Every 10 seconds const CAPTURE_INTERVAL = "0 * * * *"; // Every hour at minute 0
// Single camera configuration // Single camera configuration
const CAMERA_STREAM_ID = process.env.STREAM_ID || "camera-1"; const CAMERA_STREAM_ID = process.env.STREAM_ID || "camera-1";

View File

@@ -219,21 +219,6 @@
abortController = null; abortController = null;
} }
const response = await fetch("/api/stream", {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({
action: "stop",
streamId,
}),
});
if (!response.ok) {
throw new Error("Failed to stop stream");
}
isPlaying = false; isPlaying = false;
error = null; error = null;
frameCount = 0; frameCount = 0;
@@ -272,8 +257,8 @@
onMount(() => { onMount(() => {
// Handle page unload/reload // Handle page unload/reload
const handleBeforeUnload = () => { const handleBeforeUnload = () => {
if (isPlaying) { if (abortController) {
stopStream(); abortController.abort();
} }
}; };
@@ -301,12 +286,12 @@
</script> </script>
<svelte:head> <svelte:head>
<title>Keimli der II.</title> <title>Keimli die II.</title>
</svelte:head> </svelte:head>
<div class="min-h-screen bg-black py-8 px-4"> <div class="min-h-screen bg-black py-8 px-4">
<div class="w-full max-w-7xl mx-auto"> <div class="w-full max-w-7xl mx-auto">
<h1 class="text-white text-3xl font-bold mb-6">Keimli der II.</h1> <h1 class="text-white text-3xl font-bold mb-6">Keimli die II.</h1>
<div <div
bind:this={videoContainer} bind:this={videoContainer}

View File

@@ -1,14 +1,106 @@
import { spawn } from "child_process"; import { spawn } from "child_process";
interface StreamClient {
controller: ReadableStreamDefaultController<Uint8Array>;
id: string;
closed: boolean;
lastSeen: number;
}
interface StreamProcess { interface StreamProcess {
process: ReturnType<typeof spawn> | null; process: ReturnType<typeof spawn> | null;
rtspUrl: string; rtspUrl: string;
startedAt: Date; startedAt: Date;
clients: Set<string>; clients: Map<string, StreamClient>;
isListeningToStdout: boolean;
stopTimeout: NodeJS.Timeout | null;
} }
const activeStreams = new Map<string, StreamProcess>(); const activeStreams = new Map<string, StreamProcess>();
function setupStreamDataDistribution(streamId: string, stream: StreamProcess) {
if (stream.isListeningToStdout || !stream.process || !stream.process.stdout) {
return;
}
stream.isListeningToStdout = true;
const nodeStream = stream.process.stdout;
nodeStream.on("data", (chunk: Buffer) => {
const now = Date.now();
const deadClients: string[] = [];
// Distribute to all connected clients and detect dead ones
stream.clients.forEach((client, clientId) => {
if (client.closed) {
return;
}
try {
client.lastSeen = now;
client.controller.enqueue(new Uint8Array(chunk));
} catch (error) {
// Client controller is dead - mark for removal
console.log(
`[${streamId}] Client ${clientId} controller error (likely disconnected):`,
(error as Error).message,
);
client.closed = true;
deadClients.push(clientId);
}
});
// Remove dead clients
deadClients.forEach((clientId) => {
stream.clients.delete(clientId);
console.log(
`[${streamId}] ✗ Client disconnected (${stream.clients.size} remaining)`,
);
// If no clients remain, schedule stream stop
if (stream.clients.size === 0 && !stream.stopTimeout) {
console.log(
`[${streamId}] No clients connected. Stream will stop in 5 seconds unless a new client connects.`,
);
stream.stopTimeout = setTimeout(() => {
console.log(
`[${streamId}] 5 second grace period expired. Stopping stream.`,
);
stopStream(streamId);
}, 5000);
}
});
});
nodeStream.on("end", () => {
console.log(`[${streamId}] Stream process ended`);
stream.clients.forEach((client) => {
if (!client.closed) {
try {
client.controller.close();
client.closed = true;
} catch {
// Controller already closed
}
}
});
});
nodeStream.on("error", (error: Error) => {
console.error(`[${streamId}] Stream error:`, error);
stream.clients.forEach((client) => {
if (!client.closed) {
try {
client.controller.close();
client.closed = true;
} catch {
// Controller already closed
}
}
});
});
}
export function startStream( export function startStream(
streamId: string, streamId: string,
rtspUrl: string, rtspUrl: string,
@@ -42,6 +134,7 @@ export function startStream(
"pipe:1", "pipe:1",
]; ];
console.log(`[${streamId}] Starting stream from ${rtspUrl}`);
const ffmpegProcess = spawn("ffmpeg", args); const ffmpegProcess = spawn("ffmpeg", args);
ffmpegProcess.on("error", (err: Error) => { ffmpegProcess.on("error", (err: Error) => {
@@ -50,15 +143,22 @@ export function startStream(
}); });
ffmpegProcess.on("close", (code: number | null) => { ffmpegProcess.on("close", (code: number | null) => {
if (code !== 0) {
console.error(`[${streamId}] FFmpeg process closed with code ${code}`);
}
activeStreams.delete(streamId); activeStreams.delete(streamId);
}); });
activeStreams.set(streamId, { const stream: StreamProcess = {
process: ffmpegProcess, process: ffmpegProcess,
rtspUrl, rtspUrl,
startedAt: new Date(), startedAt: new Date(),
clients: new Set(), clients: new Map(),
}); isListeningToStdout: false,
stopTimeout: null,
};
activeStreams.set(streamId, stream);
return { return {
success: true, success: true,
@@ -74,34 +174,106 @@ export function startStream(
export function stopStream(streamId: string): void { export function stopStream(streamId: string): void {
const stream = activeStreams.get(streamId); const stream = activeStreams.get(streamId);
if (stream && stream.process) { if (!stream) {
return;
}
console.log(
`[${streamId}] Stopping stream (${stream.clients.size} clients connected)`,
);
// Cancel any pending stop timeout
if (stream.stopTimeout) {
clearTimeout(stream.stopTimeout);
stream.stopTimeout = null;
}
if (stream.process) {
stream.clients.forEach((client) => {
if (!client.closed) {
try {
client.controller.close();
client.closed = true;
} catch {
// Controller already closed
}
}
});
stream.process.kill("SIGTERM"); stream.process.kill("SIGTERM");
}
stream.clients.clear(); stream.clients.clear();
activeStreams.delete(streamId); activeStreams.delete(streamId);
} }
}
export function getStreamProcess(streamId: string) { export function getStreamProcess(streamId: string) {
return activeStreams.get(streamId); return activeStreams.get(streamId);
} }
export function addStreamClient(streamId: string, client: string): boolean { export function addStreamClient(
streamId: string,
clientId: string,
controller: ReadableStreamDefaultController<Uint8Array>,
): boolean {
const stream = activeStreams.get(streamId); const stream = activeStreams.get(streamId);
if (!stream) { if (!stream) {
console.error(`[${streamId}] Cannot add client - stream not found`);
return false; return false;
} }
stream.clients.add(client);
// Cancel any pending stop timeout since we have a new client
if (stream.stopTimeout) {
console.log(
`[${streamId}] Cancelling scheduled stream stop (new client connecting)`,
);
clearTimeout(stream.stopTimeout);
stream.stopTimeout = null;
}
// Setup data distribution if not already done
setupStreamDataDistribution(streamId, stream);
stream.clients.set(clientId, {
id: clientId,
controller,
closed: false,
lastSeen: Date.now(),
});
console.log(
`[${streamId}] ✓ Client connected (${stream.clients.size} clients total)`,
);
return true; return true;
} }
export function removeStreamClient(streamId: string, client: string): void { export function removeStreamClient(streamId: string, clientId: string): void {
const stream = activeStreams.get(streamId); const stream = activeStreams.get(streamId);
if (stream) { if (stream) {
stream.clients.delete(client); const clientsBeforeRemoval = stream.clients.size;
const client = stream.clients.get(clientId);
if (client && !client.closed) {
try {
client.controller.close();
client.closed = true;
} catch {
// Controller already closed
}
}
stream.clients.delete(clientId);
console.log(
`[${streamId}] ✗ Client disconnected (${clientsBeforeRemoval} -> ${stream.clients.size} clients)`,
);
// Auto-stop stream if no clients remain // If no clients remain, schedule stream stop with 5 second grace period
if (stream.clients.size === 0) { if (stream.clients.size === 0) {
console.log(
`[${streamId}] No clients connected. Stream will stop in 5 seconds unless a new client connects.`,
);
stream.stopTimeout = setTimeout(() => {
console.log(
`[${streamId}] 5 second grace period expired. Stopping stream.`,
);
stopStream(streamId); stopStream(streamId);
}, 5000);
} }
} }
} }

View File

@@ -2,10 +2,6 @@
import RTSPVideoPlayer from "$lib/components/RTSPVideoPlayer.svelte"; import RTSPVideoPlayer from "$lib/components/RTSPVideoPlayer.svelte";
</script> </script>
<svelte:head>
<title>RTSP Stream Viewer</title>
</svelte:head>
<div class="min-h-screen bg-linear-to-br from-gray-900 to-gray-800 py-8"> <div class="min-h-screen bg-linear-to-br from-gray-900 to-gray-800 py-8">
<RTSPVideoPlayer /> <RTSPVideoPlayer />
</div> </div>

View File

@@ -5,8 +5,6 @@ import {
getStreamStatus, getStreamStatus,
getAllStreams, getAllStreams,
getStreamProcess, getStreamProcess,
addStreamClient,
removeStreamClient,
} from "$lib/server/streaming.js"; } from "$lib/server/streaming.js";
export async function POST({ request }) { export async function POST({ request }) {
@@ -17,12 +15,19 @@ export async function POST({ request }) {
return json({ error: "Missing streamId or rtspUrl" }, { status: 400 }); return json({ error: "Missing streamId or rtspUrl" }, { status: 400 });
} }
// Check if stream already exists and is running
const existingStream = getStreamProcess(streamId);
if (existingStream && existingStream.process) {
return json({ success: true, streamId, reused: true });
}
// Stream doesn't exist, start a new one
const result = startStream(streamId, rtspUrl); const result = startStream(streamId, rtspUrl);
if (result.error) { if (result.error) {
return json({ error: result.error }, { status: 500 }); return json({ error: result.error }, { status: 500 });
} }
return json({ success: true, streamId }); return json({ success: true, streamId, reused: false });
} else if (action === "stop") { } else if (action === "stop") {
if (!streamId) { if (!streamId) {
return json({ error: "Missing streamId" }, { status: 400 }); return json({ error: "Missing streamId" }, { status: 400 });
@@ -44,54 +49,3 @@ export async function POST({ request }) {
return json({ error: "Unknown action" }, { status: 400 }); return json({ error: "Unknown action" }, { status: 400 });
} }
} }
export async function GET({ url }) {
const streamId = url.searchParams.get("id");
if (!streamId) {
return new Response("Missing stream ID", { status: 400 });
}
const stream = getStreamProcess(streamId);
if (!stream || !stream.process || !stream.process.stdout) {
return new Response("Stream not found", { status: 404 });
}
// Track client
const clientId = Math.random().toString(36).substring(7);
addStreamClient(streamId, clientId);
// Create readable stream from FFmpeg stdout
const readable = stream.process.stdout;
// Return as multipart/x-mixed-replace for MJPEG
const response = new Response(readable as any, {
headers: {
"Content-Type": "multipart/x-mixed-replace; boundary=FFMPEG",
"Cache-Control": "no-cache, no-store, must-revalidate",
Pragma: "no-cache",
Expires: "0",
Connection: "keep-alive",
},
});
// Track when connection closes
if (response.body) {
const reader = response.body.getReader();
(async () => {
try {
while (true) {
await reader.read();
}
} catch (e) {
// Connection closed
console.error("Error monitoring stream connection:", e);
} finally {
removeStreamClient(streamId, clientId);
reader.releaseLock();
}
})();
}
return response;
}

View File

@@ -17,65 +17,36 @@ export async function GET({ url }) {
} }
const clientId = Math.random().toString(36).substring(7); const clientId = Math.random().toString(36).substring(7);
addStreamClient(streamId, clientId); let cleanupDone = false;
// Create a web ReadableStream from the Node.js stream const cleanup = () => {
const nodeStream = stream.process.stdout; if (!cleanupDone) {
let controllerClosed = false; cleanupDone = true;
console.log(`[${streamId}] Client ${clientId} cleanup`);
const webReadableStream = new ReadableStream({
start(controller) {
nodeStream.on("data", (chunk) => {
try {
if (!controllerClosed) {
controller.enqueue(chunk);
}
} catch (error) {
console.error(`[Stream ${streamId}] Error enqueueing chunk:`, error);
controllerClosed = true;
}
});
nodeStream.on("end", () => {
try {
if (!controllerClosed) {
controller.close();
controllerClosed = true;
}
} catch (error) {
console.error(
`[Stream ${streamId}] Error closing controller:`,
error,
);
}
});
nodeStream.on("error", (error) => {
console.error(`[Stream ${streamId}] Node stream error:`, error);
try {
if (!controllerClosed) {
controller.close();
controllerClosed = true;
}
} catch (closeError) {
console.error(
`[Stream ${streamId}] Error closing controller on stream error:`,
closeError,
);
}
});
},
cancel() {
try {
controllerClosed = true;
removeStreamClient(streamId, clientId); removeStreamClient(streamId, clientId);
} catch (error) {
console.error(`[Stream ${streamId}] Error during cancel:`, error);
} }
};
const webReadableStream = new ReadableStream<Uint8Array>({
start(controller) {
console.log(`[${streamId}] Client ${clientId} connected`);
const success = addStreamClient(streamId, clientId, controller);
if (!success) {
console.log(`[${streamId}] Failed to add client ${clientId}`);
try {
controller.close();
} catch {
// Controller already closed
}
cleanup();
}
},
cancel(reason) {
console.log(`[${streamId}] Client ${clientId} cancelled:`, reason);
cleanup();
}, },
}); });
// Return the response with proper headers
return new Response(webReadableStream, { return new Response(webReadableStream, {
status: 200, status: 200,
headers: { headers: {