Spaces:
Sleeping
Sleeping
| from agents.base import BaseAgent | |
| from datetime import datetime | |
| class AggregatorAgent(BaseAgent): | |
| def __init__(self, gemini_agent): | |
| self.gemini = gemini_agent | |
| def summarize_database_output(self, table_text, user_message): | |
| prompt = ( | |
| "You are a helpful and friendly assistant. Organize and present the following database table for the user in a clear, concise, and visually appealing summary. " | |
| "Use bullet points, short tables, or lists if helpful, and add emojis to enhance readability where appropriate. " | |
| "Focus only on answering the user's request; do not add extra information. " | |
| "If the table is empty, reply with a polite message that no data was found. " | |
| "Always answer in the exact same language as the user's question.\n\n" | |
| f"User question: {user_message}\n" | |
| f"Database output:\n{table_text}\n" | |
| ) | |
| result = self.gemini.invoke([{"role": "user", "content": prompt}]) | |
| return result.content if hasattr(result, "content") else str(result) | |
| def summarize_multiple_agents(self, agent_outputs, user_message): | |
| prompt = ( | |
| "You are a helpful and friendly assistant. The following are responses from multiple support agents to the same user query. " | |
| "Please combine, reorganize, and summarize all relevant information into a single, concise, and easy-to-read reply for the user. " | |
| "Remove any duplicate or redundant details. Use a friendly tone, and you may add emojis to make the message more engaging. " | |
| "If there are conflicting answers, use the most accurate or comprehensive information.\n\n" | |
| "Always answer in the exact same language as the user's question.\n" | |
| f"User question: {user_message}\n" | |
| "Agent outputs:\n" | |
| ) | |
| for label, text in agent_outputs: | |
| prompt += f"[{label}]: {text}\n" | |
| result = self.gemini.invoke([{"role": "user", "content": prompt}]) | |
| return result.content if hasattr(result, "content") else str(result) | |
| def process(self, state: dict) -> dict: | |
| assigned_agents = state.get("assigned_agents", []) | |
| user_message = state.get("user_message", "") | |
| log_msg = [] | |
| now = datetime.now().isoformat(timespec='seconds') | |
| log_msg.append(f"[{now}] [AggregatorAgent] Processing assigned_agents: {[a['agent'] for a in assigned_agents]}") | |
| waiting = [a["agent"] for a in assigned_agents if not a.get("result")] | |
| if waiting: | |
| log_msg.append(f"[{now}] [AggregatorAgent] Waiting for agents: {waiting}") | |
| return {"logs": log_msg} | |
| if len(assigned_agents) == 1: | |
| agent = assigned_agents[0]["agent"] | |
| result = assigned_agents[0].get("result", "") | |
| if agent != "coffee_db_agent": | |
| log_msg.append(f"[{now}] [AggregatorAgent] Single agent, passing through result.") | |
| return {"final_response": result, "logs": log_msg} | |
| summary = self.summarize_database_output(result, user_message) | |
| log_msg.append(f"[{now}] [AggregatorAgent] Summarized database output.") | |
| return {"final_response": summary, "logs": log_msg} | |
| agent_outputs = [] | |
| for a in assigned_agents: | |
| agent = a["agent"] | |
| result = a.get("result", "") | |
| if result: | |
| agent_outputs.append((agent, result)) | |
| summary = self.summarize_multiple_agents(agent_outputs, user_message) | |
| log_msg.append(f"[{now}] [AggregatorAgent] Summarized multi-agent output.") | |
| return {"final_response": summary.strip(), "logs": log_msg} | |