PensionBot / hybrid_llm_service.py
ChAbhishek28's picture
Deploy clean Voice Bot backend to HF Spaces
cf02b2b
raw
history blame
11.2 kB
"""
Hybrid LLM Service that intelligently routes between Groq and Gemini APIs
based on task complexity and user requirements.
"""
import os
import asyncio
from enum import Enum
from typing import Dict, Any, Optional
import logging
from langchain_groq import ChatGroq
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.messages import HumanMessage, SystemMessage
logger = logging.getLogger(__name__)
class TaskComplexity(Enum):
SIMPLE = "simple"
COMPLEX = "complex"
class LLMProvider(Enum):
GROQ = "groq"
GEMINI = "gemini"
class HybridLLMService:
def __init__(self):
# Initialize Groq (Primary)
self.groq_api_key = os.getenv("GROQ_API_KEY")
self.groq_model = os.getenv("GROQ_MODEL", "llama-3.1-70b-versatile")
if self.groq_api_key:
self.groq_llm = ChatGroq(
groq_api_key=self.groq_api_key,
model_name=self.groq_model,
temperature=0.7
)
logger.info(f"βœ… Groq LLM initialized: {self.groq_model}")
else:
self.groq_llm = None
logger.warning("⚠️ Groq API key not found")
# Initialize Gemini (Secondary/Fallback)
self.google_api_key = os.getenv("GOOGLE_API_KEY")
self.gemini_model = os.getenv("GEMINI_MODEL", "gemini-1.5-flash") # Use flash model for free tier
if self.google_api_key:
try:
self.gemini_llm = ChatGoogleGenerativeAI(
model=self.gemini_model,
google_api_key=self.google_api_key,
temperature=0.7
)
logger.info(f"βœ… Gemini LLM initialized: {self.gemini_model}")
except Exception as e:
self.gemini_llm = None
logger.warning(f"⚠️ Gemini initialization failed: {e}")
else:
self.gemini_llm = None
logger.warning("⚠️ Google API key not found")
# Hybrid configuration
self.use_hybrid = os.getenv("USE_HYBRID_LLM", "true").lower() == "true"
self.primary_provider = LLMProvider.GROQ # Always use Groq as primary
logger.info(f"πŸ€– Hybrid LLM Service initialized (Primary: {self.primary_provider.value})")
def analyze_task_complexity(self, message: str) -> TaskComplexity:
"""Analyze if a task requires complex reasoning or simple response"""
complex_keywords = [
'analyze', 'compare', 'evaluate', 'scenario', 'chart', 'graph',
'visualization', 'complex', 'detailed analysis', 'multi-step',
'comprehensive', 'in-depth', 'elaborate', 'breakdown'
]
simple_keywords = [
'what is', 'who is', 'when', 'where', 'how to', 'define',
'explain', 'tell me', 'show me', 'list', 'summary'
]
message_lower = message.lower()
# Count complex vs simple indicators
complex_score = sum(1 for keyword in complex_keywords if keyword in message_lower)
simple_score = sum(1 for keyword in simple_keywords if keyword in message_lower)
# If message is very long (>200 chars) or has complex keywords, use complex
if len(message) > 200 or complex_score > simple_score:
return TaskComplexity.COMPLEX
return TaskComplexity.SIMPLE
def choose_llm_provider(self, message: str) -> LLMProvider:
"""Choose the best LLM provider based on task complexity and availability"""
# If hybrid is disabled, always use primary (Groq)
if not self.use_hybrid:
return LLMProvider.GROQ if self.groq_llm else LLMProvider.GEMINI
# Always prefer Groq for better speed and reliability
if self.groq_llm:
return LLMProvider.GROQ
# Fallback to Gemini only if Groq is not available
if self.gemini_llm:
return LLMProvider.GEMINI
# If neither is available, return Groq (will handle error gracefully)
return LLMProvider.GROQ
async def get_response(self, message: str, context: str = "") -> str:
"""Get response from the chosen LLM provider"""
provider = self.choose_llm_provider(message)
complexity = self.analyze_task_complexity(message)
logger.info(f"🎯 Using {provider.value} for {complexity.value} task")
try:
if provider == LLMProvider.GROQ and self.groq_llm:
return await self._get_groq_response(message, context)
elif provider == LLMProvider.GEMINI and self.gemini_llm:
return await self._get_gemini_response(message, context)
else:
# Fallback logic
if self.groq_llm:
logger.info("πŸ”„ Falling back to Groq")
return await self._get_groq_response(message, context)
elif self.gemini_llm:
logger.info("πŸ”„ Falling back to Gemini")
return await self._get_gemini_response(message, context)
else:
return "I apologize, but no AI providers are currently available. Please check your API keys."
except Exception as e:
logger.error(f"❌ Error with {provider.value}: {e}")
# Try fallback provider
if provider == LLMProvider.GROQ and self.gemini_llm:
logger.info("πŸ”„ Groq failed, trying Gemini")
try:
return await self._get_gemini_response(message, context)
except Exception as gemini_error:
logger.error(f"❌ Gemini fallback also failed: {gemini_error}")
return f"I apologize, but I'm experiencing technical difficulties. Both AI providers are currently unavailable."
elif provider == LLMProvider.GEMINI and self.groq_llm:
logger.info("πŸ”„ Gemini failed, trying Groq")
try:
return await self._get_groq_response(message, context)
except Exception as groq_error:
logger.error(f"❌ Groq fallback also failed: {groq_error}")
return f"I apologize, but I'm experiencing technical difficulties. Both AI providers are currently unavailable."
return f"I apologize, but I encountered an error: {str(e)}"
async def _get_groq_response(self, message: str, context: str = "") -> str:
"""Get response from Groq LLM"""
system_prompt = """You are a helpful AI assistant specializing in government policies and procedures.
You have access to government documents and can provide accurate information based on them.
Provide clear, concise, and helpful responses."""
if context:
system_prompt += f"\n\nRelevant context from documents:\n{context}"
messages = [
SystemMessage(content=system_prompt),
HumanMessage(content=message)
]
response = await self.groq_llm.ainvoke(messages)
return response.content
async def _get_gemini_response(self, message: str, context: str = "") -> str:
"""Get response from Gemini LLM"""
system_prompt = """You are a helpful AI assistant specializing in government policies and procedures.
You have access to government documents and can provide accurate information based on them.
Provide detailed, analytical responses when needed."""
if context:
system_prompt += f"\n\nRelevant context from documents:\n{context}"
messages = [
SystemMessage(content=system_prompt),
HumanMessage(content=message)
]
response = await self.gemini_llm.ainvoke(messages)
return response.content
async def get_streaming_response(self, message: str, context: str = ""):
"""Get streaming response from the chosen LLM provider"""
provider = self.choose_llm_provider(message)
try:
if provider == LLMProvider.GROQ and self.groq_llm:
async for chunk in self._get_groq_streaming_response(message, context):
yield chunk
elif provider == LLMProvider.GEMINI and self.gemini_llm:
async for chunk in self._get_gemini_streaming_response(message, context):
yield chunk
else:
# Fallback to available provider
if self.groq_llm:
async for chunk in self._get_groq_streaming_response(message, context):
yield chunk
else:
yield "No AI providers are currently available."
except Exception as e:
logger.error(f"❌ Streaming error with {provider.value}: {e}")
# Try fallback
if provider == LLMProvider.GROQ and self.gemini_llm:
try:
async for chunk in self._get_gemini_streaming_response(message, context):
yield chunk
except:
yield f"I apologize, but I'm experiencing technical difficulties."
elif provider == LLMProvider.GEMINI and self.groq_llm:
try:
async for chunk in self._get_groq_streaming_response(message, context):
yield chunk
except:
yield f"I apologize, but I'm experiencing technical difficulties."
else:
yield f"Error: {str(e)}"
async def _get_groq_streaming_response(self, message: str, context: str = ""):
"""Get streaming response from Groq"""
system_prompt = """You are a helpful AI assistant specializing in government policies and procedures."""
if context:
system_prompt += f"\n\nRelevant context:\n{context}"
messages = [
SystemMessage(content=system_prompt),
HumanMessage(content=message)
]
# Groq streaming
async for chunk in self.groq_llm.astream(messages):
if chunk.content:
yield chunk.content
await asyncio.sleep(0.01)
async def _get_gemini_streaming_response(self, message: str, context: str = ""):
"""Get streaming response from Gemini"""
system_prompt = """You are a helpful AI assistant specializing in government policies and procedures."""
if context:
system_prompt += f"\n\nRelevant context:\n{context}"
messages = [
SystemMessage(content=system_prompt),
HumanMessage(content=message)
]
# Gemini streaming
async for chunk in self.gemini_llm.astream(messages):
if chunk.content:
yield chunk.content
await asyncio.sleep(0.01)