Spaces:
Sleeping
Sleeping
| from pptx import Presentation | |
| from pptx.enum.shapes import MSO_SHAPE_TYPE | |
| from typing import List, Dict, Any | |
| from PIL import Image | |
| from io import BytesIO | |
| import requests | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| import tempfile | |
| import os | |
| from config.config import OCR_SPACE_API_KEY | |
| API_URL = "https://api.ocr.space/parse/image" | |
| assert OCR_SPACE_API_KEY, "OCR_SPACE_API_KEY not set" | |
| def ocr_space_file(filename, api_key=OCR_SPACE_API_KEY, overlay=False, language="eng"): | |
| """Extract text from image file using OCR Space API""" | |
| payload = { | |
| "isOverlayRequired": overlay, | |
| "apikey": api_key, | |
| "language": language, | |
| "detectOrientation": True, | |
| "scale": True, | |
| "isTable": False, | |
| "OCREngine": 2 | |
| } | |
| try: | |
| with open(filename, "rb") as f: | |
| response = requests.post(API_URL, files={filename: f}, data=payload, timeout=30) | |
| if response.status_code != 200: | |
| return filename, f"API Error: HTTP {response.status_code}" | |
| parsed = response.json() | |
| if parsed.get("OCRExitCode") == 1: | |
| parsed_text = parsed.get("ParsedResults", [{}])[0].get("ParsedText", "") | |
| return filename, parsed_text | |
| else: | |
| error_msg = parsed.get("ErrorMessage", ["Unknown error"])[0] if parsed.get("ErrorMessage") else "Unknown OCR error" | |
| return filename, f"OCR Error: {error_msg}" | |
| except requests.exceptions.Timeout: | |
| return filename, "Error: Request timeout" | |
| except requests.exceptions.RequestException as e: | |
| return filename, f"Error: Network error - {str(e)}" | |
| except Exception as e: | |
| return filename, f"Error: {e}" | |
| def batch_ocr_parallel(filenames, max_workers=5): | |
| """Process multiple image files in parallel using OCR Space API""" | |
| results = {} | |
| if not filenames: | |
| return results | |
| with ThreadPoolExecutor(max_workers=max_workers) as executor: | |
| future_to_file = {executor.submit(ocr_space_file, fname): fname for fname in filenames} | |
| for future in as_completed(future_to_file): | |
| fname, text = future.result() | |
| results[fname] = text | |
| return results | |
| def extract_pptx_with_meta(pptx_path: str, tesseract_cmd: str = None) -> List[Dict[str, Any]]: | |
| """Extract content from PPTX with metadata, using OCR Space API for images""" | |
| prs = Presentation(pptx_path) | |
| all_slides_content = [] | |
| # First pass: extract all images and save them temporarily | |
| temp_image_files = [] | |
| image_to_shape_mapping = {} | |
| temp_dir = tempfile.mkdtemp() | |
| try: | |
| # Extract all images first | |
| print(f"Extracting images from PPTX to temporary directory: {temp_dir}") | |
| for slide_index, slide in enumerate(prs.slides): | |
| for shape_index, shape in enumerate(slide.shapes): | |
| if shape.shape_type == MSO_SHAPE_TYPE.PICTURE: | |
| try: | |
| img = Image.open(BytesIO(shape.image.blob)) | |
| temp_file = os.path.join(temp_dir, f"slide_{slide_index}shape{shape_index}.png") | |
| img.save(temp_file, 'PNG') | |
| temp_image_files.append(temp_file) | |
| image_to_shape_mapping[temp_file] = (slide_index, shape_index) | |
| print(f"Extracted image: slide {slide_index}, shape {shape_index}") | |
| except Exception as e: | |
| print(f"Failed to extract image from slide {slide_index}, shape {shape_index}: {e}") | |
| # Process all images in parallel using OCR Space API | |
| print(f"Processing {len(temp_image_files)} images with OCR Space API...") | |
| ocr_results = batch_ocr_parallel(temp_image_files, max_workers=5) | |
| print(f"OCR processing completed for {len(ocr_results)} images") | |
| # Second pass: build the content structure | |
| for slide_index, slide in enumerate(prs.slides): | |
| slide_data = { | |
| "slide_number": slide_index + 1, | |
| "content_blocks": [] | |
| } | |
| for shape_index, shape in enumerate(slide.shapes): | |
| content_block = {} | |
| if shape.shape_type == MSO_SHAPE_TYPE.TEXT_BOX or shape.has_text_frame: | |
| text = shape.text.strip() | |
| if text: | |
| content_block["type"] = "text" | |
| content_block["content"] = text | |
| elif shape.shape_type == MSO_SHAPE_TYPE.PICTURE: | |
| # Find the corresponding OCR result | |
| temp_file_key = None | |
| for temp_file, (s_idx, sh_idx) in image_to_shape_mapping.items(): | |
| if s_idx == slide_index and sh_idx == shape_index: | |
| temp_file_key = temp_file | |
| break | |
| if temp_file_key and temp_file_key in ocr_results: | |
| ocr_text = ocr_results[temp_file_key].strip() | |
| if ocr_text and not ocr_text.startswith("Error:"): | |
| content_block["type"] = "image" | |
| content_block["content"] = ocr_text | |
| print(f"OCR extracted from slide {slide_index}: {ocr_text[:100]}...") | |
| else: | |
| content_block["type"] = "image" | |
| content_block["content"] = f"[OCR failed: {ocr_text}]" | |
| else: | |
| content_block["type"] = "image" | |
| content_block["content"] = "[OCR processing failed - no result found]" | |
| elif shape.shape_type == MSO_SHAPE_TYPE.TABLE: | |
| try: | |
| table = shape.table | |
| content_block["type"] = "table" | |
| table_content = "---Table---\n" | |
| for row in table.rows: | |
| row_content = ", ".join([cell.text.strip() for cell in row.cells]) | |
| table_content += row_content + "\n" | |
| table_content += "-" * 11 | |
| content_block["content"] = table_content | |
| except Exception as e: | |
| content_block["type"] = "table" | |
| content_block["content"] = f"[Table extraction failed: {str(e)}]" | |
| if content_block: | |
| slide_data["content_blocks"].append(content_block) | |
| # Handle slide notes | |
| if slide.has_notes_slide and slide.notes_slide.notes_text_frame: | |
| notes = slide.notes_slide.notes_text_frame.text.strip() | |
| if notes: | |
| slide_data["content_blocks"].append({ | |
| "type": "notes", | |
| "content": notes | |
| }) | |
| all_slides_content.append(slide_data) | |
| finally: | |
| # Clean up temporary files | |
| print(f"Cleaning up {len(temp_image_files)} temporary files...") | |
| for temp_file in temp_image_files: | |
| try: | |
| if os.path.exists(temp_file): | |
| os.remove(temp_file) | |
| except Exception as e: | |
| print(f"Failed to remove temp file {temp_file}: {e}") | |
| # Remove temp directory | |
| try: | |
| if os.path.exists(temp_dir): | |
| os.rmdir(temp_dir) | |
| print("Temporary directory cleanup completed") | |
| except Exception as e: | |
| print(f"Failed to remove temp directory {temp_dir}: {e}") | |
| return all_slides_content | |
| def extract_pptx(filepath: str) -> str: | |
| """ | |
| Converts extracted pptx content into a plain text string for LLM input. | |
| Removes all metadata like slide numbers, block types, etc. | |
| """ | |
| text_blocks = [] | |
| for slide in extract_pptx_with_meta(filepath): | |
| for block in slide["content_blocks"]: | |
| content = block.get("content", "").strip() | |
| if content: | |
| text_blocks.append(content) | |
| # Optionally separate slides with a line | |
| text_blocks.append("\n--- End of Slide ---\n") | |
| return "\n".join(text_blocks).strip() | |
| import json |