Spaces:
Sleeping
Sleeping
making sure end page always greater than start page and in the same seeing getting the same value output even if the pipeline generated is different
5cf6369 | # services/pipeline_generator.py | |
| """ | |
| Unified pipeline generator with Bedrock (priority) and Gemini (fallback) | |
| """ | |
| import json | |
| import os | |
| import re | |
| from typing import Dict, Any, List, Optional | |
| from pydantic import BaseModel, Field | |
| # For Bedrock | |
| try: | |
| from langchain_aws import ChatBedrock | |
| from langchain_core.prompts import ChatPromptTemplate | |
| BEDROCK_AVAILABLE = True | |
| except ImportError: | |
| BEDROCK_AVAILABLE = False | |
| print("Warning: langchain_aws not available, Bedrock will be disabled") | |
| # For Gemini | |
| import requests | |
| # ======================== | |
| # PYDANTIC MODELS | |
| # ======================== | |
| class ComponentConfig(BaseModel): | |
| """Configuration for a single pipeline component""" | |
| tool_name: str = Field(description="Name of the tool to execute") | |
| start_page: int = Field(default=1, description="Starting page number (1-indexed)") | |
| end_page: int = Field(default=1, description="Ending page number (inclusive)") | |
| params: Dict[str, Any] = Field(default_factory=dict, description="Additional tool-specific parameters") | |
| class PipelineConfig(BaseModel): | |
| """Complete pipeline configuration""" | |
| pipeline_name: str = Field(description="Name/identifier for the pipeline") | |
| components: List[ComponentConfig] = Field(description="Ordered list of components to execute") | |
| target_lang: Optional[str] = Field(default=None, description="Target language for translation (if applicable)") | |
| reason: str = Field(description="AI's reasoning for this pipeline structure") | |
| metadata: Dict[str, Any] = Field(default_factory=dict, description="Additional metadata") | |
| # ======================== | |
| # BEDROCK PIPELINE GENERATOR | |
| # ======================== | |
| def generate_pipeline_bedrock(user_input: str, file_path: Optional[str] = None) -> Dict[str, Any]: | |
| """ | |
| Generate pipeline using AWS Bedrock (Mistral Large) | |
| Priority method - tries this first | |
| """ | |
| if not BEDROCK_AVAILABLE: | |
| raise RuntimeError("Bedrock not available - langchain_aws not installed") | |
| # Check for AWS credentials | |
| if not os.getenv("AWS_ACCESS_KEY_ID") or not os.getenv("AWS_SECRET_ACCESS_KEY"): | |
| raise RuntimeError("AWS credentials not configured") | |
| try: | |
| llm = ChatBedrock( | |
| model_id="mistral.mistral-large-2402-v1:0", | |
| region_name=os.getenv("AWS_REGION", "ap-south-1") # Default to Mumbai region (nearest) | |
| ) | |
| prompt = ChatPromptTemplate.from_messages([ | |
| ("system", """You are MasterLLM, a document processing pipeline orchestrator. | |
| **YOUR ROLE:** | |
| You are a helpful AI assistant that can have normal conversations AND create document processing pipelines when asked. | |
| You should ONLY create pipelines when the user explicitly requests document processing operations. | |
| For general questions, greetings, or information requests - just have a normal conversation. | |
| **STRICT TOOL LIST - USE ONLY THESE TOOLS:** | |
| 1. extract_text (Extract text from PDFs/images) | |
| - start_page, end_page | |
| - params: {{"encoding": "utf-8", "preserve_layout": true/false}} | |
| 2. extract_tables (Extract tables from documents) | |
| - start_page, end_page | |
| - params: {{"format": "json" or "csv", "include_headers": true/false}} | |
| 3. describe_images (Generate descriptions of images) | |
| - start_page, end_page | |
| - params: {{"detail_level": "low"|"medium"|"high"}} | |
| 4. summarize (Summarize extracted text) | |
| - start_page: 1, end_page: 1 (always) | |
| - params: {{"max_length": 500, "style": "concise" or "detailed"}} | |
| 5. classify (Classify document content) | |
| - start_page: 1, end_page: 1 (always) | |
| - params: {{"categories": ["list", "of", "categories"]}} | |
| 6. ner (Named Entity Recognition - people, places, orgs) | |
| - start_page: 1, end_page: 1 (always) | |
| - params: {{"entity_types": ["PERSON", "ORG", "LOC", "DATE"]}} | |
| 7. translator (Translate text to another language) | |
| - start_page: 1, end_page: 1 (always) | |
| - params: {{"target_lang": "es"|"fr"|"de" etc, "source_lang": "auto"}} | |
| 8. signature_verification (Detect and verify signatures) | |
| - start_page, end_page | |
| - params: {{}} | |
| 9. stamp_detection (Detect stamps/seals) | |
| - start_page, end_page | |
| - params: {{}} | |
| **CRITICAL RULES:** | |
| - NEVER use tools not in this list (e.g., NO "extract_entities", "summarize_text", "translate_text") | |
| - Use "ner" for entity extraction (NOT "extract_entities") | |
| - Use "summarize" (NOT "summarize_text") | |
| - Use "translator" (NOT "translate_text") | |
| - Use "classify" (NOT "classify_text") | |
| - For text-processing tools (summarize, ner, translator, classify): ALWAYS use start_page=1, end_page=1 | |
| - For extraction tools (extract_text, extract_tables, images, signatures, stamps): use actual page ranges | |
| - **NEVER use negative page numbers (e.g., end_page: -1 is FORBIDDEN)** | |
| - **To process ALL pages, ALWAYS use end_page: 999 (NOT -1!)** | |
| - **start_page and end_page must ALWAYS be positive integers >= 1** | |
| Return ONLY valid JSON: | |
| {{ | |
| "pipeline_name": "descriptive-name", | |
| "components": [ | |
| {{"tool_name": "extract_text", "start_page": 1, "end_page": 999, "params": {{"encoding": "utf-8"}}}}, | |
| {{"tool_name": "summarize", "start_page": 1, "end_page": 1, "params": {{"max_length": 500}}}} | |
| ], | |
| "target_lang": null, | |
| "reason": "Brief explanation", | |
| "metadata": {{"estimated_duration_seconds": 30}} | |
| }} | |
| Always validate tool_name against the strict list above!"""), | |
| ("human", "User request: {input}\n\nFile: {file_path}") | |
| ]) | |
| chain = prompt | llm | |
| response = chain.invoke({ | |
| "input": user_input, | |
| "file_path": file_path or "user uploaded document" | |
| }) | |
| # Parse JSON from response | |
| content = response.content | |
| # Try direct JSON parse | |
| try: | |
| pipeline = json.loads(content) | |
| except json.JSONDecodeError: | |
| # Extract JSON from markdown code blocks | |
| json_match = re.search(r'```json\s*(\{.*?\})\s*```', content, re.DOTALL) | |
| if json_match: | |
| pipeline = json.loads(json_match.group(1)) | |
| else: | |
| # Try to find any JSON object | |
| json_match = re.search(r'\{.*\}', content, re.DOTALL) | |
| if json_match: | |
| pipeline = json.loads(json_match.group(0)) | |
| else: | |
| raise ValueError(f"No JSON found in Bedrock response: {content}") | |
| # Add generator metadata | |
| pipeline["_generator"] = "bedrock" | |
| pipeline["_model"] = "mistral.mistral-large-2402-v1:0" | |
| # Validate with Pydantic | |
| validated = PipelineConfig(**pipeline) | |
| return validated.model_dump() | |
| except Exception as e: | |
| raise RuntimeError(f"Bedrock pipeline generation failed: {str(e)}") | |
| # ======================== | |
| # GEMINI PIPELINE GENERATOR | |
| # ======================== | |
| def generate_pipeline_gemini(user_input: str, file_path: Optional[str] = None) -> Dict[str, Any]: | |
| """ | |
| Generate pipeline using Google Gemini (fallback method) | |
| """ | |
| GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") or os.getenv("GOOGLE_API_KEY") | |
| GEMINI_MODEL = os.getenv("GEMINI_MODEL", "gemini-2.0-flash") | |
| GEMINI_ENDPOINT = f"https://generativelanguage.googleapis.com/v1beta/models/{GEMINI_MODEL}:generateContent" | |
| if not GEMINI_API_KEY: | |
| raise RuntimeError("Gemini API key not configured") | |
| prompt = f"""You are MasterLLM pipeline generator. | |
| STRICT TOOL LIST (USE ONLY THESE): | |
| - extract_text (pages: start_page, end_page) | |
| - extract_tables (pages: start_page, end_page) | |
| - describe_images (pages: start_page, end_page) | |
| - summarize (always: start_page=1, end_page=1) | |
| - classify (always: start_page=1, end_page=1) | |
| - ner (always: start_page=1, end_page=1) - for entity extraction | |
| - translator (always: start_page=1, end_page=1) | |
| - signature_verification (pages: start_page, end_page) | |
| - stamp_detection (pages: start_page, end_page) | |
| DO NOT USE: extract_entities, summarize_text, translate_text, classify_text | |
| USE CORRECT NAMES: ner (not extract_entities), summarize (not summarize_text) | |
| User request: {user_input} | |
| File: {file_path or "user uploaded document"} | |
| Return ONLY valid JSON: | |
| {{ | |
| "pipeline_name": "descriptive-name", | |
| "components": [ | |
| {{"tool_name": "extract_text", "start_page": 1, "end_page": 5, "params": {{}}}} | |
| ], | |
| "target_lang": null, | |
| "reason": "explanation", | |
| "metadata": {{"estimated_duration_seconds": 30}} | |
| }} | |
| VALIDATE all tool_name values against the strict list!""" | |
| try: | |
| response = requests.post( | |
| f"{GEMINI_ENDPOINT}?key={GEMINI_API_KEY}", | |
| headers={"Content-Type": "application/json"}, | |
| json={ | |
| "contents": [{"parts": [{"text": prompt}]}], | |
| "generationConfig": { | |
| "temperature": 0.0, | |
| "maxOutputTokens": 1024, | |
| } | |
| }, | |
| timeout=60, | |
| ) | |
| response.raise_for_status() | |
| result = response.json() | |
| # Extract text from Gemini response | |
| content = result["candidates"][0]["content"]["parts"][0]["text"] | |
| # Parse JSON | |
| try: | |
| pipeline = json.loads(content) | |
| except json.JSONDecodeError: | |
| # Extract from code blocks | |
| json_match = re.search(r'```json\s*(\{.*?\})\s*```', content, re.DOTALL) | |
| if json_match: | |
| pipeline = json.loads(json_match.group(1)) | |
| else: | |
| json_match = re.search(r'\{.*\}', content, re.DOTALL) | |
| pipeline = json.loads(json_match.group(0)) | |
| # Add generator metadata | |
| pipeline["_generator"] = "gemini" | |
| pipeline["_model"] = GEMINI_MODEL | |
| # Validate with Pydantic | |
| validated = PipelineConfig(**pipeline) | |
| return validated.model_dump() | |
| except Exception as e: | |
| raise RuntimeError(f"Gemini pipeline generation failed: {str(e)}") | |
| # ======================== | |
| # UNIFIED PIPELINE GENERATOR WITH FALLBACK | |
| # ======================== | |
| def generate_pipeline( | |
| user_input: str, | |
| file_path: Optional[str] = None, | |
| prefer_bedrock: bool = True | |
| ) -> Dict[str, Any]: | |
| """ | |
| Generate pipeline with fallback mechanism. | |
| Priority: | |
| 1. Try Bedrock (Mistral Large) - if available and configured | |
| 2. Fallback to Gemini - if Bedrock fails | |
| Returns: | |
| Pipeline configuration dict with component-level details | |
| """ | |
| errors = [] | |
| # Try Bedrock first (priority) | |
| if prefer_bedrock and BEDROCK_AVAILABLE: | |
| try: | |
| print("π Attempting pipeline generation with Bedrock...") | |
| pipeline = generate_pipeline_bedrock(user_input, file_path) | |
| print(f"β Bedrock pipeline generated successfully: {pipeline['pipeline_name']}") | |
| return pipeline | |
| except Exception as bedrock_error: | |
| error_msg = f"Bedrock failed: {str(bedrock_error)}" | |
| print(f"β {error_msg}") | |
| errors.append(error_msg) | |
| print("π Falling back to Gemini...") | |
| # Fallback to Gemini | |
| try: | |
| print("π Attempting pipeline generation with Gemini...") | |
| pipeline = generate_pipeline_gemini(user_input, file_path) | |
| print(f"β Gemini pipeline generated successfully: {pipeline['pipeline_name']}") | |
| # Add fallback metadata | |
| if errors: | |
| if "metadata" not in pipeline: | |
| pipeline["metadata"] = {} | |
| pipeline["metadata"]["fallback_reason"] = errors[0] | |
| return pipeline | |
| except Exception as gemini_error: | |
| error_msg = f"Gemini failed: {str(gemini_error)}" | |
| print(f"β {error_msg}") | |
| errors.append(error_msg) | |
| # Both failed | |
| raise RuntimeError( | |
| f"Pipeline generation failed with all providers.\n" | |
| f"Errors:\n" + "\n".join(f" - {e}" for e in errors) | |
| ) | |
| # ======================== | |
| # UTILITY FUNCTIONS | |
| # ======================== | |
| def format_pipeline_for_display(pipeline: Dict[str, Any]) -> str: | |
| """ | |
| Format pipeline as fancy display string for Gradio | |
| """ | |
| generator = pipeline.get("_generator", "unknown") | |
| model = pipeline.get("_model", "unknown") | |
| display = f""" | |
| βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| π― PIPELINE GENERATED SUCCESSFULLY! | |
| βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| π Pipeline Name: {pipeline.get('pipeline_name', 'unnamed')} | |
| π€ Generated By: {generator.title()} ({model}) | |
| β±οΈ Estimated Duration: {pipeline.get('metadata', {}).get('estimated_duration_seconds', 'unknown')} seconds | |
| βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| """ | |
| # Add each component | |
| for idx, component in enumerate(pipeline.get("components", []), 1): | |
| tool_name = component.get("tool_name", "unknown") | |
| start_page = component.get("start_page", 1) | |
| end_page = component.get("end_page", 1) | |
| params = component.get("params", {}) | |
| # Icon based on tool type | |
| icon = { | |
| "extract_text": "π", | |
| "extract_tables": "π", | |
| "describe_images": "πΌοΈ", | |
| "summarize_text": "π", | |
| "classify_text": "π·οΈ", | |
| "extract_entities": "π€", | |
| "translate_text": "π", | |
| "signature_verification": "βοΈ", | |
| "stamp_detection": "π" | |
| }.get(tool_name, "π§") | |
| display += f"\n{icon} **STEP {idx}: {tool_name.replace('_', ' ').upper()}**\n" | |
| if start_page > 1 or end_page > 1: | |
| display += f" π Pages: {start_page} to {end_page}\n" | |
| if params: | |
| display += " βοΈ Parameters:\n" | |
| for key, value in params.items(): | |
| display += f" β’ {key}: {value}\n" | |
| display += "\nβββββββββββββββββββββββββββββββββββββββββββββββββ\n" | |
| # Add reasoning | |
| display += f"\nπ‘ **REASONING:**\n {pipeline.get('reason', 'No reason provided')}\n" | |
| display += "\nβββββββββββββββββββββββββββββββββββββββββββββββββ\n" | |
| display += "\nβ Type 'approve' to execute this pipeline" | |
| display += "\nβ Type 'reject' to cancel" | |
| display += "\nβοΈ Type 'edit' to modify\n" | |
| display += "\nβββββββββββββββββββββββββββββββββββββββββββββββββ" | |
| return display | |
| if __name__ == "__main__": | |
| # Test | |
| test_input = "extract text from pages 1-5, get tables from pages 2-4, and summarize everything" | |
| try: | |
| pipeline = generate_pipeline(test_input) | |
| print(json.dumps(pipeline, indent=2)) | |
| print("\n" + "="*80 + "\n") | |
| print(format_pipeline_for_display(pipeline)) | |
| except Exception as e: | |
| print(f"Error: {e}") | |