Spaces:
Sleeping
Sleeping
| from pptx import Presentation | |
| from pptx.enum.shapes import MSO_SHAPE_TYPE | |
| from typing import List, Dict, Any | |
| from PIL import Image | |
| from io import BytesIO | |
| import requests | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| import tempfile | |
| import os | |
| import sys | |
| sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..')) | |
| from config import config | |
| # OCR Space API configuration | |
| API_KEY = getattr(config, 'OCR_SPACE_API_KEY', None) | |
| API_URL = "https://api.ocr.space/parse/image" | |
| def ocr_space_file(filename, api_key=API_KEY, overlay=False, language="eng"): | |
| """Extract text from image file using OCR Space API""" | |
| if not api_key: | |
| return filename, "OCR API key not configured" | |
| payload = { | |
| "isOverlayRequired": overlay, | |
| "apikey": api_key, | |
| "language": language, | |
| "detectOrientation": True, | |
| "scale": True, | |
| "isTable": False, | |
| "OCREngine": 2 | |
| } | |
| try: | |
| with open(filename, "rb") as f: | |
| response = requests.post(API_URL, files={filename: f}, data=payload, timeout=30) | |
| if response.status_code != 200: | |
| return filename, f"API Error: HTTP {response.status_code}" | |
| parsed = response.json() | |
| if parsed.get("OCRExitCode") == 1: | |
| parsed_text = parsed.get("ParsedResults", [{}])[0].get("ParsedText", "") | |
| return filename, parsed_text | |
| else: | |
| error_msg = parsed.get("ErrorMessage", ["Unknown error"])[0] if parsed.get("ErrorMessage") else "Unknown OCR error" | |
| return filename, f"OCR Error: {error_msg}" | |
| except requests.exceptions.Timeout: | |
| return filename, "Error: Request timeout" | |
| except requests.exceptions.RequestException as e: | |
| return filename, f"Error: Network error - {str(e)}" | |
| except Exception as e: | |
| return filename, f"Error: {e}" | |
| def extract_pptx(pptx_path: str) -> str: | |
| """Extract text and images from PowerPoint presentations.""" | |
| try: | |
| prs = Presentation(pptx_path) | |
| except Exception as e: | |
| return f"Error loading PowerPoint file: {str(e)}" | |
| all_content = [] | |
| temp_files = [] | |
| try: | |
| for slide_idx, slide in enumerate(prs.slides): | |
| slide_content = [f"\\n=== Slide {slide_idx + 1} ===\\n"] | |
| slide_images = [] | |
| for shape in slide.shapes: | |
| # Extract text | |
| if hasattr(shape, "text") and shape.text.strip(): | |
| slide_content.append(shape.text.strip()) | |
| # Extract images | |
| elif shape.shape_type == MSO_SHAPE_TYPE.PICTURE: | |
| try: | |
| image = shape.image | |
| image_bytes = image.blob | |
| # Save image to temp file | |
| temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".png") | |
| temp_file.write(image_bytes) | |
| temp_file.close() | |
| temp_files.append(temp_file.name) | |
| slide_images.append(temp_file.name) | |
| except Exception as e: | |
| slide_content.append(f"[Image extraction error: {str(e)}]") | |
| # Process images with OCR if API key is available | |
| if slide_images and API_KEY: | |
| try: | |
| with ThreadPoolExecutor(max_workers=3) as executor: | |
| future_to_filename = { | |
| executor.submit(ocr_space_file, img_file): img_file | |
| for img_file in slide_images | |
| } | |
| for future in as_completed(future_to_filename): | |
| filename, ocr_result = future.result() | |
| if ocr_result and not ocr_result.startswith("Error") and not ocr_result.startswith("OCR Error"): | |
| slide_content.append(f"[Image Text]: {ocr_result}") | |
| except Exception as e: | |
| slide_content.append(f"[OCR processing error: {str(e)}]") | |
| elif slide_images: | |
| slide_content.append(f"[{len(slide_images)} images found - OCR not available]") | |
| all_content.append("\\n".join(slide_content)) | |
| finally: | |
| # Clean up temp files | |
| for temp_file in temp_files: | |
| try: | |
| os.unlink(temp_file) | |
| except: | |
| pass | |
| return "\\n\\n".join(all_content) | |