PensionBot / lancedb_service.py
ChAbhishek28's picture
Deploy clean Voice Bot backend to HF Spaces
cf02b2b
raw
history blame
16.3 kB
import lancedb
import pandas as pd
from langchain_huggingface import HuggingFaceEmbeddings
from config import EMBEDDING_MODEL_NAME, LANCEDB_PATH
from typing import List, Dict, Any, Optional
import logging
import os
import uuid
from datetime import datetime
import json
logger = logging.getLogger("voicebot")
# Lazy load embedding model to reduce startup time and memory usage
embedding_model = None
def get_embedding_model():
"""Lazy load the embedding model"""
global embedding_model
if embedding_model is None:
logger.info(f"Loading embedding model: {EMBEDDING_MODEL_NAME}")
embedding_model = HuggingFaceEmbeddings(
model_name=EMBEDDING_MODEL_NAME,
model_kwargs={
"device": "cpu",
"trust_remote_code": True
},
encode_kwargs={
"normalize_embeddings": True
}
)
return embedding_model
class LanceDBService:
def __init__(self):
self.db_path = LANCEDB_PATH
self.db = None
self.embedding_model = get_embedding_model()
self._initialize_db()
def _initialize_db(self):
"""Initialize LanceDB connection and create tables if they don't exist"""
try:
os.makedirs(self.db_path, exist_ok=True)
self.db = lancedb.connect(self.db_path)
# Initialize tables
self._init_documents_table()
self._init_personas_table()
self._init_mcp_servers_table()
self._init_sessions_table()
logger.info("βœ… LanceDB initialized successfully")
except Exception as e:
logger.error(f"❌ Error initializing LanceDB: {e}")
raise
def _init_documents_table(self):
"""Initialize documents table for vector storage"""
try:
if "documents" not in self.db.table_names():
# Create sample data to define schema
sample_data = pd.DataFrame({
"id": [str(uuid.uuid4())],
"content": ["sample"],
"metadata": [json.dumps({})],
"user_id": ["sample"],
"knowledge_base": ["sample"],
"filename": ["sample"],
"upload_date": [datetime.now().isoformat()],
"vector": [get_embedding_model().embed_query("sample")]
})
self.db.create_table("documents", sample_data)
# Delete sample data
tbl = self.db.open_table("documents")
tbl.delete("id = 'sample'")
except Exception as e:
logger.error(f"❌ Error initializing documents table: {e}")
def _init_personas_table(self):
"""Initialize personas table"""
try:
if "personas" not in self.db.table_names():
sample_data = pd.DataFrame({
"id": [str(uuid.uuid4())],
"user_id": ["sample"],
"name": ["sample"],
"description": ["sample"],
"icon": ["sample"],
"custom_prompt": ["sample"],
"knowledge_base": ["none"],
"language": ["en"],
"created_at": [datetime.now().isoformat()],
"updated_at": [datetime.now().isoformat()]
})
self.db.create_table("personas", sample_data)
tbl = self.db.open_table("personas")
tbl.delete("id = 'sample'")
except Exception as e:
logger.error(f"❌ Error initializing personas table: {e}")
def _init_mcp_servers_table(self):
"""Initialize MCP servers table"""
try:
if "mcp_servers" not in self.db.table_names():
sample_data = pd.DataFrame({
"id": [str(uuid.uuid4())],
"user_id": ["sample"],
"name": ["sample"],
"url": ["sample"],
"bearer_token": ["sample"],
"created_at": [datetime.now().isoformat()]
})
self.db.create_table("mcp_servers", sample_data)
tbl = self.db.open_table("mcp_servers")
tbl.delete("id = 'sample'")
except Exception as e:
logger.error(f"❌ Error initializing mcp_servers table: {e}")
def _init_sessions_table(self):
"""Initialize sessions table"""
try:
if "sessions" not in self.db.table_names():
sample_data = pd.DataFrame({
"id": [str(uuid.uuid4())],
"user_id": ["sample"],
"persona_id": ["sample"],
"persona_source": ["sample"],
"session_summary": ["sample"],
"created_at": [datetime.now().isoformat()],
"updated_at": [datetime.now().isoformat()]
})
self.db.create_table("sessions", sample_data)
tbl = self.db.open_table("sessions")
tbl.delete("id = 'sample'")
except Exception as e:
logger.error(f"❌ Error initializing sessions table: {e}")
async def add_documents(self, docs, user_id: str, knowledge_base: str, filename: str):
"""Add documents to LanceDB vector store"""
try:
documents_to_insert = []
for doc in docs:
embedding = self.embedding_model.embed_query(doc.page_content)
doc_data = {
"id": str(uuid.uuid4()),
"content": doc.page_content,
"metadata": json.dumps(doc.metadata),
"user_id": user_id,
"knowledge_base": knowledge_base,
"filename": filename,
"upload_date": datetime.now().isoformat(),
"vector": embedding
}
documents_to_insert.append(doc_data)
# Insert documents
tbl = self.db.open_table("documents")
df = pd.DataFrame(documents_to_insert)
tbl.add(df)
logger.info(f"βœ… Added {len(docs)} documents to LanceDB")
return len(docs)
except Exception as e:
logger.error(f"❌ Error adding documents to LanceDB: {e}")
raise e
async def similarity_search(self, query: str, user_id: str, knowledge_base: str, k: int = 4):
"""Search for similar documents"""
try:
query_embedding = self.embedding_model.embed_query(query)
tbl = self.db.open_table("documents")
# Search with filters
results = tbl.search(query_embedding)\
.where(f"user_id = '{user_id}' AND knowledge_base = '{knowledge_base}'")\
.limit(k)\
.to_list()
docs = []
for result in results:
docs.append(type('Document', (), {
'page_content': result['content'],
'metadata': json.loads(result['metadata']) if result['metadata'] else {}
})())
return docs
except Exception as e:
logger.error(f"❌ Error searching LanceDB: {e}")
return []
async def get_user_knowledge_bases(self, user_id: str) -> List[str]:
"""Get all knowledge bases for a user"""
try:
tbl = self.db.open_table("documents")
df = tbl.search().where(f"user_id = '{user_id}'").to_pandas()
if df.empty:
return []
knowledge_bases = df['knowledge_base'].unique().tolist()
return [kb for kb in knowledge_bases if kb and kb != "none"]
except Exception as e:
logger.error(f"❌ Error fetching knowledge bases: {e}")
return []
async def get_kb_documents(self, user_id: str, kb_name: str):
"""Get all documents in a knowledge base"""
try:
tbl = self.db.open_table("documents")
df = tbl.search().where(f"user_id = '{user_id}' AND knowledge_base = '{kb_name}'").to_pandas()
documents = []
for _, row in df.iterrows():
documents.append({
"id": row['id'],
"filename": row['filename'],
"knowledge_base": row['knowledge_base'],
"upload_date": row['upload_date']
})
return documents
except Exception as e:
logger.error(f"❌ Error fetching documents: {e}")
return []
async def delete_document_from_kb(self, user_id: str, kb_name: str, filename: str):
"""Delete a document from knowledge base"""
try:
tbl = self.db.open_table("documents")
tbl.delete(f"user_id = '{user_id}' AND knowledge_base = '{kb_name}' AND filename = '{filename}'")
return True
except Exception as e:
logger.error(f"❌ Error deleting document: {e}")
return False
# Persona management methods
async def insert_persona(self, name: str, description: str, icon: str, custom_prompt: str, user_id: str):
"""Insert a new persona"""
try:
persona_data = {
"id": str(uuid.uuid4()),
"user_id": user_id,
"name": name,
"description": description,
"icon": icon,
"custom_prompt": custom_prompt,
"knowledge_base": "none",
"language": "en",
"created_at": datetime.now().isoformat(),
"updated_at": datetime.now().isoformat()
}
tbl = self.db.open_table("personas")
df = pd.DataFrame([persona_data])
tbl.add(df)
return persona_data
except Exception as e:
logger.error(f"❌ Error inserting persona: {e}")
raise e
async def get_user_personas(self, user_id: str):
"""Get all personas for a user"""
try:
tbl = self.db.open_table("personas")
df = tbl.search().where(f"user_id = '{user_id}'").to_pandas()
return df.to_dict('records')
except Exception as e:
logger.error(f"❌ Error fetching personas: {e}")
return []
# MCP Server methods
async def create_mcp_server(self, user_id: str, name: str, url: str, bearer_token: str = None):
"""Create MCP server entry"""
try:
server_data = {
"id": str(uuid.uuid4()),
"user_id": user_id,
"name": name,
"url": url,
"bearer_token": bearer_token,
"created_at": datetime.now().isoformat()
}
tbl = self.db.open_table("mcp_servers")
df = pd.DataFrame([server_data])
tbl.add(df)
return server_data
except Exception as e:
logger.error(f"❌ Error creating MCP server: {e}")
raise e
async def get_mcp_servers_for_user(self, user_id: str):
"""Get MCP servers for user"""
try:
tbl = self.db.open_table("mcp_servers")
df = tbl.search().where(f"user_id = '{user_id}'").to_pandas()
return df.to_dict('records')
except Exception as e:
logger.error(f"❌ Error fetching MCP servers: {e}")
return []
async def delete_mcp_server(self, user_id: str, server_id: str):
"""Delete MCP server"""
try:
tbl = self.db.open_table("mcp_servers")
tbl.delete(f"user_id = '{user_id}' AND id = '{server_id}'")
return True
except Exception as e:
logger.error(f"❌ Error deleting MCP server: {e}")
return False
# Session management
async def upsert_session_summary(self, user_id: str, persona_id: str, persona_source: str, summary: str):
"""Create or update session summary"""
try:
session_data = {
"id": str(uuid.uuid4()),
"user_id": user_id,
"persona_id": persona_id,
"persona_source": persona_source,
"session_summary": summary,
"created_at": datetime.now().isoformat(),
"updated_at": datetime.now().isoformat()
}
tbl = self.db.open_table("sessions")
df = pd.DataFrame([session_data])
tbl.add(df)
return session_data
except Exception as e:
logger.error(f"❌ Error upserting session: {e}")
return None
async def get_knowledge_bases(self) -> List[str]:
"""Get all unique knowledge bases"""
try:
tbl = self.db.open_table("documents")
df = tbl.search().to_pandas()
if df.empty:
return []
knowledge_bases = df['knowledge_base'].unique().tolist()
return [kb for kb in knowledge_bases if kb and kb != "none"]
except Exception as e:
logger.error(f"❌ Error getting knowledge bases: {e}")
return []
async def get_documents_by_knowledge_base(self, knowledge_base: str) -> List[dict]:
"""Get list of documents in a specific knowledge base"""
try:
tbl = self.db.open_table("documents")
df = tbl.search().where(f"knowledge_base = '{knowledge_base}'").to_pandas()
if df.empty:
return []
# Group by filename and get document info
documents = []
for filename in df['filename'].unique():
file_docs = df[df['filename'] == filename]
documents.append({
"filename": filename,
"knowledge_base": knowledge_base,
"chunks": len(file_docs),
"upload_date": file_docs['upload_date'].iloc[0] if not file_docs.empty else None
})
return documents
except Exception as e:
logger.error(f"❌ Error getting documents by knowledge base: {e}")
return []
async def delete_document(self, filename: str, knowledge_base: str, user_id: str = None):
"""Delete a document from the knowledge base"""
try:
tbl = self.db.open_table("documents")
where_clause = f"filename = '{filename}' AND knowledge_base = '{knowledge_base}'"
if user_id:
where_clause += f" AND user_id = '{user_id}'"
# Delete the document chunks
tbl.delete(where_clause)
logger.info(f"βœ… Deleted document {filename} from knowledge base {knowledge_base}")
return True
except Exception as e:
logger.error(f"❌ Error deleting document: {e}")
return False
async def search_all_knowledge_bases(self, query: str, k: int = 4):
"""Search across all knowledge bases"""
try:
query_embedding = self.embedding_model.embed_query(query)
tbl = self.db.open_table("documents")
# Search without user filters for system-wide search
results = tbl.search(query_embedding).limit(k).to_list()
docs = []
for result in results:
docs.append(type('Document', (), {
'page_content': result['content'],
'metadata': json.loads(result['metadata']) if result['metadata'] else {}
})())
return docs
except Exception as e:
logger.error(f"❌ Error searching all knowledge bases: {e}")
return []
# Global instance
lancedb_service = LanceDBService()