| | |
| | from pathlib import Path |
| | from typing import List, Dict, Any, Optional |
| | from PyPDF2 import PdfReader |
| | from langchain.text_splitter import RecursiveCharacterTextSplitter |
| | import faiss |
| | import numpy as np |
| | import asyncio |
| | from concurrent.futures import ThreadPoolExecutor |
| | import logging |
| | from datetime import datetime |
| | from config.config import settings |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| | class PDFService: |
| | def __init__(self, model_service): |
| | self.embedder = model_service.embedder |
| | self.text_splitter = RecursiveCharacterTextSplitter( |
| | chunk_size=settings.CHUNK_SIZE, |
| | chunk_overlap=settings.CHUNK_OVERLAP, |
| | separators=["\n\n", "\n", ".", "!", "?", ",", " ", ""] |
| | ) |
| | self.index = None |
| | self.chunks = [] |
| | self.last_update = None |
| | self.pdf_metadata = {} |
| |
|
| | def process_pdf(self, pdf_path: Path) -> List[Dict[str, Any]]: |
| | """Process a single PDF file - now synchronous""" |
| | try: |
| | reader = PdfReader(str(pdf_path)) |
| | chunks = [] |
| | |
| | |
| | metadata = { |
| | 'title': reader.metadata.get('/Title', ''), |
| | 'author': reader.metadata.get('/Author', ''), |
| | 'creation_date': reader.metadata.get('/CreationDate', ''), |
| | 'pages': len(reader.pages), |
| | 'filename': pdf_path.name |
| | } |
| | self.pdf_metadata[pdf_path.name] = metadata |
| | |
| | |
| | for page_num, page in enumerate(reader.pages): |
| | text = page.extract_text() |
| | if not text: |
| | continue |
| | |
| | page_chunks = self.text_splitter.split_text(text) |
| | for i, chunk in enumerate(page_chunks): |
| | chunks.append({ |
| | 'text': chunk, |
| | 'source': pdf_path.name, |
| | 'page': page_num + 1, |
| | 'chunk_index': i, |
| | 'metadata': metadata, |
| | 'timestamp': datetime.now().isoformat() |
| | }) |
| | print("--------------------------- chunks ----------------------------------") |
| | print("--------------------------- chunks ----------------------------------") |
| | print(chunks) |
| | return chunks |
| | |
| | except Exception as e: |
| | logger.error(f"Error processing PDF {pdf_path}: {e}") |
| | return [] |
| |
|
| | async def index_pdfs(self, pdf_folder: Path = settings.PDF_FOLDER) -> None: |
| | """Index all PDFs in the specified folder""" |
| | try: |
| | pdf_files = list(pdf_folder.glob('*.pdf')) |
| | if not pdf_files: |
| | logger.warning(f"No PDF files found in {pdf_folder}") |
| | return |
| | |
| | |
| | loop = asyncio.get_running_loop() |
| | with ThreadPoolExecutor() as executor: |
| | chunk_lists = await loop.run_in_executor( |
| | executor, |
| | lambda: [self.process_pdf(pdf_file) for pdf_file in pdf_files] |
| | ) |
| | |
| | |
| | self.chunks = [] |
| | for chunk_list in chunk_lists: |
| | self.chunks.extend(chunk_list) |
| | |
| | if not self.chunks: |
| | logger.warning("No text chunks extracted from PDFs") |
| | return |
| | |
| | |
| | texts = [chunk['text'] for chunk in self.chunks] |
| | embeddings = await loop.run_in_executor( |
| | None, |
| | lambda: self.embedder.encode( |
| | texts, |
| | convert_to_tensor=True, |
| | show_progress_bar=True |
| | ).cpu().detach().numpy() |
| | ) |
| | |
| | dimension = embeddings.shape[1] |
| | self.index = faiss.IndexFlatL2(dimension) |
| | self.index.add(embeddings) |
| | |
| | self.last_update = datetime.now() |
| | |
| | logger.info(f"Indexed {len(self.chunks)} chunks from {len(pdf_files)} PDFs") |
| | |
| | except Exception as e: |
| | logger.error(f"Error indexing PDFs: {e}") |
| | raise |
| |
|
| | async def search( |
| | self, |
| | query: str, |
| | top_k: int = 5, |
| | min_score: float = 0.5 |
| | ) -> List[Dict[str, Any]]: |
| | """Search indexed PDFs with debug logs""" |
| | print("--------------------------- query ----------------------------------") |
| | print(query) |
| | if not self.index or not self.chunks: |
| | await self.index_pdfs() |
| | |
| | try: |
| | |
| | query_embedding = self.embedder.encode([query], convert_to_tensor=True) |
| | query_embedding_np = query_embedding.cpu().detach().numpy() |
| | print("Query Embedding Shape:", query_embedding_np.shape) |
| | |
| | |
| | distances, indices = self.index.search(query_embedding_np, top_k) |
| | print("Distances:", distances) |
| | print("Indices:", indices) |
| | |
| | |
| | results = [] |
| | for i, idx in enumerate(indices[0]): |
| | if idx >= len(self.chunks): |
| | continue |
| | |
| | score = 1 - distances[0][i] |
| | print(f"Chunk Index: {idx}, Distance: {distances[0][i]}, Score: {score}") |
| | print("----- score < min_score") |
| | print(score < min_score) |
| | if score < min_score: |
| | print("skipped ---- ") |
| | |
| | |
| | |
| | chunk = self.chunks[idx].copy() |
| | chunk['score'] = score |
| | print("---- chuck " ) |
| | print(chunk) |
| | results.append(chunk) |
| | |
| | |
| | results.sort(key=lambda x: x['score'], reverse=True) |
| | |
| | print("--------------------------- results ----------------------------------") |
| | print(results) |
| | |
| | return results[:top_k] |
| | |
| | except Exception as e: |
| | logger.error(f"Error searching PDFs: {e}") |
| | raise |
| |
|