PensionBot / document_service.py
ChAbhishek28's picture
Deploy clean Voice Bot backend to HF Spaces
cf02b2b
from fastapi import UploadFile
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.docstore.document import Document
import pdfplumber
import os
import asyncio
from typing import List
from lancedb_service import lancedb_service
from config import CHUNK_SIZE, CHUNK_OVERLAP
from datetime import datetime
def read_pdf(file: UploadFile) -> str:
with pdfplumber.open(file.file) as pdf:
text = "\n".join(page.extract_text() or "" for page in pdf.pages)
return text
async def process_document_upload(file: UploadFile, userid: str, knowledge_base: str):
try:
filename = file.filename
if not filename.lower().endswith(".pdf"):
return {"error": "Only PDF files are supported"}
# Read PDF
with pdfplumber.open(file.file) as pdf:
text = "\n".join(page.extract_text() or "" for page in pdf.pages)
# Chunk text
splitter = RecursiveCharacterTextSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP)
chunks = splitter.split_text(text)
# Batch create Document objects with metadata including knowledge base
upload_date = datetime.now().isoformat()
docs = [
Document(
page_content=chunk,
metadata={
"source": filename,
"userid": userid,
"knowledge_base": knowledge_base,
"upload_date": upload_date
}
)
for chunk in chunks
]
# ✅ Batch embed & insert using LanceDB
await lancedb_service.add_documents(docs, userid, knowledge_base, filename)
return {
"status": "uploaded",
"chunks": len(docs),
"file": filename,
"knowledge_base": knowledge_base
}
except Exception as e:
return {"error": str(e)}
def read_pdf_from_path(pdf_path: str) -> str:
"""Read PDF content from file path"""
try:
with pdfplumber.open(pdf_path) as pdf:
text = "\n".join(page.extract_text() or "" for page in pdf.pages)
return text
except Exception as e:
print(f"Error reading PDF {pdf_path}: {str(e)}")
return ""
async def process_documents_from_folder(folder_path: str, userid: str = "system", knowledge_base: str = "government_docs"):
"""Process all PDF documents from the specified folder"""
try:
if not os.path.exists(folder_path):
return {"error": f"Folder path {folder_path} does not exist"}
pdf_files = [f for f in os.listdir(folder_path) if f.lower().endswith('.pdf')]
if not pdf_files:
return {"error": "No PDF files found in the folder"}
processed_files = []
total_chunks = 0
for pdf_file in pdf_files:
pdf_path = os.path.join(folder_path, pdf_file)
# Read PDF content
text = read_pdf_from_path(pdf_path)
if not text.strip():
print(f"Skipping {pdf_file} - no text content extracted")
continue
# Chunk text
splitter = RecursiveCharacterTextSplitter(
chunk_size=CHUNK_SIZE,
chunk_overlap=CHUNK_OVERLAP
)
chunks = splitter.split_text(text)
# Create Document objects with metadata
upload_date = datetime.now().isoformat()
docs = [
Document(
page_content=chunk,
metadata={
"source": pdf_file,
"userid": userid,
"knowledge_base": knowledge_base,
"upload_date": upload_date,
"file_path": pdf_path
}
)
for chunk in chunks
]
# Add documents to LanceDB
await lancedb_service.add_documents(docs, userid, knowledge_base, pdf_file)
processed_files.append({
"file": pdf_file,
"chunks": len(chunks)
})
total_chunks += len(chunks)
print(f"Processed {pdf_file}: {len(chunks)} chunks")
return {
"status": "success",
"processed_files": len(processed_files),
"total_chunks": total_chunks,
"files": processed_files,
"knowledge_base": knowledge_base
}
except Exception as e:
return {"error": str(e)}
async def initialize_document_database():
"""Initialize the document database with documents from the aa folder"""
# Path to the documents folder
documents_folder = "/Users/abhishekchoudhary/Abhi Project/aa/raw_documents/Documents"
print("Starting document database initialization...")
result = await process_documents_from_folder(
folder_path=documents_folder,
userid="system",
knowledge_base="government_docs"
)
if "error" in result:
print(f"Error initializing database: {result['error']}")
else:
print(f"Successfully initialized database with {result['total_chunks']} chunks from {result['processed_files']} files")
return result
async def get_available_knowledge_bases() -> List[str]:
"""Get list of available knowledge bases"""
try:
return await lancedb_service.get_knowledge_bases()
except Exception as e:
print(f"Error getting knowledge bases: {str(e)}")
return []
async def get_documents_by_knowledge_base(knowledge_base: str) -> List[dict]:
"""Get list of documents in a specific knowledge base"""
try:
return await lancedb_service.get_documents_by_knowledge_base(knowledge_base)
except Exception as e:
print(f"Error getting documents for knowledge base {knowledge_base}: {str(e)}")
return []