File size: 6,053 Bytes
cf02b2b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
from fastapi import UploadFile
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.docstore.document import Document
import pdfplumber
import os
import asyncio
from typing import List
from lancedb_service import lancedb_service
from config import CHUNK_SIZE, CHUNK_OVERLAP
from datetime import datetime

def read_pdf(file: UploadFile) -> str:
    with pdfplumber.open(file.file) as pdf:
        text = "\n".join(page.extract_text() or "" for page in pdf.pages)
    return text

async def process_document_upload(file: UploadFile, userid: str, knowledge_base: str):
    try:
        filename = file.filename
        if not filename.lower().endswith(".pdf"):
            return {"error": "Only PDF files are supported"}

        # Read PDF
        with pdfplumber.open(file.file) as pdf:
            text = "\n".join(page.extract_text() or "" for page in pdf.pages)

        # Chunk text
        splitter = RecursiveCharacterTextSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP)
        chunks = splitter.split_text(text)

        # Batch create Document objects with metadata including knowledge base
        upload_date = datetime.now().isoformat()
        docs = [
            Document(
                page_content=chunk,
                metadata={
                    "source": filename,
                    "userid": userid,
                    "knowledge_base": knowledge_base,
                    "upload_date": upload_date
                }
            )
            for chunk in chunks
        ]

        # ✅ Batch embed & insert using LanceDB
        await lancedb_service.add_documents(docs, userid, knowledge_base, filename)

        return {
            "status": "uploaded",
            "chunks": len(docs),
            "file": filename,
            "knowledge_base": knowledge_base
        }

    except Exception as e:
        return {"error": str(e)}

def read_pdf_from_path(pdf_path: str) -> str:
    """Read PDF content from file path"""
    try:
        with pdfplumber.open(pdf_path) as pdf:
            text = "\n".join(page.extract_text() or "" for page in pdf.pages)
        return text
    except Exception as e:
        print(f"Error reading PDF {pdf_path}: {str(e)}")
        return ""

async def process_documents_from_folder(folder_path: str, userid: str = "system", knowledge_base: str = "government_docs"):
    """Process all PDF documents from the specified folder"""
    try:
        if not os.path.exists(folder_path):
            return {"error": f"Folder path {folder_path} does not exist"}
        
        pdf_files = [f for f in os.listdir(folder_path) if f.lower().endswith('.pdf')]
        
        if not pdf_files:
            return {"error": "No PDF files found in the folder"}
        
        processed_files = []
        total_chunks = 0
        
        for pdf_file in pdf_files:
            pdf_path = os.path.join(folder_path, pdf_file)
            
            # Read PDF content
            text = read_pdf_from_path(pdf_path)
            
            if not text.strip():
                print(f"Skipping {pdf_file} - no text content extracted")
                continue
            
            # Chunk text
            splitter = RecursiveCharacterTextSplitter(
                chunk_size=CHUNK_SIZE, 
                chunk_overlap=CHUNK_OVERLAP
            )
            chunks = splitter.split_text(text)
            
            # Create Document objects with metadata
            upload_date = datetime.now().isoformat()
            docs = [
                Document(
                    page_content=chunk,
                    metadata={
                        "source": pdf_file,
                        "userid": userid,
                        "knowledge_base": knowledge_base,
                        "upload_date": upload_date,
                        "file_path": pdf_path
                    }
                )
                for chunk in chunks
            ]
            
            # Add documents to LanceDB
            await lancedb_service.add_documents(docs, userid, knowledge_base, pdf_file)
            
            processed_files.append({
                "file": pdf_file,
                "chunks": len(chunks)
            })
            total_chunks += len(chunks)
            
            print(f"Processed {pdf_file}: {len(chunks)} chunks")
        
        return {
            "status": "success",
            "processed_files": len(processed_files),
            "total_chunks": total_chunks,
            "files": processed_files,
            "knowledge_base": knowledge_base
        }
        
    except Exception as e:
        return {"error": str(e)}

async def initialize_document_database():
    """Initialize the document database with documents from the aa folder"""
    # Path to the documents folder
    documents_folder = "/Users/abhishekchoudhary/Abhi Project/aa/raw_documents/Documents"
    
    print("Starting document database initialization...")
    result = await process_documents_from_folder(
        folder_path=documents_folder,
        userid="system",
        knowledge_base="government_docs"
    )
    
    if "error" in result:
        print(f"Error initializing database: {result['error']}")
    else:
        print(f"Successfully initialized database with {result['total_chunks']} chunks from {result['processed_files']} files")
    
    return result

async def get_available_knowledge_bases() -> List[str]:
    """Get list of available knowledge bases"""
    try:
        return await lancedb_service.get_knowledge_bases()
    except Exception as e:
        print(f"Error getting knowledge bases: {str(e)}")
        return []

async def get_documents_by_knowledge_base(knowledge_base: str) -> List[dict]:
    """Get list of documents in a specific knowledge base"""
    try:
        return await lancedb_service.get_documents_by_knowledge_base(knowledge_base)
    except Exception as e:
        print(f"Error getting documents for knowledge base {knowledge_base}: {str(e)}")
        return []