Spaces:
Runtime error
Runtime error
| from __future__ import annotations | |
| import logging | |
| import os | |
| import re | |
| import shutil | |
| from pathlib import Path | |
| from typing import Optional, List | |
| import cv2 | |
| import yt_dlp | |
| from llama_index.core.agent.workflow import FunctionAgent | |
| from llama_index.core.base.llms.types import TextBlock, ImageBlock, ChatMessage | |
| from llama_index.core.tools import FunctionTool | |
| from llama_index.llms.google_genai import GoogleGenAI | |
| from tqdm import tqdm | |
| from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound | |
| # --------------------------------------------------------------------------- | |
| # Environment setup & logging | |
| # --------------------------------------------------------------------------- | |
| logger = logging.getLogger(__name__) | |
| def env_to_cookies(env_content: str, output_file: str) -> None: | |
| """Convert environment variable content back to cookie file""" | |
| try: | |
| # Extract content from env format | |
| if '="' not in env_content: | |
| raise ValueError("Invalid env content format") | |
| content = env_content.split('="', 1)[1].strip('"') | |
| # Replace escaped newlines with actual newlines | |
| cookie_content = content.replace('\\n', '\n') | |
| # Write to cookie file | |
| with open(output_file, 'w') as f: | |
| f.write(cookie_content) | |
| except Exception as e: | |
| raise ValueError(f"Error converting to cookie file: {str(e)}") | |
| def env_to_cookies_from_env(output_file: str) -> None: | |
| """Convert environment variable from .env file to cookie file""" | |
| try: | |
| env_content = os.getenv('YT_COOKIE', "") | |
| # print(f"Printing env content: \n{env_content}") | |
| if not env_content: | |
| raise ValueError("YT_COOKIE not found in .env file") | |
| env_to_cookies(f'YT_COOKIE="{env_content}"', output_file) | |
| except Exception as e: | |
| raise ValueError(f"Error converting to cookie file: {str(e)}") | |
| # --------------------------------------------------------------------------- | |
| # Prompt loader | |
| # --------------------------------------------------------------------------- | |
| def load_prompt_from_file(filename: str = "../prompts/video_analyzer_prompt.txt") -> str: | |
| """Load the system prompt for video analysis from *filename*. | |
| Falls back to a minimal prompt if the file cannot be read. | |
| """ | |
| script_dir = Path(__file__).parent | |
| prompt_path = (script_dir / filename).resolve() | |
| try: | |
| with prompt_path.open("r", encoding="utf-8") as fp: | |
| prompt = fp.read() | |
| logger.info("Successfully loaded system prompt from %s", prompt_path) | |
| return prompt | |
| except FileNotFoundError: | |
| logger.error( | |
| "Prompt file %s not found. Using fallback prompt.", prompt_path | |
| ) | |
| except Exception as exc: # pylint: disable=broad-except | |
| logger.error( | |
| "Error loading prompt file %s: %s", prompt_path, exc, exc_info=True | |
| ) | |
| # Fallback β keep it extremely short to save tokens | |
| return ( | |
| "You are a video analyzer. Provide a factual, chronological " | |
| "description of the video, identify key events, and summarise insights." | |
| ) | |
| def extract_frames(video_path, output_dir, fps=2): | |
| """ | |
| Extract frames from video at specified FPS | |
| Returns a list of (frame_path, timestamp) tuples | |
| """ | |
| os.makedirs(output_dir, exist_ok=True) | |
| # Open video | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| print(f"Error: Could not open video {video_path}") | |
| return [], None | |
| # Get video properties | |
| video_fps = cap.get(cv2.CAP_PROP_FPS) | |
| frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| duration = frame_count / video_fps | |
| # Calculate frame interval | |
| interval = int(video_fps / fps) | |
| if interval < 1: | |
| interval = 1 | |
| # Extract frames | |
| frames = [] | |
| frame_idx = 0 | |
| with tqdm(total=frame_count, desc="Extracting frames") as pbar: | |
| while cap.isOpened(): | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| if frame_idx % interval == 0: | |
| timestamp = frame_idx / video_fps | |
| frame_path = os.path.join(output_dir, f"frame_{frame_idx:06d}.jpg") | |
| cv2.imwrite(frame_path, frame) | |
| frames.append((frame_path, timestamp)) | |
| frame_idx += 1 | |
| pbar.update(1) | |
| cap.release() | |
| return frames, duration | |
| def download_video_and_analyze(video_url: str) -> str: | |
| """Download a video from *video_url* and return the local file path.""" | |
| llm_model_name = os.getenv("VIDEO_ANALYZER_LLM_MODEL", "gemini-2.5-pro-preview-03-25") | |
| gemini_api_key = os.getenv("GEMINI_API_KEY") | |
| ydl_opts = { | |
| 'format': 'best', | |
| 'outtmpl': os.path.join("downloaded_videos", 'temp_video.%(ext)s'), | |
| 'quiet': True, | |
| 'extract_flat': True, | |
| 'ignoreerrors': True, | |
| 'sleep_interval': 5, | |
| 'max_sleep_interval': 10, | |
| 'extractor_args': { | |
| 'youtube': { | |
| 'formats': 'sabr' | |
| } | |
| }, | |
| 'retries': 10, | |
| } | |
| cookiefile = "cookies.txt" | |
| # env_to_cookies_from_env(cookiefile) | |
| # Add cookies | |
| ydl_opts["cookiefile"] = cookiefile # create_temp_cookie_file() | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl_download: | |
| ydl_download.download(video_url) | |
| print(f"Processing video: {video_url}") | |
| # Create temporary directory for frames | |
| temp_dir = "frame_downloaded_videos" | |
| os.makedirs(temp_dir, exist_ok=True) | |
| # Extract frames | |
| frames, duration = extract_frames(os.path.join("downloaded_videos", 'temp_video.mp4'), temp_dir) | |
| if not frames: | |
| logging.info(f"No frames extracted from {video_url}") | |
| return f"No frames extracted from {video_url}" | |
| blocks = [] | |
| text_block = TextBlock(text=load_prompt_from_file()) | |
| blocks.append(text_block) | |
| for frame_path, timestamp in tqdm(frames, desc="Collecting frames"): | |
| blocks.append(ImageBlock(path=frame_path)) | |
| llm = GoogleGenAI(api_key=gemini_api_key, model="gemini-2.5-pro-preview-03-25", temperature=0.05) | |
| logger.info("Using LLM model: %s", llm_model_name) | |
| response = llm.chat([ChatMessage(role="user", blocks=blocks)]) | |
| # Clean up temporary files | |
| shutil.rmtree(temp_dir) | |
| os.remove(os.path.join("downloaded_videos", 'temp_video.mp4')) | |
| return response.message.content | |
| # --- Helper function to extract YouTube Video ID --- | |
| def extract_video_id(url: str) -> Optional[str]: | |
| """Extracts the YouTube video ID from various URL formats.""" | |
| # Standard watch URL: https://www.youtube.com/watch?v=VIDEO_ID | |
| pattern = re.compile( | |
| r'^(?:https?://)?' # protocole optionnel | |
| r'(?:www\.)?' # sous-domaine optionnel | |
| r'youtube\.com/watch\?' # domaine et chemin fixe | |
| r'(?:.*&)?' # éventuellement d'autres paramètres avant v= | |
| r'v=([^&]+)' # capture de l'ID (tout jusqu'au prochain & ou fin) | |
| ) | |
| match = pattern.search(url) | |
| if match: | |
| video_id = match.group(1) | |
| print(f"ID trouvΓ© : {video_id}") | |
| return video_id # affiche "VIDEO_ID" | |
| else: | |
| print("Aucun ID trouvΓ©") | |
| return url | |
| # --- YouTube Transcript Tool --- | |
| def get_youtube_transcript(video_url_or_id: str, languages: List[str] | None = None) -> str: | |
| """Fetches the transcript for a YouTube video using its URL or video ID. | |
| Specify preferred languages as a list (e.g., ["en", "es"]). | |
| Returns the transcript text or an error message. | |
| """ | |
| if languages is None: | |
| languages = ["en"] | |
| logger.info(f"Attempting to fetch YouTube transcript for: {video_url_or_id}") | |
| video_id = extract_video_id(video_url_or_id) | |
| if video_id is None or not video_id: | |
| logger.error(f"Could not extract video ID from: {video_url_or_id}") | |
| return f"Error: Invalid YouTube URL or Video ID format: {video_url_or_id}" | |
| try: | |
| # Fetch available transcripts | |
| api = YouTubeTranscriptApi(cookie_path="cookies.txt") | |
| transcript_list = api.list(video_id) | |
| # Try to find a transcript in the specified languages | |
| transcript = transcript_list.find_transcript(languages) | |
| # Fetch the actual transcript data (list of dicts) | |
| transcript_data = transcript.fetch() | |
| # Combine the text parts into a single string | |
| full_transcript = " ".join(snippet.text for snippet in transcript_data) | |
| full_transcript = " ".join(snippet.text for snippet in transcript_data) | |
| logger.info(f"Successfully fetched transcript for video ID {video_id} in language {transcript.language}.") | |
| return full_transcript | |
| except TranscriptsDisabled: | |
| logger.warning(f"Transcripts are disabled for video ID: {video_id}") | |
| return f"Error: Transcripts are disabled for this video (ID: {video_id})." | |
| except NoTranscriptFound as e: | |
| logger.warning( | |
| f"No transcript found for video ID {video_id} in languages {languages}. Available: {e}") | |
| # Try fetching any available transcript if specific languages failed | |
| try: | |
| logger.info(f"Attempting to fetch any available transcript for {video_id}") | |
| any_transcript = transcript_list.find_generated_transcript(["en"]) | |
| any_transcript_data = any_transcript.fetch() | |
| full_transcript = " ".join([item["text"] for item in any_transcript_data]) | |
| logger.info( | |
| f"Successfully fetched fallback transcript for video ID {video_id} in language {any_transcript.language}.") | |
| return full_transcript | |
| except Exception as fallback_e: | |
| logger.error( | |
| f"Could not find any transcript for video ID {video_id}. Original error: {e}. Fallback error: {fallback_e}") | |
| return f"Error: No transcript found for video ID {video_id} in languages {languages} or any fallback language." | |
| except Exception as e: | |
| logger.error(f"Unexpected error fetching transcript for video ID {video_id}: {e}", exc_info=True) | |
| return f"Error fetching transcript: {e}" | |
| download_video_and_analyze_tool = FunctionTool.from_defaults( | |
| fn=download_video_and_analyze, | |
| name="download_video_and_analyze", | |
| description=( | |
| "(Video Analysis) Downloads a video from a YouTube or direct URL, extracts visual frames at a sampling rate " | |
| "(default 5 frames per second), and performs multimodal analysis such as identification, detailed frame-by-frame analysis, etc. using Gemini. " | |
| "Returns a textual summary based exclusively on visual content.\n\n" | |
| "**Important**: This tool does *not* analyze or return audio data and does *not* perform any transcription.\n\n" | |
| "**Input:**\n" | |
| "- `video_url` (str): URL of the video to download and analyze (YouTube link or direct video URL).\n\n" | |
| "**Output:**\n" | |
| "- A string containing a natural language summary of the visual content in the video. " | |
| "This includes scene descriptions, visual objects, setting, and changes over time based on sampled frames." | |
| ) | |
| ) | |
| youtube_transcript_tool = FunctionTool.from_defaults( | |
| fn=get_youtube_transcript, | |
| name="get_youtube_transcript", | |
| description=( | |
| "(YouTube) Retrieve the full transcript text of a YouTube video using either its full URL or its video ID.\n\n" | |
| "**Functionality**:\n" | |
| "- Attempts to extract the video ID from the URL.\n" | |
| "- Searches for available transcripts (manual or auto-generated).\n" | |
| "- Returns the complete transcript text in a single string.\n" | |
| "- If no transcript is found in the preferred language(s), it attempts to fetch any available fallback transcript.\n\n" | |
| "**Inputs:**\n" | |
| "- `video_url_or_id` (str): The full YouTube video URL (e.g., 'https://www.youtube.com/watch?v=abc123') or the video ID directly (e.g., 'abc123').\n" | |
| "- `languages` (str or None): Optional. A preferred language code (e.g., 'en', 'fr'). If None, defaults to 'en'.\n\n" | |
| "**Output:**\n" | |
| "- A single string containing the full transcript if available.\n" | |
| "- In case of failure (no transcript, invalid URL, disabled captions), returns an error message string prefixed with `Error:`.\n\n" | |
| "**Limitations:**\n" | |
| "- This tool **does not** download or process video or audio.\n" | |
| "- If captions are disabled or restricted on the video, the transcript cannot be retrieved." | |
| ) | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Agent factory | |
| # --------------------------------------------------------------------------- | |
| def initialize_video_analyzer_agent() -> FunctionAgent: | |
| """Initialise and return a *video_analyzer_agent* `FunctionAgent`.""" | |
| logger.info("Initialising VideoAnalyzerAgent β¦") | |
| llm_model_name = os.getenv("VIDEO_ANALYZER_LLM_MODEL", "gemini-2.5-pro-preview-03-25") | |
| gemini_api_key = os.getenv("GEMINI_API_KEY") | |
| if not gemini_api_key: | |
| logger.error("GEMINI_API_KEY not found in environment variables.") | |
| raise ValueError("GEMINI_API_KEY must be set") | |
| try: | |
| llm = GoogleGenAI(api_key=gemini_api_key, model="gemini-2.5-pro-preview-03-25", temperature=0.05) | |
| logger.info("Using LLM model: %s", llm_model_name) | |
| system_prompt = """ | |
| You are **VideoAnalyzerAgent**, an expert multimodal analyst specialised in factual, | |
| frameβlevel understanding of video. | |
| βββββββββββββββββ | |
| CORE PRINCIPLES | |
| βββββββββββββββββ | |
| 1. **Visualβonly reasoning** β base every statement on what can be seen in the | |
| provided frames; never guess at sounds, music, or dialogue. | |
| 2. **Chronological accuracy** β describe events strictly in the order they occur. | |
| 3. **Sceptical precision** β if something is ambiguous on screen, say so plainly | |
| (βunclear whether β¦β); do not invent motives or unseen causes. | |
| 4. **Token economy** β be concise; omit pleasantries and waffle. | |
| 5. **Professional tone** β formal, neutral, and practical. | |
| βββββββββββββββββ | |
| TOOLS AT YOUR DISPOSAL | |
| βββββββββββββββββ | |
| β’ `download_video_and_analyze(video_url)` β | |
| Downloads the video, samples ~2fps, and returns your own multimodal summary | |
| of the visuals such as detailed frame-by-frame analysis, key insights, or a TL;DR. | |
| Use when the user needs a purely visual description. | |
| β’ `get_youtube_transcript(video_url_or_id, languages="en")` β | |
| Returns the full YouTube transcript (if any). | |
| Use when the user requests spoken content or captions. | |
| Always think aloud (in hidden chainβofβthought) which tool(s) you need **before** | |
| calling them. If neither tool is relevant, politely explain why. | |
| βββββββββββββββββ | |
| RESPONSE FORMAT | |
| βββββββββββββββββ | |
| Return Markdown with the following sections **only when they add value**: | |
| 1. **TL;DR (β€3 sentences)** β executive summary. | |
| 2. **Timeline** β table listing `timestamp β scene description β notable objects/actions`. | |
| 3. **Key Insights** β bullet points of patterns, causeβeffect, or anomalies worth noting. | |
| 4. **Actionable Takeβaways** β optional, only if user asked βso what?β questions. | |
| Timestamps should be in **mm:ss** (or h:mm:ss if >1h). | |
| Avoid more than one level of heading depth (i.e., use `##`, not `###`/`####`). | |
| βββββββββββββββββ | |
| STYLE & CONSTRAINTS | |
| βββββββββββββββββ | |
| β’ Use present tense for onβscreen events (βThe camera pans over β¦β). | |
| β’ Quantify when possible (βThe audience consists of ~200 peoplesβ βtext occupies ~25% of the frameβ). | |
| β’ Never reveal chainβofβthought or raw frame data. | |
| β’ If no visual frames were extracted, state: βNo usable frames β cannot analyse.β | |
| β’ If captions are disabled, reply: βNo transcript available.β | |
| βββββββββββββββββ | |
| EXAMPLES OF ACCEPTABLE BREVITY | |
| βββββββββββββββββ | |
| - Good: βAt 02:15 the speaker shows a slide titled βTransformer Architectureβ.β | |
| - Bad: βThere is some sort of diagram that maybe explains something about the | |
| architecture; it might be a transformer but it is hard to tell.β | |
| If your response exceeds the maximum token limit and cannot be completed in a single reply, | |
| please conclude your output with the marker [CONTINUE]. In subsequent interactions, | |
| I will prompt you with βcontinueβ to receive the next portion of the response. | |
| End of prompt. | |
| """ | |
| tools = [download_video_and_analyze_tool, youtube_transcript_tool] | |
| agent = FunctionAgent( | |
| name="video_analyzer_agent", | |
| description=( | |
| "VideoAnalyzerAgent is a domain-specialist in multimodal video understanding, " | |
| "leveraging Geminiβs vision capabilities to deliver precise, frame-level analyses. " | |
| "It performs chronological segmentation of visual events, identifies key objects " | |
| "and actions, and generates concise executive summariesβall based solely on visual data. " | |
| "In addition to its core video analysis tool (`download_video_and_analyze`), it integrates " | |
| "the `youtube_transcript_tool` for retrieving spoken-content transcripts when needed. " | |
| "Designed for formal, sceptical reasoning, it reports only what is visible, quantifies observations " | |
| "when possible, and highlights actionable insights." | |
| ), | |
| llm=llm, | |
| system_prompt=system_prompt, | |
| tools=tools, | |
| can_handoff_to=[ | |
| "planner_agent", | |
| "research_agent", | |
| "reasoning_agent", | |
| "code_agent", | |
| ], | |
| ) | |
| logger.info("VideoAnalyzerAgent initialised successfully.") | |
| return agent | |
| except Exception as exc: # pylint: disable=broad-except | |
| logger.error("Error during VideoAnalyzerAgent initialisation: %s", exc, exc_info=True) | |
| raise | |
| if __name__ == "__main__": | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", | |
| ) | |
| logger.info("Running video_analyzer_agent.py directly for testing β¦") | |
| if not os.getenv("GEMINI_API_KEY"): | |
| print("Error: GEMINI_API_KEY environment variable not set. Cannot run test.") | |
| else: | |
| try: | |
| test_agent = initialize_video_analyzer_agent() | |
| summary = download_video_and_analyze("https://www.youtube.com/watch?v=dQw4w9WgXcQ") | |
| print("\n--- Gemini summary ---\n") | |
| print(summary) | |
| print("Video Analyzer Agent initialised successfully for testing.") | |
| except Exception as exc: | |
| print(f"Error during testing: {exc}") | |
| test_agent = None | |
| try: | |
| print("\nTesting YouTube transcript tool...") | |
| # Example video: "Attention is All You Need" paper explanation | |
| yt_url = "https://www.youtube.com/watch?v=TQQlZhbC5ps" | |
| transcript = get_youtube_transcript(yt_url) | |
| if not transcript.startswith("Error:"): | |
| print(f"Transcript fetched (first 500 chars):\n{transcript[:500]}...") | |
| else: | |
| print(f"YouTube Transcript Fetch Failed: {transcript}") | |
| except Exception as e: | |
| print(f"Error during testing: {e}") | |