|
|
import fitz |
|
|
import json |
|
|
import os |
|
|
import re |
|
|
from sentence_transformers import SentenceTransformer |
|
|
import pickle |
|
|
|
|
|
class PDFProcessor: |
|
|
def __init__(self, pdf_directory="/Users/maraksa/Downloads/chatbot/WebAIM/"): |
|
|
self.pdf_directory = pdf_directory |
|
|
self.embedder = SentenceTransformer('all-MiniLM-L6-v2') |
|
|
|
|
|
|
|
|
if not os.path.exists(pdf_directory): |
|
|
os.makedirs(pdf_directory) |
|
|
print(f"Created directory: {pdf_directory}") |
|
|
print("Please add your WebAIM PDF files to this directory.") |
|
|
|
|
|
def clean_text(self, text): |
|
|
"""Clean extracted text from PDF""" |
|
|
|
|
|
text = re.sub(r'\s+', ' ', text) |
|
|
|
|
|
|
|
|
text = re.sub(r'Page \d+ of \d+', '', text) |
|
|
text = re.sub(r'WebAIM.*?\n', '', text) |
|
|
|
|
|
return text.strip() |
|
|
|
|
|
def extract_text_from_pdf(self, pdf_path): |
|
|
"""Extract text from PDF with page information""" |
|
|
print(f"Processing: {os.path.basename(pdf_path)}") |
|
|
doc = fitz.open(pdf_path) |
|
|
pages_content = [] |
|
|
|
|
|
for page_num in range(len(doc)): |
|
|
page = doc[page_num] |
|
|
text = page.get_text() |
|
|
|
|
|
|
|
|
cleaned_text = self.clean_text(text) |
|
|
|
|
|
|
|
|
if len(cleaned_text) < 50: |
|
|
continue |
|
|
|
|
|
|
|
|
chunks = self.chunk_text(cleaned_text, chunk_size=500) |
|
|
|
|
|
for chunk_idx, chunk in enumerate(chunks): |
|
|
if len(chunk.strip()) > 30: |
|
|
pages_content.append({ |
|
|
'text': chunk, |
|
|
'source_file': os.path.basename(pdf_path), |
|
|
'page_number': page_num + 1, |
|
|
'chunk_id': chunk_idx, |
|
|
'source_type': 'WebAIM' |
|
|
}) |
|
|
|
|
|
doc.close() |
|
|
print(f"β
Extracted {len(pages_content)} chunks from {os.path.basename(pdf_path)}") |
|
|
return pages_content |
|
|
|
|
|
def chunk_text(self, text, chunk_size=500, overlap=50): |
|
|
"""Split text into overlapping chunks""" |
|
|
words = text.split() |
|
|
chunks = [] |
|
|
|
|
|
for i in range(0, len(words), chunk_size - overlap): |
|
|
chunk = ' '.join(words[i:i + chunk_size]) |
|
|
if chunk.strip(): |
|
|
chunks.append(chunk.strip()) |
|
|
|
|
|
return chunks |
|
|
|
|
|
def process_all_pdfs(self): |
|
|
"""Process all PDFs in the directory""" |
|
|
all_content = [] |
|
|
|
|
|
|
|
|
pdf_files = [f for f in os.listdir(self.pdf_directory) if f.endswith('.pdf')] |
|
|
|
|
|
if not pdf_files: |
|
|
print(f"β No PDF files found in {self.pdf_directory}") |
|
|
print("Please add your WebAIM PDF files to the pdfs/ directory") |
|
|
return [] |
|
|
|
|
|
print(f"Found {len(pdf_files)} PDF files:") |
|
|
for pdf_file in pdf_files: |
|
|
print(f" - {pdf_file}") |
|
|
|
|
|
for filename in pdf_files: |
|
|
pdf_path = os.path.join(self.pdf_directory, filename) |
|
|
try: |
|
|
content = self.extract_text_from_pdf(pdf_path) |
|
|
all_content.extend(content) |
|
|
except Exception as e: |
|
|
print(f"β Error processing {filename}: {str(e)}") |
|
|
|
|
|
return all_content |
|
|
|
|
|
def create_knowledge_base(self, output_path="knowledge_base.json"): |
|
|
"""Create searchable knowledge base from PDFs""" |
|
|
print("π Starting PDF processing...") |
|
|
all_content = self.process_all_pdfs() |
|
|
|
|
|
if not all_content: |
|
|
print("β No content extracted. Please check your PDF files.") |
|
|
return None |
|
|
|
|
|
print(f"π Total chunks extracted: {len(all_content)}") |
|
|
print("π§ Creating embeddings... (this may take a few minutes)") |
|
|
|
|
|
texts = [item['text'] for item in all_content] |
|
|
embeddings = self.embedder.encode(texts, show_progress_bar=True) |
|
|
|
|
|
|
|
|
knowledge_base = { |
|
|
'content': all_content, |
|
|
'embeddings': embeddings.tolist(), |
|
|
'metadata': { |
|
|
'total_chunks': len(all_content), |
|
|
'embedding_model': 'all-MiniLM-L6-v2', |
|
|
'chunk_size': 500, |
|
|
'overlap': 50 |
|
|
} |
|
|
} |
|
|
|
|
|
with open(output_path, 'w') as f: |
|
|
json.dump(knowledge_base, f, indent=2) |
|
|
|
|
|
print(f"β
Knowledge base saved to {output_path}") |
|
|
print(f"π Summary:") |
|
|
print(f" - Total chunks: {len(all_content)}") |
|
|
print(f" - Embedding dimensions: {len(embeddings[0])}") |
|
|
print(f" - File size: {os.path.getsize(output_path) / 1024 / 1024:.2f} MB") |
|
|
|
|
|
return knowledge_base |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
processor = PDFProcessor() |
|
|
knowledge_base = processor.create_knowledge_base() |