Spaces:
Sleeping
Sleeping
| """ | |
| Enhanced Search Service for Large Document Collections (1500+ docs) | |
| Specifically designed to find the RIGHT documents for pension queries | |
| """ | |
| import logging | |
| from typing import List, Dict, Any, Optional | |
| from lancedb_service import lancedb_service | |
| logger = logging.getLogger("voicebot") | |
| class EnhancedSearchService: | |
| def __init__(self): | |
| self.pension_keywords = [ | |
| "pension rules", "pension calculation", "pension formula", "pension eligibility", | |
| "retirement benefits", "pension amount", "pension process", "pension application", | |
| "commutation", "family pension", "gratuity", "provident fund", "GPF", "CPF", | |
| "pension disbursement", "pension payment", "pension revision", "DA on pension", | |
| "minimum pension", "pension certificate", "life certificate", "pension arrears" | |
| ] | |
| self.procurement_keywords = [ | |
| "tender process", "procurement rules", "bid submission", "GeM portal", | |
| "MSME benefits", "vendor registration", "procurement threshold", "bidding", | |
| "contract award", "tender committee", "technical bid", "financial bid" | |
| ] | |
| self.finance_keywords = [ | |
| "budget allocation", "sanctioning authority", "financial approval", "treasury rules", | |
| "expenditure sanction", "fund release", "audit compliance", "financial procedures" | |
| ] | |
| async def enhanced_pension_search(self, query: str, limit: int = 10) -> List[Dict[str, Any]]: | |
| """ | |
| Enhanced search specifically for pension-related queries | |
| Uses multiple search strategies to find the most relevant pension documents | |
| """ | |
| try: | |
| query_lower = query.lower() | |
| # Strategy 1: Direct pension keyword search | |
| pension_searches = [] | |
| if "pension" in query_lower: | |
| if "rules" in query_lower: | |
| pension_searches = [ | |
| "pension rules regulations", | |
| "pension calculation formula", | |
| "pension eligibility criteria", | |
| "retirement pension process", | |
| "pension disbursement rules" | |
| ] | |
| elif "calculation" in query_lower or "formula" in query_lower: | |
| pension_searches = [ | |
| "pension calculation formula", | |
| "pension amount computation", | |
| "last pay pension calculation", | |
| "service years pension formula" | |
| ] | |
| elif "eligibility" in query_lower: | |
| pension_searches = [ | |
| "pension eligibility criteria", | |
| "qualifying service pension", | |
| "minimum service pension", | |
| "pension eligibility rules" | |
| ] | |
| else: | |
| # General pension query - cast wide net | |
| pension_searches = [ | |
| "pension rules regulations guidelines", | |
| "retirement benefits pension", | |
| "pension calculation eligibility", | |
| "pension process application", | |
| "commutation pension benefits" | |
| ] | |
| # Collect results from multiple searches | |
| all_results = [] | |
| for search_query in pension_searches: | |
| try: | |
| results = await lancedb_service.search_documents( | |
| query=search_query, | |
| limit=limit//len(pension_searches) + 2 # Ensure we get enough results | |
| ) | |
| all_results.extend(results) | |
| except Exception as e: | |
| logger.warning(f"Search failed for '{search_query}': {e}") | |
| continue | |
| # Strategy 2: If no specific searches, use enhanced general search | |
| if not pension_searches: | |
| enhanced_query = self._enhance_query(query) | |
| results = await lancedb_service.search_documents( | |
| query=enhanced_query, | |
| limit=limit | |
| ) | |
| all_results.extend(results) | |
| # Deduplicate and rank results | |
| unique_results = self._deduplicate_results(all_results) | |
| ranked_results = self._rank_pension_results(unique_results, query) | |
| return ranked_results[:limit] | |
| except Exception as e: | |
| logger.error(f"❌ Enhanced pension search error: {e}") | |
| # Fallback to basic search | |
| try: | |
| return await lancedb_service.search_documents(query=query, limit=limit) | |
| except: | |
| return [] | |
| def _enhance_query(self, query: str) -> str: | |
| """Enhance query based on detected intent""" | |
| query_lower = query.lower() | |
| # Pension-related enhancements | |
| if "pension" in query_lower: | |
| if "rules" in query_lower: | |
| return f"{query} pension rules regulations calculation eligibility process" | |
| elif "calculation" in query_lower: | |
| return f"{query} pension calculation formula last pay service years" | |
| elif "benefits" in query_lower: | |
| return f"{query} pension benefits retirement gratuity provident fund" | |
| else: | |
| return f"{query} pension retirement benefits rules calculation" | |
| # Procurement-related | |
| elif any(word in query_lower for word in ["tender", "procurement", "bid"]): | |
| return f"{query} procurement tender bidding process rules guidelines" | |
| # Finance-related | |
| elif any(word in query_lower for word in ["budget", "finance", "sanction"]): | |
| return f"{query} finance budget allocation sanctioning authority rules" | |
| # Default enhancement | |
| return f"{query} government rules regulations process guidelines" | |
| def _deduplicate_results(self, results: List[Dict[str, Any]]) -> List[Dict[str, Any]]: | |
| """Remove duplicate documents based on content similarity""" | |
| if not results: | |
| return results | |
| unique_results = [] | |
| seen_content = set() | |
| for result in results: | |
| content = result.get('content', '') | |
| # Use first 200 characters as similarity check | |
| content_signature = content[:200].strip().lower() | |
| if content_signature not in seen_content: | |
| seen_content.add(content_signature) | |
| unique_results.append(result) | |
| return unique_results | |
| def _rank_pension_results(self, results: List[Dict[str, Any]], query: str) -> List[Dict[str, Any]]: | |
| """ | |
| Rank results specifically for pension queries | |
| Prioritize documents that contain specific pension information | |
| """ | |
| if not results: | |
| return results | |
| query_lower = query.lower() | |
| def calculate_pension_score(result: Dict[str, Any]) -> float: | |
| content = result.get('content', '').lower() | |
| filename = result.get('filename', '').lower() | |
| score = 0.0 | |
| # High priority: Direct pension rule matches | |
| if "pension rules" in content: | |
| score += 3.0 | |
| if "pension calculation" in content: | |
| score += 2.5 | |
| if "pension formula" in content: | |
| score += 2.5 | |
| if "retirement benefits" in content: | |
| score += 2.0 | |
| # Medium priority: Related pension concepts | |
| pension_terms = ["commutation", "gratuity", "provident fund", "family pension", | |
| "pension eligibility", "qualifying service", "last drawn pay"] | |
| for term in pension_terms: | |
| if term in content: | |
| score += 1.0 | |
| # Filename bonus | |
| if "pension" in filename: | |
| score += 1.5 | |
| if "retirement" in filename: | |
| score += 1.0 | |
| # Query-specific bonuses | |
| if "rules" in query_lower and "rules" in content: | |
| score += 1.5 | |
| if "calculation" in query_lower and "calculation" in content: | |
| score += 1.5 | |
| if "eligibility" in query_lower and "eligibility" in content: | |
| score += 1.5 | |
| return score | |
| # Sort by pension relevance score | |
| ranked_results = sorted(results, key=calculate_pension_score, reverse=True) | |
| return ranked_results | |
| async def search_with_fallback(self, query: str, limit: int = 5) -> List[Dict[str, Any]]: | |
| """ | |
| Main search function with fallback strategies | |
| """ | |
| try: | |
| # Try enhanced pension search first | |
| if "pension" in query.lower(): | |
| results = await self.enhanced_pension_search(query, limit) | |
| if results: | |
| logger.info(f"✅ Found {len(results)} pension documents") | |
| return results | |
| # Fallback to regular enhanced search | |
| enhanced_query = self._enhance_query(query) | |
| results = await lancedb_service.search_documents( | |
| query=enhanced_query, | |
| limit=limit * 2 # Get more to rank better | |
| ) | |
| # Rank and return top results | |
| if results: | |
| ranked_results = self._rank_general_results(results, query) | |
| return ranked_results[:limit] | |
| return results | |
| except Exception as e: | |
| logger.error(f"❌ Search with fallback error: {e}") | |
| return [] | |
| def _rank_general_results(self, results: List[Dict[str, Any]], query: str) -> List[Dict[str, Any]]: | |
| """General ranking for non-pension queries""" | |
| query_words = query.lower().split() | |
| def calculate_general_score(result: Dict[str, Any]) -> float: | |
| content = result.get('content', '').lower() | |
| filename = result.get('filename', '').lower() | |
| score = 0.0 | |
| # Word frequency scoring | |
| for word in query_words: | |
| if len(word) > 2: # Skip short words | |
| word_count = content.count(word) | |
| score += word_count * 0.5 | |
| if word in filename: | |
| score += 2.0 | |
| return score | |
| return sorted(results, key=calculate_general_score, reverse=True) | |
| # Global instance | |
| enhanced_search_service = EnhancedSearchService() |