File size: 11,037 Bytes
a2ca191
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
"""
Enhanced Search Service for Large Document Collections (1500+ docs)
Specifically designed to find the RIGHT documents for pension queries
"""

import logging
from typing import List, Dict, Any, Optional
from lancedb_service import lancedb_service

logger = logging.getLogger("voicebot")

class EnhancedSearchService:
    def __init__(self):
        self.pension_keywords = [
            "pension rules", "pension calculation", "pension formula", "pension eligibility",
            "retirement benefits", "pension amount", "pension process", "pension application",
            "commutation", "family pension", "gratuity", "provident fund", "GPF", "CPF",
            "pension disbursement", "pension payment", "pension revision", "DA on pension",
            "minimum pension", "pension certificate", "life certificate", "pension arrears"
        ]
        
        self.procurement_keywords = [
            "tender process", "procurement rules", "bid submission", "GeM portal",
            "MSME benefits", "vendor registration", "procurement threshold", "bidding",
            "contract award", "tender committee", "technical bid", "financial bid"
        ]
        
        self.finance_keywords = [
            "budget allocation", "sanctioning authority", "financial approval", "treasury rules",
            "expenditure sanction", "fund release", "audit compliance", "financial procedures"
        ]

    async def enhanced_pension_search(self, query: str, limit: int = 10) -> List[Dict[str, Any]]:
        """
        Enhanced search specifically for pension-related queries
        Uses multiple search strategies to find the most relevant pension documents
        """
        try:
            query_lower = query.lower()
            
            # Strategy 1: Direct pension keyword search
            pension_searches = []
            if "pension" in query_lower:
                if "rules" in query_lower:
                    pension_searches = [
                        "pension rules regulations",
                        "pension calculation formula",
                        "pension eligibility criteria",
                        "retirement pension process",
                        "pension disbursement rules"
                    ]
                elif "calculation" in query_lower or "formula" in query_lower:
                    pension_searches = [
                        "pension calculation formula",
                        "pension amount computation",
                        "last pay pension calculation",
                        "service years pension formula"
                    ]
                elif "eligibility" in query_lower:
                    pension_searches = [
                        "pension eligibility criteria",
                        "qualifying service pension",
                        "minimum service pension",
                        "pension eligibility rules"
                    ]
                else:
                    # General pension query - cast wide net
                    pension_searches = [
                        "pension rules regulations guidelines",
                        "retirement benefits pension",
                        "pension calculation eligibility",
                        "pension process application",
                        "commutation pension benefits"
                    ]
            
            # Collect results from multiple searches
            all_results = []
            for search_query in pension_searches:
                try:
                    results = await lancedb_service.search_documents(
                        query=search_query,
                        limit=limit//len(pension_searches) + 2  # Ensure we get enough results
                    )
                    all_results.extend(results)
                except Exception as e:
                    logger.warning(f"Search failed for '{search_query}': {e}")
                    continue
            
            # Strategy 2: If no specific searches, use enhanced general search
            if not pension_searches:
                enhanced_query = self._enhance_query(query)
                results = await lancedb_service.search_documents(
                    query=enhanced_query,
                    limit=limit
                )
                all_results.extend(results)
            
            # Deduplicate and rank results
            unique_results = self._deduplicate_results(all_results)
            ranked_results = self._rank_pension_results(unique_results, query)
            
            return ranked_results[:limit]
            
        except Exception as e:
            logger.error(f"❌ Enhanced pension search error: {e}")
            # Fallback to basic search
            try:
                return await lancedb_service.search_documents(query=query, limit=limit)
            except:
                return []

    def _enhance_query(self, query: str) -> str:
        """Enhance query based on detected intent"""
        query_lower = query.lower()
        
        # Pension-related enhancements
        if "pension" in query_lower:
            if "rules" in query_lower:
                return f"{query} pension rules regulations calculation eligibility process"
            elif "calculation" in query_lower:
                return f"{query} pension calculation formula last pay service years"
            elif "benefits" in query_lower:
                return f"{query} pension benefits retirement gratuity provident fund"
            else:
                return f"{query} pension retirement benefits rules calculation"
        
        # Procurement-related
        elif any(word in query_lower for word in ["tender", "procurement", "bid"]):
            return f"{query} procurement tender bidding process rules guidelines"
        
        # Finance-related
        elif any(word in query_lower for word in ["budget", "finance", "sanction"]):
            return f"{query} finance budget allocation sanctioning authority rules"
        
        # Default enhancement
        return f"{query} government rules regulations process guidelines"

    def _deduplicate_results(self, results: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """Remove duplicate documents based on content similarity"""
        if not results:
            return results
            
        unique_results = []
        seen_content = set()
        
        for result in results:
            content = result.get('content', '')
            # Use first 200 characters as similarity check
            content_signature = content[:200].strip().lower()
            
            if content_signature not in seen_content:
                seen_content.add(content_signature)
                unique_results.append(result)
        
        return unique_results

    def _rank_pension_results(self, results: List[Dict[str, Any]], query: str) -> List[Dict[str, Any]]:
        """
        Rank results specifically for pension queries
        Prioritize documents that contain specific pension information
        """
        if not results:
            return results
            
        query_lower = query.lower()
        
        def calculate_pension_score(result: Dict[str, Any]) -> float:
            content = result.get('content', '').lower()
            filename = result.get('filename', '').lower()
            
            score = 0.0
            
            # High priority: Direct pension rule matches
            if "pension rules" in content:
                score += 3.0
            if "pension calculation" in content:
                score += 2.5
            if "pension formula" in content:
                score += 2.5
            if "retirement benefits" in content:
                score += 2.0
                
            # Medium priority: Related pension concepts
            pension_terms = ["commutation", "gratuity", "provident fund", "family pension", 
                           "pension eligibility", "qualifying service", "last drawn pay"]
            for term in pension_terms:
                if term in content:
                    score += 1.0
            
            # Filename bonus
            if "pension" in filename:
                score += 1.5
            if "retirement" in filename:
                score += 1.0
                
            # Query-specific bonuses
            if "rules" in query_lower and "rules" in content:
                score += 1.5
            if "calculation" in query_lower and "calculation" in content:
                score += 1.5
            if "eligibility" in query_lower and "eligibility" in content:
                score += 1.5
                
            return score
        
        # Sort by pension relevance score
        ranked_results = sorted(results, key=calculate_pension_score, reverse=True)
        
        return ranked_results

    async def search_with_fallback(self, query: str, limit: int = 5) -> List[Dict[str, Any]]:
        """
        Main search function with fallback strategies
        """
        try:
            # Try enhanced pension search first
            if "pension" in query.lower():
                results = await self.enhanced_pension_search(query, limit)
                if results:
                    logger.info(f"✅ Found {len(results)} pension documents")
                    return results
            
            # Fallback to regular enhanced search
            enhanced_query = self._enhance_query(query)
            results = await lancedb_service.search_documents(
                query=enhanced_query,
                limit=limit * 2  # Get more to rank better
            )
            
            # Rank and return top results
            if results:
                ranked_results = self._rank_general_results(results, query)
                return ranked_results[:limit]
            
            return results
            
        except Exception as e:
            logger.error(f"❌ Search with fallback error: {e}")
            return []

    def _rank_general_results(self, results: List[Dict[str, Any]], query: str) -> List[Dict[str, Any]]:
        """General ranking for non-pension queries"""
        query_words = query.lower().split()
        
        def calculate_general_score(result: Dict[str, Any]) -> float:
            content = result.get('content', '').lower()
            filename = result.get('filename', '').lower()
            
            score = 0.0
            
            # Word frequency scoring
            for word in query_words:
                if len(word) > 2:  # Skip short words
                    word_count = content.count(word)
                    score += word_count * 0.5
                    
                    if word in filename:
                        score += 2.0
            
            return score
        
        return sorted(results, key=calculate_general_score, reverse=True)

# Global instance
enhanced_search_service = EnhancedSearchService()