Spaces:
Sleeping
Sleeping
| from fastapi import UploadFile | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain.docstore.document import Document | |
| import pdfplumber | |
| import os | |
| import asyncio | |
| from typing import List | |
| from lancedb_service import lancedb_service | |
| from config import CHUNK_SIZE, CHUNK_OVERLAP | |
| from datetime import datetime | |
| def read_pdf(file: UploadFile) -> str: | |
| with pdfplumber.open(file.file) as pdf: | |
| text = "\n".join(page.extract_text() or "" for page in pdf.pages) | |
| return text | |
| async def process_document_upload(file: UploadFile, userid: str, knowledge_base: str): | |
| try: | |
| filename = file.filename | |
| if not filename.lower().endswith(".pdf"): | |
| return {"error": "Only PDF files are supported"} | |
| # Read PDF | |
| with pdfplumber.open(file.file) as pdf: | |
| text = "\n".join(page.extract_text() or "" for page in pdf.pages) | |
| # Chunk text | |
| splitter = RecursiveCharacterTextSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP) | |
| chunks = splitter.split_text(text) | |
| # Batch create Document objects with metadata including knowledge base | |
| upload_date = datetime.now().isoformat() | |
| docs = [ | |
| Document( | |
| page_content=chunk, | |
| metadata={ | |
| "source": filename, | |
| "userid": userid, | |
| "knowledge_base": knowledge_base, | |
| "upload_date": upload_date | |
| } | |
| ) | |
| for chunk in chunks | |
| ] | |
| # ✅ Batch embed & insert using LanceDB | |
| await lancedb_service.add_documents(docs, userid, knowledge_base, filename) | |
| return { | |
| "status": "uploaded", | |
| "chunks": len(docs), | |
| "file": filename, | |
| "knowledge_base": knowledge_base | |
| } | |
| except Exception as e: | |
| return {"error": str(e)} | |
| def read_pdf_from_path(pdf_path: str) -> str: | |
| """Read PDF content from file path""" | |
| try: | |
| with pdfplumber.open(pdf_path) as pdf: | |
| text = "\n".join(page.extract_text() or "" for page in pdf.pages) | |
| return text | |
| except Exception as e: | |
| print(f"Error reading PDF {pdf_path}: {str(e)}") | |
| return "" | |
| async def process_documents_from_folder(folder_path: str, userid: str = "system", knowledge_base: str = "government_docs"): | |
| """Process all PDF documents from the specified folder""" | |
| try: | |
| if not os.path.exists(folder_path): | |
| return {"error": f"Folder path {folder_path} does not exist"} | |
| pdf_files = [f for f in os.listdir(folder_path) if f.lower().endswith('.pdf')] | |
| if not pdf_files: | |
| return {"error": "No PDF files found in the folder"} | |
| processed_files = [] | |
| total_chunks = 0 | |
| for pdf_file in pdf_files: | |
| pdf_path = os.path.join(folder_path, pdf_file) | |
| # Read PDF content | |
| text = read_pdf_from_path(pdf_path) | |
| if not text.strip(): | |
| print(f"Skipping {pdf_file} - no text content extracted") | |
| continue | |
| # Chunk text | |
| splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=CHUNK_SIZE, | |
| chunk_overlap=CHUNK_OVERLAP | |
| ) | |
| chunks = splitter.split_text(text) | |
| # Create Document objects with metadata | |
| upload_date = datetime.now().isoformat() | |
| docs = [ | |
| Document( | |
| page_content=chunk, | |
| metadata={ | |
| "source": pdf_file, | |
| "userid": userid, | |
| "knowledge_base": knowledge_base, | |
| "upload_date": upload_date, | |
| "file_path": pdf_path | |
| } | |
| ) | |
| for chunk in chunks | |
| ] | |
| # Add documents to LanceDB | |
| await lancedb_service.add_documents(docs, userid, knowledge_base, pdf_file) | |
| processed_files.append({ | |
| "file": pdf_file, | |
| "chunks": len(chunks) | |
| }) | |
| total_chunks += len(chunks) | |
| print(f"Processed {pdf_file}: {len(chunks)} chunks") | |
| return { | |
| "status": "success", | |
| "processed_files": len(processed_files), | |
| "total_chunks": total_chunks, | |
| "files": processed_files, | |
| "knowledge_base": knowledge_base | |
| } | |
| except Exception as e: | |
| return {"error": str(e)} | |
| async def initialize_document_database(): | |
| """Initialize the document database with documents from the aa folder""" | |
| # Path to the documents folder | |
| documents_folder = "/Users/abhishekchoudhary/Abhi Project/aa/raw_documents/Documents" | |
| print("Starting document database initialization...") | |
| result = await process_documents_from_folder( | |
| folder_path=documents_folder, | |
| userid="system", | |
| knowledge_base="government_docs" | |
| ) | |
| if "error" in result: | |
| print(f"Error initializing database: {result['error']}") | |
| else: | |
| print(f"Successfully initialized database with {result['total_chunks']} chunks from {result['processed_files']} files") | |
| return result | |
| async def get_available_knowledge_bases() -> List[str]: | |
| """Get list of available knowledge bases""" | |
| try: | |
| return await lancedb_service.get_knowledge_bases() | |
| except Exception as e: | |
| print(f"Error getting knowledge bases: {str(e)}") | |
| return [] | |
| async def get_documents_by_knowledge_base(knowledge_base: str) -> List[dict]: | |
| """Get list of documents in a specific knowledge base""" | |
| try: | |
| return await lancedb_service.get_documents_by_knowledge_base(knowledge_base) | |
| except Exception as e: | |
| print(f"Error getting documents for knowledge base {knowledge_base}: {str(e)}") | |
| return [] | |