PensionBot / rag_service.py
ChAbhishek28's picture
Add comprehensive fallback content system for RAG
4df145c
raw
history blame
15.7 kB
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_core.tools import tool
from config import EMBEDDING_MODEL_NAME
from langchain_core.runnables import RunnableConfig
from typing import List, Dict, Any
from lancedb_service import lancedb_service
from scenario_analysis_service import scenario_service
import logging
import json
import asyncio
logger = logging.getLogger("voicebot")
# Fallback content for when database is empty
FALLBACK_CONTENT = {
"pension": """Pension is a regular payment made during a person's retirement from an investment fund. For government employees in India, pension includes:
1. Basic Pension: Calculated based on last drawn salary and years of service
2. Dearness Relief (DR): Additional amount to counter inflation
3. Medical Benefits: Healthcare coverage post-retirement
4. Family Pension: Benefits for family members
Key features:
- Minimum 10 years service for qualification
- Monthly payment to retired employees
- Pension amount revised periodically for inflation""",
"da_increment": """Dearness Allowance (DA) is paid to government employees to offset inflation impact.
DA 6% Increment Impact:
- DA revised twice yearly (January and July)
- Based on Consumer Price Index (AICPI)
- 6% increase adds significant monthly income
- Example: β‚Ή50,000 basic salary gets β‚Ή3,000 additional per month
- Pensioners get corresponding Dearness Relief increase
- Applicable across all government pay scales""",
"rajasthan": """Rajasthan government employees have comprehensive retirement benefits:
1. Old Pension Scheme (OPS): Restored for all employees in 2022
- 50% of last drawn salary after 33 years
- Family pension available
2. Pension Processing:
- Apply 6 months before retirement
- 3-6 months processing time
- Monthly credit via NEFT
3. Benefits include pension, gratuity, and provident fund
4. Enhanced benefits for teachers and staff"""
}
def get_fallback_content(query: str) -> List[Dict[str, Any]]:
"""Return fallback content when database search fails"""
query_lower = query.lower()
results = []
if any(word in query_lower for word in ["pension", "retirement"]):
results.append({
"content": FALLBACK_CONTENT["pension"],
"source": "Fallback: Government Pension Guide",
"score": 0.9
})
if any(word in query_lower for word in ["da", "dearness", "allowance", "increment", "6%"]):
results.append({
"content": FALLBACK_CONTENT["da_increment"],
"source": "Fallback: DA Increment Guide",
"score": 0.9
})
if any(word in query_lower for word in ["rajasthan", "state"]):
results.append({
"content": FALLBACK_CONTENT["rajasthan"],
"source": "Fallback: Rajasthan Rules",
"score": 0.9
})
# If no specific match, return general pension info
if not results:
results.append({
"content": FALLBACK_CONTENT["pension"],
"source": "Fallback: General Information",
"score": 0.8
})
return results[:2] # Return max 2 fallback documents
# Setup embedding model
embedding_model = HuggingFaceEmbeddings(
model_name=EMBEDDING_MODEL_NAME,
model_kwargs={
"device": "cpu",
"trust_remote_code": True
},
encode_kwargs={
"normalize_embeddings": True
}
)
async def get_user_knowledge_bases(userid: str) -> List[str]:
"""Get all knowledge bases for a user"""
try:
return await lancedb_service.get_user_knowledge_bases(userid)
except Exception as e:
logger.error(f"❌ Error fetching knowledge bases: {e}")
return []
async def get_kb_documents(user_id: str, kb_name: str):
"""Get all documents in a knowledge base"""
try:
return await lancedb_service.get_kb_documents(user_id, kb_name)
except Exception as e:
logger.error(f"❌ Error fetching documents: {e}")
return []
async def delete_document_from_kb(user_id: str, kb_name: str, filename: str):
"""Delete a document from knowledge base"""
try:
return await lancedb_service.delete_document_from_kb(user_id, kb_name, filename)
except Exception as e:
logger.error(f"❌ Error deleting document: {e}")
return False
def search_documents(query: str, limit: int = 5) -> List[Dict[str, Any]]:
"""
Synchronous wrapper for searching documents in government knowledge base.
Returns a list of documents with content for compatibility with existing code.
"""
try:
# Run the async search function synchronously
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
# Determine which knowledge bases to search based on query content
knowledge_bases = ["government_docs"] # Default
# Add specific knowledge bases based on query keywords
query_lower = query.lower()
if any(keyword in query_lower for keyword in ["rajasthan", "pension", "circular", "pay", "rules"]):
# Use separate table for Rajasthan documents
return search_rajasthan_documents(query, limit)
all_docs = []
# Search across all relevant knowledge bases
for kb in knowledge_bases:
try:
docs = loop.run_until_complete(
lancedb_service.similarity_search(query, "system", kb, k=limit)
)
all_docs.extend(docs)
except Exception as e:
logger.warning(f"Search failed for knowledge base {kb}: {e}")
continue
if not all_docs:
# Fallback to built-in content if no documents found
logger.info("πŸ“š No documents in database, using fallback content")
return get_fallback_content(query)
# Sort by relevance score if available and limit results
all_docs = sorted(all_docs, key=lambda x: getattr(x, 'score', 1.0), reverse=True)[:limit]
# Convert to expected format
results = []
for doc in all_docs:
results.append({
"content": doc.page_content,
"source": doc.metadata.get('source', 'Unknown'),
"score": getattr(doc, 'score', 1.0)
})
logger.info(f"πŸ“š Found {len(results)} documents for query: {query}")
return results
finally:
loop.close()
except Exception as e:
logger.error(f"❌ Error in search_documents: {e}")
return get_fallback_content(query) # Return fallback content instead of empty
def search_rajasthan_documents(query: str, limit: int = 5) -> List[Dict[str, Any]]:
"""
Search specifically in the Rajasthan documents table using direct LanceDB query.
"""
try:
import lancedb
# Connect to LanceDB
db = lancedb.connect('./lancedb_data')
# Check if rajasthan_documents table exists
if 'rajasthan_documents' not in db.table_names():
logger.warning("⚠️ Rajasthan documents table not found")
return []
# Get the table
tbl = db.open_table('rajasthan_documents')
# Create embedding for the query
query_embedding = embedding_model.embed_query(query)
# Search using vector similarity
search_results = tbl.search(query_embedding).limit(limit).to_pandas()
if search_results.empty:
logger.info(f"πŸ“š No results found in Rajasthan documents for: {query}")
return get_fallback_content(query) # Return fallback instead of empty
# Convert to expected format
results = []
for _, row in search_results.iterrows():
results.append({
"content": row['content'],
"source": row['filename'],
"score": float(row.get('_distance', 1.0)) # LanceDB returns _distance
})
logger.info(f"πŸ“š Found {len(results)} Rajasthan documents for query: {query}")
return results
except Exception as e:
logger.error(f"❌ Error searching Rajasthan documents: {e}")
return []
@tool
async def search_docs(query: str, config: RunnableConfig) -> str:
"""Search the knowledge base for relevant context within a specific knowledge base."""
userid = config["configurable"].get("thread_id")
knowledge_base = config["configurable"].get("knowledge_base", "government_docs")
try:
# Search in the specified knowledge base
docs = await lancedb_service.similarity_search(query, userid, knowledge_base)
if not docs:
return "No relevant documents found in the knowledge base."
context = "\n\n".join([doc.page_content for doc in docs])
return f"πŸ“„ Found {len(docs)} relevant documents:\n\n{context}"
except Exception as e:
logger.error(f"❌ Error searching documents: {e}")
return "Error occurred while searching documents."
@tool
async def search_government_docs(query: str, config: RunnableConfig) -> str:
"""Search government documents for relevant information and policies."""
try:
# Search specifically in government_docs knowledge base
docs = await lancedb_service.similarity_search(query, "system", "government_docs")
if not docs:
return "No relevant government documents found for your query."
context = "\n\n".join([doc.page_content for doc in docs])
sources = list(set([doc.metadata.get('source', 'Unknown') for doc in docs]))
result = f"πŸ“‹ Found {len(docs)} relevant government documents:\n\n{context}"
if sources:
result += f"\n\nπŸ“ Sources: {', '.join(sources)}"
return result
except Exception as e:
logger.error(f"❌ Error searching government documents: {e}")
return "Error occurred while searching government documents."
@tool
async def analyze_scenario(scenario_query: str, config: RunnableConfig) -> str:
"""
Analyze government scenarios and create visualizations including charts, graphs, and diagrams.
Use this tool when users ask for scenario analysis, data visualization, charts, graphs, or diagrams
related to government processes, budgets, policies, organizational structures, or performance metrics.
Args:
scenario_query: Description of the scenario to analyze (e.g., "budget analysis for health department",
"policy implementation timeline", "organizational structure", "performance metrics")
"""
try:
logger.info(f"πŸ” Analyzing scenario: {scenario_query}")
# Parse the scenario query to determine type and extract data
scenario_data = await _parse_scenario_query(scenario_query)
# Perform scenario analysis
result = await scenario_service.analyze_government_scenario(scenario_data)
if result.get("success", False):
# Format response with images
response = f"πŸ“Š **Scenario Analysis Complete!**\n\n"
response += result.get("analysis", "")
response += f"\n\nπŸ–ΌοΈ **Generated {len(result.get('images', []))} visualization(s)**"
# Add image information for frontend rendering
if result.get("images"):
response += "\n\n**SCENARIO_IMAGES_START**\n"
response += json.dumps(result["images"])
response += "\n**SCENARIO_IMAGES_END**"
return response
else:
return f"❌ Error in scenario analysis: {result.get('error', 'Unknown error')}"
except Exception as e:
logger.error(f"❌ Error in scenario analysis tool: {e}")
return f"Error occurred while analyzing scenario: {str(e)}"
async def _parse_scenario_query(query: str) -> Dict[str, Any]:
"""Parse scenario query to determine type and extract relevant data"""
query_lower = query.lower()
# Determine scenario type based on keywords
if any(word in query_lower for word in ["budget", "financial", "expenditure", "allocation", "funding"]):
scenario_type = "budget"
# Extract budget data if mentioned in query
data = _extract_budget_data(query)
elif any(word in query_lower for word in ["policy", "implementation", "timeline", "plan", "strategy"]):
scenario_type = "policy"
data = _extract_policy_data(query)
elif any(word in query_lower for word in ["organization", "hierarchy", "structure", "reporting", "org"]):
scenario_type = "organization"
data = _extract_org_data(query)
elif any(word in query_lower for word in ["performance", "metrics", "kpi", "efficiency", "evaluation"]):
scenario_type = "performance"
data = _extract_performance_data(query)
elif any(word in query_lower for word in ["workflow", "process", "flow", "procedure", "steps"]):
scenario_type = "workflow"
data = _extract_workflow_data(query)
else:
scenario_type = "general"
data = {}
return {
"type": scenario_type,
"title": f"Government {scenario_type.title()} Analysis",
"data": data
}
def _extract_budget_data(query: str) -> Dict[str, Any]:
"""Extract budget-related data from query"""
# This could be enhanced with NLP to extract actual numbers and departments
# For now, return sample data structure
return {}
def _extract_policy_data(query: str) -> Dict[str, Any]:
"""Extract policy-related data from query"""
return {}
def _extract_org_data(query: str) -> Dict[str, Any]:
"""Extract organizational data from query"""
return {}
def _extract_performance_data(query: str) -> Dict[str, Any]:
"""Extract performance data from query"""
return {}
def _extract_workflow_data(query: str) -> Dict[str, Any]:
"""Extract workflow data from query"""
return {}
if __name__ == "__main__":
import asyncio
async def test_search():
print("πŸ” Testing search_docs RAG tool with LanceDB vector store...\n")
test_user_id = "test_user_123"
test_knowledge_base = "test_kb"
while True:
user_input = input("Enter a query (or 'exit'): ").strip()
if user_input.lower() == "exit":
break
kb_input = input(f"Knowledge base (current: {test_knowledge_base}, press Enter to keep): ").strip()
if kb_input:
test_knowledge_base = kb_input
try:
result = await search_docs.ainvoke(
{"query": user_input},
config=RunnableConfig(
configurable={
"thread_id": test_user_id,
"knowledge_base": test_knowledge_base
}
)
)
print(f"\nπŸ“„ Results from '{test_knowledge_base}' knowledge base:\n")
print(result)
print("\n" + "="*50 + "\n")
except Exception as e:
print(f"❌ Error: {e}")
asyncio.run(test_search())