Spaces:
Sleeping
Sleeping
| import PyPDF2 | |
| import duckdb | |
| import pandas as pd | |
| import os | |
| import random | |
| class PDFProcessor: | |
| def __init__(self): | |
| # Use in-memory database | |
| self.conn = duckdb.connect(':memory:') | |
| self.setup_database() | |
| def setup_database(self): | |
| """Create the necessary tables if they don't exist""" | |
| self.conn.execute(""" | |
| CREATE TABLE IF NOT EXISTS pdf_content ( | |
| page_number INTEGER, | |
| content TEXT, | |
| section TEXT, | |
| category TEXT, | |
| embedding TEXT | |
| ) | |
| """) | |
| def process_pdf(self, pdf_path): | |
| """Process PDF and store content in DuckDB""" | |
| with open(pdf_path, 'rb') as file: | |
| reader = PyPDF2.PdfReader(file) | |
| # Extract text from each page | |
| for page_num in range(len(reader.pages)): | |
| page = reader.pages[page_num] | |
| text = page.extract_text() | |
| # Split text into sections (you might want to adjust this based on your PDF structure) | |
| sections = text.split('\n\n') | |
| # Store each section in the database | |
| for section in sections: | |
| if section.strip(): | |
| # Extract category as the first line of the section | |
| lines = section.strip().split('\n') | |
| category = lines[0].strip() if lines else '' | |
| self.conn.execute(""" | |
| INSERT INTO pdf_content (page_number, content, section, category) | |
| VALUES (?, ?, ?, ?) | |
| """, [page_num + 1, text, section.strip(), category]) | |
| def get_relevant_content(self, query, limit=5): | |
| """Get relevant content based on a query""" | |
| # For now, we'll do a simple text search | |
| # In a production environment, you might want to use proper embeddings and vector similarity | |
| result = self.conn.execute(""" | |
| SELECT DISTINCT content | |
| FROM pdf_content | |
| WHERE content ILIKE '%' || ? || '%' | |
| LIMIT ? | |
| """, [query, limit]).fetchall() | |
| return [row[0] for row in result] | |
| def get_random_sections(self, limit=3): | |
| """Get random sections from the PDF content""" | |
| result = self.conn.execute(""" | |
| SELECT DISTINCT section | |
| FROM pdf_content | |
| WHERE length(section) > 50 | |
| ORDER BY random() | |
| LIMIT ? | |
| """, [limit]).fetchall() | |
| return [row[0] for row in result] | |
| def get_random_sections_by_category(self, category='all', limit=3): | |
| """Get random sections from the PDF content by category (section substring match). If 'all', pick one from each unique category (section) up to the limit.""" | |
| if category == 'all': | |
| # Get all unique categories (section names) | |
| categories = self.conn.execute(""" | |
| SELECT DISTINCT section | |
| FROM pdf_content | |
| WHERE length(section) > 50 | |
| """).fetchall() | |
| categories = [row[0] for row in categories] | |
| random.shuffle(categories) | |
| selected = categories[:limit] | |
| return selected | |
| else: | |
| query = """ | |
| SELECT DISTINCT section | |
| FROM pdf_content | |
| WHERE length(section) > 50 AND lower(section) LIKE ? | |
| ORDER BY random() | |
| LIMIT ? | |
| """ | |
| params = [f'%{category.lower()}%', limit] | |
| result = self.conn.execute(query, params).fetchall() | |
| return [row[0] for row in result] | |
| def get_all_categories(self): | |
| """Return the static list of unique categories for the dropdown.""" | |
| categories = [ | |
| 'All', | |
| 'CUSTOMER OBSESSION', | |
| 'OWNERSHIP', | |
| 'INVENT AND SIMPLIFY', | |
| 'ARE RIGHT, A LOT', | |
| 'HIRE AND DEVELOP THE BEST', | |
| 'INSIST ON THE HIGHEST STANDARDS', | |
| 'THINK BIG', | |
| 'BIAS FOR ACTION', | |
| 'BEING FRUGAL (FRUGALITY)', | |
| 'EARN TRUST', | |
| 'DIVE DEEP', | |
| 'DELIVER RESULTS', | |
| 'HAVE BACKBONE: DISAGREE AND COMMIT', | |
| 'LEARN & BE CURIOUS', | |
| 'SUCCESS & SCALE BRING BROAD RESPONSIBILITY', | |
| "STRIVE TO BE EARTH'S BEST" | |
| ] | |
| return categories | |
| def close(self): | |
| """Close the database connection""" | |
| self.conn.close() |