Zhen Ye commited on
Commit
a52a96d
·
1 Parent(s): f49470e

Enable MJPEG streaming for live video feedback

Browse files
LaserPerception/LaserPerception.js CHANGED
@@ -944,6 +944,23 @@
944
  log("First frame depth URL received (will fetch when ready)", "t");
945
  }
946
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
947
  // Display first frame immediately (if object detection, segmentation, or drone)
948
  if ((mode === "object_detection" || mode === "segmentation" || mode === "drone_detection") && state.hf.firstFrameUrl) {
949
  const count = Array.isArray(data.first_frame_detections) ? data.first_frame_detections.length : null;
@@ -1041,6 +1058,54 @@
1041
  }
1042
  }
1043
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1044
  function cancelReasoning() {
1045
  // Stop HF polling if running
1046
  if (state.hf.asyncPollInterval) {
@@ -1049,6 +1114,8 @@
1049
  log("HF polling stopped.", "w");
1050
  }
1051
 
 
 
1052
  // Cancel backend job if it exists
1053
  const jobId = state.hf.asyncJobId;
1054
  if (jobId) {
@@ -1114,6 +1181,7 @@
1114
  // Clear job ID to prevent cancel attempts after completion
1115
  state.hf.asyncJobId = null;
1116
  setHfStatus("ready");
 
1117
  resolve();
1118
  } catch (err) {
1119
  if (err && err.code === "VIDEO_PENDING") {
@@ -1123,6 +1191,7 @@
1123
  }
1124
  clearInterval(state.hf.asyncPollInterval);
1125
  state.hf.asyncJobId = null; // Clear on error too
 
1126
  reject(err);
1127
  }
1128
  } else if (status.status === "failed") {
@@ -1132,6 +1201,7 @@
1132
  // Clear job ID to prevent cancel attempts after failure
1133
  state.hf.asyncJobId = null;
1134
  setHfStatus(`error: ${errMsg}`);
 
1135
  reject(new Error(errMsg));
1136
  } else {
1137
  // Still processing
 
944
  log("First frame depth URL received (will fetch when ready)", "t");
945
  }
946
 
947
+ // Start Streaming if available
948
+ if (data.stream_url) {
949
+ log("Activating live stream...", "t");
950
+ const streamUrl = `${state.hf.baseUrl}${data.stream_url}`;
951
+ setStreamingMode(streamUrl);
952
+
953
+ // Switch to Engage Tab
954
+ $$(".tabbtn").forEach(b => b.classList.remove("active"));
955
+ $("[data-tab='engage']").classList.add("active");
956
+ $$(".tab").forEach(t => t.classList.remove("active"));
957
+ $("#tab-engage").classList.add("active");
958
+ // Trigger resize/render
959
+ resizeOverlays();
960
+ renderRadar();
961
+ renderTrackCards();
962
+ }
963
+
964
  // Display first frame immediately (if object detection, segmentation, or drone)
965
  if ((mode === "object_detection" || mode === "segmentation" || mode === "drone_detection") && state.hf.firstFrameUrl) {
966
  const count = Array.isArray(data.first_frame_detections) ? data.first_frame_detections.length : null;
 
1058
  }
1059
  }
1060
 
