Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Document Processing Agent for Worship Program Generation | |
| Extracts and structures content from various document types | |
| """ | |
| import os | |
| import json | |
| from typing import Dict, List, Any | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| import asyncio | |
| import aiohttp | |
| import re | |
| # Load environment variables from .env file if available | |
| def load_env_file(): | |
| """Load environment variables from .env file""" | |
| env_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), '.env') | |
| if os.path.exists(env_file): | |
| try: | |
| with open(env_file, 'r') as f: | |
| for line in f: | |
| line = line.strip() | |
| if line and not line.startswith('#') and '=' in line: | |
| key, value = line.split('=', 1) | |
| os.environ[key.strip()] = value.strip() | |
| except Exception: | |
| pass # Silently fail if .env can't be read | |
| try: | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| except ImportError: | |
| # python-dotenv not installed, load .env manually | |
| load_env_file() | |
| # Translation support using Hugging Face OPUS-MT and Qwen2.5 | |
| try: | |
| import torch | |
| from transformers import MarianMTModel, MarianTokenizer, AutoModelForCausalLM, AutoTokenizer, StoppingCriteria, StoppingCriteriaList | |
| import platform | |
| HF_TRANSLATION_AVAILABLE = True | |
| QWEN_TRANSLATION_AVAILABLE = True | |
| except ImportError: | |
| HF_TRANSLATION_AVAILABLE = False | |
| QWEN_TRANSLATION_AVAILABLE = False | |
| print("Warning: transformers or torch not available. Translation will be skipped.") | |
| StoppingCriteria = None | |
| StoppingCriteriaList = None | |
| class DocumentContent: | |
| """Structured content extracted from documents""" | |
| title: str | |
| content: str | |
| source_type: str # email, ppt, transcript, pdf, url | |
| metadata: Dict[str, Any] | |
| extracted_sections: Dict[str, str] | |
| class DocumentProcessingAgent: | |
| """Agent for processing various document types and extracting structured content""" | |
| def __init__(self, gemma_backend_url: str, use_qwen_translation: bool = False): | |
| self.gemma_backend_url = gemma_backend_url | |
| self.supported_types = ['email', 'ppt', 'transcript', 'pdf', 'docx', 'doc', 'url'] | |
| # Translation settings - Default to OPUS-MT (False) due to better name handling | |
| self.use_qwen_translation = use_qwen_translation and QWEN_TRANSLATION_AVAILABLE | |
| # Initialize translation models lazily | |
| self._translation_model = None # OPUS-MT | |
| self._translation_tokenizer = None # OPUS-MT | |
| self._translation_device = None # OPUS-MT | |
| self._qwen_model = None # Qwen2.5 | |
| self._qwen_tokenizer = None # Qwen2.5 | |
| self._qwen_device = None # Qwen2.5 | |
| async def process_documents(self, document_paths: List[str]) -> List[DocumentContent]: | |
| """Process multiple documents and extract structured content""" | |
| results = [] | |
| for doc_path in document_paths: | |
| # Skip bilingual text files - they're handled separately for Message section | |
| if doc_path and isinstance(doc_path, str) and doc_path.endswith('_bilingual.txt'): | |
| continue | |
| # Process PDF files - they contain scripture references, songs, prayer points, announcements | |
| # We need to extract this content, but we'll be careful not to duplicate it in the Message section | |
| # (Message section only uses bilingual file content) | |
| try: | |
| content = await self._extract_content(doc_path) | |
| structured = await self._structure_content(content) | |
| results.append(structured) | |
| except Exception as e: | |
| print(f"Error processing {doc_path}: {e}") | |
| continue | |
| return results | |
| async def _extract_content(self, doc_path: str) -> str: | |
| """Extract text content from various document types""" | |
| file_ext = Path(doc_path).suffix.lower() | |
| if file_ext == '.pdf': | |
| return await self._extract_pdf(doc_path) | |
| elif file_ext in ['.ppt', '.pptx']: | |
| return await self._extract_powerpoint(doc_path) | |
| elif file_ext in ['.doc', '.docx']: | |
| return await self._extract_word(doc_path) | |
| elif file_ext == '.txt': | |
| return await self._extract_text(doc_path) | |
| elif doc_path.startswith('http'): | |
| return await self._extract_url(doc_path) | |
| else: | |
| return await self._extract_generic(doc_path) | |
| async def _extract_pdf(self, pdf_path: str) -> str: | |
| """Extract text from PDF using PyPDF2 or similar""" | |
| try: | |
| import PyPDF2 | |
| with open(pdf_path, 'rb') as file: | |
| reader = PyPDF2.PdfReader(file) | |
| text = "" | |
| for page in reader.pages: | |
| text += page.extract_text() + "\n" | |
| return text | |
| except ImportError: | |
| # Fallback to external service | |
| return await self._extract_via_api(pdf_path, 'pdf') | |
| async def _extract_powerpoint(self, ppt_path: str) -> str: | |
| """Extract text from PowerPoint files""" | |
| try: | |
| from pptx import Presentation | |
| prs = Presentation(ppt_path) | |
| text = "" | |
| for slide in prs.slides: | |
| for shape in slide.shapes: | |
| if hasattr(shape, "text"): | |
| text += shape.text + "\n" | |
| return text | |
| except ImportError: | |
| return await self._extract_via_api(ppt_path, 'ppt') | |
| async def _extract_word(self, doc_path: str) -> str: | |
| """Extract text from Word documents (.doc, .docx)""" | |
| try: | |
| from docx import Document | |
| doc = Document(doc_path) | |
| text = "" | |
| for paragraph in doc.paragraphs: | |
| text += paragraph.text + "\n" | |
| # Also extract text from tables | |
| for table in doc.tables: | |
| for row in table.rows: | |
| for cell in row.cells: | |
| text += cell.text + " " | |
| text += "\n" | |
| return text | |
| except ImportError: | |
| # Try alternative library or fallback | |
| try: | |
| import zipfile | |
| import xml.etree.ElementTree as ET | |
| # .docx is a zip file containing XML | |
| with zipfile.ZipFile(doc_path, 'r') as docx: | |
| # Read the main document XML | |
| xml_content = docx.read('word/document.xml') | |
| root = ET.fromstring(xml_content) | |
| # Extract text from paragraphs | |
| text = "" | |
| for paragraph in root.iter(): | |
| if paragraph.text: | |
| text += paragraph.text + " " | |
| if paragraph.tail: | |
| text += paragraph.tail + " " | |
| return text | |
| except Exception as e: | |
| return await self._extract_via_api(doc_path, 'docx') | |
| except Exception as e: | |
| return f"Error extracting Word document: {str(e)}" | |
| async def _extract_text(self, txt_path: str) -> str: | |
| """Extract text from plain text files""" | |
| with open(txt_path, 'r', encoding='utf-8') as file: | |
| return file.read() | |
| async def _extract_url(self, url: str) -> str: | |
| """Extract content from URL""" | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get(url) as response: | |
| html = await response.text() | |
| # Use BeautifulSoup or similar to extract text | |
| from bs4 import BeautifulSoup | |
| soup = BeautifulSoup(html, 'html.parser') | |
| return soup.get_text() | |
| async def _extract_generic(self, file_path: str) -> str: | |
| """Generic text extraction for unknown file types""" | |
| try: | |
| with open(file_path, 'r', encoding='utf-8') as file: | |
| return file.read() | |
| except: | |
| return await self._extract_via_api(file_path, 'generic') | |
| async def _extract_via_api(self, file_path: str, file_type: str) -> str: | |
| """Extract content using external API services""" | |
| # This could integrate with Google Document AI, Azure Form Recognizer, etc. | |
| # For now, return placeholder | |
| return f"Content extracted from {file_type} file: {file_path}" | |
| async def _structure_content(self, content: str) -> DocumentContent: | |
| """Use Gemma to structure the extracted content""" | |
| prompt = f""" | |
| Analyze the following content and extract structured information for a worship program: | |
| Content: {content} | |
| Please extract: | |
| 1. Main topic/theme | |
| 2. Scripture references | |
| 3. Prayer points | |
| 4. Key messages | |
| 5. Announcements | |
| 6. Songs/hymns mentioned | |
| Return as JSON format. | |
| """ | |
| # Call Gemma backend for content structuring | |
| structured_data = await self._call_gemma(prompt) | |
| # Fallback if Gemma backend is not available | |
| if not structured_data or not isinstance(structured_data, dict): | |
| return self._structure_content_fallback(content) | |
| return DocumentContent( | |
| title=structured_data.get('title', 'Untitled'), | |
| content=content, | |
| source_type=structured_data.get('type', 'unknown'), | |
| metadata=structured_data.get('metadata', {}), | |
| extracted_sections=structured_data.get('sections', {}) | |
| ) | |
| def _structure_content_fallback(self, content: str) -> DocumentContent: | |
| """Fallback method to structure content without Gemma backend""" | |
| # Simple extraction without AI | |
| import re | |
| # Split content into lines for processing | |
| lines = content.split('\n') | |
| # Determine document type based on content | |
| content_lower = content.lower() | |
| if any(keyword in content_lower for keyword in ['講員', '司會', '領詩', '主日崇拜', '服事同工']): | |
| doc_type = "bulletin" | |
| title = "Worship Bulletin" | |
| elif any(keyword in content_lower for keyword in ['信息', '講道', 'sermon', 'message', '經文']): | |
| doc_type = "sermon" | |
| title = "Sermon/Message" | |
| else: | |
| doc_type = "general" | |
| title = "Extracted Document" | |
| # Try to extract scripture references (common patterns - English and Chinese) | |
| scripture_patterns = [ | |
| r'\b\d+\s*[A-Z][a-z]+\s+\d+:\d+(?:-\d+)?', # e.g., "John 3:16" or "John 3:16-17" | |
| r'[A-Z][a-z]+\s+\d+:\d+', # e.g., "John 3:16" | |
| r'以弗所書\s*\d+:\d+', # Chinese: "以弗所書 5:8" | |
| r'[以約約約羅]+\s*\d+:\d+', # Chinese book names | |
| r'第\s*\d+\s*章\s*第\s*\d+\s*節', # Chinese format | |
| ] | |
| scriptures = [] | |
| for pattern in scripture_patterns: | |
| matches = re.findall(pattern, content, re.IGNORECASE) | |
| scriptures.extend(matches) | |
| # Extract prayer points - look for "禱告主題" section with numbered items | |
| prayer_points = [] | |
| in_prayer_section = False | |
| for i, line in enumerate(lines): | |
| # Look for prayer section marker | |
| if '禱告主題' in line or ('prayer' in line.lower() and 'topic' in line.lower()): | |
| in_prayer_section = True | |
| continue | |
| if in_prayer_section: | |
| line = line.strip() | |
| # Look for numbered prayer points (1) 2) etc. or 1. 2. etc.) | |
| if re.match(r'^\d+[\))]\s+.+', line): | |
| # Extract prayer point text | |
| prayer_text = re.sub(r'^\d+[\))]\s+', '', line) | |
| if len(prayer_text) > 10: | |
| prayer_points.append(prayer_text) | |
| elif re.match(r'^\d+[\.]\s+.+', line) and '為' in line: | |
| # Also accept numbered items with "為" (prayer indicator) | |
| prayer_text = re.sub(r'^\d+[\.]\s+', '', line) | |
| if len(prayer_text) > 10: | |
| prayer_points.append(prayer_text) | |
| elif in_prayer_section and len(line) > 15 and '為' in line: | |
| # Continuation of previous prayer point | |
| if prayer_points and len(prayer_points[-1]) < 300: | |
| prayer_points[-1] += ' ' + line | |
| # Stop at next section or limit reached | |
| if len(prayer_points) >= 7 or (len(line) < 5 and prayer_points): | |
| break | |
| # If no prayer section found, search for prayer-like numbered items | |
| if not prayer_points: | |
| # Look for items with "為" (prayer indicator) and numbers | |
| prayer_items = re.findall(r'\d+[\))]\s+([^0-9]{15,200}?)(?=\s+\d+[\))]|$)', content) | |
| prayer_points = [item.strip() for item in prayer_items[:7] if '為' in item or '禱告' in item] | |
| # Extract announcements - look for numbered items in the content | |
| announcements = [] | |
| # Search for "報告及代禱事項" or numbered announcements (1. 2. 3.) | |
| announcement_started = False | |
| # First, try to find the section marker | |
| for i, line in enumerate(lines): | |
| if '報告及代禱事項' in line or '報告' in line: | |
| announcement_started = True | |
| # Continue from next line | |
| continue | |
| if announcement_started or re.search(r'^\d+[\.\)]\s+', line): | |
| # Found numbered announcement | |
| line = line.strip() | |
| if re.match(r'^\d+[\.\)]\s+.+', line): | |
| # Extract the announcement text (everything after the number) | |
| ann_text = re.sub(r'^\d+[\.\)]\s+', '', line) | |
| if len(ann_text) > 10: # Valid announcement | |
| announcements.append(ann_text) | |
| announcement_started = True | |
| elif announcement_started and len(line) > 15: | |
| # Continuation of previous announcement | |
| if announcements and len(announcements[-1]) < 300: | |
| announcements[-1] += ' ' + line | |
| # Stop if we hit prayer section or too many announcements | |
| if '禱告主題' in line or len(announcements) >= 10: | |
| break | |
| # If no section found, search entire content for numbered items | |
| if not announcements: | |
| numbered_items = re.findall(r'\d+[\.\)]\s+([^0-9]{20,300}?)(?=\s+\d+[\.\)]|\s+[0-9]+\s+[0-9]|$)', content) | |
| announcements = [item.strip() for item in numbered_items[:10] if len(item.strip()) > 15] | |
| # Extract songs/hymns from worship order | |
| songs = [] | |
| # Look for worship order section (主日崇拜程序) | |
| worship_order_text = "" | |
| in_worship_order = False | |
| for i, line in enumerate(lines): | |
| if '主日崇拜程序' in line or ('worship' in line.lower() and 'order' in line.lower()): | |
| in_worship_order = True | |
| # Get the next few lines which contain the order | |
| for j in range(i, min(i+5, len(lines))): | |
| worship_order_text += lines[j] + " " | |
| break | |
| # Extract songs from worship order text | |
| if worship_order_text: | |
| # Extract songs more carefully - look for patterns like "領詩 我的心,你要稱頌耶和華" | |
| # Songs typically appear after "領詩", "詩歌颂贊", "回應詩歌" | |
| song_patterns = [ | |
| r'領詩\s+([\u4e00-\u9fff,,、\s]+?)(?:\s+進入|\s+為|\s+司會|$)', | |
| r'詩歌[颂赞贊]*\s+([\u4e00-\u9fff,,、\s]+?)(?:\s+領詩|\s+司會|$)', | |
| r'回應詩歌\s+([\u4e00-\u9fff,,、\s]+?)(?:\s+領詩|\s+司會|$)', | |
| r'序樂\s+([\u4e00-\u9fff,,、\s]+?)(?:\s+司琴|$)', | |
| ] | |
| for pattern in song_patterns: | |
| matches = re.findall(pattern, worship_order_text) | |
| for match in matches: | |
| # Split by commas/commas and clean | |
| song_parts = re.split(r'[,,、]', match) | |
| for part in song_parts: | |
| song = part.strip() | |
| if 2 <= len(song) <= 30: # Reasonable song name length | |
| songs.append(song) | |
| # Also try direct patterns in full content | |
| direct_patterns = [ | |
| r'領詩\s+([\u4e00-\u9fff,,、\s]{3,40}?)(?:\s+進入|\s+為|\s+司會|\n|$)', | |
| ] | |
| for pattern in direct_patterns: | |
| matches = re.findall(pattern, content) | |
| for match in matches: | |
| # Split compound song names | |
| song_parts = re.split(r'[,,、]', match) | |
| for part in song_parts: | |
| song = part.strip() | |
| if 2 <= len(song) <= 30: | |
| songs.append(song) | |
| # Deduplicate and clean songs | |
| songs = list(dict.fromkeys(songs))[:5] # Keep first 5 unique songs | |
| # Filter out common non-song words | |
| exclude_words = ['司會', '司琴', '會眾', '牧者', '長老', '牧師', '信息', '講道', '程序', '主日', '崇拜', '領詩', '為奉獻', '禱告'] | |
| songs = [s for s in songs if s not in exclude_words and len(s) >= 2 and not s.startswith('為')] | |
| # Extract message/sermon content | |
| # For sermon documents, use the main content | |
| # For bulletins, look for sermon title or message section | |
| messages = [] | |
| if doc_type == "sermon": | |
| # Use first substantial paragraph as message | |
| paragraphs = [p.strip() for p in content.split('\n\n') if len(p.strip()) > 100] | |
| if paragraphs: | |
| messages.append(paragraphs[0][:1000]) # First 1000 chars | |
| elif doc_type == "bulletin": | |
| # Look for sermon title or speaker info | |
| sermon_match = re.search(r'(講員|講道|信息)[::]\s*(.+?)(?:\n|$)', content) | |
| if sermon_match: | |
| messages.append(sermon_match.group(2).strip()) | |
| # If no messages found, use first substantial content | |
| if not messages: | |
| first_paragraph = content[:500].strip() | |
| if first_paragraph: | |
| messages.append(first_paragraph) | |
| return DocumentContent( | |
| title=title, | |
| content=content, | |
| source_type=doc_type, | |
| metadata={'extraction_method': 'fallback'}, | |
| extracted_sections={ | |
| 'scripture_references': list(set(scriptures))[:10] if scriptures else [], | |
| 'prayer_points': prayer_points[:7] if prayer_points else [], | |
| 'announcements': announcements[:10] if announcements else [], | |
| 'songs': songs[:5] if songs else [], | |
| 'messages': messages if messages else [content[:500]] | |
| } | |
| ) | |
| async def _call_gemma(self, prompt: str) -> Dict[str, Any]: | |
| """Call the Gemma backend for content processing""" | |
| try: | |
| async with aiohttp.ClientSession() as session: | |
| async with session.post( | |
| f"{self.gemma_backend_url}/api/generate", | |
| json={"model": "gemma3:270m", "prompt": prompt, "stream": False}, | |
| timeout=aiohttp.ClientTimeout(total=30) | |
| ) as response: | |
| if response.status == 200: | |
| result = await response.json() | |
| response_text = result.get('response', '{}') | |
| if response_text and response_text != '{}': | |
| return json.loads(response_text) | |
| # If backend fails, return None to trigger fallback | |
| return None | |
| except Exception as e: | |
| print(f"Gemma backend error (will use fallback): {e}") | |
| return None | |
| def _get_translation_model(self): | |
| """Lazy load translation model""" | |
| if not HF_TRANSLATION_AVAILABLE: | |
| return None, None, None | |
| if self._translation_model is None: | |
| try: | |
| model_name = "Helsinki-NLP/opus-mt-zh-en" | |
| print(f"Loading translation model: {model_name}") | |
| self._translation_tokenizer = MarianTokenizer.from_pretrained(model_name) | |
| self._translation_model = MarianMTModel.from_pretrained(model_name) | |
| # Determine device | |
| self._translation_device = "cuda" if torch.cuda.is_available() else "cpu" | |
| self._translation_model = self._translation_model.to(self._translation_device) | |
| self._translation_model.eval() # Set to evaluation mode | |
| print(f"Translation model loaded on {self._translation_device}") | |
| except Exception as e: | |
| print(f"Error loading translation model: {e}") | |
| return None, None, None | |
| return self._translation_model, self._translation_tokenizer, self._translation_device | |
| def _get_qwen_model(self): | |
| """Lazy load Qwen2.5 translation model""" | |
| if not QWEN_TRANSLATION_AVAILABLE: | |
| return None, None, None | |
| if self._qwen_model is None: | |
| try: | |
| model_name = "Qwen/Qwen2.5-1.5B-Instruct" | |
| print(f"Loading Qwen2.5 translation model: {model_name}") | |
| self._qwen_tokenizer = AutoTokenizer.from_pretrained(model_name) | |
| # Force CPU on macOS to avoid MPS issues | |
| # Check if accelerate is available before using device_map | |
| try: | |
| import accelerate | |
| has_accelerate = True | |
| except ImportError: | |
| has_accelerate = False | |
| print("Warning: accelerate package not installed. Qwen2.5 will load without device_map.") | |
| if platform.system() == "Darwin": | |
| torch_dtype = torch.float32 | |
| # Don't use device_map if accelerate is not available | |
| if has_accelerate: | |
| device_map = "cpu" | |
| else: | |
| device_map = None | |
| else: | |
| torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32 | |
| # Don't use device_map if accelerate is not available | |
| if has_accelerate: | |
| device_map = "auto" | |
| else: | |
| device_map = None | |
| # Load model with or without device_map | |
| if device_map is not None: | |
| self._qwen_model = AutoModelForCausalLM.from_pretrained( | |
| model_name, | |
| torch_dtype=torch_dtype, | |
| device_map=device_map | |
| ) | |
| else: | |
| # Load without device_map (will need manual .to(device) call) | |
| self._qwen_model = AutoModelForCausalLM.from_pretrained( | |
| model_name, | |
| torch_dtype=torch_dtype | |
| ) | |
| self._qwen_model.eval() | |
| self._qwen_device = "cpu" if platform.system() == "Darwin" else ("cuda" if torch.cuda.is_available() else "cpu") | |
| # Move model to device if device_map was not used | |
| if device_map is None: | |
| self._qwen_model = self._qwen_model.to(self._qwen_device) | |
| print(f"Qwen2.5 translation model loaded on {self._qwen_device}") | |
| except Exception as e: | |
| print(f"Error loading Qwen2.5 translation model: {e}") | |
| # Mark as failed so we don't keep trying | |
| self._qwen_model = False # Use False to indicate failed load (not None) | |
| return None, None, None | |
| # Check if previous load attempt failed | |
| if self._qwen_model is False: | |
| return None, None, None | |
| return self._qwen_model, self._qwen_tokenizer, self._qwen_device | |
| def _fix_name_translations(self, translation: str, original_text: str) -> str: | |
| """Fix known name translation errors in OPUS-MT output. | |
| OPUS-MT sometimes incorrectly translates Chinese names. This function | |
| checks for known incorrect translations and replaces them with correct ones. | |
| """ | |
| if not translation: | |
| return translation | |
| # Check if original text contains the Chinese name 章沙雁 | |
| if "章沙雁" not in original_text: | |
| return translation # No need to fix if name not in original | |
| import re | |
| # Fix "章沙雁" (Zhang Shaian) mis-translations | |
| # OPUS-MT translates 章沙雁 as "sand geese" (沙雁 = sand geese) | |
| corrected = translation | |
| # Pattern 1: "elders of the sand geese" -> "Zhang Shaian Elder" | |
| # This handles: "We have the ceremonial ceremony of the elders of the sand geese here" | |
| if "长老" in original_text: | |
| # Replace "elders of the sand geese" with "Zhang Shaian Elder" | |
| corrected = re.sub( | |
| r'\b(?:the\s+)?elders\s+of\s+the\s+sand\s+geese\b', | |
| 'Zhang Shaian Elder', | |
| corrected, | |
| flags=re.IGNORECASE | |
| ) | |
| # Replace "sand geese elder" with "Zhang Shaian Elder" | |
| corrected = re.sub( | |
| r'\b(?:the\s+)?sand\s+geese\s+elder\b', | |
| 'Zhang Shaian Elder', | |
| corrected, | |
| flags=re.IGNORECASE | |
| ) | |
| # Replace remaining "sand geese" with "Zhang Shaian" (if 长老 is present, add Elder) | |
| corrected = re.sub( | |
| r'\bsand\s+geese\b', | |
| 'Zhang Shaian', | |
| corrected, | |
| flags=re.IGNORECASE | |
| ) | |
| # If we have "Zhang Shaian" but original had 长老, make sure we have "Zhang Shaian Elder" | |
| if "Zhang Shaian" in corrected and "Zhang Shaian Elder" not in corrected: | |
| # Only add Elder if it's in a context where it makes sense (not in the middle of a sentence) | |
| corrected = re.sub( | |
| r'\bZhang\s+Shaian\b(?!\s+Elder)', | |
| 'Zhang Shaian Elder', | |
| corrected, | |
| count=1 # Only replace first occurrence to avoid over-correction | |
| ) | |
| else: | |
| # If no 长老, just replace "sand geese" with "Zhang Shaian" | |
| corrected = re.sub( | |
| r'\bsand\s+geese\b', | |
| 'Zhang Shaian', | |
| corrected, | |
| flags=re.IGNORECASE | |
| ) | |
| return corrected | |
| def _validate_translation_quality(self, translation: str, original: str) -> bool: | |
| """Validate translation quality. Returns True if translation is acceptable.""" | |
| if not translation or len(translation.strip()) < 2: | |
| return False | |
| # Check for common failure patterns | |
| failure_patterns = [ | |
| "I cannot", "I'm sorry", "I don't", "I am not able", | |
| "as an AI", "as a language model", "I apologize", | |
| "cannot translate", "unable to translate" | |
| ] | |
| translation_lower = translation.lower() | |
| for pattern in failure_patterns: | |
| if pattern in translation_lower: | |
| return False | |
| # Check if translation is too short compared to original | |
| # Chinese to English ratio is roughly 1:1.5, so translation should be at least 50% of original length | |
| if len(translation) < len(original) * 0.3: | |
| return False | |
| # Check if translation contains only punctuation or special characters | |
| if not re.search(r'[a-zA-Z]', translation): | |
| return False | |
| return True | |
| async def _translate_text_qwen(self, text: str) -> str | None: | |
| """Translate text using Qwen2.5 LLM. Returns None if translation fails.""" | |
| try: | |
| model, tokenizer, device = self._get_qwen_model() | |
| if model is None or tokenizer is None: | |
| return None | |
| # Use Qwen2.5's chat template for better results | |
| # Improve prompt to ensure completeness, especially for titles and multi-sentence paragraphs | |
| # Detect if this is a title/heading (short text ending with colon) | |
| is_title = len(text) < 50 and (text.endswith(':') or text.endswith(':')) | |
| # Import prompt configurations | |
| try: | |
| from translation_prompts import get_title_prompts, get_regular_prompts, get_fallback_prompt | |
| use_prompt_config = True | |
| except ImportError: | |
| use_prompt_config = False | |
| if is_title: | |
| if use_prompt_config: | |
| system_prompt, user_prompt = get_title_prompts(text) | |
| else: | |
| # Fallback concise prompt | |
| system_prompt = "You are a translator for Christian texts. Translate Chinese titles to English. Preserve colons. Use 'enlightened' for 光明的. Output only the translation." | |
| user_prompt = f"Translate: {text}" | |
| else: | |
| if use_prompt_config: | |
| system_prompt, user_prompt = get_regular_prompts(text) | |
| else: | |
| # Fallback concise prompt | |
| system_prompt = """Translate Chinese Christian texts to English. | |
| - Use "enlightened" for 光明的, "Lord" for 主, "brothers and sisters" for 弟兄姐妹 | |
| - Preserve names exactly (e.g., 章沙雁 → Zhang Shaian) | |
| - Output only the translation, no explanations""" | |
| user_prompt = f"Translate to English:\n\n{text}" | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": system_prompt | |
| }, | |
| { | |
| "role": "user", | |
| "content": user_prompt | |
| } | |
| ] | |
| # Apply chat template | |
| try: | |
| prompt = tokenizer.apply_chat_template( | |
| messages, | |
| tokenize=False, | |
| add_generation_prompt=True | |
| ) | |
| except: | |
| # Fallback if chat template not available | |
| try: | |
| from translation_prompts import get_fallback_prompt | |
| prompt = get_fallback_prompt(text) | |
| except ImportError: | |
| prompt = f"""Translate this Chinese text to English. Output only the translation. | |
| Chinese: {text} | |
| English:""" | |
| # Calculate approximate token count for input text | |
| # Chinese characters are roughly 1 token each, English words are ~1.3 tokens each | |
| input_tokens = len(text) # Rough estimate | |
| max_input_length = 1024 # Increased from 512 to handle longer paragraphs | |
| # For very long paragraphs, we need to increase max_new_tokens proportionally | |
| # Estimate: Chinese to English translation is roughly 1:1.5 ratio | |
| estimated_output_tokens = int(input_tokens * 1.5) | |
| max_new_tokens = min(max(estimated_output_tokens + 100, 300), 800) # At least 300, up to 800 tokens | |
| inputs = tokenizer(prompt, return_tensors="pt", truncation=True, max_length=max_input_length).to(device) | |
| model = model.to(device) | |
| # Get the tokenizer's eos token | |
| eos_token_id = tokenizer.eos_token_id if tokenizer.eos_token_id is not None else tokenizer.pad_token_id | |
| # CRITICAL FIX: Add stop sequences to prevent hallucinations | |
| # Stop sequences tell the model when to stop generating | |
| stop_sequences = [ | |
| "<|im_end|>", # Qwen chat format end marker | |
| "\n\nChinese:", # Prevent continuation prompts | |
| "\n\nEnglish:", # Prevent continuation prompts | |
| "\n\nUser:", # Prevent continuation prompts | |
| "\n\nHuman:", # Prevent continuation prompts | |
| "Translation:", # Prevent continuation prompts | |
| "Here is", # Prevent continuation prompts | |
| ] | |
| # Create stopping criteria if available | |
| stopping_criteria = None | |
| if StoppingCriteria is not None: | |
| try: | |
| # Define stopping criteria class inline | |
| class StopSequenceCriteria(StoppingCriteria): | |
| """Custom stopping criteria for stop sequences""" | |
| def __init__(self, tokenizer, stop_sequences): | |
| super().__init__() | |
| self.tokenizer = tokenizer | |
| self.stop_sequences = stop_sequences | |
| def __call__(self, input_ids, scores, **kwargs): | |
| # Check if any stop sequence appears in the generated tokens | |
| generated_text = self.tokenizer.decode(input_ids[0], skip_special_tokens=False) | |
| for stop_seq in self.stop_sequences: | |
| if stop_seq in generated_text: | |
| return True | |
| return False | |
| stop_criteria = StopSequenceCriteria(tokenizer, stop_sequences) | |
| stopping_criteria = StoppingCriteriaList([stop_criteria]) | |
| except Exception as e: | |
| print(f"Warning: Could not create stopping criteria: {e}") | |
| stopping_criteria = None | |
| with torch.no_grad(): | |
| generate_kwargs = { | |
| **inputs, | |
| "max_new_tokens": max_new_tokens, # Dynamic based on input length | |
| "temperature": 0.1, # Very low temperature for deterministic output | |
| "do_sample": True, | |
| "top_p": 0.9, # Nucleus sampling | |
| "top_k": 40, # Limit to top 40 tokens | |
| "repetition_penalty": 1.2, # Penalty to avoid repetition | |
| "pad_token_id": eos_token_id, | |
| "eos_token_id": eos_token_id, | |
| "no_repeat_ngram_size": 2, # Avoid repeating 2-grams | |
| } | |
| # Add stopping criteria if available | |
| if stopping_criteria is not None: | |
| generate_kwargs["stopping_criteria"] = stopping_criteria | |
| outputs = model.generate(**generate_kwargs) | |
| # Decode response | |
| full_response = tokenizer.decode(outputs[0], skip_special_tokens=True) | |
| # Extract translation from chat format | |
| # CRITICAL: Properly extract only the translation, not continuation text | |
| translation = None | |
| # Method 1: Qwen chat format (most reliable) | |
| if "<|im_start|>assistant" in full_response: | |
| parts = full_response.split("<|im_start|>assistant") | |
| if len(parts) > 1: | |
| translation = parts[-1].strip() | |
| # Remove end marker and anything after it | |
| if "<|im_end|>" in translation: | |
| translation = translation.split("<|im_end|>")[0].strip() | |
| # Also stop at any stop sequences that might have been included | |
| for stop_seq in stop_sequences: | |
| if stop_seq in translation: | |
| translation = translation.split(stop_seq)[0].strip() | |
| break | |
| # Method 2: Fallback to "assistant" keyword | |
| if not translation and "assistant" in full_response.lower(): | |
| # Find last occurrence of "assistant" (most likely the actual response) | |
| parts = full_response.split("assistant") | |
| if len(parts) > 1: | |
| translation = parts[-1].strip() | |
| # Remove any stop sequences | |
| for stop_seq in stop_sequences: | |
| if stop_seq in translation: | |
| translation = translation.split(stop_seq)[0].strip() | |
| break | |
| # Method 3: Fallback to "English:" marker | |
| if not translation and "English:" in full_response: | |
| translation = full_response.split("English:")[-1].strip() | |
| # Remove any stop sequences | |
| for stop_seq in stop_sequences: | |
| if stop_seq in translation: | |
| translation = translation.split(stop_seq)[0].strip() | |
| break | |
| # Method 4: Last resort - remove prompt length | |
| if not translation: | |
| if len(full_response) > len(prompt): | |
| translation = full_response[len(prompt):].strip() | |
| else: | |
| translation = full_response.strip() | |
| # Final safety check: if translation still contains prompt markers, extract more carefully | |
| if translation and prompt in translation: | |
| # Find where prompt ends and translation begins | |
| prompt_end = translation.find(prompt) + len(prompt) | |
| if prompt_end < len(translation): | |
| translation = translation[prompt_end:].strip() | |
| if not translation: | |
| return None | |
| # CRITICAL: EARLY hallucination detection - check IMMEDIATELY after extraction, BEFORE cleanup | |
| # Hallucinated text often starts with phrases that don't correspond to input | |
| import re | |
| hallucination_starters = [ | |
| "the lord has spoken", | |
| "brother or sister,", | |
| "we have gathered here together", | |
| "let us begin now:", | |
| "in light of recent events", | |
| "as believers, it is important", | |
| "please feel free to express", | |
| "thank you for joining us", | |
| "may peace fill each heart", | |
| "may grace flow abundantly", | |
| "i'm sorry", # Apology patterns | |
| "i cannot", # Refusal patterns | |
| "designed primarily" # Model explanation patterns | |
| ] | |
| # Check if translation contains hallucination markers | |
| translation_lower = translation.lower() | |
| text_lower = text.lower() | |
| for starter in hallucination_starters: | |
| if starter in translation_lower: | |
| starter_idx = translation_lower.find(starter) | |
| # If marker appears after reasonable translation length (30% threshold) | |
| # AND doesn't exist in source text, it's likely hallucination | |
| if starter_idx > len(translation) * 0.3: | |
| # Check if input doesn't contain similar content | |
| # Use word-level check to avoid false positives | |
| starter_words = starter.split() | |
| if len(starter_words) >= 2: | |
| # Check if at least 2 words from starter don't appear in source | |
| matching_words = sum(1 for word in starter_words if word in text_lower) | |
| if matching_words < 2: # Less than 2 words match = likely hallucination | |
| # Cut off at hallucination start, find last sentence end | |
| translation = translation[:starter_idx].strip() | |
| # Find last complete sentence | |
| last_period = translation.rfind('.') | |
| last_exclamation = translation.rfind('!') | |
| last_question = translation.rfind('?') | |
| sentence_ends = [i for i in [last_period, last_exclamation, last_question] if i > 0] | |
| if sentence_ends: | |
| max_end = max(sentence_ends) | |
| # Only use if sentence end is in last 70% (not too early) | |
| if max_end > len(translation) * 0.7: | |
| translation = translation[:max_end + 1].strip() | |
| break | |
| # Simplified cleanup: remove prompt leakage and stop markers | |
| # Import cleanup patterns from configuration if available | |
| try: | |
| from translation_prompts import ( | |
| PROMPT_REMOVAL_PATTERNS, STOP_MARKERS, | |
| TRAILING_MARKERS, INSTRUCTION_KEYWORDS | |
| ) | |
| prompt_patterns = PROMPT_REMOVAL_PATTERNS | |
| stop_markers = STOP_MARKERS | |
| trailing_markers = TRAILING_MARKERS | |
| instruction_keywords = INSTRUCTION_KEYWORDS | |
| except ImportError: | |
| # Fallback patterns | |
| prompt_patterns = [ | |
| r"Remember:.*?Good luck!", | |
| r"Remember:.*?Thank you!", | |
| r"Please remember:.*?Thank you!", | |
| r"CRITICAL REQUIREMENTS:.*?Do not add", | |
| r"Translate.*?Output only", | |
| r"I'm sorry.*?Here is", | |
| r"designed primarily.*?Thank you!", | |
| ] | |
| stop_markers = [ | |
| "\n\nChinese:", "\n\nEnglish:", "\n\nHuman:", "\n\nUser:", | |
| "\n翻译", "\nTranslation:", "\n\nThe translation", "\n\nHere is", | |
| "\n\nNote:", "\n\nIf you", "\n\nYou are", "\n\nI am", | |
| "\n\nPlease remember", "\n\nRemember:", "\n\nPlease note", | |
| "\n\nThank you!", "\n\nGood luck!", "\n\nTranslation complete" | |
| ] | |
| trailing_markers = [ | |
| " If you", " Note:", " Here is", " The translation", | |
| " Translation:", " Chinese:", " English:", | |
| " Remember:", " Please remember:", " Please note:", | |
| " Thank you!", " Good luck!", " Translation complete" | |
| ] | |
| instruction_keywords = ['translate', 'output', 'include', 'remember', 'note', 'please', 'thank', 'good luck'] | |
| # Remove prompt-like text using regex patterns | |
| for pattern in prompt_patterns: | |
| translation = re.sub(pattern, "", translation, flags=re.DOTALL | re.IGNORECASE) | |
| # Remove common stop markers | |
| for marker in stop_markers: | |
| if marker in translation: | |
| translation = translation.split(marker)[0].strip() | |
| break | |
| # Remove trailing explanatory text (only if in second half) | |
| for marker in trailing_markers: | |
| idx = translation.find(marker) | |
| if idx > len(translation) * 0.5: # Only if marker is in second half | |
| translation = translation[:idx].strip() | |
| break | |
| # Remove instruction lines | |
| if translation: | |
| lines = translation.split('\n') | |
| cleaned_lines = [] | |
| for line in lines: | |
| line_lower = line.lower().strip() | |
| # Skip lines that are mostly instructions | |
| if any(keyword in line_lower for keyword in instruction_keywords) and len(line_lower) < 200: | |
| instruction_words = sum(1 for kw in instruction_keywords if kw in line_lower) | |
| if instruction_words >= 2: # Multiple instruction keywords = likely an instruction | |
| continue | |
| cleaned_lines.append(line) | |
| translation = '\n'.join(cleaned_lines).strip() | |
| # Preserve colon for titles - don't strip if original ended with colon | |
| original_ends_with_colon = text.endswith(':') or text.endswith(':') | |
| if not original_ends_with_colon: | |
| translation = translation.rstrip(';:') | |
| else: | |
| # Ensure colon is preserved for titles | |
| translation = translation.rstrip(';') | |
| if not translation.endswith(':'): | |
| # Add colon if missing (for titles) | |
| translation = translation.rstrip() + ':' | |
| # Final cleanup | |
| if len(translation) > 2: | |
| if translation.startswith('"') and translation.endswith('"'): | |
| translation = translation[1:-1].strip() | |
| # Restore colon if it was a title | |
| if original_ends_with_colon and not translation.endswith(':'): | |
| translation = translation + ':' | |
| elif translation.startswith("'") and translation.endswith("'"): | |
| translation = translation[1:-1].strip() | |
| # Restore colon if it was a title | |
| if original_ends_with_colon and not translation.endswith(':'): | |
| translation = translation + ':' | |
| # For very short translations (like titles), lower the minimum length requirement | |
| # Titles can be as short as 2 characters (e.g., "Be:" or "As:") | |
| # For titles ending with colon, minimum is even lower | |
| if is_title: | |
| min_length = 2 # Very low threshold for titles | |
| else: | |
| min_length = 3 if len(text) < 10 else 5 # Lower threshold for short inputs (likely titles) | |
| return translation if translation and len(translation) >= min_length else None | |
| except Exception as e: | |
| print(f"Qwen2.5 translation error: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return None | |
| async def _translate_text(self, text: str, source_lang: str = 'zh', target_lang: str = 'en') -> str | None: | |
| """Translate text from source language to target language. Uses Qwen2.5 by default, falls back to OPUS-MT.""" | |
| if not text or not text.strip(): | |
| return None | |
| if not HF_TRANSLATION_AVAILABLE and not QWEN_TRANSLATION_AVAILABLE: | |
| return None | |
| try: | |
| # Detect if text is Chinese | |
| chinese_chars = re.findall(r'[\u4e00-\u9fff]+', text) | |
| if not chinese_chars and source_lang == 'zh': | |
| # Text doesn't contain Chinese, return None (no translation needed) | |
| return None | |
| # Only support zh->en for now | |
| if source_lang != 'zh' or target_lang != 'en': | |
| print(f"Translation from {source_lang} to {target_lang} not supported. Only zh->en supported.") | |
| return None | |
| # HYBRID APPROACH: Use both methods strategically | |
| # Strategy 1: Try Qwen2.5 first (better quality for religious texts) | |
| # Strategy 2: Fallback to OPUS-MT if Qwen fails or produces poor results | |
| # Strategy 3: Use OPUS-MT for very short texts (titles) if Qwen is unreliable | |
| qwen_result = None | |
| opus_result = None | |
| # Try Qwen2.5 first if enabled | |
| if self.use_qwen_translation: | |
| try: | |
| qwen_result = await self._translate_text_qwen(text) | |
| # Validate Qwen result quality | |
| if qwen_result and self._validate_translation_quality(qwen_result, text): | |
| return qwen_result | |
| elif qwen_result: | |
| print(f"Qwen2.5 translation quality check failed, trying OPUS-MT...") | |
| else: | |
| print("Qwen2.5 translation returned None, falling back to OPUS-MT...") | |
| except Exception as e: | |
| print(f"Qwen2.5 translation error: {e}, falling back to OPUS-MT...") | |
| # Fallback to OPUS-MT | |
| if not HF_TRANSLATION_AVAILABLE: | |
| # If Qwen failed but we have a result, return it anyway | |
| return qwen_result if qwen_result else None | |
| # Get translation model (lazy loading) | |
| model, tokenizer, device = self._get_translation_model() | |
| if model is None or tokenizer is None: | |
| # If Qwen failed but we have a result, return it anyway | |
| return qwen_result if qwen_result else None | |
| # Tokenize input | |
| inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=512) | |
| inputs = {k: v.to(device) for k, v in inputs.items()} | |
| # Translate with OPUS-MT | |
| try: | |
| with torch.no_grad(): | |
| translated = model.generate(**inputs, max_length=512) | |
| # Decode result | |
| opus_result = tokenizer.decode(translated[0], skip_special_tokens=True) | |
| # Fix known name translation errors in OPUS-MT output | |
| opus_result = self._fix_name_translations(opus_result, text) | |
| # HYBRID DECISION: Choose best result | |
| # Prefer Qwen if available and valid, otherwise use OPUS-MT | |
| if qwen_result and self._validate_translation_quality(qwen_result, text): | |
| return qwen_result | |
| elif opus_result and opus_result != text and len(opus_result.strip()) > 0: | |
| return opus_result.strip() | |
| else: | |
| # Last resort: return Qwen result even if validation failed | |
| return qwen_result if qwen_result else None | |
| except Exception as e: | |
| print(f"OPUS-MT translation error: {e}") | |
| # Return Qwen result if available, even if validation failed | |
| return qwen_result if qwen_result else None | |
| except Exception as e: | |
| print(f"Translation error: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return None | |
| class WorshipProgramGenerator: | |
| """Main agent for generating worship programs from multiple sources""" | |
| def __init__(self, gemma_backend_url: str, use_qwen_translation: bool = False): | |
| self.doc_processor = DocumentProcessingAgent(gemma_backend_url, use_qwen_translation=use_qwen_translation) | |
| self.template_path = "WORSHIP_PROGRAM_TEMPLATE.md" | |
| def _extract_date_from_pdf(self, document_sources: List[str]) -> str: | |
| """Extract date from PDF filename (format: RCCA-worship-bulletin-YYYY-MM-DD.pdf)""" | |
| import re | |
| from pathlib import Path | |
| for source in document_sources: | |
| if source.endswith('.pdf'): | |
| # Try to extract date from filename | |
| filename = Path(source).name | |
| date_match = re.search(r'(\d{4}-\d{2}-\d{2})', filename) | |
| if date_match: | |
| return date_match.group(1) | |
| # Try to extract from PDF content if filename doesn't have date | |
| try: | |
| import PyPDF2 | |
| with open(source, 'rb') as pdf_file: | |
| reader = PyPDF2.PdfReader(pdf_file) | |
| if reader.pages: | |
| text = reader.pages[0].extract_text() | |
| # Look for date patterns in the PDF | |
| date_patterns = [ | |
| r'(\d{4}[-/]\d{2}[-/]\d{2})', # YYYY-MM-DD or YYYY/MM/DD | |
| r'(\d{1,2}[-/]\d{1,2}[-/]\d{4})', # MM-DD-YYYY or MM/DD/YYYY | |
| ] | |
| for pattern in date_patterns: | |
| match = re.search(pattern, text) | |
| if match: | |
| date_str = match.group(1) | |
| # Normalize to YYYY-MM-DD format | |
| if '/' in date_str: | |
| parts = date_str.split('/') | |
| else: | |
| parts = date_str.split('-') | |
| if len(parts) == 3: | |
| if len(parts[2]) == 4: # MM-DD-YYYY | |
| return f"{parts[2]}-{parts[0].zfill(2)}-{parts[1].zfill(2)}" | |
| else: # YYYY-MM-DD | |
| return f"{parts[0]}-{parts[1].zfill(2)}-{parts[2].zfill(2)}" | |
| except Exception: | |
| pass | |
| return None | |
| def _load_bilingual_document(self, document_sources: List[str] = None) -> str: | |
| """Load the bilingual document if it exists""" | |
| # First, try to find bilingual file from document_sources | |
| if document_sources: | |
| for source in document_sources: | |
| if source and isinstance(source, str) and source.endswith('_bilingual.txt'): | |
| if os.path.exists(source): | |
| try: | |
| with open(source, 'r', encoding='utf-8') as f: | |
| return f.read() | |
| except Exception as e: | |
| print(f"Error loading bilingual document from {source}: {e}") | |
| continue | |
| # Fallback: Try multiple possible locations (for backward compatibility) | |
| possible_paths = [ | |
| "2025-09-28-MQD-RCCA-sript-for-translator_bilingual.txt", | |
| os.path.join(os.path.dirname(os.path.abspath(__file__)), "2025-09-28-MQD-RCCA-sript-for-translator_bilingual.txt"), | |
| ] | |
| for bilingual_file in possible_paths: | |
| if os.path.exists(bilingual_file): | |
| try: | |
| with open(bilingual_file, 'r', encoding='utf-8') as f: | |
| return f.read() | |
| except Exception as e: | |
| print(f"Error loading bilingual document from {bilingual_file}: {e}") | |
| continue | |
| return None | |
| async def generate_program(self, document_sources: List[str]) -> str: | |
| """Generate a complete worship program from multiple sources""" | |
| # Process all documents | |
| processed_docs = await self.doc_processor.process_documents(document_sources) | |
| # Generate the worship program | |
| program_content = await self._fill_template(processed_docs, document_sources) | |
| return program_content | |
| async def _fill_template(self, processed_docs: List[DocumentContent], document_sources: List[str] = None) -> str: | |
| """Fill the worship program template with processed content""" | |
| # Load template | |
| try: | |
| with open(self.template_path, 'r', encoding='utf-8') as f: | |
| template = f.read() | |
| except FileNotFoundError: | |
| template = "# Worship Program\n\n## Generated Content\n\n" | |
| # Aggregate content from all sources | |
| aggregated_content = self._aggregate_content(processed_docs) | |
| # Try to use Gemma to fill the template | |
| prompt = f""" | |
| Fill in the following worship program template with the provided content. | |
| IMPORTANT: Format the content so that each Chinese paragraph is immediately followed by its English translation. | |
| The pattern should be: Chinese paragraph, then English paragraph, repeating. | |
| Template: {template} | |
| Content to fill with: | |
| {json.dumps(aggregated_content, indent=2, ensure_ascii=False)} | |
| Return the complete filled template with bilingual format (Chinese paragraph followed by English paragraph). | |
| """ | |
| filled_template = await self.doc_processor._call_gemma(prompt) | |
| # Fallback if Gemma backend is not available | |
| if not filled_template or filled_template == {}: | |
| return await self._fill_template_fallback(template, processed_docs, aggregated_content, document_sources) | |
| # If filled_template is a dict, extract the content field or convert to string | |
| if isinstance(filled_template, dict): | |
| result = filled_template.get('content', json.dumps(filled_template, indent=2, ensure_ascii=False)) | |
| else: | |
| result = str(filled_template) | |
| # Replace Message section with Bilingual Document Translation if using Gemma backend | |
| # (For fallback, this is already handled in _fill_template_fallback) | |
| result = self._replace_message_with_bilingual(result, document_sources) | |
| return result | |
| def _replace_message_with_bilingual(self, program_content: str, document_sources: List[str] = None) -> str: | |
| """Replace Message section with Bilingual Document Translation""" | |
| bilingual_content = self._load_bilingual_document(document_sources) | |
| if not bilingual_content or not bilingual_content.strip(): | |
| # If bilingual document not available, keep original content | |
| return program_content | |
| # Extract date from PDF | |
| date = self._extract_date_from_pdf(document_sources or []) | |
| if not date: | |
| date = "2025-11-09" # Default fallback from filename | |
| # Format date nicely (e.g., "November 9, 2025") | |
| try: | |
| from datetime import datetime | |
| date_obj = datetime.strptime(date, "%Y-%m-%d") | |
| day = date_obj.day | |
| formatted_date = date_obj.strftime(f"%B {day}, %Y") | |
| except: | |
| formatted_date = date | |
| # Remove the header from bilingual_content if it exists (to avoid duplication) | |
| bilingual_text = bilingual_content.strip() | |
| if bilingual_text.startswith("# Bilingual Document Translation"): | |
| # Skip the header lines | |
| lines = bilingual_text.split('\n') | |
| # Find where the actual content starts (after "============================================================") | |
| start_idx = 0 | |
| for i, line in enumerate(lines): | |
| if '============================================================' in line: | |
| start_idx = i + 1 | |
| break | |
| bilingual_text = '\n'.join(lines[start_idx:]).strip() | |
| # Replace Message section with Bilingual Document Translation content | |
| # Look for "## Message" section and replace its content | |
| import re | |
| # Pattern to match ## Message section and its content until next ## section or end | |
| message_pattern = r'(##\s+Message\s*\n)(.*?)(?=\n##\s+|\Z)' | |
| replacement = f"## Message\n\n*Date: {formatted_date}*\n\n{bilingual_text}\n" | |
| # Replace the Message section | |
| if re.search(message_pattern, program_content, re.DOTALL): | |
| program_content = re.sub( | |
| message_pattern, | |
| lambda m: replacement + (m.group(3) if m.group(3) else ''), | |
| program_content, | |
| flags=re.DOTALL | |
| ) | |
| else: | |
| # If Message section not found, try to find and replace after Prayer section | |
| prayer_pattern = r'(##\s+Prayer.*?\n---\s*\n)(.*?)(?=\n##\s+|\Z)' | |
| if re.search(prayer_pattern, program_content, re.DOTALL): | |
| # Insert Message section with bilingual content after Prayer | |
| program_content = re.sub( | |
| prayer_pattern, | |
| lambda m: m.group(1) + f"\n## Message\n\n*Date: {formatted_date}*\n\n{bilingual_text}\n\n---\n\n" + (m.group(2) if m.group(2) else ''), | |
| program_content, | |
| flags=re.DOTALL | |
| ) | |
| else: | |
| # Append at the end if we can't find the right place | |
| program_content += f"\n\n---\n\n## Message\n\n*Date: {formatted_date}*\n\n{bilingual_text}\n" | |
| return program_content | |
| def _split_into_paragraphs(self, text: str) -> List[str]: | |
| """Split text into paragraphs""" | |
| if not text: | |
| return [] | |
| # Split by double newlines or single newline followed by content | |
| paragraphs = re.split(r'\n\s*\n', text) | |
| # Also split by single newlines if paragraph is too long | |
| result = [] | |
| for para in paragraphs: | |
| para = para.strip() | |
| if para: | |
| # If paragraph is very long, split by single newlines | |
| if len(para) > 500: | |
| sub_paras = para.split('\n') | |
| result.extend([p.strip() for p in sub_paras if p.strip()]) | |
| else: | |
| result.append(para) | |
| return result | |
| def _format_bilingual_content(self, chinese_text: str, english_text: str = None) -> str: | |
| """Format content with Chinese paragraph followed by English paragraph""" | |
| if not chinese_text: | |
| return english_text or "" | |
| chinese_paragraphs = self._split_into_paragraphs(chinese_text) | |
| # If English text is provided, use it; otherwise translate | |
| if english_text: | |
| english_paragraphs = self._split_into_paragraphs(english_text) | |
| else: | |
| english_paragraphs = [] | |
| # Ensure we have translations for all Chinese paragraphs | |
| result = [] | |
| for i, chinese_para in enumerate(chinese_paragraphs): | |
| if chinese_para.strip(): | |
| result.append(chinese_para) | |
| # Get corresponding English paragraph | |
| if i < len(english_paragraphs) and english_paragraphs[i]: | |
| result.append(english_paragraphs[i]) | |
| else: | |
| # Translate if not provided | |
| result.append("") # Placeholder, will be filled by async translation | |
| return "\n\n".join(result) | |
| async def _format_bilingual_content_async(self, chinese_text: str, english_text: str = None) -> str: | |
| """Format content with Chinese paragraph followed by English paragraph (async with translation)""" | |
| if not chinese_text: | |
| return english_text or "" | |
| chinese_paragraphs = self._split_into_paragraphs(chinese_text) | |
| # If English text is provided, use it; otherwise translate | |
| if english_text: | |
| english_paragraphs = self._split_into_paragraphs(english_text) | |
| else: | |
| english_paragraphs = [] | |
| # Ensure we have translations for all Chinese paragraphs | |
| result = [] | |
| for i, chinese_para in enumerate(chinese_paragraphs): | |
| if chinese_para.strip(): | |
| result.append(chinese_para) | |
| # Get corresponding English paragraph | |
| if i < len(english_paragraphs) and english_paragraphs[i]: | |
| result.append(english_paragraphs[i]) | |
| else: | |
| # Translate if not provided | |
| translated = await self.doc_processor._translate_text(chinese_para, 'zh', 'en') | |
| if translated: # Only add if translation succeeded | |
| result.append(translated) | |
| # If translation is None, skip adding English (translation not available) | |
| return "\n\n".join(result) | |
| async def _fill_template_fallback(self, template: str, processed_docs: List[DocumentContent], aggregated_content: Dict[str, Any], document_sources: List[str] = None) -> str: | |
| """Fallback method to fill template without Gemma backend""" | |
| # Extract source document info (for reference, but don't duplicate main sections) | |
| source_info = [] | |
| for doc in processed_docs: | |
| # Use a different format to avoid conflicts with main sections | |
| source_info.append(f"- **{doc.title}** ({doc.source_type})") | |
| # Helper function to safely format lists | |
| def format_list(items, default_msg="To be determined"): | |
| if not items: | |
| return default_msg | |
| items = [str(item).strip() for item in items if item and str(item).strip()] | |
| if not items: | |
| return default_msg | |
| return "\n".join(items[:10]) # Limit to 10 items | |
| # Helper function to format numbered list | |
| def format_numbered_list(items, default_msg="To be determined", max_items=7): | |
| if not items: | |
| return default_msg | |
| items = [str(item).strip() for item in items if item and str(item).strip()] | |
| if not items: | |
| return default_msg | |
| return "\n".join([f"{i+1}. {item}" for i, item in enumerate(items[:max_items])]) | |
| # Get content (exclude messages since they'll come from bilingual file only) | |
| scriptures = format_list(aggregated_content.get('scripture_references', []), "Scripture reading to be determined") | |
| songs = format_list(aggregated_content.get('songs', []), "Worship songs to be selected") | |
| prayer_points = format_numbered_list(aggregated_content.get('prayer_points', []), "Prayer points to be determined") | |
| announcements = format_numbered_list(aggregated_content.get('announcements', []), "Announcements to be added") | |
| # Replace Message section with Bilingual Document Translation | |
| # Load bilingual document and format it | |
| bilingual_content = self._load_bilingual_document(document_sources) | |
| messages_formatted = "Sermon message to be prepared" | |
| if bilingual_content and bilingual_content.strip(): | |
| # Extract date from PDF | |
| date = self._extract_date_from_pdf(document_sources or []) | |
| if not date: | |
| date = "2025-11-09" # Default fallback from filename | |
| # Format date nicely (e.g., "November 9, 2025") | |
| try: | |
| from datetime import datetime | |
| date_obj = datetime.strptime(date, "%Y-%m-%d") | |
| day = date_obj.day | |
| formatted_date = date_obj.strftime(f"%B {day}, %Y") | |
| except: | |
| formatted_date = date | |
| # Remove the header from bilingual_content if it exists (to avoid duplication) | |
| bilingual_text = bilingual_content.strip() | |
| if bilingual_text.startswith("# Bilingual Document Translation"): | |
| # Skip the header lines | |
| lines = bilingual_text.split('\n') | |
| # Find where the actual content starts (after "============================================================") | |
| start_idx = 0 | |
| for i, line in enumerate(lines): | |
| if '============================================================' in line: | |
| start_idx = i + 1 | |
| break | |
| bilingual_text = '\n'.join(lines[start_idx:]).strip() | |
| # Format as Bilingual Document Translation section | |
| # Only use bilingual content - don't mix with extracted messages to avoid duplication | |
| messages_formatted = f"""*Date: {formatted_date}* | |
| {bilingual_text}""" | |
| else: | |
| # No bilingual document available - use fallback message | |
| # Don't use aggregated_content.get('messages') to avoid duplication from PDF processing | |
| messages_formatted = "Sermon message to be prepared" | |
| # Format prayer points with bilingual pattern | |
| prayer_points_formatted = prayer_points | |
| if prayer_items := aggregated_content.get('prayer_points', []): | |
| if prayer_items and isinstance(prayer_items, list) and len(prayer_items) > 0: | |
| prayer_result = [] | |
| for i, item in enumerate(prayer_items[:7]): | |
| item_str = str(item).strip() | |
| if item_str: | |
| # Check if contains Chinese | |
| chinese_chars = re.findall(r'[\u4e00-\u9fff]+', item_str) | |
| if chinese_chars: | |
| prayer_result.append(f"{i+1}. {item_str}") | |
| translated = await self.doc_processor._translate_text(item_str, 'zh', 'en') | |
| if translated: # Only add if translation succeeded | |
| prayer_result.append(f"{i+1}. {translated}") | |
| # If translation is None, skip adding English | |
| else: | |
| prayer_result.append(f"{i+1}. {item_str}") | |
| prayer_points_formatted = "\n".join(prayer_result) if prayer_result else prayer_points | |
| # Format announcements with bilingual pattern | |
| announcements_formatted = announcements | |
| if announcement_items := aggregated_content.get('announcements', []): | |
| if announcement_items and isinstance(announcement_items, list) and len(announcement_items) > 0: | |
| announcement_result = [] | |
| for i, item in enumerate(announcement_items[:10]): | |
| item_str = str(item).strip() | |
| if item_str: | |
| # Check if contains Chinese | |
| chinese_chars = re.findall(r'[\u4e00-\u9fff]+', item_str) | |
| if chinese_chars: | |
| announcement_result.append(f"{i+1}. {item_str}") | |
| translated = await self.doc_processor._translate_text(item_str, 'zh', 'en') | |
| if translated: # Only add if translation succeeded | |
| announcement_result.append(f"{i+1}. {translated}") | |
| # If translation is None, skip adding English | |
| else: | |
| announcement_result.append(f"{i+1}. {item_str}") | |
| announcements_formatted = "\n".join(announcement_result) if announcement_result else announcements | |
| program = f"""# Worship Program | |
| ## Call to Worship | |
| ### Scripture Reference | |
| {scriptures} | |
| --- | |
| ## Songs | |
| {songs} | |
| --- | |
| ## Today's Bible Reading | |
| ### Scripture Reference | |
| {scriptures} | |
| --- | |
| ## Prayer | |
| ### This Week's Prayer Topics | |
| {prayer_points_formatted} | |
| --- | |
| ## Message | |
| {messages_formatted} | |
| --- | |
| ## Announcements | |
| {announcements_formatted} | |
| --- | |
| ## Source Documents | |
| {chr(10).join(source_info) if source_info else "No source documents listed"} | |
| --- | |
| *Note: This program was generated from source documents. Please review and customize as needed.* | |
| """ | |
| return program | |
| def _aggregate_content(self, docs: List[DocumentContent]) -> Dict[str, Any]: | |
| """Aggregate content from multiple documents""" | |
| aggregated = { | |
| 'scripture_references': [], | |
| 'prayer_points': [], | |
| 'messages': [], | |
| 'announcements': [], | |
| 'songs': [] | |
| } | |
| for doc in docs: | |
| sections = doc.extracted_sections | |
| for key, value in sections.items(): | |
| if key in aggregated: | |
| # Handle both list and single value cases | |
| if isinstance(value, list): | |
| aggregated[key].extend(value) | |
| else: | |
| aggregated[key].append(value) | |
| # Flatten and deduplicate | |
| for key in aggregated: | |
| # Flatten nested lists | |
| flattened = [] | |
| for item in aggregated[key]: | |
| if isinstance(item, list): | |
| flattened.extend(item) | |
| else: | |
| flattened.append(item) | |
| # Remove duplicates while preserving order | |
| seen = set() | |
| aggregated[key] = [x for x in flattened if x and str(x).strip() and (x not in seen or seen.add(x) is None)] | |
| return aggregated | |
| # Example usage | |
| async def main(): | |
| """Example usage of the document processing agent""" | |
| # Initialize with Gemma backend URL | |
| gemma_url = "https://your-gemma-backend-url" | |
| generator = WorshipProgramGenerator(gemma_url) | |
| # List of document sources | |
| sources = [ | |
| "email_communications.txt", | |
| "sermon_transcript.pdf", | |
| "church_announcements.pptx", | |
| "https://example.com/church-news" | |
| ] | |
| # Generate worship program | |
| program = await generator.generate_program(sources) | |
| # Save the generated program | |
| with open("generated_worship_program.md", "w", encoding="utf-8") as f: | |
| f.write(program) | |
| print("Worship program generated successfully!") | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |