Spaces:
Sleeping
Sleeping
| """ | |
| Hybrid LLM Service that intelligently routes between Groq and Gemini APIs | |
| based on task complexity and user requirements. | |
| """ | |
| import os | |
| import asyncio | |
| from enum import Enum | |
| from typing import Dict, Any, Optional | |
| import logging | |
| from langchain_groq import ChatGroq | |
| from langchain_google_genai import ChatGoogleGenerativeAI | |
| from langchain_core.messages import HumanMessage, SystemMessage | |
| logger = logging.getLogger(__name__) | |
| class TaskComplexity(Enum): | |
| SIMPLE = "simple" | |
| COMPLEX = "complex" | |
| class LLMProvider(Enum): | |
| GROQ = "groq" | |
| GEMINI = "gemini" | |
| class HybridLLMService: | |
| def __init__(self): | |
| # Initialize Groq (Primary) | |
| self.groq_api_key = os.getenv("GROQ_API_KEY") | |
| self.groq_model = os.getenv("GROQ_MODEL", "llama-3.1-70b-versatile") | |
| if self.groq_api_key: | |
| self.groq_llm = ChatGroq( | |
| groq_api_key=self.groq_api_key, | |
| model_name=self.groq_model, | |
| temperature=0.7 | |
| ) | |
| logger.info(f"β Groq LLM initialized: {self.groq_model}") | |
| else: | |
| self.groq_llm = None | |
| logger.warning("β οΈ Groq API key not found") | |
| # Initialize Gemini (Secondary/Fallback) | |
| self.google_api_key = os.getenv("GOOGLE_API_KEY") | |
| self.gemini_model = os.getenv("GEMINI_MODEL", "gemini-1.5-flash") # Use flash model for free tier | |
| if self.google_api_key: | |
| try: | |
| self.gemini_llm = ChatGoogleGenerativeAI( | |
| model=self.gemini_model, | |
| google_api_key=self.google_api_key, | |
| temperature=0.7 | |
| ) | |
| logger.info(f"β Gemini LLM initialized: {self.gemini_model}") | |
| except Exception as e: | |
| self.gemini_llm = None | |
| logger.warning(f"β οΈ Gemini initialization failed: {e}") | |
| else: | |
| self.gemini_llm = None | |
| logger.warning("β οΈ Google API key not found") | |
| # Hybrid configuration | |
| self.use_hybrid = os.getenv("USE_HYBRID_LLM", "true").lower() == "true" | |
| self.primary_provider = LLMProvider.GROQ # Always use Groq as primary | |
| logger.info(f"π€ Hybrid LLM Service initialized (Primary: {self.primary_provider.value})") | |
| def analyze_task_complexity(self, message: str) -> TaskComplexity: | |
| """Analyze if a task requires complex reasoning or simple response""" | |
| complex_keywords = [ | |
| 'analyze', 'compare', 'evaluate', 'scenario', 'chart', 'graph', | |
| 'visualization', 'complex', 'detailed analysis', 'multi-step', | |
| 'comprehensive', 'in-depth', 'elaborate', 'breakdown' | |
| ] | |
| simple_keywords = [ | |
| 'what is', 'who is', 'when', 'where', 'how to', 'define', | |
| 'explain', 'tell me', 'show me', 'list', 'summary' | |
| ] | |
| message_lower = message.lower() | |
| # Count complex vs simple indicators | |
| complex_score = sum(1 for keyword in complex_keywords if keyword in message_lower) | |
| simple_score = sum(1 for keyword in simple_keywords if keyword in message_lower) | |
| # If message is very long (>200 chars) or has complex keywords, use complex | |
| if len(message) > 200 or complex_score > simple_score: | |
| return TaskComplexity.COMPLEX | |
| return TaskComplexity.SIMPLE | |
| def choose_llm_provider(self, message: str) -> LLMProvider: | |
| """Choose the best LLM provider based on task complexity and availability""" | |
| # If hybrid is disabled, always use primary (Groq) | |
| if not self.use_hybrid: | |
| return LLMProvider.GROQ if self.groq_llm else LLMProvider.GEMINI | |
| # Always prefer Groq for better speed and reliability | |
| if self.groq_llm: | |
| return LLMProvider.GROQ | |
| # Fallback to Gemini only if Groq is not available | |
| if self.gemini_llm: | |
| return LLMProvider.GEMINI | |
| # If neither is available, return Groq (will handle error gracefully) | |
| return LLMProvider.GROQ | |
| async def get_response(self, message: str, context: str = "") -> str: | |
| """Get response from the chosen LLM provider""" | |
| provider = self.choose_llm_provider(message) | |
| complexity = self.analyze_task_complexity(message) | |
| logger.info(f"π― Using {provider.value} for {complexity.value} task") | |
| try: | |
| if provider == LLMProvider.GROQ and self.groq_llm: | |
| return await self._get_groq_response(message, context) | |
| elif provider == LLMProvider.GEMINI and self.gemini_llm: | |
| return await self._get_gemini_response(message, context) | |
| else: | |
| # Fallback logic | |
| if self.groq_llm: | |
| logger.info("π Falling back to Groq") | |
| return await self._get_groq_response(message, context) | |
| elif self.gemini_llm: | |
| logger.info("π Falling back to Gemini") | |
| return await self._get_gemini_response(message, context) | |
| else: | |
| return "I apologize, but no AI providers are currently available. Please check your API keys." | |
| except Exception as e: | |
| logger.error(f"β Error with {provider.value}: {e}") | |
| # Try fallback provider | |
| if provider == LLMProvider.GROQ and self.gemini_llm: | |
| logger.info("π Groq failed, trying Gemini") | |
| try: | |
| return await self._get_gemini_response(message, context) | |
| except Exception as gemini_error: | |
| logger.error(f"β Gemini fallback also failed: {gemini_error}") | |
| return f"I apologize, but I'm experiencing technical difficulties. Both AI providers are currently unavailable." | |
| elif provider == LLMProvider.GEMINI and self.groq_llm: | |
| logger.info("π Gemini failed, trying Groq") | |
| try: | |
| return await self._get_groq_response(message, context) | |
| except Exception as groq_error: | |
| logger.error(f"β Groq fallback also failed: {groq_error}") | |
| return f"I apologize, but I'm experiencing technical difficulties. Both AI providers are currently unavailable." | |
| return f"I apologize, but I encountered an error: {str(e)}" | |
| async def _get_groq_response(self, message: str, context: str = "") -> str: | |
| """Get response from Groq LLM""" | |
| system_prompt = """You are a helpful AI assistant specializing in government policies and procedures. | |
| You have access to government documents and can provide accurate information based on them. | |
| Provide clear, concise, and helpful responses.""" | |
| if context: | |
| system_prompt += f"\n\nRelevant context from documents:\n{context}" | |
| messages = [ | |
| SystemMessage(content=system_prompt), | |
| HumanMessage(content=message) | |
| ] | |
| response = await self.groq_llm.ainvoke(messages) | |
| return response.content | |
| async def _get_gemini_response(self, message: str, context: str = "") -> str: | |
| """Get response from Gemini LLM""" | |
| system_prompt = """You are a helpful AI assistant specializing in government policies and procedures. | |
| You have access to government documents and can provide accurate information based on them. | |
| Provide detailed, analytical responses when needed.""" | |
| if context: | |
| system_prompt += f"\n\nRelevant context from documents:\n{context}" | |
| messages = [ | |
| SystemMessage(content=system_prompt), | |
| HumanMessage(content=message) | |
| ] | |
| response = await self.gemini_llm.ainvoke(messages) | |
| return response.content | |
| async def get_streaming_response(self, message: str, context: str = ""): | |
| """Get streaming response from the chosen LLM provider""" | |
| provider = self.choose_llm_provider(message) | |
| try: | |
| if provider == LLMProvider.GROQ and self.groq_llm: | |
| async for chunk in self._get_groq_streaming_response(message, context): | |
| yield chunk | |
| elif provider == LLMProvider.GEMINI and self.gemini_llm: | |
| async for chunk in self._get_gemini_streaming_response(message, context): | |
| yield chunk | |
| else: | |
| # Fallback to available provider | |
| if self.groq_llm: | |
| async for chunk in self._get_groq_streaming_response(message, context): | |
| yield chunk | |
| else: | |
| yield "No AI providers are currently available." | |
| except Exception as e: | |
| logger.error(f"β Streaming error with {provider.value}: {e}") | |
| # Try fallback | |
| if provider == LLMProvider.GROQ and self.gemini_llm: | |
| try: | |
| async for chunk in self._get_gemini_streaming_response(message, context): | |
| yield chunk | |
| except: | |
| yield f"I apologize, but I'm experiencing technical difficulties." | |
| elif provider == LLMProvider.GEMINI and self.groq_llm: | |
| try: | |
| async for chunk in self._get_groq_streaming_response(message, context): | |
| yield chunk | |
| except: | |
| yield f"I apologize, but I'm experiencing technical difficulties." | |
| else: | |
| yield f"Error: {str(e)}" | |
| async def _get_groq_streaming_response(self, message: str, context: str = ""): | |
| """Get streaming response from Groq""" | |
| system_prompt = """You are a helpful AI assistant specializing in government policies and procedures.""" | |
| if context: | |
| system_prompt += f"\n\nRelevant context:\n{context}" | |
| messages = [ | |
| SystemMessage(content=system_prompt), | |
| HumanMessage(content=message) | |
| ] | |
| # Groq streaming | |
| async for chunk in self.groq_llm.astream(messages): | |
| if chunk.content: | |
| yield chunk.content | |
| await asyncio.sleep(0.01) | |
| async def _get_gemini_streaming_response(self, message: str, context: str = ""): | |
| """Get streaming response from Gemini""" | |
| system_prompt = """You are a helpful AI assistant specializing in government policies and procedures.""" | |
| if context: | |
| system_prompt += f"\n\nRelevant context:\n{context}" | |
| messages = [ | |
| SystemMessage(content=system_prompt), | |
| HumanMessage(content=message) | |
| ] | |
| # Gemini streaming | |
| async for chunk in self.gemini_llm.astream(messages): | |
| if chunk.content: | |
| yield chunk.content | |
| await asyncio.sleep(0.01) | |