import torch import cv2 import numpy as np import os import tempfile import shutil from ultralytics import YOLO # --- UPDATED IMPORTS for SegFormer PyTorch model --- from transformers import SegformerImageProcessor, SegformerForSemanticSegmentation from transformers import VideoMAEForVideoClassification, VideoMAEFeatureExtractor from fastapi import FastAPI, UploadFile, File from fastapi.responses import FileResponse from starlette.background import BackgroundTask # --- Configuration --- DEVICE = "cuda" if torch.cuda.is_available() else "cpu" print(f"⚙️ Using device: {DEVICE}") # Set a writable directory for model caches on Hugging Face Spaces os.environ['TRANSFORMERS_CACHE'] = '/tmp/hf_cache' os.makedirs('/tmp/hf_cache', exist_ok=True) # --- 1️⃣ Load Models --- # Semantic Segmentation (SegFormer PyTorch) <-- CHANGE HERE SEG_MODEL_NAME = "nvidia/segformer-b1-finetuned-ade-512-512" seg_processor = SegformerImageProcessor.from_pretrained(SEG_MODEL_NAME) seg_model = SegformerForSemanticSegmentation.from_pretrained( SEG_MODEL_NAME ).to(DEVICE).eval() # Object Detection + Tracking (YOLOv8 + ByteTrack) detector = YOLO("yolov8n.pt") # Behavior Recognition (VideoMAE) act_processor = VideoMAEFeatureExtractor.from_pretrained("MCG-NJU/videomae-base-finetuned-kinetics") act_model = VideoMAEForVideoClassification.from_pretrained("MCG-NJU/videomae-base-finetuned-kinetics").to(DEVICE).eval() ACTION_CLIP_LEN = 16 # This dictionary stores the last predicted label for each ID id_last_labels = {} # ------------------------------- # 2️⃣ FastAPI Setup # ------------------------------- app = FastAPI(title="Smart CCTV Video Processor") # ------------------------------- # 3️⃣ Utility Functions # ------------------------------- def cleanup_task(dir_path): """Utility to create a BackgroundTask for deleting the temp directory.""" # Ignore errors (like PermissionError on Windows) during cleanup return BackgroundTask(shutil.rmtree, dir_path, ignore_errors=True) # --- REMOVED: preprocess_segformer() and run_segformer_onnx() --- # They are replaced by the functions below using the HuggingFace SegFormer pipeline. def run_segformer_pytorch(frame_rgb: np.ndarray, original_shape) -> np.ndarray: """ Runs PyTorch SegFormer inference and returns the human mask. This replaces the ONNX code. """ # 1. Preprocess # The processor handles resizing, normalization, and tensor conversion automatically inputs = seg_processor(images=frame_rgb, return_tensors="pt").to(DEVICE) # 2. Inference with torch.no_grad(): seg_logits = seg_model(**inputs).logits # 3. Post-process # Resize logits to original image size seg_logits = torch.nn.functional.interpolate( seg_logits, size=(original_shape[0], original_shape[1]), # H, W mode='bilinear', align_corners=False ) pred_mask = torch.argmax(seg_logits, dim=1).squeeze().cpu().numpy() # ADE20K class 12 = person human_mask = (pred_mask == 12).astype(np.uint8) return human_mask def run_action_recognition(clip_buffer: list) -> str: """Runs VideoMAE on a list of RGB frames (clip_buffer).""" # Resize all frames to 224x224 (VideoMAE input requirement) clip_resized = [cv2.resize(f, (224, 224), interpolation=cv2.INTER_LINEAR) for f in clip_buffer] # Convert to tensor input for VideoMAE inputs = act_processor(clip_resized, return_tensors="pt").to(DEVICE) with torch.no_grad(): outputs = act_model(**inputs) pred = outputs.logits.argmax(-1).item() label = act_model.config.id2label.get(pred, "Unknown Action") return label # ------------------------------- # 4️⃣ Main API Endpoint # ------------------------------- @app.get("/") async def root(): return {"message": "Smart CCTV Backend: Upload a video to /process-video/"} @app.post("/process-video/") async def process_video_endpoint(file: UploadFile = File(...)): global id_last_labels if not file.content_type.startswith('video/'): await file.close() return {"error": "Invalid file type. Only video files are supported."}, 400 # MANUAL TEMP DIR SETUP: Fixes PermissionError by controlling cleanup tmpdir = tempfile.mkdtemp() input_path = os.path.join(tmpdir, file.filename) output_path = os.path.join(tmpdir, "smart_cctv_output.mp4") cap, out = None, None try: # Save the uploaded file to the temporary directory with open(input_path, "wb") as buffer: # IMPORTANT: Stream the file content to the buffer while content := await file.read(1024 * 1024): # Read in chunks (1MB) buffer.write(content) await file.close() cap = cv2.VideoCapture(input_path) if not cap.isOpened(): return {"error": "Could not open video file."}, 500 # Video properties fps = cap.get(cv2.CAP_PROP_FPS) width, height = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) if width <= 0 or height <= 0: return {"error": "Invalid video dimensions."}, 500 # Output Video Setup fourcc = cv2.VideoWriter_fourcc(*"mp4v") out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) # Buffers for Per-ID Action Clips id_clip_buffers = {} id_last_labels.clear() action_clip_len = ACTION_CLIP_LEN # Process Each Frame frame_count = 0 while cap.isOpened(): ret, frame = cap.read() if not ret: break frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) original_shape = frame.shape # --- Segmentation (PyTorch) --- <-- CHANGE HERE human_mask = run_segformer_pytorch(frame_rgb, original_shape) # --- Detection + Tracking (YOLO + ByteTrack) --- results = detector.track(frame, persist=True, tracker="bytetrack.yaml", verbose=False) boxes = results[0].boxes.xyxy.cpu().numpy() if results[0].boxes is not None else [] ids = results[0].boxes.id.cpu().numpy() if results[0].boxes.id is not None else [] # --- For each tracked person --- for box, track_id in zip(boxes, ids): x1, y1, x2, y2 = map(int, box) track_id = int(track_id) # Crop with a slight buffer buffer_px = 5 y1 = max(0, y1 - buffer_px) y2 = min(height, y2 + buffer_px) x1 = max(0, x1 - buffer_px) x2 = min(width, x2 + buffer_px) person_crop = frame_rgb[y1:y2, x1:x2] if person_crop.size == 0: continue # Maintain a clip buffer per person ID id_clip_buffers.setdefault(track_id, []) id_clip_buffers[track_id].append(person_crop) # Keep only latest N frames if len(id_clip_buffers[track_id]) > action_clip_len: id_clip_buffers[track_id] = id_clip_buffers[track_id][-action_clip_len:] # --- Action Recognition when clip ready --- label = id_last_labels.get(track_id, "Analyzing...") if len(id_clip_buffers[track_id]) == action_clip_len: # Run inference every N frames to save time/resources if frame_count % (action_clip_len // 2) == 0: label = run_action_recognition(id_clip_buffers[track_id]) id_last_labels[track_id] = label # --- Draw box + label --- color = (0, 255, 0) cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2) cv2.putText(frame, f"ID {track_id}: {label}", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2) # --- Segmentation Overlay --- # NOTE: We no longer need to resize the mask here as the PyTorch function handles interpolation mask_colored = cv2.applyColorMap((human_mask * 255).astype(np.uint8), cv2.COLORMAP_JET) # Blend safely overlay = cv2.addWeighted(frame, 0.7, mask_colored, 0.3, 0) out.write(overlay) frame_count += 1 # --- Cleanup (Before Return) --- cap.release() out.release() # --- Return the processed video file --- return FileResponse( path=output_path, media_type='video/mp4', filename=f"processed_{os.path.basename(input_path)}", # Use background task to delete the temporary folder AFTER the response is sent background=cleanup_task(tmpdir) ) except Exception as e: # Ensure cleanup on failure if cap: cap.release() if out: out.release() # Clean up the manually created temp directory shutil.rmtree(tmpdir, ignore_errors=True) # Re-raise the exception for FastAPI/Uvicorn to handle raise e