Spaces:
Build error
Build error
| import os | |
| import json | |
| import time | |
| import logging | |
| from pathlib import Path | |
| from typing import List, Dict, Optional | |
| from dataclasses import dataclass | |
| from fastapi.encoders import jsonable_encoder | |
| import fitz # PyMuPDF | |
| from sentence_transformers import SentenceTransformer | |
| from mlc_llm import MLCEngine | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class ProductSpec: | |
| name: str | |
| description: Optional[str] = None | |
| price: Optional[float] = None | |
| attributes: Dict[str, str] = None | |
| tables: List[Dict] = None | |
| def to_dict(self): | |
| return jsonable_encoder(self) | |
| class PDFProcessor: | |
| def __init__(self): | |
| self.emb_model = self._initialize_emb_model("all-MiniLM-L6-v2") | |
| self.llm = self._initialize_llm() | |
| self.output_dir = Path("./output") | |
| self.output_dir.mkdir(exist_ok=True) | |
| def _initialize_emb_model(self, model_name): | |
| try: | |
| return SentenceTransformer(f'sentence-transformers/{model_name}') | |
| except Exception as e: | |
| logger.warning(f"SentenceTransformer failed: {e}") | |
| from transformers import AutoTokenizer, AutoModel | |
| tokenizer = AutoTokenizer.from_pretrained(f"sentence-transformers/{model_name}") | |
| model = AutoModel.from_pretrained(f"sentence-transformers/{model_name}") | |
| return model | |
| def _initialize_llm(self): | |
| """Initialize MLC LLM engine with optimized settings""" | |
| try: | |
| # return MLCEngine(model="HF://mlc-ai/Llama-3-8B-Instruct-q4f16_1-MLC") | |
| return MLCEngine(model="HF://mlc-ai/Llama-2-7B-q4f16_1-MLC") | |
| except Exception as e: | |
| logger.error(f"Failed to initialize MLC Engine: {e}") | |
| raise | |
| def process_pdf(self, pdf_path: str) -> Dict: | |
| """Main PDF processing pipeline""" | |
| start_time = time.time() | |
| try: | |
| doc = fitz.open(pdf_path) | |
| except Exception as e: | |
| logger.error(f"Failed to open PDF: {e}") | |
| raise RuntimeError("Cannot open PDF file.") from e | |
| text_blocks = [] | |
| tables = [] | |
| for page_num, page in enumerate(doc): | |
| blocks = self._extract_text_blocks(page) | |
| text_blocks.extend([b for b in blocks if len(b.strip()) >= 10]) | |
| tables.extend(self._extract_tables(page, page_num)) | |
| products = [] | |
| for idx, block in enumerate(text_blocks): | |
| product = self._process_text_block(block) | |
| if product and self._is_valid_product(product): | |
| product.tables = tables | |
| products.append(product.to_dict()) | |
| logger.info(f"Processed {len(products)} products in {time.time() - start_time:.2f}s") | |
| return {"products": products, "tables": tables} | |
| def _process_text_block(self, text: str) -> Optional[ProductSpec]: | |
| """Process text with MLC LLM using optimized prompt""" | |
| try: | |
| prompt = self._generate_query_prompt(text) | |
| response = self.llm.chat.completions.create( | |
| messages=[{"role": "user", "content": prompt}], | |
| stream=False | |
| ) | |
| return self._parse_response(response.choices[0].message.content) | |
| except Exception as e: | |
| logger.warning(f"Error processing text block: {e}") | |
| return None | |
| def _generate_query_prompt(self, text: str) -> str: | |
| """Generate structured prompt for better JSON response""" | |
| return f"""Extract product specifications as JSON from this text: | |
| Text: {text} | |
| Return valid JSON with exactly these keys: | |
| - name (string) | |
| - description (string, optional) | |
| - price (number, optional) | |
| - attributes (object with key-value pairs, optional) | |
| Example: | |
| {{ | |
| "name": "Example Product", | |
| "description": "High-quality example item", | |
| "price": 99.99, | |
| "attributes": {{"color": "red", "size": "XL"}} | |
| }}""" | |
| def _is_valid_product(self, product: ProductSpec) -> bool: | |
| """Validate extracted product data""" | |
| return any([ | |
| product.name, | |
| product.description, | |
| product.price, | |
| product.attributes | |
| ]) | |
| def _extract_text_blocks(self, page) -> List[str]: | |
| """Extract text blocks from a PDF page using PyMuPDF's blocks method.""" | |
| blocks = [] | |
| for block in page.get_text("blocks"): | |
| # block[4] contains the text content | |
| text = block[4].strip() | |
| if text: | |
| blocks.append(text) | |
| return blocks | |
| def _extract_tables(self, page, page_num: int) -> List[Dict]: | |
| """Extract tables from a PDF page using PyMuPDF's table extraction (if available).""" | |
| tables = [] | |
| try: | |
| tab = page.find_tables() | |
| if tab and hasattr(tab, 'tables') and tab.tables: | |
| for table in tab.tables: | |
| table_data = table.extract() | |
| if table_data: | |
| tables.append({ | |
| "page": page_num + 1, | |
| "cells": table_data, | |
| "header": table.header.names if table.header else [], | |
| "content": table_data | |
| }) | |
| except Exception as e: | |
| logger.warning(f"Error extracting tables from page {page_num + 1}: {e}") | |
| return tables | |
| def _parse_response(self, response: str) -> Optional[ProductSpec]: | |
| """Parse the LLM's response to extract a product specification.""" | |
| try: | |
| json_start = response.find('{') | |
| json_end = response.rfind('}') + 1 | |
| json_str = response[json_start:json_end].strip() | |
| if not json_str: | |
| raise ValueError("No JSON content found in response.") | |
| data = json.loads(json_str) | |
| # If the returned JSON is essentially empty, return None | |
| if all(not data.get(key) for key in ['name', 'description', 'price', 'attributes']): | |
| return None | |
| return ProductSpec( | |
| name=data.get('name', ''), | |
| description=data.get('description'), | |
| price=data.get('price'), | |
| attributes=data.get('attributes', {}) | |
| ) | |
| except (json.JSONDecodeError, KeyError, ValueError) as e: | |
| logger.warning(f"Parse error: {e} in response: {response}") | |
| return None | |
| def process_pdf_catalog(pdf_path: str): | |
| processor = PDFProcessor() | |
| try: | |
| result = processor.process_pdf(pdf_path) | |
| return result, "Processing completed successfully!" | |
| except Exception as e: | |
| logger.error(f"Processing failed: {e}") | |
| return {}, "Error processing PDF" | |
| if __name__ == "__main__": | |
| pdf_path = "path/to/your/pdf_file.pdf" | |
| result, message = process_pdf_catalog(pdf_path) | |
| print(json.dumps(result, indent=2), message) | |