Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Gradio Interface for Worship Program Generation | |
| Upload DOCX sermon and PDF bulletin to generate bilingual worship program | |
| """ | |
| import gradio as gr | |
| import asyncio | |
| import os | |
| import sys | |
| import tempfile | |
| import re | |
| import shutil | |
| from pathlib import Path | |
| from datetime import datetime | |
| # Add current directory to path | |
| sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) | |
| # Import document processing (only essential module) | |
| from document_processing_agent import DocumentProcessingAgent, WorshipProgramGenerator | |
| # Initialize generator (GEMMA backend not required for Hugging Face deployment) | |
| GEMMA_BACKEND_URL = os.getenv("GEMMA_BACKEND_URL", "http://localhost:8080") | |
| # ============================================================================ | |
| # Translation Functions (embedded from translate_document.py) | |
| # ============================================================================ | |
| async def translate_document(docx_path: str, output_path: str = None): | |
| """Translate entire DOCX document to bilingual format""" | |
| if not os.path.exists(docx_path): | |
| return None | |
| # Check if this looks like a worship program file (should not be translated) | |
| filename = os.path.basename(docx_path).lower() | |
| if 'worship_program' in filename or 'worship-program' in filename: | |
| print(f"Warning: File '{filename}' appears to be a worship program, not a sermon transcript.") | |
| print("Please upload the original sermon/transcript DOCX file, not a generated worship program.") | |
| return None | |
| # Initialize processor with OPUS-MT translation (Qwen disabled due to name translation issues) | |
| processor = DocumentProcessingAgent(GEMMA_BACKEND_URL, use_qwen_translation=False) | |
| # Extract content from DOCX | |
| try: | |
| content = await processor._extract_word(docx_path) | |
| except Exception as e: | |
| print(f"Error extracting content: {e}") | |
| return None | |
| # Validate that this looks like a sermon/transcript, not a worship program | |
| # Worship programs typically have structured sections like "## Call to Worship", "## Songs", etc. | |
| content_lower = content.lower() | |
| worship_program_indicators = [ | |
| '## call to worship', | |
| '## songs', | |
| '## prayer', | |
| '## message', | |
| '## announcements', | |
| 'worship program', | |
| 'scripture reference', | |
| 'today\'s bible reading' | |
| ] | |
| indicator_count = sum(1 for indicator in worship_program_indicators if indicator in content_lower) | |
| if indicator_count >= 3: | |
| print(f"Warning: The DOCX file appears to be a worship program (found {indicator_count} program indicators), not a sermon transcript.") | |
| print("Please upload the original sermon/transcript DOCX file for translation.") | |
| return None | |
| # Split content into paragraphs and find Chinese paragraphs | |
| # RUN EVERYTHING MODE: Translate ALL paragraphs containing Chinese characters | |
| # Process paragraphs intelligently to avoid duplicates | |
| paragraphs = content.split('\n\n') | |
| chinese_paragraphs = [] | |
| seen_paragraphs = set() # Track to avoid duplicates | |
| for para in paragraphs: | |
| para = para.strip() | |
| if not para: | |
| continue | |
| # Check if paragraph contains Chinese | |
| chinese_chars = re.findall(r'[\u4e00-\u9fff]+', para) | |
| if not chinese_chars: | |
| continue | |
| # Split by single newlines to handle titles on separate lines | |
| lines = [line.strip() for line in para.split('\n') if line.strip()] | |
| # Strategy: If paragraph has multiple lines, check if first line is a title | |
| # If so, process title separately, then process remaining content | |
| if len(lines) > 1: | |
| first_line = lines[0] | |
| first_line_has_chinese = bool(re.findall(r'[\u4e00-\u9fff]+', first_line)) | |
| # Check if first line is a title (ends with colon and is relatively short) | |
| if first_line_has_chinese and (first_line.endswith('οΌ') or first_line.endswith(':')) and len(first_line) < 50: | |
| # Add title separately if not seen | |
| if first_line not in seen_paragraphs: | |
| chinese_paragraphs.append(first_line) | |
| seen_paragraphs.add(first_line) | |
| # Process remaining content | |
| remaining_content = '\n'.join(lines[1:]).strip() | |
| if remaining_content and remaining_content not in seen_paragraphs: | |
| remaining_chinese = re.findall(r'[\u4e00-\u9fff]+', remaining_content) | |
| if remaining_chinese: | |
| chinese_paragraphs.append(remaining_content) | |
| seen_paragraphs.add(remaining_content) | |
| continue | |
| # For single-line paragraphs or multi-line without title pattern, add whole paragraph | |
| if para not in seen_paragraphs: | |
| chinese_paragraphs.append(para) | |
| seen_paragraphs.add(para) | |
| # Translate each paragraph | |
| bilingual_content = [] | |
| for i, chinese_para in enumerate(chinese_paragraphs, 1): | |
| translated = await processor._translate_text(chinese_para, 'zh', 'en') | |
| bilingual_content.append(chinese_para) | |
| if translated: | |
| bilingual_content.append(translated) | |
| # Determine output path | |
| if output_path is None: | |
| input_path = Path(docx_path) | |
| output_path = input_path.parent / f"{input_path.stem}_bilingual.txt" | |
| # Write output | |
| with open(output_path, 'w', encoding='utf-8') as f: | |
| f.write("# Bilingual Document Translation\n\n") | |
| f.write(f"Source: {docx_path}\n\n") | |
| f.write("="*60 + "\n\n") | |
| f.write("\n\n".join(bilingual_content)) | |
| return str(output_path) | |
| # ============================================================================ | |
| # DOCX Conversion Functions (embedded from markdown_to_docx.py) | |
| # ============================================================================ | |
| def add_formatted_text(paragraph, text): | |
| """Add text with inline formatting (bold, italic)""" | |
| parts = re.split(r'(\*\*.*?\*\*|\*.*?\*|__.*?__|_.*?_)', text) | |
| for part in parts: | |
| if not part: | |
| continue | |
| if part.startswith('**') and part.endswith('**'): | |
| run = paragraph.add_run(part[2:-2]) | |
| run.bold = True | |
| elif part.startswith('__') and part.endswith('__'): | |
| run = paragraph.add_run(part[2:-2]) | |
| run.bold = True | |
| elif part.startswith('*') and part.endswith('*') and len(part) > 2: | |
| run = paragraph.add_run(part[1:-1]) | |
| run.italic = True | |
| elif part.startswith('_') and part.endswith('_') and len(part) > 2: | |
| run = paragraph.add_run(part[1:-1]) | |
| run.italic = True | |
| else: | |
| paragraph.add_run(part) | |
| def markdown_to_docx(markdown_path: str, docx_path: str): | |
| """Convert markdown file to DOCX format""" | |
| from docx import Document | |
| from docx.shared import Pt | |
| # Read markdown file | |
| with open(markdown_path, 'r', encoding='utf-8') as f: | |
| content = f.read() | |
| # Create new document | |
| doc = Document() | |
| style = doc.styles['Normal'] | |
| font = style.font | |
| font.name = 'Arial' | |
| font.size = Pt(11) | |
| # Split content into lines | |
| lines = content.split('\n') | |
| i = 0 | |
| while i < len(lines): | |
| line = lines[i].strip() | |
| # Skip empty lines | |
| if not line: | |
| if i < len(lines) - 1: | |
| doc.add_paragraph() | |
| i += 1 | |
| continue | |
| # Handle horizontal rules | |
| if line.startswith('---'): | |
| doc.add_paragraph('β' * 50) | |
| i += 1 | |
| continue | |
| # Handle headings | |
| if line.startswith('#'): | |
| level = len(line) - len(line.lstrip('#')) | |
| heading_text = line.lstrip('#').strip() | |
| if level == 1: | |
| doc.add_heading(heading_text, level=1) | |
| elif level == 2: | |
| doc.add_heading(heading_text, level=2) | |
| elif level == 3: | |
| doc.add_heading(heading_text, level=3) | |
| else: | |
| doc.add_heading(heading_text, level=4) | |
| i += 1 | |
| continue | |
| # Handle numbered lists | |
| if re.match(r'^\d+[\.\)]\s+', line): | |
| list_items = [] | |
| while i < len(lines) and re.match(r'^\d+[\.\)]\s+', lines[i].strip()): | |
| item_text = re.sub(r'^\d+[\.\)]\s+', '', lines[i].strip()) | |
| list_items.append(item_text) | |
| i += 1 | |
| for item in list_items: | |
| doc.add_paragraph(item, style='List Number') | |
| continue | |
| # Handle bullet lists | |
| if line.startswith('- ') or line.startswith('* '): | |
| list_items = [] | |
| while i < len(lines) and (lines[i].strip().startswith('- ') or lines[i].strip().startswith('* ')): | |
| item_text = lines[i].strip()[2:].strip() | |
| list_items.append(item_text) | |
| i += 1 | |
| for item in list_items: | |
| doc.add_paragraph(item, style='List Bullet') | |
| continue | |
| # Handle italic text (*text*) | |
| if line.startswith('*') and line.endswith('*') and len(line) > 2: | |
| para = doc.add_paragraph() | |
| run = para.add_run(line[1:-1]) | |
| run.italic = True | |
| i += 1 | |
| continue | |
| # Regular paragraph | |
| para = doc.add_paragraph() | |
| add_formatted_text(para, line) | |
| i += 1 | |
| # Save document | |
| doc.save(docx_path) | |
| return docx_path | |
| # ============================================================================ | |
| # Main Gradio Application | |
| # ============================================================================ | |
| def extract_date_from_pdf(pdf_filename: str) -> str: | |
| """Extract date from PDF filename (format: RCCA-worship-bulletin-YYYY-MM-DD.pdf)""" | |
| if not pdf_filename: | |
| return datetime.now().strftime("%Y-%m-%d") | |
| filename = Path(pdf_filename).name | |
| date_match = re.search(r'(\d{4}-\d{2}-\d{2})', filename) | |
| if date_match: | |
| return date_match.group(1) | |
| return datetime.now().strftime("%Y-%m-%d") | |
| async def process_worship_program(docx_file, pdf_file, progress=gr.Progress()): | |
| """Main processing function for Gradio interface""" | |
| if docx_file is None: | |
| return "β Error: Please upload a DOCX file", None | |
| if pdf_file is None: | |
| return "β Error: Please upload a PDF file", None | |
| try: | |
| progress(0.1, desc="π Extracting content from DOCX file...") | |
| # Create temporary directory for processing | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| # Copy uploaded files to temp directory | |
| docx_path = os.path.join(temp_dir, os.path.basename(docx_file.name)) | |
| pdf_path = os.path.join(temp_dir, os.path.basename(pdf_file.name)) | |
| shutil.copy2(docx_file.name, docx_path) | |
| shutil.copy2(pdf_file.name, pdf_path) | |
| # Translate DOCX | |
| progress(0.2, desc="π Translating DOCX content (this may take a few minutes)...") | |
| bilingual_path_temp = await translate_document(docx_path, output_path=None) | |
| if not bilingual_path_temp or not os.path.exists(bilingual_path_temp): | |
| error_msg = "β Error: Translation failed. " | |
| filename = os.path.basename(docx_file.name) | |
| if 'worship_program' in filename.lower() or 'worship-program' in filename.lower(): | |
| error_msg += f"\n\nThe file '{filename}' appears to be a previously generated worship program, not a sermon transcript.\n" | |
| error_msg += "Please upload the ORIGINAL sermon/transcript DOCX file for translation." | |
| else: | |
| error_msg += "Please check the DOCX file." | |
| return error_msg, None | |
| # Copy bilingual file to current directory for persistence and easy access | |
| bilingual_filename = os.path.basename(bilingual_path_temp) | |
| bilingual_path = bilingual_filename # Save in current directory | |
| shutil.copy2(bilingual_path_temp, bilingual_path) | |
| progress(0.5, desc=f"πΎ Saved bilingual translation to {bilingual_filename}...") | |
| progress(0.6, desc="β Translation complete! Generating worship program...") | |
| # Generate worship program | |
| # Only pass bilingual file and PDF - don't process PDF as document (it's just for date extraction) | |
| generator = WorshipProgramGenerator(GEMMA_BACKEND_URL, use_qwen_translation=False) | |
| # Pass bilingual file (for Message section) and PDF path (for date extraction only) | |
| sources = [bilingual_path, pdf_path] | |
| program_content = await generator.generate_program(sources) | |
| if not program_content: | |
| return "β Error: Failed to generate worship program.", None | |
| progress(0.9, desc="πΎ Saving worship program...") | |
| # Save output file with date from PDF filename | |
| date_str = extract_date_from_pdf(pdf_file.name) | |
| output_filename = f"worship_program_{date_str}.md" | |
| output_path = os.path.join(temp_dir, output_filename) | |
| with open(output_path, 'w', encoding='utf-8') as f: | |
| f.write(str(program_content)) | |
| # Copy to current directory for download | |
| final_output_path = output_filename | |
| shutil.copy2(output_path, final_output_path) | |
| progress(0.95, desc="π Converting to DOCX format...") | |
| # Convert markdown to DOCX | |
| docx_filename = output_filename.replace('.md', '.docx') | |
| docx_path_temp = os.path.join(temp_dir, docx_filename) | |
| try: | |
| markdown_to_docx(output_path, docx_path_temp) | |
| final_docx_path = docx_filename | |
| shutil.copy2(docx_path_temp, final_docx_path) | |
| docx_created = True | |
| except Exception as e: | |
| print(f"Warning: DOCX conversion failed: {e}") | |
| docx_created = False | |
| final_docx_path = None | |
| progress(1.0, desc="β Complete!") | |
| # Generate status message | |
| file_size = os.path.getsize(final_output_path) | |
| content_length = len(str(program_content)) | |
| status_message = f"""β Worship program generated successfully! | |
| π Markdown file: {final_output_path} | |
| π Content length: {content_length:,} characters | |
| πΎ File size: {file_size:,} bytes | |
| π Date: {date_str}""" | |
| if docx_created: | |
| docx_size = os.path.getsize(final_docx_path) | |
| status_message += f""" | |
| π DOCX file: {final_docx_path} | |
| πΎ DOCX size: {docx_size:,} bytes""" | |
| status_message += "\n\nThe bilingual document has been integrated into the Message section." | |
| # Return both files if DOCX created | |
| if docx_created: | |
| return status_message, [final_output_path, final_docx_path] | |
| else: | |
| return status_message, final_output_path | |
| except Exception as e: | |
| import traceback | |
| error_msg = f"β Error: {str(e)}\n\n{traceback.format_exc()}" | |
| return error_msg, None | |
| def process_worship_program_sync(docx_file, pdf_file, progress=gr.Progress()): | |
| """Synchronous wrapper for async function""" | |
| return asyncio.run(process_worship_program(docx_file, pdf_file, progress)) | |
| # Create Gradio interface | |
| with gr.Blocks(title="Worship Program Generator", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown(""" | |
| # π΅ Worship Program Generator | |
| Upload your DOCX sermon/transcript and PDF worship bulletin to generate a complete bilingual worship program. | |
| **Features:** | |
| - β Automatic Chinese-to-English translation using OPUS-MT | |
| - β Structured worship program generation | |
| - β Bilingual content integration | |
| - β Date extraction from PDF filename | |
| - β Markdown and DOCX output formats | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| docx_input = gr.File( | |
| label="π DOCX Sermon/Transcript File", | |
| file_types=[".docx"], | |
| type="filepath" | |
| ) | |
| pdf_input = gr.File( | |
| label="π PDF Worship Bulletin", | |
| file_types=[".pdf"], | |
| type="filepath" | |
| ) | |
| process_btn = gr.Button("π Generate Worship Program", variant="primary", size="lg") | |
| with gr.Column(): | |
| status_output = gr.Textbox( | |
| label="Status", | |
| lines=10, | |
| interactive=False, | |
| placeholder="Status messages will appear here..." | |
| ) | |
| download_output = gr.File( | |
| label="π₯ Download Worship Program", | |
| visible=True, | |
| file_count="multiple" | |
| ) | |
| # Process button click handler | |
| process_btn.click( | |
| fn=process_worship_program_sync, | |
| inputs=[docx_input, pdf_input], | |
| outputs=[status_output, download_output], | |
| show_progress=True | |
| ) | |
| # Instructions | |
| gr.Markdown(""" | |
| ### π Instructions | |
| 1. **Upload DOCX File**: Your sermon transcript or message document (Chinese content will be automatically translated) | |
| 2. **Upload PDF File**: Your worship bulletin (should contain date in filename like `RCCA-worship-bulletin-2025-11-09.pdf`) | |
| 3. **Click Generate**: The system will translate DOCX, process PDF, and generate the worship program | |
| 4. **Download**: Get both markdown and DOCX files | |
| **Note**: Translation may take a few minutes depending on document size. | |
| """) | |
| # Footer | |
| gr.Markdown(""" | |
| --- | |
| *Powered by Helsinki-NLP OPUS-MT for translation | Built with Gradio* | |
| """) | |
| if __name__ == "__main__": | |
| demo.launch() | |