from fastapi import UploadFile from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.docstore.document import Document import pdfplumber import os import asyncio from typing import List from lancedb_service import lancedb_service from config import CHUNK_SIZE, CHUNK_OVERLAP from datetime import datetime def read_pdf(file: UploadFile) -> str: with pdfplumber.open(file.file) as pdf: text = "\n".join(page.extract_text() or "" for page in pdf.pages) return text async def process_document_upload(file: UploadFile, userid: str, knowledge_base: str): try: filename = file.filename if not filename.lower().endswith(".pdf"): return {"error": "Only PDF files are supported"} # Read PDF with pdfplumber.open(file.file) as pdf: text = "\n".join(page.extract_text() or "" for page in pdf.pages) # Chunk text splitter = RecursiveCharacterTextSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP) chunks = splitter.split_text(text) # Batch create Document objects with metadata including knowledge base upload_date = datetime.now().isoformat() docs = [ Document( page_content=chunk, metadata={ "source": filename, "userid": userid, "knowledge_base": knowledge_base, "upload_date": upload_date } ) for chunk in chunks ] # ✅ Batch embed & insert using LanceDB await lancedb_service.add_documents(docs, userid, knowledge_base, filename) return { "status": "uploaded", "chunks": len(docs), "file": filename, "knowledge_base": knowledge_base } except Exception as e: return {"error": str(e)} def read_pdf_from_path(pdf_path: str) -> str: """Read PDF content from file path""" try: with pdfplumber.open(pdf_path) as pdf: text = "\n".join(page.extract_text() or "" for page in pdf.pages) return text except Exception as e: print(f"Error reading PDF {pdf_path}: {str(e)}") return "" async def process_documents_from_folder(folder_path: str, userid: str = "system", knowledge_base: str = "government_docs"): """Process all PDF documents from the specified folder""" try: if not os.path.exists(folder_path): return {"error": f"Folder path {folder_path} does not exist"} pdf_files = [f for f in os.listdir(folder_path) if f.lower().endswith('.pdf')] if not pdf_files: return {"error": "No PDF files found in the folder"} processed_files = [] total_chunks = 0 for pdf_file in pdf_files: pdf_path = os.path.join(folder_path, pdf_file) # Read PDF content text = read_pdf_from_path(pdf_path) if not text.strip(): print(f"Skipping {pdf_file} - no text content extracted") continue # Chunk text splitter = RecursiveCharacterTextSplitter( chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP ) chunks = splitter.split_text(text) # Create Document objects with metadata upload_date = datetime.now().isoformat() docs = [ Document( page_content=chunk, metadata={ "source": pdf_file, "userid": userid, "knowledge_base": knowledge_base, "upload_date": upload_date, "file_path": pdf_path } ) for chunk in chunks ] # Add documents to LanceDB await lancedb_service.add_documents(docs, userid, knowledge_base, pdf_file) processed_files.append({ "file": pdf_file, "chunks": len(chunks) }) total_chunks += len(chunks) print(f"Processed {pdf_file}: {len(chunks)} chunks") return { "status": "success", "processed_files": len(processed_files), "total_chunks": total_chunks, "files": processed_files, "knowledge_base": knowledge_base } except Exception as e: return {"error": str(e)} async def initialize_document_database(): """Initialize the document database with documents from the aa folder""" # Path to the documents folder documents_folder = "/Users/abhishekchoudhary/Abhi Project/aa/raw_documents/Documents" print("Starting document database initialization...") result = await process_documents_from_folder( folder_path=documents_folder, userid="system", knowledge_base="government_docs" ) if "error" in result: print(f"Error initializing database: {result['error']}") else: print(f"Successfully initialized database with {result['total_chunks']} chunks from {result['processed_files']} files") return result async def get_available_knowledge_bases() -> List[str]: """Get list of available knowledge bases""" try: return await lancedb_service.get_knowledge_bases() except Exception as e: print(f"Error getting knowledge bases: {str(e)}") return [] async def get_documents_by_knowledge_base(knowledge_base: str) -> List[dict]: """Get list of documents in a specific knowledge base""" try: return await lancedb_service.get_documents_by_knowledge_base(knowledge_base) except Exception as e: print(f"Error getting documents for knowledge base {knowledge_base}: {str(e)}") return []