Compare commits

...

2 Commits

Author SHA1 Message Date
8de0096644 some fixes 2026-03-11 23:15:54 +01:00
de707294ec fix: capture when no stream is running 2026-03-11 22:36:03 +01:00
8 changed files with 402 additions and 156 deletions

22
docker-compose.yml Normal file
View File

@@ -0,0 +1,22 @@
services:
grown:
image: git.klemp.dev/tobi/grown:latest
container_name: grown
restart: unless-stopped
environment:
- NODE_ENV=production
volumes:
- /volume1/docker/grown/keimli2/:/app/timelapse/
networks:
- traefik-proxy
labels:
- "traefik.enable=true"
- traefik.docker.network=traefik-proxy
- "traefik.http.routers.grown.rule=Host(`keimli.klemp.local`)"
- "traefik.http.routers.grown.entrypoints=http"
- "traefik.http.services.grown.loadbalancer.server.port=3000"
- traefik.http.routers.grown.service=grown
networks:
traefik-proxy:
external: true

View File

@@ -9,7 +9,13 @@
"preview": "vite preview",
"prepare": "svelte-kit sync || echo ''",
"check": "svelte-kit sync && svelte-check --tsconfig ./tsconfig.json",
"check:watch": "svelte-kit sync && svelte-check --tsconfig ./tsconfig.json --watch"
"check:watch": "svelte-kit sync && svelte-check --tsconfig ./tsconfig.json --watch",
"docker:build": "docker build -t grown:latest .",
"docker:login": "echo $REFURBILY_REGISTRY_ACCESS_TOKEN | docker login ${GITEA_REGISTRY_URL#https://} -u refurbily --password-stdin",
"docker:push": "docker tag grown:latest ${GITEA_REGISTRY_URL#https://}/tobi/grown:latest && docker push ${GITEA_REGISTRY_URL#https://}/tobi/grown:latest",
"docker:build-push": "npm run docker:build && npm run docker:login && npm run docker:push",
"deploy": "docker context create nas --docker \"host=ssh://nas\" 2>/dev/null || true && docker --context nas compose -f docker-compose.yml --env-file .env pull && docker --context nas compose -f docker-compose.yml --env-file .env up -d && docker context use default",
"ship": "bun --bun run docker:build-push && bun --bun run deploy"
},
"devDependencies": {
"@lucide/svelte": "^0.577.0",

View File

@@ -2,42 +2,46 @@ import schedule from "node-schedule";
import { spawn } from "child_process";
import { existsSync, mkdirSync } from "fs";
import { join } from "path";
import { getAllStreams, getStreamProcess } from "$lib/server/streaming.js";
import {
getAllStreams,
getStreamProcess,
startStream,
stopStream,
} from "$lib/server/streaming.js";
const TIMELAPSE_OUTPUT_PATH =
process.env.TIMELAPSE_OUTPUT_PATH || "./timelapse";
const CAPTURE_INTERVAL = "0 * * * *"; // Every hour at minute 0
// Single camera configuration
const CAMERA_STREAM_ID = process.env.STREAM_ID || "camera-1";
const CAMERA_RTSP_URL =
process.env.RTSP_URL || "rtsp://Kamera_1:kamera_1@192.168.175.49/stream1";
let captureJob: any = null;
let initialized = false;
function getTimestamp() {
return new Date().toISOString();
}
function ensureOutputDirectory() {
if (!existsSync(TIMELAPSE_OUTPUT_PATH)) {
mkdirSync(TIMELAPSE_OUTPUT_PATH, { recursive: true });
}
}
function captureTimelapseFrame() {
const streams = getAllStreams();
streams.forEach((streamInfo) => {
const stream = getStreamProcess(streamInfo.streamId);
if (!stream || !stream.process || !stream.process.stdout) {
console.error(
`[Timelapse] Stream ${streamInfo.streamId} not available for capture`,
);
return;
}
function captureFrameForStream(streamId: string, rtspUrl: string) {
try {
const timestamp = new Date().toISOString().replace(/[:.]/g, "-");
const filename = `${streamInfo.streamId}_${timestamp}.jpg`;
const filename = `${streamId}_${timestamp}.jpg`;
const outputPath = join(TIMELAPSE_OUTPUT_PATH, filename);
const ffmpegArgs = [
"-rtsp_transport",
"tcp",
"-i",
streamInfo.rtspUrl,
rtspUrl,
"-vframes",
"1",
"-q:v",
@@ -45,27 +49,106 @@ function captureTimelapseFrame() {
outputPath,
];
try {
const ffmpegProcess = spawn("ffmpeg", ffmpegArgs);
ffmpegProcess.on("error", (err: Error) => {
console.error(
`[Timelapse] FFmpeg error for ${streamInfo.streamId}:`,
err,
`[${getTimestamp()}] [Timelapse] FFmpeg spawn error: ${err.message}`,
);
});
ffmpegProcess.on("close", (code: number | null) => {
try {
if (code === 0) {
console.log(
`[Timelapse] Captured frame for ${streamInfo.streamId}: ${filename}`,
`[${getTimestamp()}] [Timelapse] Captured frame: ${filename}`,
);
} else {
console.error(
`[Timelapse] FFmpeg exited with code ${code} for ${streamInfo.streamId}`,
`[${getTimestamp()}] [Timelapse] FFmpeg exited with code ${code}`,
);
}
} catch (err) {
console.error(
`[${getTimestamp()}] [Timelapse] Error handling ffmpeg close event: ${err}`,
);
}
});
});
} catch (spawnErr) {
console.error(
`[${getTimestamp()}] [Timelapse] Error spawning ffmpeg: ${spawnErr}`,
);
}
} catch (err) {
console.error(
`[${getTimestamp()}] [Timelapse] Error in captureFrameForStream: ${err}`,
);
}
}
function captureTimelapseFrame() {
const startTime = getTimestamp();
console.log(`[${startTime}] [Timelapse] Starting capture frame...`);
try {
let stream = getStreamProcess(CAMERA_STREAM_ID);
// If no active stream, start one temporarily
if (!stream || !stream.process || !stream.process.stdout) {
console.log(
`[${getTimestamp()}] [Timelapse] No active stream, starting temporary stream`,
);
const result = startStream(CAMERA_STREAM_ID, CAMERA_RTSP_URL);
if (!result.success) {
console.error(
`[${getTimestamp()}] [Timelapse] Failed to start stream: ${result.error}`,
);
return;
}
// Give the stream a moment to start
setTimeout(() => {
const startedStream = getStreamProcess(CAMERA_STREAM_ID);
if (
startedStream &&
startedStream.process &&
startedStream.process.stdout
) {
captureFrameForStream(CAMERA_STREAM_ID, CAMERA_RTSP_URL);
// Stop stream after capture if no clients are connected
setTimeout(() => {
const streamStatus = getStreamProcess(CAMERA_STREAM_ID);
if (
streamStatus &&
streamStatus.clients &&
streamStatus.clients.size === 0
) {
console.log(
`[${getTimestamp()}] [Timelapse] Stopping temporary stream after capture`,
);
stopStream(CAMERA_STREAM_ID);
}
}, 1000);
} else {
console.error(
`[${getTimestamp()}] [Timelapse] Failed to start stream process`,
);
}
}, 500);
} else {
// Stream already active, just capture
captureFrameForStream(CAMERA_STREAM_ID, CAMERA_RTSP_URL);
}
} catch (error) {
console.error(
`[${getTimestamp()}] [Timelapse] Fatal error in captureTimelapseFrame: ${error}`,
);
}
const endTime = getTimestamp();
console.log(`[${endTime}] [Timelapse] Capture frame completed`);
}
function initializeTimelapse() {
@@ -73,17 +156,34 @@ function initializeTimelapse() {
return;
}
try {
ensureOutputDirectory();
console.log(`[Timelapse] Output directory: ${TIMELAPSE_OUTPUT_PATH}`);
console.log("[Timelapse] Starting hourly capture job...");
console.log(
`[${getTimestamp()}] [Timelapse] Output directory: ${TIMELAPSE_OUTPUT_PATH}`,
);
console.log(
`[${getTimestamp()}] [Timelapse] Camera: ${CAMERA_STREAM_ID} (${CAMERA_RTSP_URL})`,
);
console.log(`[${getTimestamp()}] [Timelapse] Starting capture job...`);
captureJob = schedule.scheduleJob(CAPTURE_INTERVAL, () => {
console.log("[Timelapse] Running hourly capture job...");
try {
console.log(`[${getTimestamp()}] [Timelapse] Running capture job...`);
captureTimelapseFrame();
} catch (err) {
console.error(
`[${getTimestamp()}] [Timelapse] Error in scheduled capture job: ${err}`,
);
}
});
initialized = true;
console.log("[Timelapse] Hourly capture job initialized");
console.log(`[${getTimestamp()}] [Timelapse] Capture job initialized`);
} catch (error) {
console.error(
`[${getTimestamp()}] [Timelapse] Error initializing timelapse: ${error}`,
);
}
}
export async function handle({ event, resolve }) {
@@ -98,7 +198,13 @@ export async function handle({ event, resolve }) {
// Cleanup on shutdown
process.on("exit", () => {
if (captureJob) {
try {
captureJob.cancel();
console.log("[Timelapse] Capture job cancelled");
console.log(`[${getTimestamp()}] [Timelapse] Capture job cancelled`);
} catch (error) {
console.error(
`[${getTimestamp()}] [Timelapse] Error cancelling capture job: ${error}`,
);
}
}
});

View File

@@ -219,21 +219,6 @@
abortController = null;
}
const response = await fetch("/api/stream", {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({
action: "stop",
streamId,
}),
});
if (!response.ok) {
throw new Error("Failed to stop stream");
}
isPlaying = false;
error = null;
frameCount = 0;
@@ -272,8 +257,8 @@
onMount(() => {
// Handle page unload/reload
const handleBeforeUnload = () => {
if (isPlaying) {
stopStream();
if (abortController) {
abortController.abort();
}
};
@@ -301,12 +286,12 @@
</script>
<svelte:head>
<title>Keimli der II.</title>
<title>Keimli die II.</title>
</svelte:head>
<div class="min-h-screen bg-black py-8 px-4">
<div class="w-full max-w-7xl mx-auto">
<h1 class="text-white text-3xl font-bold mb-6">Keimli der II.</h1>
<h1 class="text-white text-3xl font-bold mb-6">Keimli die II.</h1>
<div
bind:this={videoContainer}

View File

@@ -1,14 +1,106 @@
import { spawn } from "child_process";
interface StreamClient {
controller: ReadableStreamDefaultController<Uint8Array>;
id: string;
closed: boolean;
lastSeen: number;
}
interface StreamProcess {
process: ReturnType<typeof spawn> | null;
rtspUrl: string;
startedAt: Date;
clients: Set<string>;
clients: Map<string, StreamClient>;
isListeningToStdout: boolean;
stopTimeout: NodeJS.Timeout | null;
}
const activeStreams = new Map<string, StreamProcess>();
function setupStreamDataDistribution(streamId: string, stream: StreamProcess) {
if (stream.isListeningToStdout || !stream.process || !stream.process.stdout) {
return;
}
stream.isListeningToStdout = true;
const nodeStream = stream.process.stdout;
nodeStream.on("data", (chunk: Buffer) => {
const now = Date.now();
const deadClients: string[] = [];
// Distribute to all connected clients and detect dead ones
stream.clients.forEach((client, clientId) => {
if (client.closed) {
return;
}
try {
client.lastSeen = now;
client.controller.enqueue(new Uint8Array(chunk));
} catch (error) {
// Client controller is dead - mark for removal
console.log(
`[${streamId}] Client ${clientId} controller error (likely disconnected):`,
(error as Error).message,
);
client.closed = true;
deadClients.push(clientId);
}
});
// Remove dead clients
deadClients.forEach((clientId) => {
stream.clients.delete(clientId);
console.log(
`[${streamId}] ✗ Client disconnected (${stream.clients.size} remaining)`,
);
// If no clients remain, schedule stream stop
if (stream.clients.size === 0 && !stream.stopTimeout) {
console.log(
`[${streamId}] No clients connected. Stream will stop in 5 seconds unless a new client connects.`,
);
stream.stopTimeout = setTimeout(() => {
console.log(
`[${streamId}] 5 second grace period expired. Stopping stream.`,
);
stopStream(streamId);
}, 5000);
}
});
});
nodeStream.on("end", () => {
console.log(`[${streamId}] Stream process ended`);
stream.clients.forEach((client) => {
if (!client.closed) {
try {
client.controller.close();
client.closed = true;
} catch {
// Controller already closed
}
}
});
});
nodeStream.on("error", (error: Error) => {
console.error(`[${streamId}] Stream error:`, error);
stream.clients.forEach((client) => {
if (!client.closed) {
try {
client.controller.close();
client.closed = true;
} catch {
// Controller already closed
}
}
});
});
}
export function startStream(
streamId: string,
rtspUrl: string,
@@ -26,7 +118,7 @@ export function startStream(
rtspUrl,
// Scale video to 1920x1080 (Full HD)
"-vf",
"scale=1920:1080",
"scale=2560:1440",
// Video output - MJPEG format
"-c:v",
"mjpeg",
@@ -42,6 +134,7 @@ export function startStream(
"pipe:1",
];
console.log(`[${streamId}] Starting stream from ${rtspUrl}`);
const ffmpegProcess = spawn("ffmpeg", args);
ffmpegProcess.on("error", (err: Error) => {
@@ -50,15 +143,22 @@ export function startStream(
});
ffmpegProcess.on("close", (code: number | null) => {
if (code !== 0) {
console.error(`[${streamId}] FFmpeg process closed with code ${code}`);
}
activeStreams.delete(streamId);
});
activeStreams.set(streamId, {
const stream: StreamProcess = {
process: ffmpegProcess,
rtspUrl,
startedAt: new Date(),
clients: new Set(),
});
clients: new Map(),
isListeningToStdout: false,
stopTimeout: null,
};
activeStreams.set(streamId, stream);
return {
success: true,
@@ -74,34 +174,106 @@ export function startStream(
export function stopStream(streamId: string): void {
const stream = activeStreams.get(streamId);
if (stream && stream.process) {
if (!stream) {
return;
}
console.log(
`[${streamId}] Stopping stream (${stream.clients.size} clients connected)`,
);
// Cancel any pending stop timeout
if (stream.stopTimeout) {
clearTimeout(stream.stopTimeout);
stream.stopTimeout = null;
}
if (stream.process) {
stream.clients.forEach((client) => {
if (!client.closed) {
try {
client.controller.close();
client.closed = true;
} catch {
// Controller already closed
}
}
});
stream.process.kill("SIGTERM");
}
stream.clients.clear();
activeStreams.delete(streamId);
}
}
export function getStreamProcess(streamId: string) {
return activeStreams.get(streamId);
}
export function addStreamClient(streamId: string, client: string): boolean {
export function addStreamClient(
streamId: string,
clientId: string,
controller: ReadableStreamDefaultController<Uint8Array>,
): boolean {
const stream = activeStreams.get(streamId);
if (!stream) {
console.error(`[${streamId}] Cannot add client - stream not found`);
return false;
}
stream.clients.add(client);
// Cancel any pending stop timeout since we have a new client
if (stream.stopTimeout) {
console.log(
`[${streamId}] Cancelling scheduled stream stop (new client connecting)`,
);
clearTimeout(stream.stopTimeout);
stream.stopTimeout = null;
}
// Setup data distribution if not already done
setupStreamDataDistribution(streamId, stream);
stream.clients.set(clientId, {
id: clientId,
controller,
closed: false,
lastSeen: Date.now(),
});
console.log(
`[${streamId}] ✓ Client connected (${stream.clients.size} clients total)`,
);
return true;
}
export function removeStreamClient(streamId: string, client: string): void {
export function removeStreamClient(streamId: string, clientId: string): void {
const stream = activeStreams.get(streamId);
if (stream) {
stream.clients.delete(client);
const clientsBeforeRemoval = stream.clients.size;
const client = stream.clients.get(clientId);
if (client && !client.closed) {
try {
client.controller.close();
client.closed = true;
} catch {
// Controller already closed
}
}
stream.clients.delete(clientId);
console.log(
`[${streamId}] ✗ Client disconnected (${clientsBeforeRemoval} -> ${stream.clients.size} clients)`,
);
// Auto-stop stream if no clients remain
// If no clients remain, schedule stream stop with 5 second grace period
if (stream.clients.size === 0) {
console.log(
`[${streamId}] No clients connected. Stream will stop in 5 seconds unless a new client connects.`,
);
stream.stopTimeout = setTimeout(() => {
console.log(
`[${streamId}] 5 second grace period expired. Stopping stream.`,
);
stopStream(streamId);
}, 5000);
}
}
}

View File

@@ -2,10 +2,6 @@
import RTSPVideoPlayer from "$lib/components/RTSPVideoPlayer.svelte";
</script>
<svelte:head>
<title>RTSP Stream Viewer</title>
</svelte:head>
<div class="min-h-screen bg-linear-to-br from-gray-900 to-gray-800 py-8">
<RTSPVideoPlayer />
</div>

View File

@@ -5,8 +5,6 @@ import {
getStreamStatus,
getAllStreams,
getStreamProcess,
addStreamClient,
removeStreamClient,
} from "$lib/server/streaming.js";
export async function POST({ request }) {
@@ -17,12 +15,19 @@ export async function POST({ request }) {
return json({ error: "Missing streamId or rtspUrl" }, { status: 400 });
}
// Check if stream already exists and is running
const existingStream = getStreamProcess(streamId);
if (existingStream && existingStream.process) {
return json({ success: true, streamId, reused: true });
}
// Stream doesn't exist, start a new one
const result = startStream(streamId, rtspUrl);
if (result.error) {
return json({ error: result.error }, { status: 500 });
}
return json({ success: true, streamId });
return json({ success: true, streamId, reused: false });
} else if (action === "stop") {
if (!streamId) {
return json({ error: "Missing streamId" }, { status: 400 });
@@ -44,54 +49,3 @@ export async function POST({ request }) {
return json({ error: "Unknown action" }, { status: 400 });
}
}
export async function GET({ url }) {
const streamId = url.searchParams.get("id");
if (!streamId) {
return new Response("Missing stream ID", { status: 400 });
}
const stream = getStreamProcess(streamId);
if (!stream || !stream.process || !stream.process.stdout) {
return new Response("Stream not found", { status: 404 });
}
// Track client
const clientId = Math.random().toString(36).substring(7);
addStreamClient(streamId, clientId);
// Create readable stream from FFmpeg stdout
const readable = stream.process.stdout;
// Return as multipart/x-mixed-replace for MJPEG
const response = new Response(readable as any, {
headers: {
"Content-Type": "multipart/x-mixed-replace; boundary=FFMPEG",
"Cache-Control": "no-cache, no-store, must-revalidate",
Pragma: "no-cache",
Expires: "0",
Connection: "keep-alive",
},
});
// Track when connection closes
if (response.body) {
const reader = response.body.getReader();
(async () => {
try {
while (true) {
await reader.read();
}
} catch (e) {
// Connection closed
console.error("Error monitoring stream connection:", e);
} finally {
removeStreamClient(streamId, clientId);
reader.releaseLock();
}
})();
}
return response;
}

View File

@@ -17,31 +17,36 @@ export async function GET({ url }) {
}
const clientId = Math.random().toString(36).substring(7);
addStreamClient(streamId, clientId);
let cleanupDone = false;
// Create a web ReadableStream from the Node.js stream
const nodeStream = stream.process.stdout;
const webReadableStream = new ReadableStream({
start(controller) {
nodeStream.on("data", (chunk) => {
controller.enqueue(chunk);
});
nodeStream.on("end", () => {
controller.close();
});
nodeStream.on("error", () => {
controller.close();
});
},
cancel() {
const cleanup = () => {
if (!cleanupDone) {
cleanupDone = true;
console.log(`[${streamId}] Client ${clientId} cleanup`);
removeStreamClient(streamId, clientId);
}
};
const webReadableStream = new ReadableStream<Uint8Array>({
start(controller) {
console.log(`[${streamId}] Client ${clientId} connected`);
const success = addStreamClient(streamId, clientId, controller);
if (!success) {
console.log(`[${streamId}] Failed to add client ${clientId}`);
try {
controller.close();
} catch {
// Controller already closed
}
cleanup();
}
},
cancel(reason) {
console.log(`[${streamId}] Client ${clientId} cancelled:`, reason);
cleanup();
},
});
// Return the response with proper headers
return new Response(webReadableStream, {
status: 200,
headers: {