Spaces:
Sleeping
Sleeping
| # Version: Text Bot Support v2.0 - Enhanced routing for text clients | |
| import os | |
| import logging | |
| import time | |
| from datetime import datetime | |
| from contextlib import asynccontextmanager | |
| from fastapi import FastAPI, WebSocket, HTTPException, Request | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import JSONResponse, FileResponse | |
| from websocket_handler import handle_websocket_connection | |
| from enhanced_websocket_handler import handle_enhanced_websocket_connection | |
| from conversational_websocket_handler import handle_conversational_websocket | |
| from hybrid_llm_service import HybridLLMService | |
| from voice_service import VoiceService | |
| from groq_voice_service import groq_voice_service # Import the new Groq voice service | |
| from rag_service import search_documents_async | |
| from lancedb_service import LanceDBService | |
| from scenario_analysis_service import ScenarioAnalysisService | |
| from evidence_pack_export import export_evidence_pack_pdf, export_evidence_pack_csv | |
| def transform_message_to_evidence_pack(raw_data): | |
| """Transform message data to evidence pack format""" | |
| try: | |
| # Extract relevant information from the message | |
| message_text = "" | |
| sources = [] | |
| user_role = "citizen" # default role | |
| if isinstance(raw_data, dict): | |
| # Handle different message formats | |
| if 'text' in raw_data: | |
| message_text = raw_data.get('text', '') | |
| elif 'content' in raw_data: | |
| message_text = raw_data.get('content', '') | |
| elif 'message' in raw_data: | |
| message_text = raw_data.get('message', '') | |
| else: | |
| message_text = str(raw_data) | |
| # Extract user role from the data | |
| user_role = raw_data.get('user_role', raw_data.get('role', 'citizen')) | |
| # Extract sources if available | |
| if 'sources' in raw_data: | |
| sources = raw_data.get('sources', []) | |
| elif 'relevant_docs' in raw_data: | |
| sources = raw_data.get('relevant_docs', []) | |
| else: | |
| message_text = str(raw_data) | |
| # Create evidence pack data structure with role-based customization | |
| evidence_data = { | |
| "clause_text": extract_clause_from_message(message_text, user_role), | |
| "summary": create_summary_from_message(message_text, user_role), | |
| "role_checklist": extract_checklist_from_message(message_text, user_role), | |
| "source_title": f"Rajasthan Pension Rules - Voice Bot Response ({user_role.title()})", | |
| "clause_id": f"VB_{user_role.upper()}_{datetime.now().strftime('%Y%m%d_%H%M%S')}", | |
| "date": datetime.now().strftime("%Y-%m-%d"), | |
| "url": "https://chabhishek28-pensionbot.hf.space", | |
| "original_query": raw_data.get('query', '') if isinstance(raw_data, dict) else '', | |
| "user_role": user_role, | |
| "sources": sources, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| return evidence_data | |
| except Exception as e: | |
| logger.error(f"Error transforming message to evidence pack: {e}") | |
| # Return default structure | |
| return { | |
| "clause_text": "Voice bot conversation response", | |
| "summary": "Rajasthan Pension Rules: AI Assistant Response", | |
| "role_checklist": ["Review AI response", "Consult official documentation", "Verify with pension department"], | |
| "source_title": "Voice Bot AI Assistant", | |
| "clause_id": f"VB_{datetime.now().strftime('%Y%m%d_%H%M%S')}", | |
| "date": datetime.now().strftime("%Y-%m-%d"), | |
| "url": "https://chabhishek28-pensionbot.hf.space", | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| def extract_clause_from_message(text, user_role="citizen"): | |
| """Extract or generate clause information from message text based on user role""" | |
| # Look for pension-related keywords to categorize | |
| text_lower = text.lower() | |
| if any(word in text_lower for word in ['pension', 'retirement', 'superannuation']): | |
| if 'commutation' in text_lower: | |
| if user_role == "employee": | |
| return "Rajasthan Pension Rules - Commutation of Pension (Employee): As a government employee, you may commute up to one-third of your pension. Calculate commutation value using prescribed formula: (Pension Γ Commutation Factor Γ 12)." | |
| elif user_role == "officer": | |
| return "Rajasthan Pension Rules - Commutation of Pension (Officer): Administrative guidance: Officers may authorize commutation up to 1/3rd pension. Ensure proper documentation and verification before approval." | |
| elif user_role == "pensioner": | |
| return "Rajasthan Pension Rules - Commutation of Pension (Pensioner): Your commuted pension amount is calculated based on age at retirement and commutation factors. Contact pension disbursing office for arrears or revision queries." | |
| else: | |
| return "Rajasthan Pension Rules - Commutation of Pension (Citizen): Eligible government employees may commute up to one-third of their pension as lump sum. Consult with pension department for eligibility." | |
| elif 'eligibility' in text_lower: | |
| if user_role == "employee": | |
| return "Rajasthan Pension Rules - Eligibility Criteria (Employee): You need minimum 10 years qualifying service for pension eligibility. Check your service record and ensure all periods are counted." | |
| elif user_role == "officer": | |
| return "Rajasthan Pension Rules - Eligibility Criteria (Officer): Verify employee's qualifying service, break periods, and ensure proper service verification before pension processing." | |
| elif user_role == "pensioner": | |
| return "Rajasthan Pension Rules - Eligibility Criteria (Pensioner): Your pension eligibility was established at retirement. For revision queries, check service verification and pay fixation records." | |
| else: | |
| return "Rajasthan Pension Rules - Eligibility Criteria (Citizen): Government employees are eligible for pension after completing minimum qualifying service as per rules." | |
| elif 'family pension' in text_lower: | |
| if user_role == "pensioner": | |
| return "Rajasthan Pension Rules - Family Pension (Pensioner): Your spouse/family is entitled to family pension after your demise. Ensure nomination and family details are updated." | |
| else: | |
| return "Rajasthan Pension Rules - Family Pension: Family members are entitled to family pension as per prescribed conditions and rates." | |
| else: | |
| return f"Rajasthan Pension Rules - General Provisions ({user_role.title()}): Pension benefits are governed by applicable government rules and regulations." | |
| else: | |
| return f"Government Policy Response ({user_role.title()}): {text[:200]}..." if len(text) > 200 else text | |
| def create_summary_from_message(text, user_role="citizen"): | |
| """Create a summary for the evidence pack based on user role""" | |
| role_prefixes = { | |
| "citizen": "Rajasthan Pension Rules: Citizen Guidance", | |
| "employee": "Rajasthan Pension Rules: Employee Information", | |
| "officer": "Rajasthan Pension Rules: Administrative Guidance", | |
| "pensioner": "Rajasthan Pension Rules: Pensioner Support" | |
| } | |
| prefix = role_prefixes.get(user_role, "Rajasthan Pension Rules: AI Assistant Response") | |
| if len(text) > 100: | |
| return f"{prefix} - {text[:100]}..." | |
| else: | |
| return f"{prefix} - {text}" | |
| def extract_checklist_from_message(text, user_role="citizen"): | |
| """Extract or generate checklist items from message based on user role""" | |
| text_lower = text.lower() | |
| checklist = [] | |
| # Role-specific checklist items based on content | |
| if 'pension' in text_lower: | |
| if user_role == "citizen": | |
| checklist.extend([ | |
| "Check your eligibility for pension scheme", | |
| "Gather required documents (service certificate, age proof)", | |
| "Visit nearest pension office for application", | |
| "Keep copies of all submitted documents" | |
| ]) | |
| elif user_role == "employee": | |
| checklist.extend([ | |
| "Verify your service record for pension eligibility", | |
| "Check pension calculation with current pay scale", | |
| "Update nomination and family details", | |
| "Submit pension papers 6 months before retirement" | |
| ]) | |
| elif user_role == "officer": | |
| checklist.extend([ | |
| "Verify employee's qualifying service period", | |
| "Check all service breaks and regularization", | |
| "Ensure proper documentation and approvals", | |
| "Process pension papers as per rules and timeline" | |
| ]) | |
| elif user_role == "pensioner": | |
| checklist.extend([ | |
| "Verify pension calculation and arrears", | |
| "Check family pension eligibility and nomination", | |
| "Update bank details and address changes", | |
| "Submit life certificate annually for pension continuity" | |
| ]) | |
| if 'application' in text_lower or 'apply' in text_lower: | |
| if user_role == "citizen": | |
| checklist.extend([ | |
| "Download application form from official website", | |
| "Prepare required documents and attestation", | |
| "Submit application within prescribed timeline" | |
| ]) | |
| elif user_role == "employee": | |
| checklist.extend([ | |
| "Fill pension application form completely", | |
| "Attach service verification certificates", | |
| "Get departmental clearances before submission" | |
| ]) | |
| else: | |
| checklist.extend([ | |
| "Prepare required documents", | |
| "Submit application to concerned department" | |
| ]) | |
| if 'commutation' in text_lower: | |
| if user_role == "employee": | |
| checklist.extend([ | |
| "Calculate commutation value using current pay scale", | |
| "Consider tax implications of lump sum amount", | |
| "Submit commutation application with pension papers" | |
| ]) | |
| elif user_role == "pensioner": | |
| checklist.extend([ | |
| "Check commutation arrears calculation", | |
| "Verify commutation factor used in calculation", | |
| "Contact pension office for payment status" | |
| ]) | |
| else: | |
| checklist.extend([ | |
| "Calculate commutation amount", | |
| "Consider financial implications" | |
| ]) | |
| # Default role-specific checklist items if no specific matches | |
| if not checklist: | |
| default_checklists = { | |
| "citizen": [ | |
| "Review information for your specific situation", | |
| "Consult with local government office", | |
| "Verify eligibility criteria", | |
| "Keep records for future reference" | |
| ], | |
| "employee": [ | |
| "Check departmental circulars and updates", | |
| "Verify with your administrative office", | |
| "Update your service records if needed", | |
| "Follow proper departmental procedures" | |
| ], | |
| "officer": [ | |
| "Refer to latest government rules and circulars", | |
| "Ensure compliance with administrative procedures", | |
| "Verify delegated authority for decision making", | |
| "Document all actions taken" | |
| ], | |
| "pensioner": [ | |
| "Verify pension calculation and payments", | |
| "Keep pension payment order copy safe", | |
| "Update address and bank details regularly", | |
| "Contact pension disbursing office for queries" | |
| ] | |
| } | |
| checklist = default_checklists.get(user_role, default_checklists["citizen"]) | |
| return checklist | |
| # Set CORS | |
| origins = [ | |
| "http://localhost:3000", | |
| "http://localhost:5173", | |
| "http://localhost:5174", | |
| "http://localhost:5175", | |
| "http://localhost:5176", | |
| "http://localhost:5177", | |
| "https://chabhishek28-pension-assistant.hf.space" | |
| ] | |
| from groq_websocket_handler import groq_websocket_handler | |
| import config | |
| from dotenv import load_dotenv | |
| import json | |
| import base64 | |
| # MCP and Authentication imports | |
| from fastapi import Depends | |
| from pydantic import BaseModel | |
| from typing import Optional | |
| from auth import get_current_user | |
| # Load environment variables | |
| load_dotenv() | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s [%(levelname)s] %(message)s', | |
| datefmt='%Y-%m-%d %H:%M:%S' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Get configuration | |
| config_dict = { | |
| "ALLOWED_ORIGINS": config.ALLOWED_ORIGINS, | |
| "ENABLE_VOICE_FEATURES": config.ENABLE_VOICE_FEATURES | |
| } | |
| async def lifespan(app: FastAPI): | |
| """Application lifespan handler""" | |
| # Startup | |
| logger.info("π Starting Voice Bot Application...") | |
| # Check document database status | |
| try: | |
| from document_status_logger import log_document_status | |
| document_count = await log_document_status() | |
| # Setup comprehensive documents if database needs more content | |
| if document_count < 10: | |
| logger.info("π Database needs more content - setting up comprehensive government documents...") | |
| from setup_documents import setup_sample_documents | |
| await setup_sample_documents() | |
| # Re-check document count after setup | |
| try: | |
| new_count = await log_document_status() | |
| logger.info(f"β Voice Bot ready with {new_count:,} documents in knowledge base") | |
| except: | |
| logger.info("β Voice Bot ready with comprehensive document coverage") | |
| else: | |
| logger.info(f"β Voice Bot ready with {document_count:,} documents in knowledge base") | |
| except Exception as e: | |
| logger.warning(f"β οΈ Could not check document status: {e}") | |
| # Fallback to basic sample setup | |
| try: | |
| from setup_documents import setup_sample_documents | |
| await setup_sample_documents() | |
| except Exception as e2: | |
| logger.error(f"β Could not setup sample documents: {e2}") | |
| logger.info("β Application started successfully") | |
| yield | |
| # Shutdown (if needed) | |
| logger.info("π Shutting down Voice Bot Application...") | |
| # Create FastAPI application | |
| app = FastAPI( | |
| title="Voice Bot Government Assistant", | |
| description="AI-powered voice assistant for government policies and services", | |
| version="1.0.0", | |
| lifespan=lifespan | |
| ) | |
| # Configure CORS | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=config.ALLOWED_ORIGINS, | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Initialize services (lazy loading for HF Spaces) | |
| llm_service = None | |
| voice_service = None | |
| lancedb_service = None | |
| scenario_service = None | |
| def get_llm_service(): | |
| global llm_service | |
| if llm_service is None: | |
| llm_service = HybridLLMService() | |
| return llm_service | |
| def get_voice_service(): | |
| global voice_service | |
| if voice_service is None: | |
| voice_service = VoiceService() | |
| return voice_service | |
| def get_lancedb_service(): | |
| global lancedb_service | |
| if lancedb_service is None: | |
| lancedb_service = LanceDBService() | |
| return lancedb_service | |
| def get_scenario_service(): | |
| global scenario_service | |
| if scenario_service is None: | |
| scenario_service = ScenarioAnalysisService() | |
| return scenario_service | |
| # Evidence Pack Export endpoint | |
| async def export_evidence_pack(request: Request, format: str = "pdf"): | |
| """Export evidence pack in PDF or CSV format""" | |
| try: | |
| # Handle CORS preflight and HEAD requests | |
| if request.method == "OPTIONS": | |
| return JSONResponse({"status": "ok"}, status_code=200) | |
| if request.method == "HEAD": | |
| # For HEAD requests, return headers without body | |
| return JSONResponse( | |
| {"status": "ok"}, | |
| status_code=200, | |
| headers={"Content-Type": "application/pdf" if format.lower() == "pdf" else "text/csv"} | |
| ) | |
| # Handle both GET and POST requests | |
| if request.method == "POST": | |
| try: | |
| raw_data = await request.json() | |
| # Format can come from query params or request body | |
| format = request.query_params.get("format", raw_data.get("format", "pdf")) | |
| # Transform the message data to evidence pack format | |
| data = transform_message_to_evidence_pack(raw_data) | |
| except Exception: | |
| # If JSON parsing fails, use query params | |
| data = { | |
| "clause_text": "No specific clause available", | |
| "summary": "Rajasthan Pension Rules: General Information", | |
| "role_checklist": ["Consult pension department", "Verify eligibility criteria"], | |
| "source_title": "Rajasthan Pension Rules", | |
| "clause_id": "GENERAL_001", | |
| "date": datetime.now().strftime("%Y-%m-%d"), | |
| "url": "https://finance.rajasthan.gov.in/pension", | |
| "query": request.query_params.get("query", ""), | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| else: # GET request | |
| # For GET requests, we need some default data structure | |
| data = { | |
| "clause_text": "No specific clause available", | |
| "summary": "Rajasthan Pension Rules: General Information", | |
| "role_checklist": ["Consult pension department", "Verify eligibility criteria"], | |
| "source_title": "Rajasthan Pension Rules", | |
| "clause_id": "GENERAL_001", | |
| "date": datetime.now().strftime("%Y-%m-%d"), | |
| "url": "https://finance.rajasthan.gov.in/pension", | |
| "query": request.query_params.get("query", ""), | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| if format.lower() == "pdf": | |
| file_path = export_evidence_pack_pdf(data) | |
| return FileResponse( | |
| file_path, | |
| media_type="application/pdf", | |
| filename="evidence_pack.pdf", | |
| headers={"Content-Disposition": "attachment; filename=evidence_pack.pdf"} | |
| ) | |
| elif format.lower() == "csv": | |
| file_path = export_evidence_pack_csv(data) | |
| return FileResponse( | |
| file_path, | |
| media_type="text/csv", | |
| filename="evidence_pack.csv", | |
| headers={"Content-Disposition": "attachment; filename=evidence_pack.csv"} | |
| ) | |
| else: | |
| return JSONResponse({"error": "Invalid format. Use 'pdf' or 'csv'"}, status_code=400) | |
| except Exception as e: | |
| logger.error(f"Export evidence pack error: {str(e)}") | |
| return JSONResponse({"error": f"Export failed: {str(e)}"}, status_code=500) | |
| # Health check endpoint | |
| async def health_check(): | |
| """Health check endpoint""" | |
| return { | |
| "status": "healthy", | |
| "service": "voice-bot-api", | |
| "timestamp": datetime.now().isoformat(), | |
| "version": "1.0.0" | |
| } | |
| # Root endpoint | |
| async def root(): | |
| """Root endpoint with service information""" | |
| return { | |
| "message": "Voice Bot Government Assistant API", | |
| "status": "running", | |
| "version": "1.0.0", | |
| "endpoints": { | |
| "health": "/health", | |
| "chat": "/chat", | |
| "websocket": "/ws", | |
| "websocket_stream": "/ws/stream", | |
| "export_evidence_pack": "/export_evidence_pack", | |
| "docs": "/docs" | |
| } | |
| } | |
| # Chat endpoint | |
| async def chat_endpoint(request: dict): | |
| """Text-based chat endpoint""" | |
| try: | |
| message = request.get("message", "") | |
| if not message: | |
| raise HTTPException(status_code=400, detail="Message is required") | |
| llm = get_llm_service() | |
| response = await llm.get_response(message) | |
| return { | |
| "response": response, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"Chat error: {str(e)}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # WebSocket endpoints | |
| async def websocket_endpoint(websocket: WebSocket): | |
| """WebSocket endpoint for real-time communication""" | |
| await handle_enhanced_websocket_connection(websocket) | |
| async def websocket_stream_endpoint(websocket: WebSocket): | |
| """ | |
| Enhanced WebSocket endpoint compatible with friend's frontend format | |
| Handles both JSON audio format and friend's JSON+Binary format | |
| """ | |
| # Accept connection and get session ID | |
| session_id = await groq_websocket_handler.connect(websocket) | |
| # Send connection successful message (friend's format) | |
| await websocket.send_json({"type": "connection_successful"}) | |
| logger.info("β WebSocket connection established") | |
| try: | |
| while True: | |
| try: | |
| # Try to receive JSON message first | |
| message_text = await websocket.receive_text() | |
| message = json.loads(message_text) | |
| # Check if this is friend's format with lang field | |
| if "lang" in message and "type" not in message: | |
| logger.info(f"π± Received friend's format: {message}") | |
| # This is friend's format - expect binary audio next | |
| try: | |
| # Receive the binary audio data | |
| audio_bytes = await websocket.receive_bytes() | |
| logger.info(f"π€ Received {len(audio_bytes)} bytes of audio data") | |
| # Convert to base64 for internal processing | |
| audio_base64 = base64.b64encode(audio_bytes).decode('utf-8') | |
| # Create standard message format for internal processing | |
| standard_message = { | |
| "type": "audio_data", | |
| "language": "en" if message.get("lang", "").lower().startswith("eng") else message.get("lang", "en"), | |
| "audio_data": audio_base64, | |
| "user_id": message.get("user_id") | |
| } | |
| # Process using standard handler | |
| await groq_websocket_handler.handle_stream_message(websocket, session_id, standard_message) | |
| except Exception as audio_error: | |
| logger.error(f"β Error receiving binary audio: {audio_error}") | |
| await websocket.send_json({ | |
| "type": "error", | |
| "message": "Failed to receive audio data" | |
| }) | |
| else: | |
| # This is standard format - process normally | |
| await groq_websocket_handler.handle_stream_message(websocket, session_id, message) | |
| except json.JSONDecodeError: | |
| await websocket.send_json({ | |
| "type": "error", | |
| "message": "Invalid JSON message" | |
| }) | |
| continue | |
| except Exception as e: | |
| logger.error(f"β WebSocket stream error: {e}") | |
| finally: | |
| await groq_websocket_handler.disconnect(session_id) | |
| async def websocket_conversational_endpoint(websocket: WebSocket): | |
| """ | |
| Enhanced Conversational WebSocket endpoint with session memory and personalization | |
| Based on friend's conversational implementation for better user experience | |
| """ | |
| await handle_conversational_websocket(websocket) | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) |