| | import os |
| | import re |
| | from pathlib import Path |
| | from typing import List |
| | import google.generativeai as genai |
| | from PyPDF2 import PdfReader |
| | from tqdm import tqdm |
| |
|
| |
|
| | class GeminiProcessor: |
| |
|
| | def __init__(self): |
| | self.api_key = os.getenv("GOOGLE_API_KEY") |
| | if not self.api_key: |
| | raise ValueError("GOOGLE_API_KEY not found") |
| |
|
| | |
| | genai.configure(api_key=self.api_key) |
| | self.model = genai.GenerativeModel('gemini-pro') |
| |
|
| | def preprocess_text(self, text: str) -> str: |
| | """Enhanced preprocessing for screenplay text""" |
| | |
| | text = re.sub(r'<[^>]+>', '', text) |
| |
|
| | |
| | text = re.sub(r'\n(INT\.|EXT\.|INT\/EXT\.)\s*\n', '', text) |
| |
|
| | |
| | text = re.sub(r'\d+\.$', '', text, flags=re.MULTILINE) |
| | text = re.sub(r'\(CONT\'D\)\d*', '', text) |
| |
|
| | |
| | text = re.sub(r'\s+([.,!?])', r'\1', text) |
| |
|
| | |
| | text = re.sub(r' +', ' ', text) |
| | text = re.sub(r'\n{3,}', '\n\n', text) |
| |
|
| | |
| | lines = text.split('\n') |
| | cleaned_lines = [] |
| | prev_line = None |
| |
|
| | for line in lines: |
| | if not line.strip() or line == prev_line: |
| | continue |
| | if line.strip() in ['INT.', 'EXT.', 'INT/EXT.']: |
| | continue |
| | cleaned_lines.append(line) |
| | prev_line = line |
| |
|
| | return '\n'.join(cleaned_lines) |
| |
|
| | def split_into_scenes(self, text: str) -> list: |
| | """Split screenplay into scenes while preserving headers and content""" |
| | |
| | scene_pattern = r'((?:INT\.|EXT\.|INT\/EXT\.)[^\n]+\n(?:(?!(?:INT\.|EXT\.|INT\/EXT\.))[^\n]+\n)*)' |
| |
|
| | scenes = re.findall(scene_pattern, text, re.MULTILINE) |
| |
|
| | |
| | valid_scenes = [] |
| | for scene in scenes: |
| | scene = scene.strip() |
| | if scene: |
| | valid_scenes.append(scene) |
| |
|
| | return valid_scenes |
| |
|
| | def clean_scene(self, scene: str) -> str: |
| | """Process a single scene through Gemini""" |
| | prompt = f"""Fix ONLY spacing and indentation in this screenplay scene. |
| | DO NOT modify any words or content. DO NOT add or remove lines. |
| | Keep original capitalization and formatting: |
| | |
| | {scene}""" |
| |
|
| | try: |
| | response = self.model.generate_content(prompt) |
| | if response.text: |
| | cleaned = response.text |
| | |
| | if abs(len(scene.split()) - len(cleaned.split())) <= 3: |
| | return cleaned.strip() |
| | return scene |
| |
|
| | except Exception as e: |
| | print(f"Error cleaning scene: {str(e)}") |
| | return scene |
| |
|
| | def process_screenplay(self, pdf_path: str, output_path: str) -> bool: |
| | """Process entire screenplay""" |
| | try: |
| | |
| | with open(pdf_path, 'rb') as file: |
| | pdf = PdfReader(file) |
| | text = '\n'.join(page.extract_text() for page in pdf.pages) |
| |
|
| | |
| | |
| |
|
| | |
| | text = self.preprocess_text(text) |
| |
|
| | |
| | scenes = self.split_into_scenes(text) |
| | print(f"Found {len(scenes)} scenes") |
| |
|
| | |
| | cleaned_scenes = [] |
| | for scene in tqdm(scenes, desc="Processing scenes"): |
| | cleaned = self.clean_scene(scene) |
| | if cleaned: |
| | cleaned = self.preprocess_text(cleaned) |
| | cleaned_scenes.append(cleaned) |
| |
|
| | |
| | Path(output_path).parent.mkdir(parents=True, exist_ok=True) |
| | with open(output_path, 'w', encoding='utf-8') as f: |
| | f.write('\n\n'.join(cleaned_scenes)) |
| |
|
| | return True |
| |
|
| | except Exception as e: |
| | print(f"Error processing screenplay: {str(e)}") |
| | return False |
| |
|