""" Session management service for storing and retrieving conversation context to make the voice bot more conversational and personalized. """ import json import os from datetime import datetime from typing import Dict, Optional, List import logging logger = logging.getLogger(__name__) class SessionService: def __init__(self, storage_dir: str = "./session_data"): self.storage_dir = storage_dir os.makedirs(storage_dir, exist_ok=True) def get_session_file(self, user_id: str) -> str: """Get the file path for a user's session data""" return os.path.join(self.storage_dir, f"session_{user_id}.json") async def get_session_summary(self, user_id: str) -> Dict: """Retrieve session summary for a user""" try: session_file = self.get_session_file(user_id) if os.path.exists(session_file): with open(session_file, 'r', encoding='utf-8') as f: data = json.load(f) return data.get('session_summary', {}) return {} except Exception as e: logger.error(f"Error retrieving session summary for {user_id}: {e}") return {} async def get_user_preferences(self, user_id: str) -> Dict: """Get user preferences and settings""" try: session_file = self.get_session_file(user_id) if os.path.exists(session_file): with open(session_file, 'r', encoding='utf-8') as f: data = json.load(f) return data.get('preferences', { 'language': 'english', 'voice_id': 'en-IN-isha', 'conversation_style': 'friendly' }) return { 'language': 'english', 'voice_id': 'en-IN-isha', 'conversation_style': 'friendly' } except Exception as e: logger.error(f"Error retrieving user preferences for {user_id}: {e}") return { 'language': 'english', 'voice_id': 'en-IN-isha', 'conversation_style': 'friendly' } async def store_session_summary(self, user_id: str, summary: str, conversation_history: List[Dict] = None) -> bool: """Store session summary and conversation context""" try: session_file = self.get_session_file(user_id) # Load existing data existing_data = {} if os.path.exists(session_file): with open(session_file, 'r', encoding='utf-8') as f: existing_data = json.load(f) # Update session data session_data = { 'user_id': user_id, 'last_updated': datetime.now().isoformat(), 'session_summary': summary, 'preferences': existing_data.get('preferences', { 'language': 'english', 'voice_id': 'en-IN-isha', 'conversation_style': 'friendly' }), 'conversation_count': existing_data.get('conversation_count', 0) + 1, 'recent_topics': existing_data.get('recent_topics', []) } # Add conversation history if provided if conversation_history: session_data['last_conversation'] = conversation_history[-10:] # Keep last 10 messages # Extract recent topics topics = [] for msg in conversation_history: if msg.get('type') == 'user': content = msg.get('content', '').lower() if 'pension' in content: topics.append('pension policies') if 'medical' in content or 'allowance' in content: topics.append('medical allowances') if 'retirement' in content: topics.append('retirement planning') if 'dearness' in content or 'dr' in content: topics.append('dearness relief') # Keep unique recent topics session_data['recent_topics'] = list(set(topics + session_data['recent_topics']))[:5] # Save to file with open(session_file, 'w', encoding='utf-8') as f: json.dump(session_data, f, indent=2, ensure_ascii=False) logger.info(f"✅ Session summary stored for user {user_id}") return True except Exception as e: logger.error(f"❌ Error storing session summary for {user_id}: {e}") return False async def get_conversation_context(self, user_id: str) -> str: """Get conversation context string for system prompt""" try: session_data = await self.get_session_summary(user_id) if not session_data: return "" context_parts = [] # Add conversation count conv_count = session_data.get('conversation_count', 0) if conv_count > 1: context_parts.append(f"This is conversation #{conv_count} with this user.") # Add recent topics recent_topics = session_data.get('recent_topics', []) if recent_topics: topics_str = ", ".join(recent_topics) context_parts.append(f"User has previously discussed: {topics_str}.") # Add session summary if available if isinstance(session_data, str): context_parts.append(f"Previous conversation context: {session_data}") elif isinstance(session_data, dict) and 'summary' in session_data: context_parts.append(f"Previous conversation context: {session_data['summary']}") return " ".join(context_parts) except Exception as e: logger.error(f"Error getting conversation context for {user_id}: {e}") return "" async def generate_personalized_greeting(self, user_id: str, messages_context: List = None) -> str: """Generate a personalized greeting based on user history""" try: session_data = await self.get_session_summary(user_id) preferences = await self.get_user_preferences(user_id) conv_count = session_data.get('conversation_count', 0) if isinstance(session_data, dict) else 0 recent_topics = session_data.get('recent_topics', []) if isinstance(session_data, dict) else [] if conv_count == 0: # First time user return "Hello! I'm your Rajasthan Pension Assistant. I'm here to help you navigate pension policies, calculations, and retirement planning from our comprehensive knowledge base. What pension question can I help you with today?" elif conv_count == 1: # Second conversation return "Welcome back! I'm glad to see you again. How can I assist you with your pension and policy questions today?" else: # Returning user with history if recent_topics: topics_mention = f"I remember we discussed {recent_topics[0]} before. " else: topics_mention = "" return f"Hello again! {topics_mention}I'm here to help with any pension or policy questions you have. What's on your mind today?" except Exception as e: logger.error(f"Error generating personalized greeting for {user_id}: {e}") return "Hello! I'm your Rajasthan Pension Assistant. How can I help you with your pension questions today?" async def generate_session_summary(self, messages: List, user_id: str) -> str: """Generate a session summary from conversation messages""" try: # Extract key topics and user preferences from conversation user_messages = [msg for msg in messages if hasattr(msg, 'content') and 'user' in str(type(msg)).lower()] if len(user_messages) < 2: return "Brief conversation about pension policies." # Simple topic extraction topics = [] for msg in user_messages: content = msg.content.lower() if 'pension' in content: topics.append('pension queries') if 'medical' in content or 'allowance' in content: topics.append('medical allowances') if 'retirement' in content: topics.append('retirement planning') if 'dearness' in content or 'dr' in content: topics.append('dearness relief') if 'calculation' in content or 'calculate' in content: topics.append('pension calculations') unique_topics = list(set(topics)) if unique_topics: return f"User discussed: {', '.join(unique_topics)}. Showed interest in pension policies and government benefits." else: return "General conversation about pension and policy matters." except Exception as e: logger.error(f"Error generating session summary for {user_id}: {e}") return "Conversation session completed." # Global session service instance session_service = SessionService()