milestoneme / pdf_processor.py
luxananda's picture
Upload pdf_processor.py
0952601 verified
import PyPDF2
import duckdb
import pandas as pd
import os
import random
class PDFProcessor:
def __init__(self):
# Use in-memory database
self.conn = duckdb.connect(':memory:')
self.setup_database()
def setup_database(self):
"""Create the necessary tables if they don't exist"""
self.conn.execute("""
CREATE TABLE IF NOT EXISTS pdf_content (
page_number INTEGER,
content TEXT,
section TEXT,
category TEXT,
embedding TEXT
)
""")
def process_pdf(self, pdf_path):
"""Process PDF and store content in DuckDB"""
with open(pdf_path, 'rb') as file:
reader = PyPDF2.PdfReader(file)
# Extract text from each page
for page_num in range(len(reader.pages)):
page = reader.pages[page_num]
text = page.extract_text()
# Split text into sections (you might want to adjust this based on your PDF structure)
sections = text.split('\n\n')
# Store each section in the database
for section in sections:
if section.strip():
# Extract category as the first line of the section
lines = section.strip().split('\n')
category = lines[0].strip() if lines else ''
self.conn.execute("""
INSERT INTO pdf_content (page_number, content, section, category)
VALUES (?, ?, ?, ?)
""", [page_num + 1, text, section.strip(), category])
def get_relevant_content(self, query, limit=5):
"""Get relevant content based on a query"""
# For now, we'll do a simple text search
# In a production environment, you might want to use proper embeddings and vector similarity
result = self.conn.execute("""
SELECT DISTINCT content
FROM pdf_content
WHERE content ILIKE '%' || ? || '%'
LIMIT ?
""", [query, limit]).fetchall()
return [row[0] for row in result]
def get_random_sections(self, limit=3):
"""Get random sections from the PDF content"""
result = self.conn.execute("""
SELECT DISTINCT section
FROM pdf_content
WHERE length(section) > 50
ORDER BY random()
LIMIT ?
""", [limit]).fetchall()
return [row[0] for row in result]
def get_random_sections_by_category(self, category='all', limit=3):
"""Get random sections from the PDF content by category (section substring match). If 'all', pick one from each unique category (section) up to the limit."""
if category == 'all':
# Get all unique categories (section names)
categories = self.conn.execute("""
SELECT DISTINCT section
FROM pdf_content
WHERE length(section) > 50
""").fetchall()
categories = [row[0] for row in categories]
random.shuffle(categories)
selected = categories[:limit]
return selected
else:
query = """
SELECT DISTINCT section
FROM pdf_content
WHERE length(section) > 50 AND lower(section) LIKE ?
ORDER BY random()
LIMIT ?
"""
params = [f'%{category.lower()}%', limit]
result = self.conn.execute(query, params).fetchall()
return [row[0] for row in result]
def get_all_categories(self):
"""Return the static list of unique categories for the dropdown."""
categories = [
'All',
'CUSTOMER OBSESSION',
'OWNERSHIP',
'INVENT AND SIMPLIFY',
'ARE RIGHT, A LOT',
'HIRE AND DEVELOP THE BEST',
'INSIST ON THE HIGHEST STANDARDS',
'THINK BIG',
'BIAS FOR ACTION',
'BEING FRUGAL (FRUGALITY)',
'EARN TRUST',
'DIVE DEEP',
'DELIVER RESULTS',
'HAVE BACKBONE: DISAGREE AND COMMIT',
'LEARN & BE CURIOUS',
'SUCCESS & SCALE BRING BROAD RESPONSIBILITY',
"STRIVE TO BE EARTH'S BEST"
]
return categories
def close(self):
"""Close the database connection"""
self.conn.close()