| |
| from pathlib import Path |
| from typing import List, Dict, Any, Optional |
| from PyPDF2 import PdfReader |
| from langchain.text_splitter import RecursiveCharacterTextSplitter |
| import faiss |
| import numpy as np |
| import asyncio |
| from concurrent.futures import ThreadPoolExecutor |
| import logging |
| from datetime import datetime |
| from config.config import settings |
|
|
| logger = logging.getLogger(__name__) |
|
|
| class PDFService: |
| def __init__(self, model_service): |
| self.embedder = model_service.embedder |
| self.text_splitter = RecursiveCharacterTextSplitter( |
| chunk_size=settings.CHUNK_SIZE, |
| chunk_overlap=settings.CHUNK_OVERLAP, |
| separators=["\n\n", "\n", ".", "!", "?", ",", " ", ""] |
| ) |
| self.index = None |
| self.chunks = [] |
| self.last_update = None |
| self.pdf_metadata = {} |
|
|
| def process_pdf(self, pdf_path: Path) -> List[Dict[str, Any]]: |
| """Process a single PDF file - now synchronous""" |
| try: |
| reader = PdfReader(str(pdf_path)) |
| chunks = [] |
| |
| |
| metadata = { |
| 'title': reader.metadata.get('/Title', ''), |
| 'author': reader.metadata.get('/Author', ''), |
| 'creation_date': reader.metadata.get('/CreationDate', ''), |
| 'pages': len(reader.pages), |
| 'filename': pdf_path.name |
| } |
| self.pdf_metadata[pdf_path.name] = metadata |
| |
| |
| for page_num, page in enumerate(reader.pages): |
| text = page.extract_text() |
| if not text: |
| continue |
| |
| page_chunks = self.text_splitter.split_text(text) |
| for i, chunk in enumerate(page_chunks): |
| chunks.append({ |
| 'text': chunk, |
| 'source': pdf_path.name, |
| 'page': page_num + 1, |
| 'chunk_index': i, |
| 'metadata': metadata, |
| 'timestamp': datetime.now().isoformat() |
| }) |
| print("--------------------------- chunks ----------------------------------") |
| print("--------------------------- chunks ----------------------------------") |
| print(chunks) |
| return chunks |
| |
| except Exception as e: |
| logger.error(f"Error processing PDF {pdf_path}: {e}") |
| return [] |
|
|
| async def index_pdfs(self, pdf_folder: Path = settings.PDF_FOLDER) -> None: |
| """Index all PDFs in the specified folder""" |
| try: |
| pdf_files = list(pdf_folder.glob('*.pdf')) |
| if not pdf_files: |
| logger.warning(f"No PDF files found in {pdf_folder}") |
| return |
| |
| |
| loop = asyncio.get_running_loop() |
| with ThreadPoolExecutor() as executor: |
| chunk_lists = await loop.run_in_executor( |
| executor, |
| lambda: [self.process_pdf(pdf_file) for pdf_file in pdf_files] |
| ) |
| |
| |
| self.chunks = [] |
| for chunk_list in chunk_lists: |
| self.chunks.extend(chunk_list) |
| |
| if not self.chunks: |
| logger.warning("No text chunks extracted from PDFs") |
| return |
| |
| |
| texts = [chunk['text'] for chunk in self.chunks] |
| embeddings = await loop.run_in_executor( |
| None, |
| lambda: self.embedder.encode( |
| texts, |
| convert_to_tensor=True, |
| show_progress_bar=True |
| ).cpu().detach().numpy() |
| ) |
| |
| dimension = embeddings.shape[1] |
| self.index = faiss.IndexFlatL2(dimension) |
| self.index.add(embeddings) |
| |
| self.last_update = datetime.now() |
| |
| logger.info(f"Indexed {len(self.chunks)} chunks from {len(pdf_files)} PDFs") |
| |
| except Exception as e: |
| logger.error(f"Error indexing PDFs: {e}") |
| raise |
|
|
| async def search( |
| self, |
| query: str, |
| top_k: int = 5, |
| min_score: float = 0.5 |
| ) -> List[Dict[str, Any]]: |
| """Search indexed PDFs with debug logs""" |
| print("--------------------------- query ----------------------------------") |
| print(query) |
| if not self.index or not self.chunks: |
| await self.index_pdfs() |
| |
| try: |
| |
| query_embedding = self.embedder.encode([query], convert_to_tensor=True) |
| query_embedding_np = query_embedding.cpu().detach().numpy() |
| print("Query Embedding Shape:", query_embedding_np.shape) |
| |
| |
| distances, indices = self.index.search(query_embedding_np, top_k) |
| print("Distances:", distances) |
| print("Indices:", indices) |
| |
| |
| results = [] |
| for i, idx in enumerate(indices[0]): |
| if idx >= len(self.chunks): |
| continue |
| |
| score = 1 - distances[0][i] |
| print(f"Chunk Index: {idx}, Distance: {distances[0][i]}, Score: {score}") |
| print("----- score < min_score") |
| print(score < min_score) |
| if score < min_score: |
| print("skipped ---- ") |
| |
| |
| |
| chunk = self.chunks[idx].copy() |
| chunk['score'] = score |
| print("---- chuck " ) |
| print(chunk) |
| results.append(chunk) |
| |
| |
| results.sort(key=lambda x: x['score'], reverse=True) |
| |
| print("--------------------------- results ----------------------------------") |
| print(results) |
| |
| return results[:top_k] |
| |
| except Exception as e: |
| logger.error(f"Error searching PDFs: {e}") |
| raise |
|
|