CafeAgentX / agents /aggregator_agent.py
LKTs's picture
Upload 65 files
ba71fca verified
raw
history blame
3.7 kB
from agents.base import BaseAgent
from datetime import datetime
class AggregatorAgent(BaseAgent):
def __init__(self, gemini_agent):
self.gemini = gemini_agent
def summarize_database_output(self, table_text, user_message):
prompt = (
"You are a helpful and friendly assistant. Organize and present the following database table for the user in a clear, concise, and visually appealing summary. "
"Use bullet points, short tables, or lists if helpful, and add emojis to enhance readability where appropriate. "
"Focus only on answering the user's request; do not add extra information. "
"If the table is empty, reply with a polite message that no data was found. "
"Always answer in the exact same language as the user's question.\n\n"
f"User question: {user_message}\n"
f"Database output:\n{table_text}\n"
)
result = self.gemini.invoke([{"role": "user", "content": prompt}])
return result.content if hasattr(result, "content") else str(result)
def summarize_multiple_agents(self, agent_outputs, user_message):
prompt = (
"You are a helpful and friendly assistant. The following are responses from multiple support agents to the same user query. "
"Please combine, reorganize, and summarize all relevant information into a single, concise, and easy-to-read reply for the user. "
"Remove any duplicate or redundant details. Use a friendly tone, and you may add emojis to make the message more engaging. "
"If there are conflicting answers, use the most accurate or comprehensive information.\n\n"
"Always answer in the exact same language as the user's question.\n"
f"User question: {user_message}\n"
"Agent outputs:\n"
)
for label, text in agent_outputs:
prompt += f"[{label}]: {text}\n"
result = self.gemini.invoke([{"role": "user", "content": prompt}])
return result.content if hasattr(result, "content") else str(result)
def process(self, state: dict) -> dict:
assigned_agents = state.get("assigned_agents", [])
user_message = state.get("user_message", "")
log_msg = []
now = datetime.now().isoformat(timespec='seconds')
log_msg.append(f"[{now}] [AggregatorAgent] Processing assigned_agents: {[a['agent'] for a in assigned_agents]}")
waiting = [a["agent"] for a in assigned_agents if not a.get("result")]
if waiting:
log_msg.append(f"[{now}] [AggregatorAgent] Waiting for agents: {waiting}")
return {"logs": log_msg}
if len(assigned_agents) == 1:
agent = assigned_agents[0]["agent"]
result = assigned_agents[0].get("result", "")
if agent != "coffee_db_agent":
log_msg.append(f"[{now}] [AggregatorAgent] Single agent, passing through result.")
return {"final_response": result, "logs": log_msg}
summary = self.summarize_database_output(result, user_message)
log_msg.append(f"[{now}] [AggregatorAgent] Summarized database output.")
return {"final_response": summary, "logs": log_msg}
agent_outputs = []
for a in assigned_agents:
agent = a["agent"]
result = a.get("result", "")
if result:
agent_outputs.append((agent, result))
summary = self.summarize_multiple_agents(agent_outputs, user_message)
log_msg.append(f"[{now}] [AggregatorAgent] Summarized multi-agent output.")
return {"final_response": summary.strip(), "logs": log_msg}