Spaces:
Sleeping
Sleeping
| # services/agent_crewai.py | |
| """ | |
| CrewAI-based agent for MasterLLM orchestration. | |
| """ | |
| import json | |
| import os | |
| from typing import Optional, Dict, Any, List, Generator | |
| from crewai import Agent, Task, Crew, Process | |
| from crewai.tools import BaseTool | |
| from pydantic import BaseModel, Field | |
| # Import your remote utilities | |
| from utilities.extract_text import extract_text_remote | |
| from utilities.extract_tables import extract_tables_remote | |
| from utilities.describe_images import describe_images_remote | |
| from utilities.summarizer import summarize_remote | |
| from utilities.classify import classify_remote | |
| from utilities.ner import ner_remote | |
| from utilities.translator import translate_remote | |
| from utilities.signature_verification import signature_verification_remote | |
| from utilities.stamp_detection import stamp_detection_remote | |
| # ======================== | |
| # TOOL INPUT SCHEMAS | |
| # ======================== | |
| class FileSpanInput(BaseModel): | |
| file_path: str = Field(..., description="Absolute/local path to the uploaded file") | |
| start_page: int = Field(1, description="Start page (1-indexed)") | |
| end_page: int = Field(1, description="End page (inclusive, 1-indexed)") | |
| class TextOrFileInput(BaseModel): | |
| text: Optional[str] = Field(None, description="Raw text to process") | |
| file_path: Optional[str] = Field(None, description="Path to a document on disk (PDF/Image)") | |
| start_page: int = Field(1, description="Start page (1-indexed)") | |
| end_page: int = Field(1, description="End page (inclusive, 1-indexed)") | |
| class TranslateInput(TextOrFileInput): | |
| target_lang: str = Field(..., description="Target language code or name (e.g., 'es' or 'Spanish')") | |
| # ======================== | |
| # HELPER FUNCTIONS | |
| # ======================== | |
| def _base_state(file_path: str, start_page: int = 1, end_page: int = 1) -> Dict[str, Any]: | |
| """Build the base state your utilities expect.""" | |
| filename = os.path.basename(file_path) | |
| return { | |
| "filename": filename, | |
| "temp_files": {filename: file_path}, | |
| "start_page": start_page, | |
| "end_page": end_page, | |
| } | |
| # ======================== | |
| # CREWAI TOOLS | |
| # ======================== | |
| class ExtractTextTool(BaseTool): | |
| name: str = "extract_text" | |
| description: str = """Extract text from a document between start_page and end_page (inclusive). | |
| Use this when the user asks to read, analyze, or summarize document text. | |
| Input should be a JSON object with: file_path (required), start_page (default 1), end_page (default 1).""" | |
| def _run(self, file_path: str, start_page: int = 1, end_page: int = 1) -> str: | |
| state = _base_state(file_path, start_page, end_page) | |
| out = extract_text_remote(state) | |
| text = out.get("text") or out.get("extracted_text") or "" | |
| return json.dumps({"text": text}) | |
| class ExtractTablesTool(BaseTool): | |
| name: str = "extract_tables" | |
| description: str = """Extract tables from a document between start_page and end_page. | |
| Input should be a JSON object with: file_path (required), start_page (default 1), end_page (default 1).""" | |
| def _run(self, file_path: str, start_page: int = 1, end_page: int = 1) -> str: | |
| state = _base_state(file_path, start_page, end_page) | |
| out = extract_tables_remote(state) | |
| tables = out.get("tables", []) | |
| return json.dumps({"tables": tables, "table_count": len(tables)}) | |
| class DescribeImagesTool(BaseTool): | |
| name: str = "describe_images" | |
| description: str = """Generate captions/descriptions for images in the specified page range. | |
| Input should be a JSON object with: file_path (required), start_page (default 1), end_page (default 1).""" | |
| def _run(self, file_path: str, start_page: int = 1, end_page: int = 1) -> str: | |
| state = _base_state(file_path, start_page, end_page) | |
| out = describe_images_remote(state) | |
| return json.dumps({"image_descriptions": out.get("image_descriptions", out)}) | |
| class SummarizeTextTool(BaseTool): | |
| name: str = "summarize_text" | |
| description: str = """Summarize either raw text or a document (by file_path + optional page span). | |
| Input should be a JSON object with: text (optional), file_path (optional), start_page (default 1), end_page (default 1). | |
| At least one of text or file_path must be provided.""" | |
| def _run( | |
| self, | |
| text: Optional[str] = None, | |
| file_path: Optional[str] = None, | |
| start_page: int = 1, | |
| end_page: int = 1, | |
| ) -> str: | |
| state: Dict[str, Any] = { | |
| "text": text, | |
| "start_page": start_page, | |
| "end_page": end_page, | |
| } | |
| if file_path: | |
| state.update(_base_state(file_path, start_page, end_page)) | |
| out = summarize_remote(state) | |
| return json.dumps({"summary": out.get("summary", out)}) | |
| class ClassifyTextTool(BaseTool): | |
| name: str = "classify_text" | |
| description: str = """Classify a text or document content. | |
| Input should be a JSON object with: text (optional), file_path (optional), start_page (default 1), end_page (default 1). | |
| At least one of text or file_path must be provided.""" | |
| def _run( | |
| self, | |
| text: Optional[str] = None, | |
| file_path: Optional[str] = None, | |
| start_page: int = 1, | |
| end_page: int = 1, | |
| ) -> str: | |
| state: Dict[str, Any] = { | |
| "text": text, | |
| "start_page": start_page, | |
| "end_page": end_page, | |
| } | |
| if file_path: | |
| state.update(_base_state(file_path, start_page, end_page)) | |
| out = classify_remote(state) | |
| return json.dumps({"classification": out.get("classification", out)}) | |
| class ExtractEntitesTool(BaseTool): | |
| name: str = "extract_entities" | |
| description: str = """Perform Named Entity Recognition (NER) on text or a document. | |
| Input should be a JSON object with: text (optional), file_path (optional), start_page (default 1), end_page (default 1). | |
| At least one of text or file_path must be provided.""" | |
| def _run( | |
| self, | |
| text: Optional[str] = None, | |
| file_path: Optional[str] = None, | |
| start_page: int = 1, | |
| end_page: int = 1, | |
| ) -> str: | |
| state: Dict[str, Any] = { | |
| "text": text, | |
| "start_page": start_page, | |
| "end_page": end_page, | |
| } | |
| if file_path: | |
| state.update(_base_state(file_path, start_page, end_page)) | |
| out = ner_remote(state) | |
| return json.dumps({"ner": out.get("ner", out)}) | |
| class TranslateTextTool(BaseTool): | |
| name: str = "translate_text" | |
| description: str = """Translate text or a document to target_lang (e.g., 'es', 'fr', 'de', 'Spanish'). | |
| Input should be a JSON object with: target_lang (required), text (optional), file_path (optional), | |
| start_page (default 1), end_page (default 1). At least one of text or file_path must be provided.""" | |
| def _run( | |
| self, | |
| target_lang: str, | |
| text: Optional[str] = None, | |
| file_path: Optional[str] = None, | |
| start_page: int = 1, | |
| end_page: int = 1, | |
| ) -> str: | |
| state: Dict[str, Any] = { | |
| "text": text, | |
| "start_page": start_page, | |
| "end_page": end_page, | |
| "target_lang": target_lang, | |
| } | |
| if file_path: | |
| state.update(_base_state(file_path, start_page, end_page)) | |
| out = translate_remote(state) | |
| return json.dumps({ | |
| "translation": out.get("translation", out), | |
| "target_lang": target_lang | |
| }) | |
| class SignatureVerificationTool(BaseTool): | |
| name: str = "signature_verification" | |
| description: str = """Verify signatures/stamps presence and authenticity indicators in specified page range. | |
| Input should be a JSON object with: file_path (required), start_page (default 1), end_page (default 1).""" | |
| def _run(self, file_path: str, start_page: int = 1, end_page: int = 1) -> str: | |
| state = _base_state(file_path, start_page, end_page) | |
| out = signature_verification_remote(state) | |
| return json.dumps({"signature_verification": out.get("signature_verification", out)}) | |
| class StampDetectionTool(BaseTool): | |
| name: str = "stamp_detection" | |
| description: str = """Detect stamps in a document in the specified page range. | |
| Input should be a JSON object with: file_path (required), start_page (default 1), end_page (default 1).""" | |
| def _run(self, file_path: str, start_page: int = 1, end_page: int = 1) -> str: | |
| state = _base_state(file_path, start_page, end_page) | |
| out = stamp_detection_remote(state) | |
| return json.dumps({"stamp_detection": out.get("stamp_detection", out)}) | |
| # ======================== | |
| # TOOL REGISTRY | |
| # ======================== | |
| def get_master_tools() -> List[BaseTool]: | |
| """Export all tools for CrewAI agent binding.""" | |
| return [ | |
| ExtractTextTool(), | |
| ExtractTablesTool(), | |
| DescribeImagesTool(), | |
| SummarizeTextTool(), | |
| ClassifyTextTool(), | |
| ExtractEntitesTool(), | |
| TranslateTextTool(), | |
| SignatureVerificationTool(), | |
| StampDetectionTool(), | |
| ] | |
| # ======================== | |
| # AGENT CONFIGURATION | |
| # ======================== | |
| SYSTEM_INSTRUCTIONS = """You are MasterLLM, a precise document processing agent. | |
| Your responsibilities: | |
| - Use tools for any action (extraction, tables, images, summarization, classification, NER, translation, signature verification, stamp detection). | |
| - If a tool requires file_path and the user didn't provide one, use the provided session_file_path. | |
| - Use page spans when relevant (start_page, end_page). | |
| - Combine results when needed (e.g., extract_text -> summarize_text; tables -> summarize_text). | |
| - If a PLAN is provided, follow it strictly unless it's impossible. | |
| - Keep outputs compact - do not include raw base64 or giant blobs. | |
| - Always return a final JSON result with: | |
| { | |
| "steps_executed": [...], | |
| "outputs": { ... }, | |
| "errors": [], | |
| "meta": { | |
| "model": "crewai-gemini", | |
| "notes": "short note if needed" | |
| } | |
| } | |
| """ | |
| def create_master_agent(session_file_path: str = "", plan_json: str = "{}") -> Agent: | |
| """Create the master document processing agent.""" | |
| tools = get_master_tools() | |
| backstory = f"""{SYSTEM_INSTRUCTIONS} | |
| Current session file: {session_file_path} | |
| Execution plan: {plan_json} | |
| """ | |
| # Use LiteLLM-compatible string format for Gemini | |
| # CrewAI internally uses LiteLLM which requires "gemini/" prefix | |
| # Now that google-generativeai is installed, this should work | |
| try: | |
| import google.generativeai as genai | |
| # Verify package is installed and configure it | |
| api_key = os.getenv("GOOGLE_API_KEY") or os.getenv("GEMINI_API_KEY") | |
| if api_key: | |
| genai.configure(api_key=api_key) | |
| # Use LiteLLM string format (not the model object) | |
| llm = "gemini/gemini-2.0-flash-exp" | |
| print(f"✅ CrewAI using Gemini via LiteLLM: {llm}") | |
| else: | |
| print("⚠️ No Google API key found") | |
| llm = "gemini/gemini-2.0-flash-exp" | |
| except ImportError as e: | |
| print(f"⚠️ google.generativeai not available: {e}") | |
| print("Using LiteLLM format anyway") | |
| llm = "gemini/gemini-2.0-flash-exp" | |
| except Exception as e: | |
| print(f"⚠️ Error configuring Gemini: {e}") | |
| llm = "gemini/gemini-2.0-flash-exp" | |
| agent = Agent( | |
| role="Document Processing Specialist", | |
| goal="Process documents according to the given plan using available tools, and return structured JSON results", | |
| backstory=backstory, | |
| tools=tools, | |
| verbose=True, | |
| allow_delegation=False, | |
| max_iter=12, | |
| llm=llm, | |
| ) | |
| return agent | |
| def create_master_crew( | |
| user_input: str, | |
| session_file_path: str = "", | |
| plan: Optional[Dict[str, Any]] = None, | |
| ) -> Crew: | |
| """Create a crew with the master agent and a task based on user input.""" | |
| plan_json = json.dumps(plan or {}) | |
| agent = create_master_agent(session_file_path, plan_json) | |
| task_description = f""" | |
| Execute the following document processing request: | |
| User Request: {user_input} | |
| Session File Path: {session_file_path} | |
| Execution Plan: {plan_json} | |
| Instructions: | |
| 1. Follow the plan steps in order | |
| 2. Use the file path provided for all file-based operations | |
| 3. Combine results from multiple tools when appropriate | |
| 4. Return a comprehensive JSON result with all outputs | |
| Expected Output Format: | |
| {{ | |
| "steps_executed": ["step1", "step2", ...], | |
| "outputs": {{ | |
| "text": "...", | |
| "tables": [...], | |
| "summary": "...", | |
| // other outputs based on what was executed | |
| }}, | |
| "errors": [], | |
| "meta": {{ | |
| "model": "crewai-gemini", | |
| "pipeline": "{plan.get('pipeline', '') if plan else ''}", | |
| "pages_processed": "{plan.get('start_page', 1)}-{plan.get('end_page', 1) if plan else '1-1'}" | |
| }} | |
| }} | |
| """ | |
| task = Task( | |
| description=task_description, | |
| expected_output="A JSON object containing all processed results, executed steps, and any errors", | |
| agent=agent, | |
| ) | |
| crew = Crew( | |
| agents=[agent], | |
| tasks=[task], | |
| process=Process.sequential, | |
| verbose=True, | |
| ) | |
| return crew | |
| # ======================== | |
| # MAIN ENTRY POINTS | |
| # ======================== | |
| def run_agent( | |
| user_input: str, | |
| session_file_path: Optional[str] = None, | |
| plan: Optional[Dict[str, Any]] = None, | |
| chat_history: Optional[List[Any]] = None, | |
| ) -> Dict[str, Any]: | |
| """ | |
| Invokes the CrewAI agent to process the document. | |
| Returns a dict with the processing results. | |
| """ | |
| crew = create_master_crew( | |
| user_input=user_input, | |
| session_file_path=session_file_path or "", | |
| plan=plan, | |
| ) | |
| result = crew.kickoff() | |
| # Parse the result - CrewAI returns a CrewOutput object | |
| try: | |
| if hasattr(result, 'raw'): | |
| raw_output = result.raw | |
| else: | |
| raw_output = str(result) | |
| # Try to parse as JSON | |
| try: | |
| parsed = json.loads(raw_output) | |
| return {"output": parsed} | |
| except json.JSONDecodeError: | |
| # Try to extract JSON from the response | |
| import re | |
| json_match = re.search(r'\{.*\}', raw_output, re.DOTALL) | |
| if json_match: | |
| try: | |
| parsed = json.loads(json_match.group()) | |
| return {"output": parsed} | |
| except json.JSONDecodeError: | |
| pass | |
| # Return as-is if not JSON | |
| return {"output": {"result": raw_output, "format": "text"}} | |
| except Exception as e: | |
| return {"output": {"error": str(e), "raw_result": str(result)}} | |
| def run_agent_streaming( | |
| user_input: str, | |
| session_file_path: Optional[str] = None, | |
| plan: Optional[Dict[str, Any]] = None, | |
| chat_history: Optional[List[Any]] = None, | |
| ) -> Generator[Dict[str, Any], None, None]: | |
| """ | |
| Streaming version of run_agent that yields intermediate step updates. | |
| Each yield contains: {"type": "step"|"final", "data": {...}} | |
| Note: CrewAI doesn't have native streaming like LangChain's AgentExecutor, | |
| so we simulate it by yielding progress updates and then the final result. | |
| """ | |
| import threading | |
| import queue | |
| import time | |
| result_queue: queue.Queue = queue.Queue() | |
| # Yield initial status | |
| yield { | |
| "type": "step", | |
| "step": 0, | |
| "status": "initializing", | |
| "tool": "crew_setup", | |
| "input_preview": f"Setting up pipeline: {plan.get('pipeline', 'unknown') if plan else 'unknown'}" | |
| } | |
| def run_crew(): | |
| try: | |
| crew = create_master_crew( | |
| user_input=user_input, | |
| session_file_path=session_file_path or "", | |
| plan=plan, | |
| ) | |
| result = crew.kickoff() | |
| result_queue.put(("success", result)) | |
| except Exception as e: | |
| result_queue.put(("error", str(e))) | |
| # Start crew execution in a separate thread | |
| thread = threading.Thread(target=run_crew) | |
| thread.start() | |
| # Yield progress updates while waiting | |
| step_count = 1 | |
| pipeline_steps = plan.get("pipeline", "").split("-") if plan else [] | |
| for step_name in pipeline_steps: | |
| yield { | |
| "type": "step", | |
| "step": step_count, | |
| "status": "executing", | |
| "tool": step_name, | |
| "input_preview": f"Processing: {step_name}" | |
| } | |
| step_count += 1 | |
| # Check if result is ready | |
| try: | |
| result_type, result_data = result_queue.get(timeout=2.0) | |
| break | |
| except queue.Empty: | |
| continue | |
| # Wait for completion if not already done | |
| thread.join(timeout=120) # Max 2 minutes timeout | |
| # Get final result | |
| try: | |
| if result_queue.empty(): | |
| yield { | |
| "type": "error", | |
| "error": "Execution timeout - crew did not complete in time" | |
| } | |
| return | |
| result_type, result_data = result_queue.get_nowait() | |
| if result_type == "error": | |
| yield { | |
| "type": "error", | |
| "error": result_data | |
| } | |
| return | |
| # Parse the result | |
| try: | |
| if hasattr(result_data, 'raw'): | |
| raw_output = result_data.raw | |
| else: | |
| raw_output = str(result_data) | |
| # Try to parse as JSON | |
| try: | |
| parsed = json.loads(raw_output) | |
| except json.JSONDecodeError: | |
| import re | |
| json_match = re.search(r'\{.*\}', raw_output, re.DOTALL) | |
| if json_match: | |
| try: | |
| parsed = json.loads(json_match.group()) | |
| except json.JSONDecodeError: | |
| parsed = {"result": raw_output, "format": "text"} | |
| else: | |
| parsed = {"result": raw_output, "format": "text"} | |
| yield { | |
| "type": "final", | |
| "data": parsed | |
| } | |
| except Exception as e: | |
| yield { | |
| "type": "final", | |
| "data": {"error": str(e), "raw_result": str(result_data)} | |
| } | |
| except queue.Empty: | |
| yield { | |
| "type": "error", | |
| "error": "No result received from crew execution" | |
| } | |