Spaces:
Runtime error
Runtime error
| import os | |
| import logging | |
| from typing import List, Dict | |
| from llama_index.core.agent.workflow import ReActAgent | |
| from llama_index.core.tools import FunctionTool | |
| from llama_index.llms.google_genai import GoogleGenAI | |
| # Setup logging | |
| logger = logging.getLogger(__name__) | |
| # Helper function to load prompt from file | |
| def load_prompt_from_file(filename: str, default_prompt: str) -> str: | |
| """Loads a prompt from a text file.""" | |
| try: | |
| # Assuming the prompt file is in the same directory as the agent script | |
| script_dir = os.path.dirname(__file__) | |
| prompt_path = os.path.join(script_dir, filename) | |
| with open(prompt_path, "r") as f: | |
| prompt = f.read() | |
| logger.info(f"Successfully loaded prompt from {prompt_path}") | |
| return prompt | |
| except FileNotFoundError: | |
| logger.warning(f"Prompt file {filename} not found at {prompt_path}. Using default.") | |
| return default_prompt | |
| except Exception as e: | |
| logger.error(f"Error loading prompt file {filename}: {e}", exc_info=True) | |
| return default_prompt | |
| # --- Tool Functions --- | |
| def plan(objective: str) -> List[str]: | |
| """ | |
| Generate a list of sub-steps (4-8) from the given objective using an LLM. | |
| Args: | |
| objective (str): The research or task objective. | |
| Returns: | |
| List[str]: A list of sub-steps as strings, or an error message list. | |
| """ | |
| logger.info(f"Generating plan for objective: {objective[:100]}...") | |
| # Configuration for planning LLM | |
| planner_llm_model = os.getenv("PLANNER_TOOL_LLM_MODEL", "gemini-2.5-pro-preview-03-25") # Specific model for this tool? | |
| gemini_api_key = os.getenv("GEMINI_API_KEY") | |
| if not gemini_api_key: | |
| logger.error("GEMINI_API_KEY not found for planning tool LLM.") | |
| return "Error: GEMINI_API_KEY not set for planning." | |
| # Prompt for the LLM to generate sub-steps | |
| input_prompt = ( | |
| "You are a research assistant. " | |
| "Given an objective, break it down into a list of 4-8 concise, actionable sub-steps. " | |
| "Ensure the steps are logically ordered.\n" | |
| f"Objective: {objective}\n" | |
| "Sub-steps (one per line, numbered):" | |
| ) | |
| try: | |
| llm = GoogleGenAI(api_key=gemini_api_key, model="gemini-2.5-pro-preview-03-25", temperature=0.05) | |
| logger.info(f"Using planning LLM: {planner_llm_model}") | |
| response = llm.complete(input_prompt) | |
| # Post-process: split lines into sub-steps, remove numbering if present | |
| lines = response.text.strip().split("\n") | |
| sub_steps = [] | |
| for line in lines: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| # Remove potential leading numbering (e.g., "1. ", "- ") | |
| if line and line[0].isdigit() and "." in line[:3]: | |
| text = line.split(".", 1)[1].strip() | |
| elif line.startswith("- "): | |
| text = line[2:].strip() | |
| else: | |
| text = line | |
| if text: | |
| sub_steps.append(text) | |
| if not sub_steps: | |
| logger.warning("LLM generated no sub-steps for the objective.") | |
| return "Error: Failed to generate sub-steps." | |
| logger.info(f"Generated {len(sub_steps)} sub-steps.") | |
| return sub_steps | |
| except Exception as e: | |
| logger.error(f"LLM call failed during planning: {e}", exc_info=True) | |
| return f"Error during planning: {e}" | |
| def synthesize_and_report(results: List[Dict[str, str]]) -> str: | |
| """ | |
| Aggregate results from sub-steps into a coherent final report using an LLM. | |
| Args: | |
| results (List[Dict[str, str]]): List of dictionaries, each with "sub_step" and "answer" keys. | |
| Returns: | |
| str: A unified, well-structured report, or an error message. | |
| """ | |
| logger.info(f"Synthesizing results from {len(results)} sub-steps...") | |
| if not results: | |
| logger.warning("Synthesize called with empty results list.") | |
| return "No results provided to synthesize." | |
| # Format the results for the synthesis prompt | |
| summary_blocks = "" | |
| for i, result in enumerate(results): | |
| sub_step = result.get("sub_step", f"Step {i+1}") | |
| answer = result.get("answer", "No answer provided.") | |
| summary_blocks += f"Sub-step {i+1}: {sub_step}\nAnswer {i+1}: {answer}\n\n" | |
| # Configuration for synthesis LLM | |
| synthesizer_llm_model = os.getenv("SYNTHESIZER_LLM_MODEL", "gemini-2.5-pro-preview-03-25") # Specific model? | |
| gemini_api_key = os.getenv("GEMINI_API_KEY") | |
| if not gemini_api_key: | |
| logger.error("GEMINI_API_KEY not found for synthesis tool LLM.") | |
| return "Error: GEMINI_API_KEY not set for synthesis." | |
| # Prompt for the LLM | |
| input_prompt = f"""You are an expert synthesizer. Given the following sub-steps and their answers derived | |
| from an initial objective, produce a single, coherent, comprehensive final report that | |
| addresses the original objective: | |
| --- SUB-STEP RESULTS --- | |
| {summary_blocks.strip()} | |
| --- END SUB-STEP RESULTS --- | |
| Generate the Final Report: | |
| """ | |
| try: | |
| llm = GoogleGenAI(api_key=gemini_api_key, model="gemini-2.5-pro-preview-03-25", temperature=0.05) | |
| logger.info(f"Using synthesis LLM: {synthesizer_llm_model}") | |
| response = llm.complete(input_prompt) | |
| logger.info("Synthesis successful.") | |
| return response.text | |
| except Exception as e: | |
| logger.error(f"LLM call failed during synthesis: {e}", exc_info=True) | |
| return f"Error during synthesis: {e}" | |
| def answer_question(question: str) -> str: | |
| """ | |
| Answer any question by following this strict format: | |
| 1. Include your chain of thought (your reasoning steps). | |
| 2. End your reply with the exact template: | |
| FINAL ANSWER: [YOUR FINAL ANSWER] | |
| YOUR FINAL ANSWER must be: | |
| - A number, or | |
| - As few words as possible, or | |
| - A comma-separated list of numbers and/or strings. | |
| Formatting rules: | |
| * If asked for a number, do not use commas or units (e.g., $, %), unless explicitly requested. | |
| * If asked for a string, do not include articles or abbreviations (e.g., city names), and write digits in plain text. | |
| * If asked for a comma-separated list, apply the above rules to each element. | |
| This tool should be invoked immediately after completing the final planning sub-step. | |
| """ | |
| logger.info(f"Answering question: {question[:100]}") | |
| gemini_api_key = os.getenv("GEMINI_API_KEY") | |
| if not gemini_api_key: | |
| logger.error("GEMINI_API_KEY not set for answer_question tool.") | |
| return "Error: GEMINI_API_KEY not set." | |
| model_name = os.getenv("ANSWER_TOOL_LLM_MODEL", "gemini-2.5-pro-preview-03-25") | |
| # Build the assistant prompt enforcing the required format | |
| assistant_prompt = ( | |
| "You are a general AI assistant. I will ask you a question. " | |
| "Report your thoughts, and finish your answer with the following template: " | |
| "FINAL ANSWER: [YOUR FINAL ANSWER]. " | |
| "YOUR FINAL ANSWER should be a number OR as few words as possible " | |
| "OR a comma separated list of numbers and/or strings. " | |
| "If you are asked for a number, don't use commas for thousands or any units like $ or % unless specified. " | |
| "If you are asked for a string, omit articles and abbreviations, and write digits in plain text. " | |
| "If you are asked for a comma separated list, apply these rules to each element.\n\n" | |
| f"Question: {question}\n" | |
| "Answer:" | |
| ) | |
| try: | |
| llm = GoogleGenAI(api_key=gemini_api_key, model="gemini-2.5-pro-preview-03-25", temperature=0.05) | |
| logger.info(f"Using answer LLM: {model_name}") | |
| response = llm.complete(assistant_prompt) | |
| logger.info("Answer generated successfully.") | |
| return response.text | |
| except Exception as e: | |
| logger.error(f"LLM call failed during answer generation: {e}", exc_info=True) | |
| return f"Error during answer generation: {e}" | |
| # --- Tool Definitions --- | |
| synthesize_tool = FunctionTool.from_defaults( | |
| fn=synthesize_and_report, | |
| name="synthesize_and_report", | |
| description=( | |
| "Aggregates results from multiple sub-steps into a final coherent report. " | |
| "Input: results (List[Dict[str, str]]) where each dict has \"sub_step\" and \"answer\". " | |
| "Output: A unified report (str) or error message." | |
| ), | |
| ) | |
| generate_substeps_tool = FunctionTool.from_defaults( | |
| fn=plan, | |
| name="generate_substeps", | |
| description=( | |
| "Decomposes a high-level objective into a concise roadmap of 4–8 actionable sub-steps using an LLM. " | |
| "Input: objective (str). Output: List of sub-step strings (List[str]) or error list." | |
| ) | |
| ) | |
| answer_question = FunctionTool.from_defaults( | |
| fn=answer_question, | |
| name="answer_question", | |
| description=( | |
| "Answers any question and returns the full text, always ending with " | |
| "‘FINAL ANSWER: ...’ in accordance with the formatting rules." | |
| ), | |
| ) | |
| # --- Agent Initialization --- | |
| def initialize_planner_agent() -> ReActAgent: | |
| """Initializes the Planner Agent.""" | |
| logger.info("Initializing PlannerAgent...") | |
| # Configuration for the agent's main LLM | |
| agent_llm_model = os.getenv("PLANNER_AGENT_LLM_MODEL", "gemini-2.5-pro-preview-03-25") | |
| gemini_api_key = os.getenv("GEMINI_API_KEY") | |
| if not gemini_api_key: | |
| logger.error("GEMINI_API_KEY not found for PlannerAgent.") | |
| raise ValueError("GEMINI_API_KEY must be set for PlannerAgent") | |
| try: | |
| llm = GoogleGenAI(api_key=gemini_api_key, model="gemini-2.5-pro-preview-03-25", temperature=0.05) | |
| logger.info(f"Using agent LLM: {agent_llm_model}") | |
| # Load system prompt | |
| default_system_prompt = ("You are PlannerAgent... [Default prompt content - replace with actual]" # Placeholder | |
| ) | |
| system_prompt = load_prompt_from_file("../prompts/planner_agent_prompt.txt", default_system_prompt) | |
| if system_prompt == default_system_prompt: | |
| logger.warning("Using default/fallback system prompt for PlannerAgent.") | |
| # Define available tools | |
| tools = [generate_substeps_tool, synthesize_tool] | |
| # Define valid handoff targets | |
| valid_handoffs = [ | |
| "code_agent", | |
| "research_agent", | |
| "math_agent", | |
| "role_agent", | |
| "image_analyzer_agent", | |
| "text_analyzer_agent", | |
| "reasoning_agent", | |
| "long_context_management_agent", | |
| "advanced_validation_agent", | |
| "video_analyzer_agent" | |
| ] | |
| agent = ReActAgent( | |
| name="planner_agent", | |
| description=( | |
| "Strategically plans tasks by breaking down objectives into sub-steps using `generate_substeps`. " | |
| "Orchestrates execution by handing off sub-steps to specialized agents. " | |
| "Synthesizes final results using `synthesize_and_report`." | |
| ), | |
| tools=tools, | |
| llm=llm, | |
| system_prompt=system_prompt, | |
| can_handoff_to=valid_handoffs, | |
| ) | |
| logger.info("PlannerAgent initialized successfully.") | |
| return agent | |
| except Exception as e: | |
| logger.error(f"Error during PlannerAgent initialization: {e}", exc_info=True) | |
| raise | |
| # Example usage (for testing if run directly) | |
| if __name__ == "__main__": | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
| logger.info("Running planner_agent.py directly for testing...") | |
| # Ensure API key is set | |
| if not os.getenv("GEMINI_API_KEY"): | |
| print("Error: GEMINI_API_KEY environment variable not set. Cannot run test.") | |
| else: | |
| try: | |
| # Test plan generation | |
| print("\nTesting plan generation...") | |
| test_objective = "Analyze the market trends for electric vehicles in Europe for 2024." | |
| substeps = plan(test_objective) | |
| print(f"Generated Sub-steps:\n{substeps}") | |
| # Test synthesis | |
| print("\nTesting synthesis...") | |
| test_results = [ | |
| {"sub_step": "Identify key EV manufacturers in Europe.", "answer": "Tesla, VW, Stellantis, Renault."}, | |
| {"sub_step": "Find recent sales data.", "answer": "EV sales grew 25% year-over-year in Q1 2024."}, | |
| {"sub_step": "Analyze government incentives.", "answer": "Germany reduced subsidies, France maintained them."} | |
| ] | |
| report = synthesize_and_report(test_results) | |
| print(f"Synthesized Report:\n{report}") | |
| # Initialize the agent (optional) | |
| # test_agent = initialize_planner_agent() | |
| # print("\nPlanner Agent initialized successfully for testing.") | |
| except Exception as e: | |
| print(f"Error during testing: {e}") | |