Spaces:
Sleeping
Sleeping
| from langchain_huggingface import HuggingFaceEmbeddings | |
| from langchain_core.tools import tool | |
| from config import EMBEDDING_MODEL_NAME | |
| from langchain_core.runnables import RunnableConfig | |
| from typing import List, Dict, Any | |
| from lancedb_service import lancedb_service | |
| from scenario_analysis_service import scenario_service | |
| import logging | |
| import json | |
| import asyncio | |
| logger = logging.getLogger("voicebot") | |
| # Fallback content for when database is empty | |
| FALLBACK_CONTENT = { | |
| "pension": """Pension is a regular payment made during a person's retirement from an investment fund. For government employees in India, pension includes: | |
| 1. Basic Pension: Calculated based on last drawn salary and years of service | |
| 2. Dearness Relief (DR): Additional amount to counter inflation | |
| 3. Medical Benefits: Healthcare coverage post-retirement | |
| 4. Family Pension: Benefits for family members | |
| Key features: | |
| - Minimum 10 years service for qualification | |
| - Monthly payment to retired employees | |
| - Pension amount revised periodically for inflation""", | |
| "da_increment": """Dearness Allowance (DA) is paid to government employees to offset inflation impact. | |
| DA 6% Increment Impact: | |
| - DA revised twice yearly (January and July) | |
| - Based on Consumer Price Index (AICPI) | |
| - 6% increase adds significant monthly income | |
| - Example: βΉ50,000 basic salary gets βΉ3,000 additional per month | |
| - Pensioners get corresponding Dearness Relief increase | |
| - Applicable across all government pay scales""", | |
| "rajasthan": """Rajasthan government employees have comprehensive retirement benefits: | |
| 1. Old Pension Scheme (OPS): Restored for all employees in 2022 | |
| - 50% of last drawn salary after 33 years | |
| - Family pension available | |
| 2. Pension Processing: | |
| - Apply 6 months before retirement | |
| - 3-6 months processing time | |
| - Monthly credit via NEFT | |
| 3. Benefits include pension, gratuity, and provident fund | |
| 4. Enhanced benefits for teachers and staff""" | |
| } | |
| def get_fallback_content(query: str) -> List[Dict[str, Any]]: | |
| """Return fallback content when database search fails""" | |
| query_lower = query.lower() | |
| results = [] | |
| if any(word in query_lower for word in ["pension", "retirement"]): | |
| results.append({ | |
| "content": FALLBACK_CONTENT["pension"], | |
| "source": "Fallback: Government Pension Guide", | |
| "score": 0.9 | |
| }) | |
| if any(word in query_lower for word in ["da", "dearness", "allowance", "increment", "6%"]): | |
| results.append({ | |
| "content": FALLBACK_CONTENT["da_increment"], | |
| "source": "Fallback: DA Increment Guide", | |
| "score": 0.9 | |
| }) | |
| if any(word in query_lower for word in ["rajasthan", "state"]): | |
| results.append({ | |
| "content": FALLBACK_CONTENT["rajasthan"], | |
| "source": "Fallback: Rajasthan Rules", | |
| "score": 0.9 | |
| }) | |
| # If no specific match, return general pension info | |
| if not results: | |
| results.append({ | |
| "content": FALLBACK_CONTENT["pension"], | |
| "source": "Fallback: General Information", | |
| "score": 0.8 | |
| }) | |
| return results[:2] # Return max 2 fallback documents | |
| # Setup embedding model | |
| embedding_model = HuggingFaceEmbeddings( | |
| model_name=EMBEDDING_MODEL_NAME, | |
| model_kwargs={ | |
| "device": "cpu", | |
| "trust_remote_code": True | |
| }, | |
| encode_kwargs={ | |
| "normalize_embeddings": True | |
| } | |
| ) | |
| async def get_user_knowledge_bases(userid: str) -> List[str]: | |
| """Get all knowledge bases for a user""" | |
| try: | |
| return await lancedb_service.get_user_knowledge_bases(userid) | |
| except Exception as e: | |
| logger.error(f"β Error fetching knowledge bases: {e}") | |
| return [] | |
| async def get_kb_documents(user_id: str, kb_name: str): | |
| """Get all documents in a knowledge base""" | |
| try: | |
| return await lancedb_service.get_kb_documents(user_id, kb_name) | |
| except Exception as e: | |
| logger.error(f"β Error fetching documents: {e}") | |
| return [] | |
| async def delete_document_from_kb(user_id: str, kb_name: str, filename: str): | |
| """Delete a document from knowledge base""" | |
| try: | |
| return await lancedb_service.delete_document_from_kb(user_id, kb_name, filename) | |
| except Exception as e: | |
| logger.error(f"β Error deleting document: {e}") | |
| return False | |
| def search_documents(query: str, limit: int = 5) -> List[Dict[str, Any]]: | |
| """ | |
| Synchronous wrapper for searching documents in government knowledge base. | |
| Returns a list of documents with content for compatibility with existing code. | |
| """ | |
| try: | |
| # Run the async search function synchronously | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| try: | |
| # Determine which knowledge bases to search based on query content | |
| knowledge_bases = ["government_docs"] # Default | |
| # Add specific knowledge bases based on query keywords | |
| query_lower = query.lower() | |
| if any(keyword in query_lower for keyword in ["rajasthan", "pension", "circular", "pay", "rules"]): | |
| # Use separate table for Rajasthan documents | |
| return search_rajasthan_documents(query, limit) | |
| all_docs = [] | |
| # Search across all relevant knowledge bases | |
| for kb in knowledge_bases: | |
| try: | |
| docs = loop.run_until_complete( | |
| lancedb_service.similarity_search(query, "system", kb, k=limit) | |
| ) | |
| all_docs.extend(docs) | |
| except Exception as e: | |
| logger.warning(f"Search failed for knowledge base {kb}: {e}") | |
| continue | |
| if not all_docs: | |
| # Fallback to built-in content if no documents found | |
| logger.info("π No documents in database, using fallback content") | |
| return get_fallback_content(query) | |
| # Sort by relevance score if available and limit results | |
| all_docs = sorted(all_docs, key=lambda x: getattr(x, 'score', 1.0), reverse=True)[:limit] | |
| # Convert to expected format | |
| results = [] | |
| for doc in all_docs: | |
| results.append({ | |
| "content": doc.page_content, | |
| "source": doc.metadata.get('source', 'Unknown'), | |
| "score": getattr(doc, 'score', 1.0) | |
| }) | |
| logger.info(f"π Found {len(results)} documents for query: {query}") | |
| return results | |
| finally: | |
| loop.close() | |
| except Exception as e: | |
| logger.error(f"β Error in search_documents: {e}") | |
| return get_fallback_content(query) # Return fallback content instead of empty | |
| def search_rajasthan_documents(query: str, limit: int = 5) -> List[Dict[str, Any]]: | |
| """ | |
| Search specifically in the Rajasthan documents table using direct LanceDB query. | |
| """ | |
| try: | |
| import lancedb | |
| # Connect to LanceDB | |
| db = lancedb.connect('./lancedb_data') | |
| # Check if rajasthan_documents table exists | |
| if 'rajasthan_documents' not in db.table_names(): | |
| logger.warning("β οΈ Rajasthan documents table not found") | |
| return [] | |
| # Get the table | |
| tbl = db.open_table('rajasthan_documents') | |
| # Create embedding for the query | |
| query_embedding = embedding_model.embed_query(query) | |
| # Search using vector similarity | |
| search_results = tbl.search(query_embedding).limit(limit).to_pandas() | |
| if search_results.empty: | |
| logger.info(f"π No results found in Rajasthan documents for: {query}") | |
| return get_fallback_content(query) # Return fallback instead of empty | |
| # Convert to expected format | |
| results = [] | |
| for _, row in search_results.iterrows(): | |
| results.append({ | |
| "content": row['content'], | |
| "source": row['filename'], | |
| "score": float(row.get('_distance', 1.0)) # LanceDB returns _distance | |
| }) | |
| logger.info(f"π Found {len(results)} Rajasthan documents for query: {query}") | |
| return results | |
| except Exception as e: | |
| logger.error(f"β Error searching Rajasthan documents: {e}") | |
| return [] | |
| async def search_docs(query: str, config: RunnableConfig) -> str: | |
| """Search the knowledge base for relevant context within a specific knowledge base.""" | |
| userid = config["configurable"].get("thread_id") | |
| knowledge_base = config["configurable"].get("knowledge_base", "government_docs") | |
| try: | |
| # Search in the specified knowledge base | |
| docs = await lancedb_service.similarity_search(query, userid, knowledge_base) | |
| if not docs: | |
| return "No relevant documents found in the knowledge base." | |
| context = "\n\n".join([doc.page_content for doc in docs]) | |
| return f"π Found {len(docs)} relevant documents:\n\n{context}" | |
| except Exception as e: | |
| logger.error(f"β Error searching documents: {e}") | |
| return "Error occurred while searching documents." | |
| async def search_government_docs(query: str, config: RunnableConfig) -> str: | |
| """Search government documents for relevant information and policies.""" | |
| try: | |
| # Search specifically in government_docs knowledge base | |
| docs = await lancedb_service.similarity_search(query, "system", "government_docs") | |
| if not docs: | |
| return "No relevant government documents found for your query." | |
| context = "\n\n".join([doc.page_content for doc in docs]) | |
| sources = list(set([doc.metadata.get('source', 'Unknown') for doc in docs])) | |
| result = f"π Found {len(docs)} relevant government documents:\n\n{context}" | |
| if sources: | |
| result += f"\n\nπ Sources: {', '.join(sources)}" | |
| return result | |
| except Exception as e: | |
| logger.error(f"β Error searching government documents: {e}") | |
| return "Error occurred while searching government documents." | |
| async def analyze_scenario(scenario_query: str, config: RunnableConfig) -> str: | |
| """ | |
| Analyze government scenarios and create visualizations including charts, graphs, and diagrams. | |
| Use this tool when users ask for scenario analysis, data visualization, charts, graphs, or diagrams | |
| related to government processes, budgets, policies, organizational structures, or performance metrics. | |
| Args: | |
| scenario_query: Description of the scenario to analyze (e.g., "budget analysis for health department", | |
| "policy implementation timeline", "organizational structure", "performance metrics") | |
| """ | |
| try: | |
| logger.info(f"π Analyzing scenario: {scenario_query}") | |
| # Parse the scenario query to determine type and extract data | |
| scenario_data = await _parse_scenario_query(scenario_query) | |
| # Perform scenario analysis | |
| result = await scenario_service.analyze_government_scenario(scenario_data) | |
| if result.get("success", False): | |
| # Format response with images | |
| response = f"π **Scenario Analysis Complete!**\n\n" | |
| response += result.get("analysis", "") | |
| response += f"\n\nπΌοΈ **Generated {len(result.get('images', []))} visualization(s)**" | |
| # Add image information for frontend rendering | |
| if result.get("images"): | |
| response += "\n\n**SCENARIO_IMAGES_START**\n" | |
| response += json.dumps(result["images"]) | |
| response += "\n**SCENARIO_IMAGES_END**" | |
| return response | |
| else: | |
| return f"β Error in scenario analysis: {result.get('error', 'Unknown error')}" | |
| except Exception as e: | |
| logger.error(f"β Error in scenario analysis tool: {e}") | |
| return f"Error occurred while analyzing scenario: {str(e)}" | |
| async def _parse_scenario_query(query: str) -> Dict[str, Any]: | |
| """Parse scenario query to determine type and extract relevant data""" | |
| query_lower = query.lower() | |
| # Determine scenario type based on keywords | |
| if any(word in query_lower for word in ["budget", "financial", "expenditure", "allocation", "funding"]): | |
| scenario_type = "budget" | |
| # Extract budget data if mentioned in query | |
| data = _extract_budget_data(query) | |
| elif any(word in query_lower for word in ["policy", "implementation", "timeline", "plan", "strategy"]): | |
| scenario_type = "policy" | |
| data = _extract_policy_data(query) | |
| elif any(word in query_lower for word in ["organization", "hierarchy", "structure", "reporting", "org"]): | |
| scenario_type = "organization" | |
| data = _extract_org_data(query) | |
| elif any(word in query_lower for word in ["performance", "metrics", "kpi", "efficiency", "evaluation"]): | |
| scenario_type = "performance" | |
| data = _extract_performance_data(query) | |
| elif any(word in query_lower for word in ["workflow", "process", "flow", "procedure", "steps"]): | |
| scenario_type = "workflow" | |
| data = _extract_workflow_data(query) | |
| else: | |
| scenario_type = "general" | |
| data = {} | |
| return { | |
| "type": scenario_type, | |
| "title": f"Government {scenario_type.title()} Analysis", | |
| "data": data | |
| } | |
| def _extract_budget_data(query: str) -> Dict[str, Any]: | |
| """Extract budget-related data from query""" | |
| # This could be enhanced with NLP to extract actual numbers and departments | |
| # For now, return sample data structure | |
| return {} | |
| def _extract_policy_data(query: str) -> Dict[str, Any]: | |
| """Extract policy-related data from query""" | |
| return {} | |
| def _extract_org_data(query: str) -> Dict[str, Any]: | |
| """Extract organizational data from query""" | |
| return {} | |
| def _extract_performance_data(query: str) -> Dict[str, Any]: | |
| """Extract performance data from query""" | |
| return {} | |
| def _extract_workflow_data(query: str) -> Dict[str, Any]: | |
| """Extract workflow data from query""" | |
| return {} | |
| if __name__ == "__main__": | |
| import asyncio | |
| async def test_search(): | |
| print("π Testing search_docs RAG tool with LanceDB vector store...\n") | |
| test_user_id = "test_user_123" | |
| test_knowledge_base = "test_kb" | |
| while True: | |
| user_input = input("Enter a query (or 'exit'): ").strip() | |
| if user_input.lower() == "exit": | |
| break | |
| kb_input = input(f"Knowledge base (current: {test_knowledge_base}, press Enter to keep): ").strip() | |
| if kb_input: | |
| test_knowledge_base = kb_input | |
| try: | |
| result = await search_docs.ainvoke( | |
| {"query": user_input}, | |
| config=RunnableConfig( | |
| configurable={ | |
| "thread_id": test_user_id, | |
| "knowledge_base": test_knowledge_base | |
| } | |
| ) | |
| ) | |
| print(f"\nπ Results from '{test_knowledge_base}' knowledge base:\n") | |
| print(result) | |
| print("\n" + "="*50 + "\n") | |
| except Exception as e: | |
| print(f"β Error: {e}") | |
| asyncio.run(test_search()) | |