1061
+ function setStreamingMode(url) {
1062
+ // Ensure stream image element exists
1063
+ let streamView = $("#streamView");
1064
+ if (!streamView) {
1065
+ streamView = document.createElement("img");
1066
+ streamView.id = "streamView";
1067
+ streamView.style.width = "100%";
1068
+ streamView.style.height = "100%";
1069
+ streamView.style.objectFit = "contain";
1070
+ streamView.style.position = "absolute";
1071
+ streamView.style.top = "0";
1072
+ streamView.style.left = "0";
1073
+ streamView.style.zIndex = "10"; // Above video
1074
+ streamView.style.backgroundColor = "#000";
1075
+ // Insert into the wrapper
1076
+ // videoEngage is likely inside a container or just in the DOM
1077
+ // We'll insert it as a sibling or wrapper child
1078
+ if (videoEngage && videoEngage.parentNode) {
1079
+ videoEngage.parentNode.appendChild(streamView);
1080
+ // Ensure container is relative
1081
+ if (getComputedStyle(videoEngage.parentNode).position === "static") {
1082
+ videoEngage.parentNode.style.position = "relative";
1083
+ }
1084
+ }
1085
+ }
1086
+
1087
+ if (streamView) {
1088
+ streamView.src = url;
1089
+ streamView.style.display = "block";
1090
+ if (videoEngage) videoEngage.style.display = "none";
1091
+
1092
+ // Also hide empty state
1093
+ if (engageEmpty) engageEmpty.style.display = "none";
1094
+ }
1095
+ }
1096
+
1097
+ function stopStreamingMode() {
1098
+ const streamView = $("#streamView");
1099
+ if (streamView) {
1100
+ streamView.src = ""; // Stop connection
1101
+ streamView.style.display = "none";
1102
+ }
1103
+ if (videoEngage) videoEngage.style.display = "block";
1104
+
1105
+ // If no video loaded yet, might want to show empty?
1106
+ // But usually we stop streaming when video IS loaded.
1107
+ }
1108
+
1109
  function cancelReasoning() {
1110
  // Stop HF polling if running
1111
  if (state.hf.asyncPollInterval) {
 
1114
  log("HF polling stopped.", "w");
1115
  }
1116
 
1117
+ stopStreamingMode();
1118
+
1119
  // Cancel backend job if it exists
1120
  const jobId = state.hf.asyncJobId;
1121
  if (jobId) {
 
1181
  // Clear job ID to prevent cancel attempts after completion
1182
  state.hf.asyncJobId = null;
1183
  setHfStatus("ready");
1184
+ stopStreamingMode();
1185
  resolve();
1186
  } catch (err) {
1187
  if (err && err.code === "VIDEO_PENDING") {
 
1191
  }
1192
  clearInterval(state.hf.asyncPollInterval);
1193
  state.hf.asyncJobId = null; // Clear on error too
1194
+ stopStreamingMode();
1195
  reject(err);
1196
  }
1197
  } else if (status.status === "failed") {
 
1201
  // Clear job ID to prevent cancel attempts after failure
1202
  state.hf.asyncJobId = null;
1203
  setHfStatus(`error: ${errMsg}`);
1204
+ stopStreamingMode();
1205
  reject(new Error(errMsg));
1206
  } else {
1207
  // Still processing
app.py CHANGED
@@ -36,7 +36,7 @@ from pathlib import Path
36
  import cv2
37
  from fastapi import BackgroundTasks, FastAPI, File, Form, HTTPException, UploadFile
38
  from fastapi.middleware.cors import CORSMiddleware
39
- from fastapi.responses import FileResponse, HTMLResponse, JSONResponse, RedirectResponse
40
  from fastapi.staticfiles import StaticFiles
41
  import uvicorn
42
 
@@ -44,6 +44,7 @@ from inference import process_first_frame, run_inference, run_segmentation
44
  from models.depth_estimators.model_loader import list_depth_estimators
45
  from jobs.background import process_video_async
46
  from jobs.models import JobInfo, JobStatus
 
47
  from jobs.storage import (
48
  get_depth_output_path,
49
  get_first_frame_depth_path,
@@ -385,6 +386,13 @@ async def detect_async_endpoint(
385
  "status_url": f"/detect/status/{job_id}",
386
  "video_url": f"/detect/video/{job_id}",
387
  "depth_video_url": f"/detect/depth-video/{job_id}",
 
 
 
 
 
 
 
388
  "status": job.status.value,
389
  "first_frame_detections": detections,
390
  }
@@ -507,5 +515,38 @@ async def detect_first_frame_depth(job_id: str):
507
  )
508
 
509
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
510
  if __name__ == "__main__":
511
  uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=False)
 
