masterllm / services /master_tools.py
stellar413's picture
Added fixed agent to agent communication
6df13ef
# services/master_tools.py
from typing import Optional, Dict, Any, List
from pydantic import BaseModel, Field, model_validator
from langchain_core.tools import tool
import os
# Import your remote utilities
from utilities.extract_text import extract_text_remote
from utilities.extract_tables import extract_tables_remote
from utilities.describe_images import describe_images_remote
from utilities.summarizer import summarize_remote
from utilities.classify import classify_remote
from utilities.ner import ner_remote
from utilities.translator import translate_remote
from utilities.signature_verification import signature_verification_remote
from utilities.stamp_detection import stamp_detection_remote
# ---------- Agent Integration (Phase 1) ----------
def _use_agents() -> bool:
"""Check if agent mode is enabled via USE_AGENTS environment variable."""
return os.getenv("USE_AGENTS", "false").lower() == "true"
def _get_agent_if_enabled(agent_name: str):
"""Get agent from registry if USE_AGENTS=true, otherwise return None."""
if not _use_agents():
return None
try:
from services.agents.agent_registry import get_agent
return get_agent(agent_name)
except Exception as e:
# If agent system fails, fall back to utilities silently
print(f"Warning: Agent system unavailable ({e}), using utility fallback")
return None
# ---------- Shared helpers ----------
def _base_state(file_path: str, start_page: int = 1, end_page: int = 1) -> Dict[str, Any]:
"""
Build the base state your utilities expect.
"""
filename = os.path.basename(file_path)
return {
"filename": filename,
"temp_files": {filename: file_path},
"start_page": start_page,
"end_page": end_page,
}
# ---------- Arg Schemas ----------
class FileSpanArgs(BaseModel):
file_path: str = Field(..., description="Absolute/local path to the uploaded file")
start_page: int = Field(1, description="Start page (1-indexed)", ge=1)
end_page: int = Field(1, description="End page (inclusive, 1-indexed)", ge=1)
class TextOrFileArgs(BaseModel):
text: Optional[str] = Field(None, description="Raw text to process")
file_path: Optional[str] = Field(None, description="Path to a document on disk (PDF/Image)")
start_page: int = Field(1, description="Start page (1-indexed)", ge=1)
end_page: int = Field(1, description="End page (inclusive, 1-indexed)", ge=1)
@model_validator(mode="after")
def validate_sources(self):
if not self.text and not self.file_path:
raise ValueError("Provide either text or file_path.")
return self
class TranslateArgs(TextOrFileArgs):
target_lang: str = Field(..., description="Target language code or name (e.g., 'es' or 'Spanish')")
class FinalizeArgs(BaseModel):
content: Dict[str, Any] = Field(..., description="JSON payload to return directly to the user")
# ---------- Tools ----------
@tool("extract_text", args_schema=FileSpanArgs)
def extract_text_tool(file_path: str, start_page: int = 1, end_page: int = 1) -> Dict[str, Any]:
"""
Extract text from a document between start_page and end_page (inclusive).
Use this when the user asks to read, analyze, or summarize document text.
Returns: {"text": "..."}
"""
# Try agent path if enabled
agent = _get_agent_if_enabled("extract_text")
if agent:
state = _base_state(file_path, start_page, end_page)
result = agent.run(state)
# Extract text field for compatibility
text = result.get("text") or result.get("extracted_text") or ""
return {"text": text}
# Fallback to utility
state = _base_state(file_path, start_page, end_page)
out = extract_text_remote(state)
text = out.get("text") or out.get("extracted_text") or ""
return {"text": text}
@tool("extract_tables", args_schema=FileSpanArgs)
def extract_tables_tool(file_path: str, start_page: int = 1, end_page: int = 1) -> Dict[str, Any]:
"""
Extract tables from a document between start_page and end_page.
Returns: {"tables": [...], "table_count": int}
"""
# Try agent path if enabled
agent = _get_agent_if_enabled("extract_tables")
if agent:
state = _base_state(file_path, start_page, end_page)
result = agent.run(state)
tables = result.get("tables", [])
return {"tables": tables, "table_count": len(tables)}
# Fallback to utility
state = _base_state(file_path, start_page, end_page)
out = extract_tables_remote(state)
tables = out.get("tables", [])
return {"tables": tables, "table_count": len(tables)}
@tool("describe_images", args_schema=FileSpanArgs)
def describe_images_tool(file_path: str, start_page: int = 1, end_page: int = 1) -> Dict[str, Any]:
"""
Generate captions/descriptions for images in the specified page range.
Returns: {"image_descriptions": ...}
"""
agent = _get_agent_if_enabled("describe_images")
if agent:
state = _base_state(file_path, start_page, end_page)
result = agent.run(state)
return {"image_descriptions": result.get("image_descriptions", result)}
state = _base_state(file_path, start_page, end_page)
out = describe_images_remote(state)
return {"image_descriptions": out.get("image_descriptions", out)}
@tool("summarize_text", args_schema=TextOrFileArgs)
def summarize_text_tool(text: Optional[str] = None, file_path: Optional[str] = None,
start_page: int = 1, end_page: int = 1) -> Dict[str, Any]:
"""
Summarize either raw text or a document (by file_path + optional page span).
Returns: {"summary": "..."}
"""
state: Dict[str, Any] = {
"text": text,
"start_page": start_page,
"end_page": end_page,
}
if file_path:
state.update(_base_state(file_path, start_page, end_page))
agent = _get_agent_if_enabled("summarize")
if agent:
result = agent.run(state)
return {"summary": result.get("summary", result)}
out = summarize_remote(state)
return {"summary": out.get("summary", out)}
@tool("classify_text", args_schema=TextOrFileArgs)
def classify_text_tool(text: Optional[str] = None, file_path: Optional[str] = None,
start_page: int = 1, end_page: int = 1) -> Dict[str, Any]:
"""
Classify a text or document content.
Returns: {"classification": ...}
"""
state: Dict[str, Any] = {
"text": text,
"start_page": start_page,
"end_page": end_page,
}
if file_path:
state.update(_base_state(file_path, start_page, end_page))
agent = _get_agent_if_enabled("classify")
if agent:
result = agent.run(state)
return {"classification": result.get("classification", result)}
out = classify_remote(state)
return {"classification": out.get("classification", out)}
@tool("extract_entities", args_schema=TextOrFileArgs)
def extract_entities_tool(text: Optional[str] = None, file_path: Optional[str] = None,
start_page: int = 1, end_page: int = 1) -> Dict[str, Any]:
"""
Perform Named Entity Recognition (NER) on text or a document.
Returns: {"ner": ...}
"""
state: Dict[str, Any] = {
"text": text,
"start_page": start_page,
"end_page": end_page,
}
if file_path:
state.update(_base_state(file_path, start_page, end_page))
agent = _get_agent_if_enabled("ner")
if agent:
result = agent.run(state)
return {"ner": result.get("ner", result)}
out = ner_remote(state)
return {"ner": out.get("ner", out)}
@tool("translate_text", args_schema=TranslateArgs)
def translate_text_tool(target_lang: str,
text: Optional[str] = None, file_path: Optional[str] = None,
start_page: int = 1, end_page: int = 1) -> Dict[str, Any]:
"""
Translate text or a document to target_lang (e.g., 'es', 'fr', 'de', 'Spanish').
Returns: {"translation": "...", "target_lang": "..."}
"""
state: Dict[str, Any] = {
"text": text,
"start_page": start_page,
"end_page": end_page,
"target_lang": target_lang,
}
if file_path:
state.update(_base_state(file_path, start_page, end_page))
agent = _get_agent_if_enabled("translate")
if agent:
result = agent.run(state)
return {
"translation": result.get("translation", result),
"target_lang": target_lang
}
out = translate_remote(state)
return {
"translation": out.get("translation", out),
"target_lang": target_lang
}
@tool("signature_verification", args_schema=FileSpanArgs)
def signature_verification_tool(file_path: str, start_page: int = 1, end_page: int = 1) -> Dict[str, Any]:
"""
Verify signatures/stamps presence and authenticity indicators in specified page range.
Returns: {"signature_verification": ...}
"""
agent = _get_agent_if_enabled("signature_verification")
if agent:
state = _base_state(file_path, start_page, end_page)
result = agent.run(state)
return {"signature_verification": result.get("signature_verification", result)}
state = _base_state(file_path, start_page, end_page)
out = signature_verification_remote(state)
return {"signature_verification": out.get("signature_verification", out)}
@tool("stamp_detection", args_schema=FileSpanArgs)
def stamp_detection_tool(file_path: str, start_page: int = 1, end_page: int = 1) -> Dict[str, Any]:
"""
Detect stamps in a document in the specified page range.
Returns: {"stamp_detection": ...}
"""
agent = _get_agent_if_enabled("stamp_detection")
if agent:
state = _base_state(file_path, start_page, end_page)
result = agent.run(state)
return {"stamp_detection": result.get("stamp_detection", result)}
state = _base_state(file_path, start_page, end_page)
out = stamp_detection_remote(state)
return {"stamp_detection": out.get("stamp_detection", out)}
@tool("finalize", args_schema=FinalizeArgs, return_direct=True)
def finalize_tool(content: Dict[str, Any]) -> Dict[str, Any]:
"""
FINAL STEP ONLY. Call this at the end to return a concise JSON result to the UI.
Whatever you pass in 'content' is returned directly and ends the run.
"""
return content
def get_master_tools() -> List[Any]:
"""
Export all tools for agent binding.
"""
return [
extract_text_tool,
extract_tables_tool,
describe_images_tool,
summarize_text_tool,
classify_text_tool,
extract_entities_tool,
translate_text_tool,
signature_verification_tool,
stamp_detection_tool,
finalize_tool,
]