Spaces:
Sleeping
Sleeping
File size: 21,709 Bytes
cf02b2b b971859 cf02b2b 159ca63 cf02b2b 9a54a21 cf02b2b b971859 cf02b2b b971859 cf02b2b 159ca63 cf02b2b 159ca63 cf02b2b d43698c ab738e0 d43698c 9a54a21 cf02b2b f209836 cf02b2b f209836 cf02b2b f209836 cf02b2b f209836 cf02b2b f209836 cf02b2b f209836 cf02b2b 3f1265d 9a54a21 f209836 b7bbf7a f209836 9a54a21 f209836 b7bbf7a 9a54a21 f209836 9a54a21 20ba800 f209836 3f1265d cf02b2b 9a54a21 159ca63 cf02b2b 3f1265d 9a54a21 cf02b2b 5e02cec cf02b2b 3f1265d cf02b2b f209836 20ba800 cf02b2b 20ba800 cf02b2b 20ba800 cf02b2b f209836 20ba800 cf02b2b 20ba800 cf02b2b 20ba800 cf02b2b 5e02cec cf02b2b 159ca63 cf02b2b 5e02cec cf02b2b 3f1265d cf02b2b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 |
"""
Hybrid LLM Service that intelligently routes between Groq and Gemini APIs
based on task complexity and user requirements.
"""
import os
import asyncio
from enum import Enum
from typing import Dict, Any, Optional
import logging
from langchain_groq import ChatGroq
# Temporarily disabled due to protobuf issues
# from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.messages import HumanMessage, SystemMessage
logger = logging.getLogger(__name__)
class TaskComplexity(Enum):
SIMPLE = "simple"
COMPLEX = "complex"
class LLMProvider(Enum):
GROQ = "groq"
GEMINI = "gemini"
class HybridLLMService:
def __init__(self):
# Initialize Groq (Primary)
self.groq_api_key = os.getenv("GROQ_API_KEY")
self.groq_model = os.getenv("GROQ_MODEL", "llama-3.1-8b-instant") # Updated to supported model
# Rajasthan Rule Assistant Language Detection
self.hindi_keywords = [
'क्या', 'कैसे', 'कब', 'कहाँ', 'किसको', 'पेंशन', 'नियम', 'राजस्थान',
'सरकार', 'नीति', 'योजना', 'लाभ', 'पात्रता', 'आवेदन', 'फॉर्म',
'दस्तावेज', 'प्रक्रिया', 'अधिकारी', 'विभाग', 'कार्यालय', 'अनुमोदन',
'सेवा', 'कर्मचारी', 'अधिकार', 'कानून', 'परिपत्र', 'आदेश', 'गजट'
]
if self.groq_api_key:
self.groq_llm = ChatGroq(
groq_api_key=self.groq_api_key,
model_name=self.groq_model,
temperature=0.7
)
logger.info(f"✅ Groq LLM initialized: {self.groq_model}")
else:
self.groq_llm = None
logger.warning("⚠️ Groq API key not found")
# Initialize Gemini (Secondary/Fallback) - TEMPORARILY DISABLED DUE TO PROTOBUF ISSUES
self.google_api_key = os.getenv("GOOGLE_API_KEY")
self.gemini_model = os.getenv("GEMINI_MODEL", "gemini-1.5-flash") # Use flash model for free tier
# Temporarily disabled due to protobuf compatibility issues
self.gemini_llm = None
logger.warning("⚠️ Gemini temporarily disabled due to protobuf issues")
# if self.google_api_key:
# try:
# self.gemini_llm = ChatGoogleGenerativeAI(
# model=self.gemini_model,
# google_api_key=self.google_api_key,
# temperature=0.7
# )
# logger.info(f"✅ Gemini LLM initialized: {self.gemini_model}")
# except Exception as e:
# self.gemini_llm = None
# logger.warning(f"⚠️ Gemini initialization failed: {e}")
# else:
# self.gemini_llm = None
# logger.warning("⚠️ Google API key not found")
# Hybrid configuration
self.use_hybrid = os.getenv("USE_HYBRID_LLM", "true").lower() == "true"
self.primary_provider = LLMProvider.GROQ # Always use Groq as primary
# Check if we have any working providers
if not self.groq_llm and not self.gemini_llm:
logger.error("❌ No LLM providers available! Please check your API keys.")
logger.error("❌ Set GROQ_API_KEY and/or GOOGLE_API_KEY environment variables")
logger.info(f"🤖 Hybrid LLM Service initialized (Primary: {self.primary_provider.value})")
logger.info(f"📊 Available providers - Groq: {self.groq_llm is not None}, Gemini: {self.gemini_llm is not None}")
def get_provider_info(self):
"""Get information about available LLM providers"""
return {
"primary_provider": self.primary_provider.value,
"groq_available": self.groq_llm is not None,
"gemini_available": self.gemini_llm is not None,
"groq_model": self.groq_model if self.groq_llm else None,
"gemini_model": self.gemini_model if self.gemini_llm else None,
"hybrid_enabled": self.use_hybrid, # Changed from 'use_hybrid' to 'hybrid_enabled'
"fast_provider": "groq", # Add fast_provider for compatibility
"complex_provider": "gemini" # Add complex_provider for compatibility
}
def detect_language(self, message: str) -> str:
"""Detect if the query is in Hindi and return response language preference"""
message_lower = message.lower()
# Check for Hindi keywords
hindi_matches = sum(1 for keyword in self.hindi_keywords if keyword in message)
# Check for Devanagari script
devanagari_chars = sum(1 for char in message if '\u0900' <= char <= '\u097F')
if hindi_matches >= 2 or devanagari_chars >= 5:
return "hindi"
elif hindi_matches >= 1 or devanagari_chars >= 2:
return "bilingual" # Mix of Hindi and English
else:
return "english"
def analyze_task_complexity(self, message: str) -> TaskComplexity:
"""Analyze if a task requires complex reasoning or simple response"""
message_lower = message.lower()
# Impact analysis and scenario keywords (always complex)
impact_keywords = [
'impact', 'effect', 'scenario', 'chart', 'graph', 'visualization',
'analyze', 'compare', 'evaluate', 'breakdown', 'simulation',
'projection', 'forecast', 'calculation', 'calculate'
]
# Policy overview keywords (complex for comprehensive responses)
overview_keywords = [
'policies', 'schemes', 'types of', 'categories', 'overview',
'comprehensive', 'detailed', 'all about', 'everything about',
'complete guide', 'full information'
]
# Complex analysis keywords
complex_keywords = [
'detailed analysis', 'multi-step', 'in-depth', 'elaborate',
'comprehensive analysis', 'step by step', 'procedure',
'process', 'workflow', 'implementation'
]
# Simple definition keywords
simple_keywords = [
'what is', 'who is', 'when is', 'where is', 'define',
'meaning', 'definition', 'brief', 'quick', 'simple'
]
# Check for impact/scenario analysis (highest priority)
if any(keyword in message_lower for keyword in impact_keywords):
return TaskComplexity.COMPLEX
# Check for policy overview questions (need comprehensive responses)
if any(keyword in message_lower for keyword in overview_keywords):
return TaskComplexity.COMPLEX
# Check for other complex requests
complex_score = sum(1 for keyword in complex_keywords if keyword in message_lower)
simple_score = sum(1 for keyword in simple_keywords if keyword in message_lower)
# If message is very long or has complex keywords, use complex
if len(message) > 150 or complex_score > simple_score:
return TaskComplexity.COMPLEX
return TaskComplexity.SIMPLE
def _create_adaptive_system_prompt(self, message: str, user_role: str = "citizen",
language_preference: str = "hindi") -> str:
"""Create system prompt adapted to the type of query, user role, and language preference"""
# Detect actual language from message, override preference if needed
detected_language = self.detect_language(message)
# If user types in English, always respond in English regardless of preference
if detected_language == "english":
language_pref = "english"
else:
language_pref = language_preference
# Language-specific instructions
if language_pref == "hindi":
language_instruction = """
IMPORTANT LANGUAGE INSTRUCTION: The user has asked in Hindi. Please respond in Hindi (देवनागरी script).
Provide all explanations, procedures, and details in Hindi language. Use English only for:
- Technical terms that don't have direct Hindi equivalents
- Rule numbers and official references
- Amounts and calculations (but explain in Hindi)"""
elif language_pref == "bilingual":
language_instruction = """
IMPORTANT LANGUAGE INSTRUCTION: The user is using both Hindi and English. Please respond in a bilingual format:
- Main explanations in Hindi (देवनागरी script)
- Include English translations for key terms in brackets
- Use both languages naturally as appropriate"""
else:
language_instruction = """
IMPORTANT LANGUAGE INSTRUCTION: The user has asked in English. Please respond ONLY in English language.
Do not mix Hindi text unnecessarily. Keep responses professional and in English only.
Only use Hindi if specifically requested by the user."""
# Role-specific guidance
role_instructions = {
"citizen": """
USER ROLE: General Citizen (सामान्य नागरिक):
- Explain complex government procedures in simple terms
- Focus on eligibility criteria and application processes
- Provide step-by-step guidance for common procedures
- Include contact information and office locations
- Emphasize required documents and timelines""",
"employee": """
USER ROLE: Government Employee (सरकारी कर्मचारी):
- Provide detailed information about service rules and benefits
- Include specific rule references and circular numbers
- Focus on career progression, transfers, and service matters
- Explain pension calculations and retirement procedures
- Address leave policies and salary-related queries""",
"officer": """
USER ROLE: Government Officer (सरकारी अधिकारी):
- Provide comprehensive policy analysis and implementation guidance
- Include administrative procedures and delegation of powers
- Focus on policy interpretation and decision-making frameworks
- Explain departmental procedures and inter-departmental coordination
- Address complex administrative and legal matters""",
"pensioner": """
USER ROLE: Pensioner (पेंशनर):
- Focus on pension disbursement and related issues
- Explain revision processes and arrears calculations
- Address medical allowances and family pension matters
- Provide information about pension grievance procedures
- Include details about documentation and verification processes"""
}
role_instruction = role_instructions.get(user_role, role_instructions["citizen"])
base_prompt = f"""You are the Rajasthan Rule Assistant (राजस्थान नियम सहायक), an expert AI assistant specializing in Rajasthan government policies, procedures, and pension-related matters.
{language_instruction}
{role_instruction}
Key Guidelines:
- Always provide accurate information based on government documents
- For pension calculations, show step-by-step breakdown with formulas
- Include relevant rule numbers and circular references
- Provide practical, actionable guidance
- When uncertain, acknowledge limitations and suggest consulting officials
- For Rajasthan-specific queries, prioritize state government rules and circulars"""
return base_prompt
def determine_task_complexity(self, message: str, context: str = "") -> TaskComplexity:
"""Determine task complexity - alias for analyze_task_complexity for compatibility"""
return self.analyze_task_complexity(message)
def choose_llm_provider(self, message: str) -> LLMProvider:
"""Choose the best LLM provider based on task complexity and availability"""
# If hybrid is disabled, always use primary (Groq)
if not self.use_hybrid:
return LLMProvider.GROQ if self.groq_llm else LLMProvider.GEMINI
# Always prefer Groq for better speed and reliability
if self.groq_llm:
return LLMProvider.GROQ
# Fallback to Gemini only if Groq is not available
if self.gemini_llm:
return LLMProvider.GEMINI
# If neither is available, return Groq (will handle error gracefully)
return LLMProvider.GROQ
async def get_response(self, message: str, context: str = "", system_prompt: str = None,
user_role: str = "citizen", language_preference: str = "hindi") -> str:
"""Get response from the chosen LLM provider with role and language context"""
# Check if any providers are available
if not self.groq_llm and not self.gemini_llm:
return """I apologize, but I'm currently unable to process your request due to configuration issues.
Please ensure that the following environment variables are properly set:
- GROQ_API_KEY: For Groq LLM service
- GOOGLE_API_KEY: For Gemini LLM service
At least one of these API keys is required for the Voice Bot to function properly."""
provider = self.choose_llm_provider(message)
complexity = self.analyze_task_complexity(message)
provider_name = provider.value if provider else "unknown"
complexity_name = complexity.value if complexity else "unknown"
logger.info(f"🎯 Using {provider_name} for {complexity_name} task | Role: {user_role} | Language: {language_preference}")
# Create adaptive system prompt with role and language context
if not system_prompt:
system_prompt = self._create_adaptive_system_prompt(message, user_role, language_preference)
# If system_prompt is provided, prepend it to the context
if system_prompt:
context = f"{system_prompt}\n\n{context}" if context else system_prompt
try:
if provider == LLMProvider.GROQ and self.groq_llm:
return await self._get_groq_response(message, context)
elif provider == LLMProvider.GEMINI and self.gemini_llm:
return await self._get_gemini_response(message, context)
else:
# Fallback logic
if self.groq_llm:
logger.info("🔄 Falling back to Groq")
return await self._get_groq_response(message, context)
elif self.gemini_llm:
logger.info("🔄 Falling back to Gemini")
return await self._get_gemini_response(message, context)
else:
return "I apologize, but no AI providers are currently available. Please check your API keys."
except Exception as e:
provider_name = provider.value if provider else "unknown"
logger.error(f"❌ Error with {provider_name}: {e}")
# Try fallback provider
if provider == LLMProvider.GROQ and self.gemini_llm:
logger.info("🔄 Groq failed, trying Gemini")
try:
return await self._get_gemini_response(message, context)
except Exception as gemini_error:
logger.error(f"❌ Gemini fallback also failed: {gemini_error}")
return f"I apologize, but I'm experiencing technical difficulties. Both AI providers are currently unavailable."
elif provider == LLMProvider.GEMINI and self.groq_llm:
logger.info("🔄 Gemini failed, trying Groq")
try:
return await self._get_groq_response(message, context)
except Exception as groq_error:
logger.error(f"❌ Groq fallback also failed: {groq_error}")
return f"I apologize, but I'm experiencing technical difficulties. Both AI providers are currently unavailable."
return f"I apologize, but I encountered an error: {str(e)}"
async def _get_groq_response(self, message: str, context: str = "") -> str:
"""Get response from Groq LLM"""
# Create context-aware system prompt based on query type
system_prompt = self._create_adaptive_system_prompt(message) or ""
if context and system_prompt:
system_prompt += f"\n\nRelevant context from documents:\n{context}"
elif context:
system_prompt = f"Relevant context from documents:\n{context}"
messages = [
SystemMessage(content=system_prompt),
HumanMessage(content=message)
]
response = await self.groq_llm.ainvoke(messages)
return response.content
async def _get_gemini_response(self, message: str, context: str = "") -> str:
"""Get response from Gemini LLM"""
# Create context-aware system prompt based on query type
system_prompt = self._create_adaptive_system_prompt(message) or ""
if context and system_prompt:
system_prompt += f"\n\nRelevant context from documents:\n{context}"
elif context:
system_prompt = f"Relevant context from documents:\n{context}"
messages = [
SystemMessage(content=system_prompt),
HumanMessage(content=message)
]
response = await self.gemini_llm.ainvoke(messages)
return response.content
async def get_streaming_response(self, message: str, context: str = "", system_prompt: str = None):
"""Get streaming response from the chosen LLM provider"""
# Check if any providers are available
if not self.groq_llm and not self.gemini_llm:
yield """I apologize, but I'm currently unable to process your request due to configuration issues.
Please ensure that the following environment variables are properly set:
- GROQ_API_KEY: For Groq LLM service
- GOOGLE_API_KEY: For Gemini LLM service
At least one of these API keys is required for the Voice Bot to function properly."""
return
provider = self.choose_llm_provider(message)
# If system_prompt is provided, prepend it to the context
if system_prompt:
context = f"{system_prompt}\n\n{context}" if context else system_prompt
try:
if provider == LLMProvider.GROQ and self.groq_llm:
async for chunk in self._get_groq_streaming_response(message, context):
yield chunk
elif provider == LLMProvider.GEMINI and self.gemini_llm:
async for chunk in self._get_gemini_streaming_response(message, context):
yield chunk
else:
# Fallback to available provider
if self.groq_llm:
async for chunk in self._get_groq_streaming_response(message, context):
yield chunk
else:
yield "No AI providers are currently available."
except Exception as e:
provider_name = provider.value if provider else "unknown"
logger.error(f"❌ Streaming error with {provider_name}: {e}")
# Try fallback
if provider == LLMProvider.GROQ and self.gemini_llm:
try:
async for chunk in self._get_gemini_streaming_response(message, context):
yield chunk
except:
yield f"I apologize, but I'm experiencing technical difficulties."
elif provider == LLMProvider.GEMINI and self.groq_llm:
try:
async for chunk in self._get_groq_streaming_response(message, context):
yield chunk
except:
yield f"I apologize, but I'm experiencing technical difficulties."
else:
yield f"Error: {str(e)}"
async def _get_groq_streaming_response(self, message: str, context: str = ""):
"""Get streaming response from Groq"""
system_prompt = """You are a helpful AI assistant specializing in government policies and procedures."""
if context:
system_prompt += f"\n\nRelevant context:\n{context}"
messages = [
SystemMessage(content=system_prompt),
HumanMessage(content=message)
]
# Groq streaming
async for chunk in self.groq_llm.astream(messages):
if chunk.content:
yield chunk.content
await asyncio.sleep(0.01)
async def _get_gemini_streaming_response(self, message: str, context: str = ""):
"""Get streaming response from Gemini"""
system_prompt = """You are a helpful AI assistant specializing in government policies and procedures."""
if context:
system_prompt += f"\n\nRelevant context:\n{context}"
messages = [
SystemMessage(content=system_prompt),
HumanMessage(content=message)
]
# Gemini streaming
async for chunk in self.gemini_llm.astream(messages):
if chunk.content:
yield chunk.content
await asyncio.sleep(0.01)
|