36
  import cv2
37
  from fastapi import BackgroundTasks, FastAPI, File, Form, HTTPException, UploadFile
38
  from fastapi.middleware.cors import CORSMiddleware
39
+ from fastapi.responses import FileResponse, HTMLResponse, JSONResponse, RedirectResponse, StreamingResponse
40
  from fastapi.staticfiles import StaticFiles
41
  import uvicorn
42
 
 
44
  from models.depth_estimators.model_loader import list_depth_estimators
45
  from jobs.background import process_video_async
46
  from jobs.models import JobInfo, JobStatus
47
+ from jobs.streaming import get_stream
48
  from jobs.storage import (
49
  get_depth_output_path,
50
  get_first_frame_depth_path,
 
386
  "status_url": f"/detect/status/{job_id}",
387
  "video_url": f"/detect/video/{job_id}",
388
  "depth_video_url": f"/detect/depth-video/{job_id}",
389
+ "job_id": job_id,
390
+ "first_frame_url": f"/detect/first-frame/{job_id}",
391
+ "first_frame_depth_url": f"/detect/first-frame-depth/{job_id}",
392
+ "status_url": f"/detect/status/{job_id}",
393
+ "video_url": f"/detect/video/{job_id}",
394
+ "depth_video_url": f"/detect/depth-video/{job_id}",
395
+ "stream_url": f"/detect/stream/{job_id}",
396
  "status": job.status.value,
397
  "first_frame_detections": detections,
398
  }
 
515
  )
516
 
517
 
518
+ @app.get("/detect/stream/{job_id}")
519
+ async def stream_video(job_id: str):
520
+ """MJPEG stream of the processing video."""
521
+ import queue
522
+
523
+ async def stream_generator():
524
+ while True:
525
+ # Check if stream exists
526
+ q = get_stream(job_id)
527
+ if not q:
528
+ # Job finished or not started
529
+ break
530
+
531
+ try:
532
+ # Non-blocking get
533
+ frame = q.get_nowait()
534
+
535
+ # Encode
536
+ success, buffer = cv2.imencode('.jpg', frame)
537
+ if success:
538
+ yield (b'--frame\r\n'
539
+ b'Content-Type: image/jpeg\r\n\r\n' + buffer.tobytes() + b'\r\n')
540
+ except queue.Empty:
541
+ await asyncio.sleep(0.02) # Yield to event loop, ~50fps max poll
542
+ except Exception:
543
+ await asyncio.sleep(0.1)
544
+
545
+ return StreamingResponse(
546
+ stream_generator(),
547
+ media_type="multipart/x-mixed-replace; boundary=frame"
548
+ )
549
+
550
+
551
  if __name__ == "__main__":
552
  uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=False)
inference.py CHANGED
@@ -470,6 +470,7 @@ def run_inference(
470
  job_id: Optional[str] = None,
471
  depth_estimator_name: Optional[str] = None,
472
  depth_scale: float = 1.0,
 
473
  ) -> Tuple[str, List[List[Dict[str, Any]]]]:
474
 
475
  # 1. Setup Video Reader
