masterllm / services /pipeline_generator.py
redhairedshanks1's picture
making sure end page always greater than start page and in the same seeing getting the same value output even if the pipeline generated is different
5cf6369
# services/pipeline_generator.py
"""
Unified pipeline generator with Bedrock (priority) and Gemini (fallback)
"""
import json
import os
import re
from typing import Dict, Any, List, Optional
from pydantic import BaseModel, Field
# For Bedrock
try:
from langchain_aws import ChatBedrock
from langchain_core.prompts import ChatPromptTemplate
BEDROCK_AVAILABLE = True
except ImportError:
BEDROCK_AVAILABLE = False
print("Warning: langchain_aws not available, Bedrock will be disabled")
# For Gemini
import requests
# ========================
# PYDANTIC MODELS
# ========================
class ComponentConfig(BaseModel):
"""Configuration for a single pipeline component"""
tool_name: str = Field(description="Name of the tool to execute")
start_page: int = Field(default=1, description="Starting page number (1-indexed)")
end_page: int = Field(default=1, description="Ending page number (inclusive)")
params: Dict[str, Any] = Field(default_factory=dict, description="Additional tool-specific parameters")
class PipelineConfig(BaseModel):
"""Complete pipeline configuration"""
pipeline_name: str = Field(description="Name/identifier for the pipeline")
components: List[ComponentConfig] = Field(description="Ordered list of components to execute")
target_lang: Optional[str] = Field(default=None, description="Target language for translation (if applicable)")
reason: str = Field(description="AI's reasoning for this pipeline structure")
metadata: Dict[str, Any] = Field(default_factory=dict, description="Additional metadata")
# ========================
# BEDROCK PIPELINE GENERATOR
# ========================
def generate_pipeline_bedrock(user_input: str, file_path: Optional[str] = None) -> Dict[str, Any]:
"""
Generate pipeline using AWS Bedrock (Mistral Large)
Priority method - tries this first
"""
if not BEDROCK_AVAILABLE:
raise RuntimeError("Bedrock not available - langchain_aws not installed")
# Check for AWS credentials
if not os.getenv("AWS_ACCESS_KEY_ID") or not os.getenv("AWS_SECRET_ACCESS_KEY"):
raise RuntimeError("AWS credentials not configured")
try:
llm = ChatBedrock(
model_id="mistral.mistral-large-2402-v1:0",
region_name=os.getenv("AWS_REGION", "ap-south-1") # Default to Mumbai region (nearest)
)
prompt = ChatPromptTemplate.from_messages([
("system", """You are MasterLLM, a document processing pipeline orchestrator.
**YOUR ROLE:**
You are a helpful AI assistant that can have normal conversations AND create document processing pipelines when asked.
You should ONLY create pipelines when the user explicitly requests document processing operations.
For general questions, greetings, or information requests - just have a normal conversation.
**STRICT TOOL LIST - USE ONLY THESE TOOLS:**
1. extract_text (Extract text from PDFs/images)
- start_page, end_page
- params: {{"encoding": "utf-8", "preserve_layout": true/false}}
2. extract_tables (Extract tables from documents)
- start_page, end_page
- params: {{"format": "json" or "csv", "include_headers": true/false}}
3. describe_images (Generate descriptions of images)
- start_page, end_page
- params: {{"detail_level": "low"|"medium"|"high"}}
4. summarize (Summarize extracted text)
- start_page: 1, end_page: 1 (always)
- params: {{"max_length": 500, "style": "concise" or "detailed"}}
5. classify (Classify document content)
- start_page: 1, end_page: 1 (always)
- params: {{"categories": ["list", "of", "categories"]}}
6. ner (Named Entity Recognition - people, places, orgs)
- start_page: 1, end_page: 1 (always)
- params: {{"entity_types": ["PERSON", "ORG", "LOC", "DATE"]}}
7. translator (Translate text to another language)
- start_page: 1, end_page: 1 (always)
- params: {{"target_lang": "es"|"fr"|"de" etc, "source_lang": "auto"}}
8. signature_verification (Detect and verify signatures)
- start_page, end_page
- params: {{}}
9. stamp_detection (Detect stamps/seals)
- start_page, end_page
- params: {{}}
**CRITICAL RULES:**
- NEVER use tools not in this list (e.g., NO "extract_entities", "summarize_text", "translate_text")
- Use "ner" for entity extraction (NOT "extract_entities")
- Use "summarize" (NOT "summarize_text")
- Use "translator" (NOT "translate_text")
- Use "classify" (NOT "classify_text")
- For text-processing tools (summarize, ner, translator, classify): ALWAYS use start_page=1, end_page=1
- For extraction tools (extract_text, extract_tables, images, signatures, stamps): use actual page ranges
- **NEVER use negative page numbers (e.g., end_page: -1 is FORBIDDEN)**
- **To process ALL pages, ALWAYS use end_page: 999 (NOT -1!)**
- **start_page and end_page must ALWAYS be positive integers >= 1**
Return ONLY valid JSON:
{{
"pipeline_name": "descriptive-name",
"components": [
{{"tool_name": "extract_text", "start_page": 1, "end_page": 999, "params": {{"encoding": "utf-8"}}}},
{{"tool_name": "summarize", "start_page": 1, "end_page": 1, "params": {{"max_length": 500}}}}
],
"target_lang": null,
"reason": "Brief explanation",
"metadata": {{"estimated_duration_seconds": 30}}
}}
Always validate tool_name against the strict list above!"""),
("human", "User request: {input}\n\nFile: {file_path}")
])
chain = prompt | llm
response = chain.invoke({
"input": user_input,
"file_path": file_path or "user uploaded document"
})
# Parse JSON from response
content = response.content
# Try direct JSON parse
try:
pipeline = json.loads(content)
except json.JSONDecodeError:
# Extract JSON from markdown code blocks
json_match = re.search(r'```json\s*(\{.*?\})\s*```', content, re.DOTALL)
if json_match:
pipeline = json.loads(json_match.group(1))
else:
# Try to find any JSON object
json_match = re.search(r'\{.*\}', content, re.DOTALL)
if json_match:
pipeline = json.loads(json_match.group(0))
else:
raise ValueError(f"No JSON found in Bedrock response: {content}")
# Add generator metadata
pipeline["_generator"] = "bedrock"
pipeline["_model"] = "mistral.mistral-large-2402-v1:0"
# Validate with Pydantic
validated = PipelineConfig(**pipeline)
return validated.model_dump()
except Exception as e:
raise RuntimeError(f"Bedrock pipeline generation failed: {str(e)}")
# ========================
# GEMINI PIPELINE GENERATOR
# ========================
def generate_pipeline_gemini(user_input: str, file_path: Optional[str] = None) -> Dict[str, Any]:
"""
Generate pipeline using Google Gemini (fallback method)
"""
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") or os.getenv("GOOGLE_API_KEY")
GEMINI_MODEL = os.getenv("GEMINI_MODEL", "gemini-2.0-flash")
GEMINI_ENDPOINT = f"https://generativelanguage.googleapis.com/v1beta/models/{GEMINI_MODEL}:generateContent"
if not GEMINI_API_KEY:
raise RuntimeError("Gemini API key not configured")
prompt = f"""You are MasterLLM pipeline generator.
STRICT TOOL LIST (USE ONLY THESE):
- extract_text (pages: start_page, end_page)
- extract_tables (pages: start_page, end_page)
- describe_images (pages: start_page, end_page)
- summarize (always: start_page=1, end_page=1)
- classify (always: start_page=1, end_page=1)
- ner (always: start_page=1, end_page=1) - for entity extraction
- translator (always: start_page=1, end_page=1)
- signature_verification (pages: start_page, end_page)
- stamp_detection (pages: start_page, end_page)
DO NOT USE: extract_entities, summarize_text, translate_text, classify_text
USE CORRECT NAMES: ner (not extract_entities), summarize (not summarize_text)
User request: {user_input}
File: {file_path or "user uploaded document"}
Return ONLY valid JSON:
{{
"pipeline_name": "descriptive-name",
"components": [
{{"tool_name": "extract_text", "start_page": 1, "end_page": 5, "params": {{}}}}
],
"target_lang": null,
"reason": "explanation",
"metadata": {{"estimated_duration_seconds": 30}}
}}
VALIDATE all tool_name values against the strict list!"""
try:
response = requests.post(
f"{GEMINI_ENDPOINT}?key={GEMINI_API_KEY}",
headers={"Content-Type": "application/json"},
json={
"contents": [{"parts": [{"text": prompt}]}],
"generationConfig": {
"temperature": 0.0,
"maxOutputTokens": 1024,
}
},
timeout=60,
)
response.raise_for_status()
result = response.json()
# Extract text from Gemini response
content = result["candidates"][0]["content"]["parts"][0]["text"]
# Parse JSON
try:
pipeline = json.loads(content)
except json.JSONDecodeError:
# Extract from code blocks
json_match = re.search(r'```json\s*(\{.*?\})\s*```', content, re.DOTALL)
if json_match:
pipeline = json.loads(json_match.group(1))
else:
json_match = re.search(r'\{.*\}', content, re.DOTALL)
pipeline = json.loads(json_match.group(0))
# Add generator metadata
pipeline["_generator"] = "gemini"
pipeline["_model"] = GEMINI_MODEL
# Validate with Pydantic
validated = PipelineConfig(**pipeline)
return validated.model_dump()
except Exception as e:
raise RuntimeError(f"Gemini pipeline generation failed: {str(e)}")
# ========================
# UNIFIED PIPELINE GENERATOR WITH FALLBACK
# ========================
def generate_pipeline(
user_input: str,
file_path: Optional[str] = None,
prefer_bedrock: bool = True
) -> Dict[str, Any]:
"""
Generate pipeline with fallback mechanism.
Priority:
1. Try Bedrock (Mistral Large) - if available and configured
2. Fallback to Gemini - if Bedrock fails
Returns:
Pipeline configuration dict with component-level details
"""
errors = []
# Try Bedrock first (priority)
if prefer_bedrock and BEDROCK_AVAILABLE:
try:
print("πŸ† Attempting pipeline generation with Bedrock...")
pipeline = generate_pipeline_bedrock(user_input, file_path)
print(f"βœ… Bedrock pipeline generated successfully: {pipeline['pipeline_name']}")
return pipeline
except Exception as bedrock_error:
error_msg = f"Bedrock failed: {str(bedrock_error)}"
print(f"❌ {error_msg}")
errors.append(error_msg)
print("πŸ”„ Falling back to Gemini...")
# Fallback to Gemini
try:
print("πŸ”„ Attempting pipeline generation with Gemini...")
pipeline = generate_pipeline_gemini(user_input, file_path)
print(f"βœ… Gemini pipeline generated successfully: {pipeline['pipeline_name']}")
# Add fallback metadata
if errors:
if "metadata" not in pipeline:
pipeline["metadata"] = {}
pipeline["metadata"]["fallback_reason"] = errors[0]
return pipeline
except Exception as gemini_error:
error_msg = f"Gemini failed: {str(gemini_error)}"
print(f"❌ {error_msg}")
errors.append(error_msg)
# Both failed
raise RuntimeError(
f"Pipeline generation failed with all providers.\n"
f"Errors:\n" + "\n".join(f" - {e}" for e in errors)
)
# ========================
# UTILITY FUNCTIONS
# ========================
def format_pipeline_for_display(pipeline: Dict[str, Any]) -> str:
"""
Format pipeline as fancy display string for Gradio
"""
generator = pipeline.get("_generator", "unknown")
model = pipeline.get("_model", "unknown")
display = f"""
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
🎯 PIPELINE GENERATED SUCCESSFULLY!
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
πŸ“‹ Pipeline Name: {pipeline.get('pipeline_name', 'unnamed')}
πŸ€– Generated By: {generator.title()} ({model})
⏱️ Estimated Duration: {pipeline.get('metadata', {}).get('estimated_duration_seconds', 'unknown')} seconds
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
"""
# Add each component
for idx, component in enumerate(pipeline.get("components", []), 1):
tool_name = component.get("tool_name", "unknown")
start_page = component.get("start_page", 1)
end_page = component.get("end_page", 1)
params = component.get("params", {})
# Icon based on tool type
icon = {
"extract_text": "πŸ“„",
"extract_tables": "πŸ“Š",
"describe_images": "πŸ–ΌοΈ",
"summarize_text": "πŸ“",
"classify_text": "🏷️",
"extract_entities": "πŸ‘€",
"translate_text": "🌐",
"signature_verification": "✍️",
"stamp_detection": "πŸ”–"
}.get(tool_name, "πŸ”§")
display += f"\n{icon} **STEP {idx}: {tool_name.replace('_', ' ').upper()}**\n"
if start_page > 1 or end_page > 1:
display += f" πŸ“ Pages: {start_page} to {end_page}\n"
if params:
display += " βš™οΈ Parameters:\n"
for key, value in params.items():
display += f" β€’ {key}: {value}\n"
display += "\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n"
# Add reasoning
display += f"\nπŸ’‘ **REASONING:**\n {pipeline.get('reason', 'No reason provided')}\n"
display += "\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n"
display += "\nβœ… Type 'approve' to execute this pipeline"
display += "\n❌ Type 'reject' to cancel"
display += "\n✏️ Type 'edit' to modify\n"
display += "\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━"
return display
if __name__ == "__main__":
# Test
test_input = "extract text from pages 1-5, get tables from pages 2-4, and summarize everything"
try:
pipeline = generate_pipeline(test_input)
print(json.dumps(pipeline, indent=2))
print("\n" + "="*80 + "\n")
print(format_pipeline_for_display(pipeline))
except Exception as e:
print(f"Error: {e}")