# services/pipeline_generator.py """ Unified pipeline generator with Bedrock (priority) and Gemini (fallback) """ import json import os import re from typing import Dict, Any, List, Optional from pydantic import BaseModel, Field # For Bedrock try: from langchain_aws import ChatBedrock from langchain_core.prompts import ChatPromptTemplate BEDROCK_AVAILABLE = True except ImportError: BEDROCK_AVAILABLE = False print("Warning: langchain_aws not available, Bedrock will be disabled") # For Gemini import requests # ======================== # PYDANTIC MODELS # ======================== class ComponentConfig(BaseModel): """Configuration for a single pipeline component""" tool_name: str = Field(description="Name of the tool to execute") start_page: int = Field(default=1, description="Starting page number (1-indexed)") end_page: int = Field(default=1, description="Ending page number (inclusive)") params: Dict[str, Any] = Field(default_factory=dict, description="Additional tool-specific parameters") class PipelineConfig(BaseModel): """Complete pipeline configuration""" pipeline_name: str = Field(description="Name/identifier for the pipeline") components: List[ComponentConfig] = Field(description="Ordered list of components to execute") target_lang: Optional[str] = Field(default=None, description="Target language for translation (if applicable)") reason: str = Field(description="AI's reasoning for this pipeline structure") metadata: Dict[str, Any] = Field(default_factory=dict, description="Additional metadata") # ======================== # BEDROCK PIPELINE GENERATOR # ======================== def generate_pipeline_bedrock(user_input: str, file_path: Optional[str] = None) -> Dict[str, Any]: """ Generate pipeline using AWS Bedrock (Mistral Large) Priority method - tries this first """ if not BEDROCK_AVAILABLE: raise RuntimeError("Bedrock not available - langchain_aws not installed") # Check for AWS credentials if not os.getenv("AWS_ACCESS_KEY_ID") or not os.getenv("AWS_SECRET_ACCESS_KEY"): raise RuntimeError("AWS credentials not configured") try: llm = ChatBedrock( model_id="mistral.mistral-large-2402-v1:0", region_name=os.getenv("AWS_REGION", "ap-south-1") # Default to Mumbai region (nearest) ) prompt = ChatPromptTemplate.from_messages([ ("system", """You are MasterLLM, a document processing pipeline orchestrator. **YOUR ROLE:** You are a helpful AI assistant that can have normal conversations AND create document processing pipelines when asked. You should ONLY create pipelines when the user explicitly requests document processing operations. For general questions, greetings, or information requests - just have a normal conversation. **STRICT TOOL LIST - USE ONLY THESE TOOLS:** 1. extract_text (Extract text from PDFs/images) - start_page, end_page - params: {{"encoding": "utf-8", "preserve_layout": true/false}} 2. extract_tables (Extract tables from documents) - start_page, end_page - params: {{"format": "json" or "csv", "include_headers": true/false}} 3. describe_images (Generate descriptions of images) - start_page, end_page - params: {{"detail_level": "low"|"medium"|"high"}} 4. summarize (Summarize extracted text) - start_page: 1, end_page: 1 (always) - params: {{"max_length": 500, "style": "concise" or "detailed"}} 5. classify (Classify document content) - start_page: 1, end_page: 1 (always) - params: {{"categories": ["list", "of", "categories"]}} 6. ner (Named Entity Recognition - people, places, orgs) - start_page: 1, end_page: 1 (always) - params: {{"entity_types": ["PERSON", "ORG", "LOC", "DATE"]}} 7. translator (Translate text to another language) - start_page: 1, end_page: 1 (always) - params: {{"target_lang": "es"|"fr"|"de" etc, "source_lang": "auto"}} 8. signature_verification (Detect and verify signatures) - start_page, end_page - params: {{}} 9. stamp_detection (Detect stamps/seals) - start_page, end_page - params: {{}} **CRITICAL RULES:** - NEVER use tools not in this list (e.g., NO "extract_entities", "summarize_text", "translate_text") - Use "ner" for entity extraction (NOT "extract_entities") - Use "summarize" (NOT "summarize_text") - Use "translator" (NOT "translate_text") - Use "classify" (NOT "classify_text") - For text-processing tools (summarize, ner, translator, classify): ALWAYS use start_page=1, end_page=1 - For extraction tools (extract_text, extract_tables, images, signatures, stamps): use actual page ranges - **NEVER use negative page numbers (e.g., end_page: -1 is FORBIDDEN)** - **To process ALL pages, ALWAYS use end_page: 999 (NOT -1!)** - **start_page and end_page must ALWAYS be positive integers >= 1** Return ONLY valid JSON: {{ "pipeline_name": "descriptive-name", "components": [ {{"tool_name": "extract_text", "start_page": 1, "end_page": 999, "params": {{"encoding": "utf-8"}}}}, {{"tool_name": "summarize", "start_page": 1, "end_page": 1, "params": {{"max_length": 500}}}} ], "target_lang": null, "reason": "Brief explanation", "metadata": {{"estimated_duration_seconds": 30}} }} Always validate tool_name against the strict list above!"""), ("human", "User request: {input}\n\nFile: {file_path}") ]) chain = prompt | llm response = chain.invoke({ "input": user_input, "file_path": file_path or "user uploaded document" }) # Parse JSON from response content = response.content # Try direct JSON parse try: pipeline = json.loads(content) except json.JSONDecodeError: # Extract JSON from markdown code blocks json_match = re.search(r'```json\s*(\{.*?\})\s*```', content, re.DOTALL) if json_match: pipeline = json.loads(json_match.group(1)) else: # Try to find any JSON object json_match = re.search(r'\{.*\}', content, re.DOTALL) if json_match: pipeline = json.loads(json_match.group(0)) else: raise ValueError(f"No JSON found in Bedrock response: {content}") # Add generator metadata pipeline["_generator"] = "bedrock" pipeline["_model"] = "mistral.mistral-large-2402-v1:0" # Validate with Pydantic validated = PipelineConfig(**pipeline) return validated.model_dump() except Exception as e: raise RuntimeError(f"Bedrock pipeline generation failed: {str(e)}") # ======================== # GEMINI PIPELINE GENERATOR # ======================== def generate_pipeline_gemini(user_input: str, file_path: Optional[str] = None) -> Dict[str, Any]: """ Generate pipeline using Google Gemini (fallback method) """ GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") or os.getenv("GOOGLE_API_KEY") GEMINI_MODEL = os.getenv("GEMINI_MODEL", "gemini-2.0-flash") GEMINI_ENDPOINT = f"https://generativelanguage.googleapis.com/v1beta/models/{GEMINI_MODEL}:generateContent" if not GEMINI_API_KEY: raise RuntimeError("Gemini API key not configured") prompt = f"""You are MasterLLM pipeline generator. STRICT TOOL LIST (USE ONLY THESE): - extract_text (pages: start_page, end_page) - extract_tables (pages: start_page, end_page) - describe_images (pages: start_page, end_page) - summarize (always: start_page=1, end_page=1) - classify (always: start_page=1, end_page=1) - ner (always: start_page=1, end_page=1) - for entity extraction - translator (always: start_page=1, end_page=1) - signature_verification (pages: start_page, end_page) - stamp_detection (pages: start_page, end_page) DO NOT USE: extract_entities, summarize_text, translate_text, classify_text USE CORRECT NAMES: ner (not extract_entities), summarize (not summarize_text) User request: {user_input} File: {file_path or "user uploaded document"} Return ONLY valid JSON: {{ "pipeline_name": "descriptive-name", "components": [ {{"tool_name": "extract_text", "start_page": 1, "end_page": 5, "params": {{}}}} ], "target_lang": null, "reason": "explanation", "metadata": {{"estimated_duration_seconds": 30}} }} VALIDATE all tool_name values against the strict list!""" try: response = requests.post( f"{GEMINI_ENDPOINT}?key={GEMINI_API_KEY}", headers={"Content-Type": "application/json"}, json={ "contents": [{"parts": [{"text": prompt}]}], "generationConfig": { "temperature": 0.0, "maxOutputTokens": 1024, } }, timeout=60, ) response.raise_for_status() result = response.json() # Extract text from Gemini response content = result["candidates"][0]["content"]["parts"][0]["text"] # Parse JSON try: pipeline = json.loads(content) except json.JSONDecodeError: # Extract from code blocks json_match = re.search(r'```json\s*(\{.*?\})\s*```', content, re.DOTALL) if json_match: pipeline = json.loads(json_match.group(1)) else: json_match = re.search(r'\{.*\}', content, re.DOTALL) pipeline = json.loads(json_match.group(0)) # Add generator metadata pipeline["_generator"] = "gemini" pipeline["_model"] = GEMINI_MODEL # Validate with Pydantic validated = PipelineConfig(**pipeline) return validated.model_dump() except Exception as e: raise RuntimeError(f"Gemini pipeline generation failed: {str(e)}") # ======================== # UNIFIED PIPELINE GENERATOR WITH FALLBACK # ======================== def generate_pipeline( user_input: str, file_path: Optional[str] = None, prefer_bedrock: bool = True ) -> Dict[str, Any]: """ Generate pipeline with fallback mechanism. Priority: 1. Try Bedrock (Mistral Large) - if available and configured 2. Fallback to Gemini - if Bedrock fails Returns: Pipeline configuration dict with component-level details """ errors = [] # Try Bedrock first (priority) if prefer_bedrock and BEDROCK_AVAILABLE: try: print("šŸ† Attempting pipeline generation with Bedrock...") pipeline = generate_pipeline_bedrock(user_input, file_path) print(f"āœ… Bedrock pipeline generated successfully: {pipeline['pipeline_name']}") return pipeline except Exception as bedrock_error: error_msg = f"Bedrock failed: {str(bedrock_error)}" print(f"āŒ {error_msg}") errors.append(error_msg) print("šŸ”„ Falling back to Gemini...") # Fallback to Gemini try: print("šŸ”„ Attempting pipeline generation with Gemini...") pipeline = generate_pipeline_gemini(user_input, file_path) print(f"āœ… Gemini pipeline generated successfully: {pipeline['pipeline_name']}") # Add fallback metadata if errors: if "metadata" not in pipeline: pipeline["metadata"] = {} pipeline["metadata"]["fallback_reason"] = errors[0] return pipeline except Exception as gemini_error: error_msg = f"Gemini failed: {str(gemini_error)}" print(f"āŒ {error_msg}") errors.append(error_msg) # Both failed raise RuntimeError( f"Pipeline generation failed with all providers.\n" f"Errors:\n" + "\n".join(f" - {e}" for e in errors) ) # ======================== # UTILITY FUNCTIONS # ======================== def format_pipeline_for_display(pipeline: Dict[str, Any]) -> str: """ Format pipeline as fancy display string for Gradio """ generator = pipeline.get("_generator", "unknown") model = pipeline.get("_model", "unknown") display = f""" ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ šŸŽÆ PIPELINE GENERATED SUCCESSFULLY! ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ šŸ“‹ Pipeline Name: {pipeline.get('pipeline_name', 'unnamed')} šŸ¤– Generated By: {generator.title()} ({model}) ā±ļø Estimated Duration: {pipeline.get('metadata', {}).get('estimated_duration_seconds', 'unknown')} seconds ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ """ # Add each component for idx, component in enumerate(pipeline.get("components", []), 1): tool_name = component.get("tool_name", "unknown") start_page = component.get("start_page", 1) end_page = component.get("end_page", 1) params = component.get("params", {}) # Icon based on tool type icon = { "extract_text": "šŸ“„", "extract_tables": "šŸ“Š", "describe_images": "šŸ–¼ļø", "summarize_text": "šŸ“", "classify_text": "šŸ·ļø", "extract_entities": "šŸ‘¤", "translate_text": "🌐", "signature_verification": "āœļø", "stamp_detection": "šŸ”–" }.get(tool_name, "šŸ”§") display += f"\n{icon} **STEP {idx}: {tool_name.replace('_', ' ').upper()}**\n" if start_page > 1 or end_page > 1: display += f" šŸ“ Pages: {start_page} to {end_page}\n" if params: display += " āš™ļø Parameters:\n" for key, value in params.items(): display += f" • {key}: {value}\n" display += "\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n" # Add reasoning display += f"\nšŸ’” **REASONING:**\n {pipeline.get('reason', 'No reason provided')}\n" display += "\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n" display += "\nāœ… Type 'approve' to execute this pipeline" display += "\nāŒ Type 'reject' to cancel" display += "\nāœļø Type 'edit' to modify\n" display += "\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━" return display if __name__ == "__main__": # Test test_input = "extract text from pages 1-5, get tables from pages 2-4, and summarize everything" try: pipeline = generate_pipeline(test_input) print(json.dumps(pipeline, indent=2)) print("\n" + "="*80 + "\n") print(format_pipeline_for_display(pipeline)) except Exception as e: print(f"Error: {e}")