@@ -632,6 +633,13 @@ def run_inference(
632
  # Write next_idx
633
  p_frame, dets = buffer.pop(next_idx)
634
  writer.write(p_frame)
 
 
 
 
 
 
 
635
  all_detections_map[next_idx] = dets
636
  next_idx += 1
637
 
@@ -701,6 +709,7 @@ def run_segmentation(
701
  max_frames: Optional[int] = None,
702
  segmenter_name: Optional[str] = None,
703
  job_id: Optional[str] = None,
 
704
  ) -> str:
705
  # 1. Setup Reader
706
  try:
@@ -807,6 +816,13 @@ def run_segmentation(
807
 
808
  frm = buffer.pop(next_idx)
809
  writer.write(frm)
 
 
 
 
 
 
 
810
  next_idx += 1
811
  except Exception:
812
  if job_id and _check_cancellation(job_id): pass
@@ -850,6 +866,7 @@ def run_depth_inference(
850
  depth_estimator_name: str = "depth",
851
  first_frame_depth_path: Optional[str] = None,
852
  job_id: Optional[str] = None,
 
853
  ) -> str:
854
  # 1. Setup Reader
855
  try:
@@ -1069,6 +1086,13 @@ def run_depth_inference(
1069
 
1070
  frm = buffer.pop(next_idx)
1071
  writer.write(frm)
 
 
 
 
 
 
 
1072
 
1073
  if first_frame_depth_path and not first_frame_saved and next_idx == 0:
1074
  cv2.imwrite(first_frame_depth_path, frm)
 
470
  job_id: Optional[str] = None,
471
  depth_estimator_name: Optional[str] = None,
472
  depth_scale: float = 1.0,
473
+ stream_queue: Optional[Queue] = None,
474
  ) -> Tuple[str, List[List[Dict[str, Any]]]]:
475
 
476
  # 1. Setup Video Reader
 
633
  # Write next_idx
634
  p_frame, dets = buffer.pop(next_idx)
635
  writer.write(p_frame)
636
+
637
+ if stream_queue:
638
+ try:
639
+ stream_queue.put_nowait(p_frame)
640
+ except:
641
+ pass
642
+
643
  all_detections_map[next_idx] = dets
644
  next_idx += 1
645
 
 
709
  max_frames: Optional[int] = None,
710
  segmenter_name: Optional[str] = None,
711
  job_id: Optional[str] = None,
712
+ stream_queue: Optional[Queue] = None,
713
  ) -> str:
714
  # 1. Setup Reader
715
  try:
 
816
 
817
  frm = buffer.pop(next_idx)
818
  writer.write(frm)
819
+
820
+ if stream_queue:
821
+ try:
822
+ stream_queue.put_nowait(frm)
823
+ except:
824
+ pass
825
+
826
  next_idx += 1
827
  except Exception:
828
  if job_id and _check_cancellation(job_id): pass
 
866
  depth_estimator_name: str = "depth",
867
  first_frame_depth_path: Optional[str] = None,
868
  job_id: Optional[str] = None,
869
+ stream_queue: Optional[Queue] = None,
870
  ) -> str:
871
  # 1. Setup Reader
872
  try:
 
1086
 
1087
  frm = buffer.pop(next_idx)
1088
  writer.write(frm)
1089
+
1090
+ if stream_queue:
1091
+ try:
1092
+ stream_queue.put_nowait(frm)
1093
+ except:
1094
+ pass
1095
+
1096
 
1097
  if first_frame_depth_path and not first_frame_saved and next_idx == 0:
1098
  cv2.imwrite(first_frame_depth_path, frm)
jobs/background.py CHANGED
@@ -6,6 +6,7 @@ import torch
6
 
7
  from jobs.models import JobStatus
8
  from jobs.storage import get_job_storage, get_depth_output_path, get_first_frame_depth_path
 
9
  from inference import run_inference, run_segmentation, run_depth_inference
10
 
11
 
@@ -20,6 +21,9 @@ async def process_video_async(job_id: str) -> None:
20
  depth_error = None
21
  partial_success = False
22
 
 
 
 
23
  try:
24
  # Run detection or segmentation first
25
  if job.mode == "segmentation":
@@ -31,6 +35,7 @@ async def process_video_async(job_id: str) -> None:
31
  None,
32
  job.segmenter_name,
33
  job_id,
 
34
  )
35
  else:
36
  detections_list = None
@@ -44,6 +49,7 @@ async def process_video_async(job_id: str) -> None:
44
  job_id,
45
  job.depth_estimator_name,
46
  job.depth_scale,
 
47
  )
48
  # run_inference now returns (path, detections)
49
  detection_path, detections_list = result_pkg
@@ -60,6 +66,7 @@ async def process_video_async(job_id: str) -> None:
60
  job.depth_estimator_name,
61
  str(get_first_frame_depth_path(job_id)),
62
  job_id,
 
63
  )
64
  logging.info("Depth estimation completed for job %s", job_id)
65
  except (ImportError, ModuleNotFoundError) as exc:
@@ -121,4 +128,9 @@ async def process_video_async(job_id: str) -> None:
121
  status=JobStatus.FAILED,
122
  completed_at=datetime.utcnow(),
123
  error=str(exc),
 
 
 
124
  )
 
 
 
6
 
7
  from jobs.models import JobStatus
8
  from jobs.storage import get_job_storage, get_depth_output_path, get_first_frame_depth_path
9
+ from jobs.streaming import create_stream, remove_stream
10
  from inference import run_inference, run_segmentation, run_depth_inference
11
 
12
 
 
21
  depth_error = None
22
  partial_success = False
23
 
24
+ # Create stream for live view
25
+ stream_queue = create_stream(job_id)
26
+
27
  try:
28
  # Run detection or segmentation first
29
  if job.mode == "segmentation":
 
35
  None,
36
  job.segmenter_name,
37
  job_id,
38
+ stream_queue,
39
  )
40
  else:
41
  detections_list = None
 
49
  job_id,
50
  job.depth_estimator_name,
51
  job.depth_scale,
52
+ stream_queue,
53
  )
54
  # run_inference now returns (path, detections)
55
  detection_path, detections_list = result_pkg
 
66
  job.depth_estimator_name,
67
  str(get_first_frame_depth_path(job_id)),
68
  job_id,
69
+ stream_queue,
70
  )
71
  logging.info("Depth estimation completed for job %s", job_id)
72
  except (ImportError, ModuleNotFoundError) as exc:
 
128
  status=JobStatus.FAILED,
129
  completed_at=datetime.utcnow(),
130
  error=str(exc),
131
+ status=JobStatus.FAILED,
132
+ completed_at=datetime.utcnow(),
133
+ error=str(exc),
134
  )
135
+ finally:
136
+ remove_stream(job_id)
jobs/streaming.py ADDED
@@ -0,0 +1,45 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import queue
2
+ from typing import Dict, Optional, Any
3
+ from threading import Lock
4
+
5
+ # Global registry of active streams
6
+ # Key: job_id -> Queue[frame_data]
7
+ _STREAMS: Dict[str, queue.Queue] = {}
8
+ _LOCK = Lock()
9
+
10
+ def create_stream(job_id: str) -> queue.Queue:
11
+ """Create a new stream queue for a job."""
12
+ with _LOCK:
13
+ # standard Queue, thread-safe
14
+ # maxsize to prevent memory explosion if consumer is slow
15
+ q = queue.Queue(maxsize=10)
16
+ _STREAMS[job_id] = q
17
+ return q
18
+
19
+ def get_stream(job_id: str) -> Optional[queue.Queue]:
20
+ """Get the stream queue for a job."""
21
+ with _LOCK:
22
+ return _STREAMS.get(job_id)
23
+
24
+ def remove_stream(job_id: str) -> None:
25
+ """Remove a stream queue."""
26
+ with _LOCK:
27
+ if job_id in _STREAMS:
28
+ del _STREAMS[job_id]
29
+
30
+ def publish_frame(job_id: str, frame: Any) -> None:
31
+ """Publish a frame to a specific job's stream. Non-blocking drop if full."""
32
+ q = get_stream(job_id)
33
+ if q:
34
+ try:
35
+ q.put_nowait(frame)
36
+ except queue.Full:
37
+ # Drop frame if consumer is too slow
38
+ pass
39
+
40
+ def publish_frame_to_queue(q: queue.Queue, frame: Any) -> None:
41
+ """Publish to a specific queue object. Non-blocking drop."""
42
+ try:
43
+ q.put_nowait(frame)
44
+ except queue.Full:
45
+ pass