Spaces:
Sleeping
Sleeping
FIX: Add missing search_documents method to LanceDBService - resolves enhanced search errors
df5eb6f
| import lancedb | |
| import pandas as pd | |
| from langchain_huggingface import HuggingFaceEmbeddings | |
| from config import EMBEDDING_MODEL_NAME, LANCEDB_PATH | |
| from typing import List, Dict, Any, Optional | |
| import logging | |
| import os | |
| import uuid | |
| from datetime import datetime | |
| import json | |
| logger = logging.getLogger("voicebot") | |
| # Lazy load embedding model to reduce startup time and memory usage | |
| embedding_model = None | |
| def get_embedding_model(): | |
| """Lazy load the embedding model""" | |
| global embedding_model | |
| if embedding_model is None: | |
| logger.info(f"Loading embedding model: {EMBEDDING_MODEL_NAME}") | |
| embedding_model = HuggingFaceEmbeddings( | |
| model_name=EMBEDDING_MODEL_NAME, | |
| model_kwargs={ | |
| "device": "cpu", | |
| "trust_remote_code": True | |
| }, | |
| encode_kwargs={ | |
| "normalize_embeddings": True | |
| } | |
| ) | |
| return embedding_model | |
| class LanceDBService: | |
| def __init__(self): | |
| self.db_path = LANCEDB_PATH | |
| self.db = None | |
| self.embedding_model = get_embedding_model() | |
| self._initialize_db() | |
| def _initialize_db(self): | |
| """Initialize LanceDB connection and create tables if they don't exist""" | |
| try: | |
| os.makedirs(self.db_path, exist_ok=True) | |
| self.db = lancedb.connect(self.db_path) | |
| # Initialize tables | |
| self._init_documents_table() | |
| self._init_personas_table() | |
| self._init_mcp_servers_table() | |
| self._init_sessions_table() | |
| logger.info("β LanceDB initialized successfully") | |
| except Exception as e: | |
| logger.error(f"β Error initializing LanceDB: {e}") | |
| raise | |
| def _init_documents_table(self): | |
| """Initialize documents table for vector storage""" | |
| try: | |
| if "documents" not in self.db.table_names(): | |
| # Create sample data to define schema | |
| sample_data = pd.DataFrame({ | |
| "id": [str(uuid.uuid4())], | |
| "content": ["sample"], | |
| "metadata": [json.dumps({})], | |
| "user_id": ["sample"], | |
| "knowledge_base": ["sample"], | |
| "filename": ["sample"], | |
| "upload_date": [datetime.now().isoformat()], | |
| "vector": [get_embedding_model().embed_query("sample")] | |
| }) | |
| self.db.create_table("documents", sample_data) | |
| # Delete sample data | |
| tbl = self.db.open_table("documents") | |
| tbl.delete("id = 'sample'") | |
| except Exception as e: | |
| logger.error(f"β Error initializing documents table: {e}") | |
| def _init_personas_table(self): | |
| """Initialize personas table""" | |
| try: | |
| if "personas" not in self.db.table_names(): | |
| sample_data = pd.DataFrame({ | |
| "id": [str(uuid.uuid4())], | |
| "user_id": ["sample"], | |
| "name": ["sample"], | |
| "description": ["sample"], | |
| "icon": ["sample"], | |
| "custom_prompt": ["sample"], | |
| "knowledge_base": ["none"], | |
| "language": ["en"], | |
| "created_at": [datetime.now().isoformat()], | |
| "updated_at": [datetime.now().isoformat()] | |
| }) | |
| self.db.create_table("personas", sample_data) | |
| tbl = self.db.open_table("personas") | |
| tbl.delete("id = 'sample'") | |
| except Exception as e: | |
| logger.error(f"β Error initializing personas table: {e}") | |
| def _init_mcp_servers_table(self): | |
| """Initialize MCP servers table""" | |
| try: | |
| if "mcp_servers" not in self.db.table_names(): | |
| sample_data = pd.DataFrame({ | |
| "id": [str(uuid.uuid4())], | |
| "user_id": ["sample"], | |
| "name": ["sample"], | |
| "url": ["sample"], | |
| "bearer_token": ["sample"], | |
| "created_at": [datetime.now().isoformat()] | |
| }) | |
| self.db.create_table("mcp_servers", sample_data) | |
| tbl = self.db.open_table("mcp_servers") | |
| tbl.delete("id = 'sample'") | |
| except Exception as e: | |
| logger.error(f"β Error initializing mcp_servers table: {e}") | |
| def _init_sessions_table(self): | |
| """Initialize sessions table""" | |
| try: | |
| if "sessions" not in self.db.table_names(): | |
| sample_data = pd.DataFrame({ | |
| "id": [str(uuid.uuid4())], | |
| "user_id": ["sample"], | |
| "persona_id": ["sample"], | |
| "persona_source": ["sample"], | |
| "session_summary": ["sample"], | |
| "created_at": [datetime.now().isoformat()], | |
| "updated_at": [datetime.now().isoformat()] | |
| }) | |
| self.db.create_table("sessions", sample_data) | |
| tbl = self.db.open_table("sessions") | |
| tbl.delete("id = 'sample'") | |
| except Exception as e: | |
| logger.error(f"β Error initializing sessions table: {e}") | |
| async def add_documents(self, docs, user_id: str, knowledge_base: str, filename: str): | |
| """Add documents to LanceDB vector store""" | |
| try: | |
| documents_to_insert = [] | |
| for doc in docs: | |
| embedding = self.embedding_model.embed_query(doc.page_content) | |
| doc_data = { | |
| "id": str(uuid.uuid4()), | |
| "content": doc.page_content, | |
| "metadata": json.dumps(doc.metadata), | |
| "user_id": user_id, | |
| "knowledge_base": knowledge_base, | |
| "filename": filename, | |
| "upload_date": datetime.now().isoformat(), | |
| "vector": embedding | |
| } | |
| documents_to_insert.append(doc_data) | |
| # Insert documents | |
| tbl = self.db.open_table("documents") | |
| df = pd.DataFrame(documents_to_insert) | |
| tbl.add(df) | |
| logger.info(f"β Added {len(docs)} documents to LanceDB") | |
| return len(docs) | |
| except Exception as e: | |
| logger.error(f"β Error adding documents to LanceDB: {e}") | |
| raise e | |
| async def similarity_search(self, query: str, user_id: str, knowledge_base: str, k: int = 4): | |
| """Search for similar documents""" | |
| try: | |
| query_embedding = self.embedding_model.embed_query(query) | |
| tbl = self.db.open_table("documents") | |
| # Search with filters | |
| results = tbl.search(query_embedding)\ | |
| .where(f"user_id = '{user_id}' AND knowledge_base = '{knowledge_base}'")\ | |
| .limit(k)\ | |
| .to_list() | |
| docs = [] | |
| for result in results: | |
| docs.append(type('Document', (), { | |
| 'page_content': result['content'], | |
| 'metadata': json.loads(result['metadata']) if result['metadata'] else {} | |
| })()) | |
| return docs | |
| except Exception as e: | |
| logger.error(f"β Error searching LanceDB: {e}") | |
| return [] | |
| async def get_user_knowledge_bases(self, user_id: str) -> List[str]: | |
| """Get all knowledge bases for a user""" | |
| try: | |
| tbl = self.db.open_table("documents") | |
| df = tbl.search().where(f"user_id = '{user_id}'").to_pandas() | |
| if df.empty: | |
| return [] | |
| knowledge_bases = df['knowledge_base'].unique().tolist() | |
| return [kb for kb in knowledge_bases if kb and kb != "none"] | |
| except Exception as e: | |
| logger.error(f"β Error fetching knowledge bases: {e}") | |
| return [] | |
| async def get_kb_documents(self, user_id: str, kb_name: str): | |
| """Get all documents in a knowledge base""" | |
| try: | |
| tbl = self.db.open_table("documents") | |
| df = tbl.search().where(f"user_id = '{user_id}' AND knowledge_base = '{kb_name}'").to_pandas() | |
| documents = [] | |
| for _, row in df.iterrows(): | |
| documents.append({ | |
| "id": row['id'], | |
| "filename": row['filename'], | |
| "knowledge_base": row['knowledge_base'], | |
| "upload_date": row['upload_date'] | |
| }) | |
| return documents | |
| except Exception as e: | |
| logger.error(f"β Error fetching documents: {e}") | |
| return [] | |
| async def delete_document_from_kb(self, user_id: str, kb_name: str, filename: str): | |
| """Delete a document from knowledge base""" | |
| try: | |
| tbl = self.db.open_table("documents") | |
| tbl.delete(f"user_id = '{user_id}' AND knowledge_base = '{kb_name}' AND filename = '{filename}'") | |
| return True | |
| except Exception as e: | |
| logger.error(f"β Error deleting document: {e}") | |
| return False | |
| # Persona management methods | |
| async def insert_persona(self, name: str, description: str, icon: str, custom_prompt: str, user_id: str): | |
| """Insert a new persona""" | |
| try: | |
| persona_data = { | |
| "id": str(uuid.uuid4()), | |
| "user_id": user_id, | |
| "name": name, | |
| "description": description, | |
| "icon": icon, | |
| "custom_prompt": custom_prompt, | |
| "knowledge_base": "none", | |
| "language": "en", | |
| "created_at": datetime.now().isoformat(), | |
| "updated_at": datetime.now().isoformat() | |
| } | |
| tbl = self.db.open_table("personas") | |
| df = pd.DataFrame([persona_data]) | |
| tbl.add(df) | |
| return persona_data | |
| except Exception as e: | |
| logger.error(f"β Error inserting persona: {e}") | |
| raise e | |
| async def get_user_personas(self, user_id: str): | |
| """Get all personas for a user""" | |
| try: | |
| tbl = self.db.open_table("personas") | |
| df = tbl.search().where(f"user_id = '{user_id}'").to_pandas() | |
| return df.to_dict('records') | |
| except Exception as e: | |
| logger.error(f"β Error fetching personas: {e}") | |
| return [] | |
| # MCP Server methods | |
| async def create_mcp_server(self, user_id: str, name: str, url: str, bearer_token: str = None): | |
| """Create MCP server entry""" | |
| try: | |
| server_data = { | |
| "id": str(uuid.uuid4()), | |
| "user_id": user_id, | |
| "name": name, | |
| "url": url, | |
| "bearer_token": bearer_token, | |
| "created_at": datetime.now().isoformat() | |
| } | |
| tbl = self.db.open_table("mcp_servers") | |
| df = pd.DataFrame([server_data]) | |
| tbl.add(df) | |
| return server_data | |
| except Exception as e: | |
| logger.error(f"β Error creating MCP server: {e}") | |
| raise e | |
| async def get_mcp_servers_for_user(self, user_id: str): | |
| """Get MCP servers for user""" | |
| try: | |
| tbl = self.db.open_table("mcp_servers") | |
| df = tbl.search().where(f"user_id = '{user_id}'").to_pandas() | |
| return df.to_dict('records') | |
| except Exception as e: | |
| logger.error(f"β Error fetching MCP servers: {e}") | |
| return [] | |
| async def delete_mcp_server(self, user_id: str, server_id: str): | |
| """Delete MCP server""" | |
| try: | |
| tbl = self.db.open_table("mcp_servers") | |
| tbl.delete(f"user_id = '{user_id}' AND id = '{server_id}'") | |
| return True | |
| except Exception as e: | |
| logger.error(f"β Error deleting MCP server: {e}") | |
| return False | |
| # Session management | |
| async def upsert_session_summary(self, user_id: str, persona_id: str, persona_source: str, summary: str): | |
| """Create or update session summary""" | |
| try: | |
| session_data = { | |
| "id": str(uuid.uuid4()), | |
| "user_id": user_id, | |
| "persona_id": persona_id, | |
| "persona_source": persona_source, | |
| "session_summary": summary, | |
| "created_at": datetime.now().isoformat(), | |
| "updated_at": datetime.now().isoformat() | |
| } | |
| tbl = self.db.open_table("sessions") | |
| df = pd.DataFrame([session_data]) | |
| tbl.add(df) | |
| return session_data | |
| except Exception as e: | |
| logger.error(f"β Error upserting session: {e}") | |
| return None | |
| async def get_knowledge_bases(self) -> List[str]: | |
| """Get all unique knowledge bases""" | |
| try: | |
| tbl = self.db.open_table("documents") | |
| df = tbl.search().to_pandas() | |
| if df.empty: | |
| return [] | |
| knowledge_bases = df['knowledge_base'].unique().tolist() | |
| return [kb for kb in knowledge_bases if kb and kb != "none"] | |
| except Exception as e: | |
| logger.error(f"β Error getting knowledge bases: {e}") | |
| return [] | |
| async def get_documents_by_knowledge_base(self, knowledge_base: str) -> List[dict]: | |
| """Get list of documents in a specific knowledge base""" | |
| try: | |
| tbl = self.db.open_table("documents") | |
| df = tbl.search().where(f"knowledge_base = '{knowledge_base}'").to_pandas() | |
| if df.empty: | |
| return [] | |
| # Group by filename and get document info | |
| documents = [] | |
| for filename in df['filename'].unique(): | |
| file_docs = df[df['filename'] == filename] | |
| documents.append({ | |
| "filename": filename, | |
| "knowledge_base": knowledge_base, | |
| "chunks": len(file_docs), | |
| "upload_date": file_docs['upload_date'].iloc[0] if not file_docs.empty else None | |
| }) | |
| return documents | |
| except Exception as e: | |
| logger.error(f"β Error getting documents by knowledge base: {e}") | |
| return [] | |
| async def delete_document(self, filename: str, knowledge_base: str, user_id: str = None): | |
| """Delete a document from the knowledge base""" | |
| try: | |
| tbl = self.db.open_table("documents") | |
| where_clause = f"filename = '{filename}' AND knowledge_base = '{knowledge_base}'" | |
| if user_id: | |
| where_clause += f" AND user_id = '{user_id}'" | |
| # Delete the document chunks | |
| tbl.delete(where_clause) | |
| logger.info(f"β Deleted document {filename} from knowledge base {knowledge_base}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"β Error deleting document: {e}") | |
| return False | |
| async def search_documents(self, query: str, limit: int = 5, knowledge_base: str = None): | |
| """Search for documents with specific query and limit""" | |
| try: | |
| query_embedding = self.embedding_model.embed_query(query) | |
| tbl = self.db.open_table("documents") | |
| # Build search query | |
| search_query = tbl.search(query_embedding).limit(limit) | |
| # Add knowledge base filter if specified | |
| if knowledge_base: | |
| search_query = search_query.where(f"knowledge_base = '{knowledge_base}'") | |
| results = search_query.to_list() | |
| docs = [] | |
| for result in results: | |
| docs.append({ | |
| 'content': result['content'], | |
| 'metadata': json.loads(result['metadata']) if result['metadata'] else {}, | |
| 'score': result.get('_distance', 0.0), | |
| 'knowledge_base': result.get('knowledge_base', 'unknown') | |
| }) | |
| return docs | |
| except Exception as e: | |
| logger.error(f"β Error searching documents: {e}") | |
| return [] | |
| async def search_all_knowledge_bases(self, query: str, k: int = 4): | |
| """Search across all knowledge bases""" | |
| try: | |
| query_embedding = self.embedding_model.embed_query(query) | |
| tbl = self.db.open_table("documents") | |
| # Search without user filters for system-wide search | |
| results = tbl.search(query_embedding).limit(k).to_list() | |
| docs = [] | |
| for result in results: | |
| docs.append(type('Document', (), { | |
| 'page_content': result['content'], | |
| 'metadata': json.loads(result['metadata']) if result['metadata'] else {} | |
| })()) | |
| return docs | |
| except Exception as e: | |
| logger.error(f"β Error searching all knowledge bases: {e}") | |
| return [] | |
| # Global instance | |
| lancedb_service = LanceDBService() | |