Spaces:
Sleeping
Sleeping
| import fitz # PyMuPDF | |
| import json | |
| import os | |
| import re | |
| from sentence_transformers import SentenceTransformer | |
| import pickle | |
| class PDFProcessor: | |
| def __init__(self, pdf_directory="/Users/maraksa/Downloads/chatbot/WebAIM/"): | |
| self.pdf_directory = pdf_directory | |
| self.embedder = SentenceTransformer('all-MiniLM-L6-v2') | |
| # Check if directory exists | |
| if not os.path.exists(pdf_directory): | |
| os.makedirs(pdf_directory) | |
| print(f"Created directory: {pdf_directory}") | |
| print("Please add your WebAIM PDF files to this directory.") | |
| def clean_text(self, text): | |
| """Clean extracted text from PDF""" | |
| # Remove extra whitespace and line breaks | |
| text = re.sub(r'\s+', ' ', text) | |
| # Remove common PDF artifacts | |
| text = re.sub(r'Page \d+ of \d+', '', text) | |
| text = re.sub(r'WebAIM.*?\n', '', text) | |
| return text.strip() | |
| def extract_text_from_pdf(self, pdf_path): | |
| """Extract text from PDF with page information""" | |
| print(f"Processing: {os.path.basename(pdf_path)}") | |
| doc = fitz.open(pdf_path) | |
| pages_content = [] | |
| for page_num in range(len(doc)): | |
| page = doc[page_num] | |
| text = page.get_text() | |
| # Clean the text | |
| cleaned_text = self.clean_text(text) | |
| # Skip pages with very little content | |
| if len(cleaned_text) < 50: | |
| continue | |
| # Clean and chunk text | |
| chunks = self.chunk_text(cleaned_text, chunk_size=500) | |
| for chunk_idx, chunk in enumerate(chunks): | |
| if len(chunk.strip()) > 30: # Only keep substantial chunks | |
| pages_content.append({ | |
| 'text': chunk, | |
| 'source_file': os.path.basename(pdf_path), | |
| 'page_number': page_num + 1, | |
| 'chunk_id': chunk_idx, | |
| 'source_type': 'WebAIM' | |
| }) | |
| doc.close() | |
| print(f"β Extracted {len(pages_content)} chunks from {os.path.basename(pdf_path)}") | |
| return pages_content | |
| def chunk_text(self, text, chunk_size=500, overlap=50): | |
| """Split text into overlapping chunks""" | |
| words = text.split() | |
| chunks = [] | |
| for i in range(0, len(words), chunk_size - overlap): | |
| chunk = ' '.join(words[i:i + chunk_size]) | |
| if chunk.strip(): | |
| chunks.append(chunk.strip()) | |
| return chunks | |
| def process_all_pdfs(self): | |
| """Process all PDFs in the directory""" | |
| all_content = [] | |
| # Check if PDFs exist | |
| pdf_files = [f for f in os.listdir(self.pdf_directory) if f.endswith('.pdf')] | |
| if not pdf_files: | |
| print(f"β No PDF files found in {self.pdf_directory}") | |
| print("Please add your WebAIM PDF files to the pdfs/ directory") | |
| return [] | |
| print(f"Found {len(pdf_files)} PDF files:") | |
| for pdf_file in pdf_files: | |
| print(f" - {pdf_file}") | |
| for filename in pdf_files: | |
| pdf_path = os.path.join(self.pdf_directory, filename) | |
| try: | |
| content = self.extract_text_from_pdf(pdf_path) | |
| all_content.extend(content) | |
| except Exception as e: | |
| print(f"β Error processing {filename}: {str(e)}") | |
| return all_content | |
| def create_knowledge_base(self, output_path="knowledge_base.json"): | |
| """Create searchable knowledge base from PDFs""" | |
| print("π Starting PDF processing...") | |
| all_content = self.process_all_pdfs() | |
| if not all_content: | |
| print("β No content extracted. Please check your PDF files.") | |
| return None | |
| print(f"π Total chunks extracted: {len(all_content)}") | |
| print("π§ Creating embeddings... (this may take a few minutes)") | |
| texts = [item['text'] for item in all_content] | |
| embeddings = self.embedder.encode(texts, show_progress_bar=True) | |
| # Save knowledge base | |
| knowledge_base = { | |
| 'content': all_content, | |
| 'embeddings': embeddings.tolist(), | |
| 'metadata': { | |
| 'total_chunks': len(all_content), | |
| 'embedding_model': 'all-MiniLM-L6-v2', | |
| 'chunk_size': 500, | |
| 'overlap': 50 | |
| } | |
| } | |
| with open(output_path, 'w') as f: | |
| json.dump(knowledge_base, f, indent=2) | |
| print(f"β Knowledge base saved to {output_path}") | |
| print(f"π Summary:") | |
| print(f" - Total chunks: {len(all_content)}") | |
| print(f" - Embedding dimensions: {len(embeddings[0])}") | |
| print(f" - File size: {os.path.getsize(output_path) / 1024 / 1024:.2f} MB") | |
| return knowledge_base | |
| # Usage | |
| if __name__ == "__main__": | |
| processor = PDFProcessor() | |
| knowledge_base = processor.create_knowledge_base() |