Spaces:
Sleeping
Sleeping
| """ | |
| PDF Extraction Pipeline using GPT-5.1. | |
| Extracts structured data from Harmonic PDF reports using sentence-level chunking. | |
| """ | |
| import json | |
| import re | |
| import fitz # PyMuPDF | |
| from openai import OpenAI | |
| from tenacity import retry, stop_after_attempt, wait_exponential | |
| from config import OPENAI_API_KEY | |
| # Initialize OpenAI client | |
| client = OpenAI(api_key=OPENAI_API_KEY) | |
| EXTRACTION_PROMPT = """You are an expert at extracting structured data from Harmonic research reports. | |
| Analyze this text from a Harmonic company report and extract all relevant information. | |
| TEXT FROM HARMONIC REPORT: | |
| {page_text} | |
| --- | |
| Return a JSON object with the following structure (only include fields that have data): | |
| {{ | |
| "company_info": {{ | |
| "name": "company name", | |
| "website": "website URL", | |
| "description": "company description", | |
| "founded_year": 2022, | |
| "hq_location": "City, Country", | |
| "headcount": 10, | |
| "headcount_growth": "growth info" | |
| }}, | |
| "funding": {{ | |
| "total_funding": "$X.XM", | |
| "latest_round": "Seed/Series A/etc", | |
| "latest_round_amount": "$X.XM", | |
| "latest_round_date": "Month Year", | |
| "investors": ["investor1", "investor2"] | |
| }}, | |
| "founders": [ | |
| {{ | |
| "name": "Founder Name", | |
| "title": "Co-Founder & CEO", | |
| "background": "Brief background", | |
| "previous_companies": ["Company1", "Company2"] | |
| }} | |
| ], | |
| "product": {{ | |
| "core_thesis": "Core product thesis", | |
| "icp": "Ideal customer profile", | |
| "key_features": ["feature1", "feature2"], | |
| "differentiation": ["differentiator1", "differentiator2"] | |
| }}, | |
| "market": {{ | |
| "market_description": "Market overview", | |
| "competitors": [ | |
| {{ | |
| "name": "Competitor Name", | |
| "funding": "$XM", | |
| "description": "Brief description", | |
| "founded": "Year" | |
| }} | |
| ], | |
| "trends": ["trend1", "trend2"] | |
| }}, | |
| "traction": {{ | |
| "customers": ["Customer1", "Customer2"], | |
| "web_traffic": "traffic info", | |
| "social_growth": "social growth info", | |
| "recent_news": ["news item 1", "news item 2"] | |
| }} | |
| }} | |
| IMPORTANT: | |
| - The MAIN COMPANY being analyzed is at the top of the report (e.g., "Roadway research report" means the company is Roadway) | |
| - Other companies mentioned are COMPETITORS, not the main company | |
| - Only include sections that have relevant data | |
| - Use "unknown" or null for missing values | |
| - Extract exact numbers and metrics where available | |
| - Be precise with company names, investor names, and figures | |
| - Return ONLY valid JSON, no additional text | |
| """ | |
| def extract_full_text_from_pdf(pdf_path: str) -> str: | |
| """ | |
| Extract all text from a PDF as a single string. | |
| Args: | |
| pdf_path: Path to the PDF file | |
| Returns: | |
| Full text content of the PDF | |
| """ | |
| doc = fitz.open(pdf_path) | |
| full_text = [] | |
| for page_num in range(len(doc)): | |
| page = doc[page_num] | |
| text = page.get_text() | |
| if text.strip(): | |
| full_text.append(text) | |
| doc.close() | |
| return "\n".join(full_text) | |
| def split_into_sentences(text: str) -> list[str]: | |
| """ | |
| Split text into sentences. | |
| Args: | |
| text: Full text to split | |
| Returns: | |
| List of sentences | |
| """ | |
| # Clean up text - remove excessive whitespace | |
| text = re.sub(r'\s+', ' ', text) | |
| # Split on sentence boundaries (., !, ?) followed by space and capital letter | |
| # Also handle common abbreviations | |
| sentence_pattern = r'(?<=[.!?])\s+(?=[A-Z])' | |
| sentences = re.split(sentence_pattern, text) | |
| # Filter out empty sentences and very short ones (likely noise) | |
| sentences = [s.strip() for s in sentences if len(s.strip()) > 20] | |
| return sentences | |
| def create_sentence_chunks_with_context(sentences: list[str], context_window: int = 1) -> list[str]: | |
| """ | |
| Create chunks with sentence context window. | |
| Each chunk contains: [prev sentence(s)] + current sentence + [next sentence(s)] | |
| Args: | |
| sentences: List of sentences | |
| context_window: Number of sentences before and after to include (default: 1) | |
| Returns: | |
| List of chunks with context | |
| """ | |
| chunks = [] | |
| for i in range(len(sentences)): | |
| # Get context sentences | |
| start_idx = max(0, i - context_window) | |
| end_idx = min(len(sentences), i + context_window + 1) | |
| # Create chunk with context | |
| chunk_sentences = sentences[start_idx:end_idx] | |
| chunk = " ".join(chunk_sentences) | |
| chunks.append(chunk) | |
| return chunks | |
| def batch_chunks(chunks: list[str], batch_size: int = 20) -> list[str]: | |
| """ | |
| Batch chunks together to reduce API calls. | |
| Args: | |
| chunks: List of individual chunks | |
| batch_size: Number of chunks per batch | |
| Returns: | |
| List of batched chunks | |
| """ | |
| batches = [] | |
| for i in range(0, len(chunks), batch_size): | |
| batch = chunks[i:i + batch_size] | |
| batched_text = "\n\n---\n\n".join(batch) | |
| batches.append(batched_text) | |
| return batches | |
| def extract_chunk_data(chunk_text: str, chunk_num: int) -> dict: | |
| """ | |
| Extract structured data from a text chunk using GPT-5.1. | |
| Args: | |
| chunk_text: Text content of the chunk (sentences with context) | |
| chunk_num: Chunk number for logging | |
| Returns: | |
| Dictionary of extracted data | |
| """ | |
| # Format the prompt with the chunk text | |
| prompt = EXTRACTION_PROMPT.format(page_text=chunk_text) | |
| try: | |
| # Use GPT-5.1 with medium reasoning for reliable extraction | |
| response = client.responses.create( | |
| model="gpt-5.1", | |
| input=prompt, | |
| reasoning={"effort": "medium"} | |
| ) | |
| content = response.output_text | |
| # Clean up response - extract JSON from various formats | |
| # Remove markdown code blocks | |
| if "```json" in content: | |
| content = content.split("```json")[1].split("```")[0] | |
| elif "```" in content: | |
| parts = content.split("```") | |
| for part in parts: | |
| part = part.strip() | |
| if part.startswith("{"): | |
| content = part | |
| break | |
| # Try to find JSON object in the response | |
| content = content.strip() | |
| if not content.startswith("{"): | |
| # Find the first { and last } | |
| start = content.find("{") | |
| end = content.rfind("}") + 1 | |
| if start != -1 and end > start: | |
| content = content[start:end] | |
| return json.loads(content) | |
| except json.JSONDecodeError as e: | |
| print(f"Warning: Could not parse JSON from chunk {chunk_num}: {e}") | |
| return {} | |
| except Exception as e: | |
| print(f"Error on chunk {chunk_num}: {e}") | |
| return {} | |
| def merge_extracted_data(data_list: list[dict]) -> dict: | |
| """ | |
| Merge extracted data from multiple pages into a single consolidated object. | |
| Later pages can override/update earlier data. | |
| Args: | |
| data_list: List of extracted data dictionaries from each page | |
| Returns: | |
| Merged dictionary with all extracted data | |
| """ | |
| merged = { | |
| "company_info": {}, | |
| "funding": {"investors": []}, | |
| "founders": [], | |
| "product": {"key_features": [], "differentiation": []}, | |
| "market": {"competitors": [], "trends": []}, | |
| "traction": {"customers": [], "recent_news": []} | |
| } | |
| seen_founders = set() | |
| seen_competitors = set() | |
| for data in data_list: | |
| if not data: | |
| continue | |
| # Merge company_info (update fields) | |
| if "company_info" in data: | |
| for key, value in data["company_info"].items(): | |
| if value and value != "unknown": | |
| merged["company_info"][key] = value | |
| # Merge funding | |
| if "funding" in data: | |
| for key, value in data["funding"].items(): | |
| if key == "investors" and isinstance(value, list): | |
| merged["funding"]["investors"].extend(value) | |
| elif value and value != "unknown": | |
| merged["funding"][key] = value | |
| # Merge founders (deduplicate by name) | |
| if "founders" in data: | |
| for founder in data["founders"]: | |
| if founder.get("name") and founder["name"] not in seen_founders: | |
| seen_founders.add(founder["name"]) | |
| merged["founders"].append(founder) | |
| # Merge product | |
| if "product" in data: | |
| for key, value in data["product"].items(): | |
| if key in ["key_features", "differentiation"] and isinstance(value, list): | |
| merged["product"][key].extend(value) | |
| elif value and value != "unknown": | |
| merged["product"][key] = value | |
| # Merge market | |
| if "market" in data: | |
| if "market_description" in data["market"]: | |
| merged["market"]["market_description"] = data["market"]["market_description"] | |
| if "competitors" in data["market"]: | |
| for comp in data["market"]["competitors"]: | |
| if comp.get("name") and comp["name"] not in seen_competitors: | |
| seen_competitors.add(comp["name"]) | |
| merged["market"]["competitors"].append(comp) | |
| if "trends" in data["market"]: | |
| merged["market"]["trends"].extend(data["market"]["trends"]) | |
| # Merge traction | |
| if "traction" in data: | |
| for key, value in data["traction"].items(): | |
| if key in ["customers", "recent_news"] and isinstance(value, list): | |
| merged["traction"][key].extend(value) | |
| elif value and value != "unknown": | |
| merged["traction"][key] = value | |
| # Deduplicate lists | |
| merged["funding"]["investors"] = list(set(merged["funding"]["investors"])) | |
| merged["product"]["key_features"] = list(set(merged["product"]["key_features"])) | |
| merged["product"]["differentiation"] = list(set(merged["product"]["differentiation"])) | |
| merged["market"]["trends"] = list(set(merged["market"]["trends"])) | |
| merged["traction"]["customers"] = list(set(merged["traction"]["customers"])) | |
| return merged | |
| def extract_from_pdf(pdf_path: str, progress_callback=None) -> tuple[dict, list[str]]: | |
| """ | |
| Main function to extract structured data from a Harmonic PDF report. | |
| Uses sentence-level chunking with context window. | |
| Args: | |
| pdf_path: Path to the Harmonic PDF file | |
| progress_callback: Optional callback function for progress updates (chunk, total) | |
| Returns: | |
| Tuple of (extracted_data dict, list of sentence chunks for Pinecone storage) | |
| """ | |
| # Step 1: Extract full text from PDF | |
| print("Extracting text from PDF...") | |
| full_text = extract_full_text_from_pdf(pdf_path) | |
| # Step 2: Split into sentences | |
| print("Splitting into sentences...") | |
| sentences = split_into_sentences(full_text) | |
| print(f"Found {len(sentences)} sentences") | |
| # Step 3: Create chunks with context window (1 sentence before, 1 after) | |
| print("Creating sentence chunks with context window...") | |
| chunks = create_sentence_chunks_with_context(sentences, context_window=1) | |
| print(f"Created {len(chunks)} chunks with context window") | |
| # Step 4: Batch chunks to reduce API calls (20 chunks per batch) | |
| print("Batching chunks for extraction...") | |
| batches = batch_chunks(chunks, batch_size=20) | |
| total_batches = len(batches) | |
| print(f"Created {total_batches} batches for processing") | |
| # Step 5: Extract data from each batch | |
| extracted_data = [] | |
| max_batches = min(total_batches, 5) # Limit to 5 batches to save cost | |
| for i, batch_text in enumerate(batches[:max_batches]): | |
| if progress_callback: | |
| progress_callback(i + 1, max_batches) | |
| print(f"Extracting batch {i + 1}/{max_batches}...") | |
| batch_data = extract_chunk_data(batch_text, i + 1) | |
| extracted_data.append(batch_data) | |
| # Step 6: Merge all extracted data | |
| print("Merging extracted data...") | |
| merged_data = merge_extracted_data(extracted_data) | |
| # Return both the merged data AND the sentence chunks for Pinecone storage | |
| return merged_data, chunks | |
| if __name__ == "__main__": | |
| # Test extraction with sample PDF | |
| import sys | |
| if len(sys.argv) > 1: | |
| pdf_path = sys.argv[1] | |
| data = extract_from_pdf(pdf_path) | |
| print(json.dumps(data, indent=2)) | |