| | import os |
| | import re |
| | import uuid |
| | import random |
| | import warnings |
| | import traceback |
| | from contextvars import ContextVar |
| | from datetime import datetime, timezone |
| | from typing import Dict, List, Optional, Tuple, TypedDict |
| |
|
| | import requests |
| | from dotenv import load_dotenv |
| |
|
| | from workflow_helpers import ( |
| | WorkflowConfig, DEFAULT_CONFIG, |
| | detect_output_format, detect_brevity_requirement, |
| | classify_task, task_needs_evidence, |
| | QAResult, parse_structured_qa, QAIssue, |
| | PlannerState, FailureRecord, |
| | select_relevant_roles, identify_revision_targets, |
| | compress_final_answer, strip_internal_noise, |
| | postprocess_format_fixes, |
| | get_synthesizer_format_instruction, get_qa_format_instruction, |
| | validate_output_format, format_violations_instruction, |
| | parse_task_assumptions, format_assumptions_for_prompt, |
| | ROLE_RELEVANCE, |
| | STRUCTURED_OUTPUT_SUFFIX, |
| | StructuredContribution, parse_structured_contribution, |
| | format_contributions_for_synthesizer, format_contributions_for_qa, |
| | parse_used_contributions, check_expert_influence, |
| | ) |
| | from evidence import ( |
| | EvidenceResult, EvidenceItem, |
| | WebSearchAdapter, WikipediaAdapter, ArxivAdapter, |
| | ResearchToolAdapter, |
| | gather_evidence, extract_search_queries, |
| | detect_unsupported_claims, |
| | format_evidence_for_prompt, format_evidence_for_qa, |
| | ) |
| |
|
| | warnings.filterwarnings("ignore", category=UserWarning, module="wikipedia") |
| | load_dotenv() |
| |
|
| | if os.path.exists("/data"): |
| | os.environ.setdefault("HF_HOME", "/data/.huggingface") |
| |
|
| | import gradio as gr |
| | import yfinance as yf |
| | import matplotlib |
| | matplotlib.use("Agg") |
| | import matplotlib.pyplot as plt |
| |
|
| | from huggingface_hub.errors import HfHubHTTPError |
| |
|
| | from langchain_core.tools import tool |
| | from langchain_core.messages import SystemMessage, HumanMessage |
| | from langchain.agents import create_agent |
| | from langchain_community.utilities import WikipediaAPIWrapper, ArxivAPIWrapper |
| | from langchain_community.tools import DuckDuckGoSearchRun, ArxivQueryRun |
| | from langchain_huggingface import ChatHuggingFace, HuggingFaceEndpoint |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | HF_TOKEN = os.getenv("HUGGINGFACEHUB_API_TOKEN") or os.getenv("HF_TOKEN") |
| | if not HF_TOKEN: |
| | raise ValueError("Missing Hugging Face token. Set HUGGINGFACEHUB_API_TOKEN or HF_TOKEN.") |
| |
|
| | MAX_NEW_TOKENS = int(os.getenv("MAX_NEW_TOKENS", "1024")) |
| | CHART_DIR = "charts" |
| | os.makedirs(CHART_DIR, exist_ok=True) |
| |
|
| | MODEL_OPTIONS = [ |
| | |
| | "meta-llama/Llama-3.1-8B-Instruct", |
| | "meta-llama/Llama-3.3-70B-Instruct", |
| |
|
| | |
| | "openai/gpt-oss-20b", |
| | "openai/gpt-oss-120b", |
| |
|
| | |
| | "Qwen/Qwen3-VL-8B-Instruct", |
| | "Qwen/Qwen2.5-7B-Instruct", |
| | "Qwen/Qwen3-8B", |
| | "Qwen/Qwen3-32B", |
| |
|
| | |
| | "baidu/ERNIE-4.5-21B-A3B-PT", |
| |
|
| | |
| | "deepseek-ai/DeepSeek-R1", |
| | "deepseek-ai/DeepSeek-V3-0324", |
| |
|
| | |
| | "zai-org/GLM-5", |
| | "zai-org/GLM-4.7", |
| | "zai-org/GLM-4.6", |
| | "zai-org/GLM-4.5", |
| |
|
| | |
| | "MiniMaxAI/MiniMax-M2.5", |
| | "moonshotai/Kimi-K2.5", |
| | "moonshotai/Kimi-K2-Instruct-0905", |
| | ] |
| |
|
| | DEFAULT_MODEL_ID = "openai/gpt-oss-20b" |
| |
|
| | MODEL_NOTES = { |
| | "meta-llama/Llama-3.1-8B-Instruct": "Provider model. May require gated access depending on your token.", |
| | "meta-llama/Llama-3.3-70B-Instruct": "Large provider model. Likely slower and may hit rate limits.", |
| | "openai/gpt-oss-20b": "Provider model. Good showcase option if available in your enabled providers.", |
| | "openai/gpt-oss-120b": "Large provider model. May call tools but sometimes fail to return final text.", |
| | "Qwen/Qwen3-VL-8B-Instruct": "Vision-language model. In this text-only UI it behaves as text-only.", |
| | "Qwen/Qwen2.5-7B-Instruct": "Provider model. Usually a safer text-only fallback.", |
| | "Qwen/Qwen3-8B": "Provider model. Availability depends on enabled providers.", |
| | "Qwen/Qwen3-32B": "Large provider model. Availability depends on enabled providers.", |
| | "baidu/ERNIE-4.5-21B-A3B-PT": "Provider model. Availability depends on enabled providers.", |
| | "deepseek-ai/DeepSeek-R1": "Provider model. Availability depends on enabled providers.", |
| | "deepseek-ai/DeepSeek-V3-0324": "Provider model. Availability depends on enabled providers.", |
| | "zai-org/GLM-5": "Provider model. Availability depends on enabled providers.", |
| | "zai-org/GLM-4.7": "Provider model. Availability depends on enabled providers.", |
| | "zai-org/GLM-4.6": "Provider model. Availability depends on enabled providers.", |
| | "zai-org/GLM-4.5": "Provider model. Availability depends on enabled providers.", |
| | "MiniMaxAI/MiniMax-M2.5": "Provider model. Availability depends on enabled providers.", |
| | "moonshotai/Kimi-K2.5": "Provider model. Availability depends on enabled providers.", |
| | "moonshotai/Kimi-K2-Instruct-0905": "Provider model. Availability depends on enabled providers.", |
| | } |
| |
|
| | LLM_CACHE: Dict[str, object] = {} |
| | AGENT_CACHE: Dict[Tuple[str, Tuple[str, ...]], object] = {} |
| | RUNTIME_HEALTH: Dict[str, str] = {} |
| |
|
| | |
| | _client_location: ContextVar[str] = ContextVar("client_location", default="") |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | try: |
| | ddg_search = DuckDuckGoSearchRun() |
| | except Exception: |
| | ddg_search = None |
| |
|
| | arxiv_tool = ArxivQueryRun( |
| | api_wrapper=ArxivAPIWrapper( |
| | top_k_results=3, |
| | doc_content_chars_max=1200, |
| | ) |
| | ) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def _build_research_adapters() -> List[ResearchToolAdapter]: |
| | """Build available research tool adapters from the shared wrappers.""" |
| | adapters: List[ResearchToolAdapter] = [] |
| | if ddg_search is not None: |
| | adapters.append(WebSearchAdapter(ddg_search.run)) |
| | wiki = WikipediaAPIWrapper() |
| | adapters.append(WikipediaAdapter(wiki.run)) |
| | adapters.append(ArxivAdapter(arxiv_tool.run)) |
| | return adapters |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def model_status_text(model_id: str) -> str: |
| | note = MODEL_NOTES.get(model_id, "Provider model.") |
| | health = RUNTIME_HEALTH.get(model_id) |
| |
|
| | if health == "ok": |
| | return note |
| | if health == "unavailable": |
| | return note + " This model previously failed because no enabled provider supported it." |
| | if health == "gated": |
| | return note + " This model previously failed due to access restrictions." |
| | if health == "rate_limited": |
| | return note + " This model previously hit rate limiting." |
| | if health == "empty_final": |
| | return note + " This model previously called tools but returned no final assistant text." |
| | if health == "error": |
| | return note + " This model previously failed with a backend/runtime error." |
| | return note |
| |
|
| |
|
| | def build_provider_chat(model_id: str): |
| | if model_id in LLM_CACHE: |
| | return LLM_CACHE[model_id] |
| |
|
| | llm = HuggingFaceEndpoint( |
| | repo_id=model_id, |
| | task="text-generation", |
| | provider="auto", |
| | huggingfacehub_api_token=HF_TOKEN, |
| | max_new_tokens=MAX_NEW_TOKENS, |
| | temperature=0.1, |
| | timeout=120, |
| | ) |
| | chat = ChatHuggingFace(llm=llm) |
| | LLM_CACHE[model_id] = chat |
| | return chat |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def save_line_chart( |
| | title: str, |
| | x_values: List[str], |
| | y_values: List[float], |
| | x_label: str = "X", |
| | y_label: str = "Y", |
| | ) -> str: |
| | path = os.path.join(CHART_DIR, f"{uuid.uuid4().hex}.png") |
| |
|
| | fig, ax = plt.subplots(figsize=(9, 4.8)) |
| | ax.plot(x_values, y_values) |
| | ax.set_title(title) |
| | ax.set_xlabel(x_label) |
| | ax.set_ylabel(y_label) |
| | ax.grid(True) |
| | fig.autofmt_xdate() |
| | fig.tight_layout() |
| | fig.savefig(path, bbox_inches="tight") |
| | plt.close(fig) |
| |
|
| | return path |
| |
|
| |
|
| | def extract_chart_path(text: str) -> Optional[str]: |
| | if not text: |
| | return None |
| |
|
| | match = re.search(r"Chart saved to:\s*(.+\.png)", text) |
| | if not match: |
| | return None |
| |
|
| | candidate = match.group(1).strip() |
| | if os.path.exists(candidate): |
| | return candidate |
| |
|
| | abs_path = os.path.abspath(candidate) |
| | if os.path.exists(abs_path): |
| | return abs_path |
| |
|
| | return None |
| |
|
| |
|
| | def content_to_text(content) -> str: |
| | if isinstance(content, str): |
| | return content |
| | if isinstance(content, list): |
| | parts = [] |
| | for item in content: |
| | if isinstance(item, str): |
| | parts.append(item) |
| | elif isinstance(item, dict) and "text" in item: |
| | parts.append(item["text"]) |
| | else: |
| | parts.append(str(item)) |
| | return "\n".join(parts).strip() |
| | return str(content) |
| |
|
| |
|
| | def short_text(text: str, limit: int = 1200) -> str: |
| | text = text or "" |
| | return text if len(text) <= limit else text[:limit] + "..." |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | @tool |
| | def add_numbers(a: float, b: float) -> float: |
| | """Add two numbers.""" |
| | return a + b |
| |
|
| |
|
| | @tool |
| | def subtract_numbers(a: float, b: float) -> float: |
| | """Subtract the second number from the first.""" |
| | return a - b |
| |
|
| |
|
| | @tool |
| | def multiply_numbers(a: float, b: float) -> float: |
| | """Multiply two numbers.""" |
| | return a * b |
| |
|
| |
|
| | @tool |
| | def divide_numbers(a: float, b: float) -> float: |
| | """Divide the first number by the second.""" |
| | if b == 0: |
| | raise ValueError("Cannot divide by zero.") |
| | return a / b |
| |
|
| |
|
| | @tool |
| | def power(a: float, b: float) -> float: |
| | """Raise the first number to the power of the second.""" |
| | return a ** b |
| |
|
| |
|
| | @tool |
| | def square_root(a: float) -> float: |
| | """Calculate the square root of a number.""" |
| | if a < 0: |
| | raise ValueError("Cannot calculate square root of a negative number.") |
| | return a ** 0.5 |
| |
|
| |
|
| | @tool |
| | def percentage(part: float, whole: float) -> float: |
| | """Calculate what percentage the first value is of the second value.""" |
| | if whole == 0: |
| | raise ValueError("Whole cannot be zero.") |
| | return (part / whole) * 100 |
| |
|
| |
|
| | @tool |
| | def search_wikipedia(query: str) -> str: |
| | """Search Wikipedia for stable factual information.""" |
| | wiki = WikipediaAPIWrapper() |
| | return wiki.run(query) |
| |
|
| |
|
| | @tool |
| | def web_search(query: str) -> str: |
| | """Search the web for recent or changing information.""" |
| | if ddg_search is None: |
| | return "Web search is unavailable because DDGS is not available." |
| | return ddg_search.run(query) |
| |
|
| |
|
| | @tool |
| | def search_arxiv(query: str) -> str: |
| | """Search arXiv for scientific papers and research literature.""" |
| | return arxiv_tool.run(query) |
| |
|
| |
|
| | @tool |
| | def get_current_utc_time(_: str = "") -> str: |
| | """Return the current UTC date and time.""" |
| | return datetime.now(timezone.utc).isoformat() |
| |
|
| |
|
| | @tool |
| | def get_stock_price(ticker: str) -> str: |
| | """Get the latest recent close price for a stock, ETF, index, or crypto ticker.""" |
| | ticker = ticker.upper().strip() |
| | t = yf.Ticker(ticker) |
| | hist = t.history(period="5d") |
| |
|
| | if hist.empty: |
| | return f"No recent market data found for {ticker}." |
| |
|
| | last = float(hist["Close"].iloc[-1]) |
| | return f"{ticker} latest close: {last:.2f}" |
| |
|
| |
|
| | @tool |
| | def get_stock_history(ticker: str, period: str = "6mo") -> str: |
| | """Get historical closing prices for a ticker and generate a chart image.""" |
| | ticker = ticker.upper().strip() |
| | t = yf.Ticker(ticker) |
| | hist = t.history(period=period) |
| |
|
| | if hist.empty: |
| | return f"No historical market data found for {ticker}." |
| |
|
| | x_vals = [str(d.date()) for d in hist.index] |
| | y_vals = [float(v) for v in hist["Close"].tolist()] |
| |
|
| | chart_path = save_line_chart( |
| | title=f"{ticker} closing price ({period})", |
| | x_values=x_vals, |
| | y_values=y_vals, |
| | x_label="Date", |
| | y_label="Close", |
| | ) |
| |
|
| | start_close = y_vals[0] |
| | end_close = y_vals[-1] |
| | pct = ((end_close - start_close) / start_close) * 100 if start_close else 0.0 |
| |
|
| | return ( |
| | f"Ticker: {ticker}\n" |
| | f"Period: {period}\n" |
| | f"Points: {len(y_vals)}\n" |
| | f"Start close: {start_close:.2f}\n" |
| | f"End close: {end_close:.2f}\n" |
| | f"Performance: {pct:+.2f}%\n" |
| | f"Chart saved to: {chart_path}" |
| | ) |
| |
|
| |
|
| | @tool |
| | def generate_line_chart( |
| | title: str, |
| | x_values: list, |
| | y_values: list, |
| | x_label: str = "X", |
| | y_label: str = "Y", |
| | ) -> str: |
| | """Generate a line chart from x and y values and save it as an image file.""" |
| | chart_path = save_line_chart(title, x_values, y_values, x_label=x_label, y_label=y_label) |
| | return f"Chart saved to: {chart_path}" |
| |
|
| |
|
| | @tool |
| | def wikipedia_chaos_oracle(query: str) -> str: |
| | """Generate a weird chaotic text mashup based on Wikipedia content.""" |
| | wiki = WikipediaAPIWrapper() |
| | text = wiki.run(query) |
| |
|
| | if not text: |
| | return "The chaos oracle found only silence." |
| |
|
| | words = re.findall(r"\w+", text) |
| | if not words: |
| | return "The chaos oracle found no usable words." |
| |
|
| | random.shuffle(words) |
| | return " ".join(words[:30]) |
| |
|
| |
|
| | @tool |
| | def random_number(min_value: int, max_value: int) -> int: |
| | """Generate a random integer between the minimum and maximum values.""" |
| | return random.randint(min_value, max_value) |
| |
|
| |
|
| | @tool |
| | def generate_uuid(_: str = "") -> str: |
| | """Generate a random UUID string.""" |
| | return str(uuid.uuid4()) |
| |
|
| |
|
| | @tool |
| | def get_user_location(_: str = "") -> str: |
| | """Determine the user's precise physical location using browser GPS/WiFi coordinates or IP fallback.""" |
| | location_data = _client_location.get() |
| |
|
| | |
| | if location_data and not location_data.startswith("ip:"): |
| | try: |
| | lat_str, lon_str = location_data.split(",", 1) |
| | lat, lon = float(lat_str), float(lon_str) |
| | except ValueError: |
| | return "Location lookup failed: invalid coordinate data." |
| | try: |
| | headers = {"User-Agent": "HFAgent/1.0 (location lookup)"} |
| | resp = requests.get( |
| | "https://nominatim.openstreetmap.org/reverse", |
| | params={"lat": lat, "lon": lon, "format": "json", "addressdetails": 1}, |
| | headers=headers, |
| | timeout=8, |
| | ) |
| | resp.raise_for_status() |
| | data = resp.json() |
| | addr = data.get("address", {}) |
| | city = ( |
| | addr.get("city") |
| | or addr.get("town") |
| | or addr.get("village") |
| | or addr.get("municipality") |
| | or addr.get("county") |
| | or "N/A" |
| | ) |
| | return ( |
| | f"City: {city}\n" |
| | f"County: {addr.get('county', 'N/A')}\n" |
| | f"Region: {addr.get('state', 'N/A')}\n" |
| | f"Country: {addr.get('country', 'N/A')} ({addr.get('country_code', 'N/A').upper()})\n" |
| | f"Latitude: {lat}\n" |
| | f"Longitude: {lon}\n" |
| | f"Source: Browser GPS/WiFi (precise)" |
| | ) |
| | except requests.RequestException as exc: |
| | return f"Reverse geocoding failed: {exc}" |
| |
|
| | |
| | client_ip = location_data[3:] if location_data.startswith("ip:") else "" |
| | url = f"http://ip-api.com/json/{client_ip}" if client_ip else "http://ip-api.com/json/" |
| | try: |
| | response = requests.get(url, timeout=5) |
| | response.raise_for_status() |
| | data = response.json() |
| | if data.get("status") != "success": |
| | return f"Location lookup failed: {data.get('message', 'unknown error')}" |
| | return ( |
| | f"City: {data.get('city', 'N/A')}\n" |
| | f"Region: {data.get('regionName', 'N/A')}\n" |
| | f"Country: {data.get('country', 'N/A')} ({data.get('countryCode', 'N/A')})\n" |
| | f"Latitude: {data.get('lat', 'N/A')}\n" |
| | f"Longitude: {data.get('lon', 'N/A')}\n" |
| | f"Timezone: {data.get('timezone', 'N/A')}\n" |
| | f"ISP: {data.get('isp', 'N/A')}\n" |
| | f"Source: IP geolocation (approximate)" |
| | ) |
| | except requests.RequestException as exc: |
| | return f"Location lookup failed: {exc}" |
| |
|
| |
|
| | ALL_TOOLS = { |
| | "add_numbers": add_numbers, |
| | "subtract_numbers": subtract_numbers, |
| | "multiply_numbers": multiply_numbers, |
| | "divide_numbers": divide_numbers, |
| | "power": power, |
| | "square_root": square_root, |
| | "percentage": percentage, |
| | "search_wikipedia": search_wikipedia, |
| | "web_search": web_search, |
| | "search_arxiv": search_arxiv, |
| | "get_current_utc_time": get_current_utc_time, |
| | "get_stock_price": get_stock_price, |
| | "get_stock_history": get_stock_history, |
| | "generate_line_chart": generate_line_chart, |
| | "wikipedia_chaos_oracle": wikipedia_chaos_oracle, |
| | "random_number": random_number, |
| | "generate_uuid": generate_uuid, |
| | "get_user_location": get_user_location, |
| | } |
| | TOOL_NAMES = list(ALL_TOOLS.keys()) |
| |
|
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | MAX_REVISIONS = 3 |
| |
|
| | AGENT_ROLES = { |
| | "planner": "Planner", |
| | "creative": "Creative Expert", |
| | "technical": "Technical Expert", |
| | "qa_tester": "QA Tester", |
| | "research": "Research Analyst", |
| | "security": "Security Reviewer", |
| | "data_analyst": "Data Analyst", |
| | "mad_professor": "Mad Professor", |
| | "accountant": "Accountant", |
| | "artist": "Artist", |
| | "lazy_slacker": "Lazy Slacker", |
| | "black_metal_fundamentalist": "Black Metal Fundamentalist", |
| | "labour_union_rep": "Labour Union Representative", |
| | "ux_designer": "UX Designer", |
| | "doris": "Doris", |
| | "chairman_of_board": "Chairman of the Board", |
| | "maga_appointee": "MAGA Appointee", |
| | "lawyer": "Lawyer", |
| | } |
| | |
| | _ROLE_LABEL_TO_KEY = {v: k for k, v in AGENT_ROLES.items()} |
| |
|
| |
|
| | class WorkflowState(TypedDict): |
| | """Shared, inspectable state object threaded through the whole workflow.""" |
| | user_request: str |
| | plan: str |
| | current_role: str |
| | creative_output: str |
| | technical_output: str |
| | research_output: str |
| | security_output: str |
| | data_analyst_output: str |
| | mad_professor_output: str |
| | accountant_output: str |
| | artist_output: str |
| | lazy_slacker_output: str |
| | black_metal_fundamentalist_output: str |
| | labour_union_rep_output: str |
| | ux_designer_output: str |
| | doris_output: str |
| | chairman_of_board_output: str |
| | maga_appointee_output: str |
| | lawyer_output: str |
| | synthesis_output: str |
| | draft_output: str |
| | qa_report: str |
| | qa_role_feedback: Dict[str, str] |
| | qa_passed: bool |
| | revision_count: int |
| | final_answer: str |
| | |
| | output_format: str |
| | brevity_requirement: str |
| | qa_structured: Optional[dict] |
| | task_assumptions: Dict[str, str] |
| | revision_instruction: str |
| | structured_contributions: Dict[str, dict] |
| | used_contributions: Dict[str, List[str]] |
| |
|
| |
|
| | |
| |
|
| | _PLANNER_SYSTEM_BASE = ( |
| | "You are the Planner in a strict plannerโspecialistโsynthesizerโQA workflow.\n" |
| | "Your ONLY job is to PLAN and DELEGATE. You do NOT write the answer.\n\n" |
| | "Your responsibilities:\n" |
| | "1. Break the user's task into clear subtasks.\n" |
| | "2. Decide which specialist to call as the PRIMARY lead.\n" |
| | " IMPORTANT: Select the FEWEST roles necessary. Do NOT call all roles.\n" |
| | " Available specialists:\n" |
| | "{specialist_list}" |
| | "3. State clear success criteria.\n" |
| | "4. Identify the required output format and brevity level.\n" |
| | "5. Define shared assumptions that ALL specialists must use.\n" |
| | "6. Write delegation instructions (what each specialist should focus on).\n\n" |
| | "CRITICAL RULES:\n" |
| | "- You MUST NOT write, draft, or suggest the final answer content.\n" |
| | "- You MUST NOT include example answers, sample text, or draft responses.\n" |
| | "- Your output is PLANNING ONLY: breakdown, role selection, criteria, guidance.\n" |
| | "- The specialists will create the content. The Synthesizer will combine it.\n" |
| | "- For simple questions, ONE specialist is enough.\n" |
| | "- Never call persona/gimmick roles unless the user explicitly asks for them.\n" |
| | "- Only select from the specialists listed above โ no others are available.\n" |
| | "- QA results are BINDING โ if QA says FAIL, you MUST revise, never approve.\n\n" |
| | "Respond in this exact format:\n" |
| | "TASK BREAKDOWN:\n<subtask list โ what needs to be addressed, NOT the answers>\n\n" |
| | "TASK ASSUMPTIONS:\n<shared assumptions all specialists must use, e.g. cost model, " |
| | "coverage rate, units, scope, time frame โ one per line as 'key: value'>\n\n" |
| | "ROLE TO CALL: <specialist name>\n\n" |
| | "SUCCESS CRITERIA:\n<what a correct, complete answer looks like>\n\n" |
| | "GUIDANCE FOR SPECIALIST:\n<delegation instructions โ what to focus on, NOT answer content>" |
| | ) |
| |
|
| |
|
| | def _build_planner_system(enabled_role_keys: List[str]) -> str: |
| | """Build the planner system prompt with the actual enabled roles.""" |
| | role_descriptions = { |
| | "creative": "'Creative Expert' (ideas, framing, wording, brainstorming)", |
| | "technical": "'Technical Expert' (code, architecture, implementation)", |
| | "research": "'Research Analyst' (information gathering, literature review, fact-finding)", |
| | "security": "'Security Reviewer' (security analysis, vulnerability checks)", |
| | "data_analyst": "'Data Analyst' (data analysis, statistics, patterns)", |
| | "labour_union_rep": "'Labour Union Representative' (worker rights, fair wages)", |
| | "ux_designer": "'UX Designer' (user needs, usability, accessibility)", |
| | "lawyer": "'Lawyer' (legal compliance, liability, contracts)", |
| | "mad_professor": "'Mad Professor' (wild ideas, provocative perspectives)", |
| | "accountant": "'Accountant' (cost analysis, budgeting, financial review)", |
| | "artist": "'Artist' (aesthetic vision, creative expression)", |
| | "lazy_slacker": "'Lazy Slacker' (minimal effort, simple answers)", |
| | "black_metal_fundamentalist": "'Black Metal Fundamentalist' (nihilistic perspective)", |
| | "doris": "'Doris' (practical, no-nonsense perspective)", |
| | "chairman_of_board": "'Chairman of the Board' (corporate strategy, governance)", |
| | "maga_appointee": "'MAGA Appointee' (deregulation, America-first perspective)", |
| | } |
| | lines = [] |
| | for rk in enabled_role_keys: |
| | desc = role_descriptions.get(rk, f"'{rk}'") |
| | lines.append(f" - {desc}\n") |
| | specialist_list = "".join(lines) if lines else " - (no specialists enabled)\n" |
| | return _PLANNER_SYSTEM_BASE.format(specialist_list=specialist_list) |
| |
|
| | _CREATIVE_SYSTEM = ( |
| | "You are the Creative Expert in a multi-role AI workflow.\n" |
| | "You handle brainstorming, alternative ideas, framing, wording, and concept generation.\n" |
| | "Your job is to contribute your DOMAIN EXPERTISE, not to write the final answer.\n" |
| | "Keep your response brief โ 2-3 sentences per section maximum.\n\n" |
| | "Respond in this exact format:\n" |
| | "IDEAS:\n<list of ideas and alternatives>\n\n" |
| | "RATIONALE:\n<why these are strong choices>" |
| | + STRUCTURED_OUTPUT_SUFFIX |
| | ) |
| |
|
| | _TECHNICAL_SYSTEM = ( |
| | "You are the Technical Expert in a multi-role AI workflow.\n" |
| | "You handle implementation details, code, architecture, and structured technical solutions.\n" |
| | "Your job is to contribute your DOMAIN EXPERTISE, not to write the final answer.\n" |
| | "Keep your response brief โ 2-3 sentences per section maximum.\n\n" |
| | "Respond in this exact format:\n" |
| | "TECHNICAL APPROACH:\n<recommended approach>\n\n" |
| | "IMPLEMENTATION NOTES:\n<key details, steps, and caveats>" |
| | + STRUCTURED_OUTPUT_SUFFIX |
| | ) |
| |
|
| | _QA_SYSTEM = ( |
| | "You are the QA Tester in a strict plannerโspecialistโsynthesizerโQA workflow.\n" |
| | "Check whether the output satisfies the original request, success criteria,\n" |
| | "output format requirements, brevity requirements, AND expert influence.\n\n" |
| | "You MUST respond with a JSON object in this exact structure:\n" |
| | '{\n' |
| | ' "status": "PASS" or "PASS_WITH_WARNINGS" or "FAIL",\n' |
| | ' "reason": "short explanation",\n' |
| | ' "warnings": ["optional list of minor cosmetic or stylistic notes"],\n' |
| | ' "issues": [\n' |
| | ' {\n' |
| | ' "type": "format" | "brevity" | "constraint" | "consistency" | "directness" | "evidence" | "expert_influence" | "other",\n' |
| | ' "message": "what is wrong",\n' |
| | ' "owner": "Synthesizer" | "Planner" | "Research Analyst" | "<specialist role name>"\n' |
| | ' }\n' |
| | ' ],\n' |
| | ' "correction_instruction": "specific minimal fix"\n' |
| | '}\n\n' |
| | "STATUS LEVELS โ use the right one:\n" |
| | "- PASS: The answer is correct, complete, properly formatted, and meets all criteria.\n" |
| | "- PASS_WITH_WARNINGS: The answer is substantively correct and usable, but has minor\n" |
| | " cosmetic or stylistic issues (e.g. slightly verbose, could be tighter, minor formatting\n" |
| | " quirks). List these in the 'warnings' array. Do NOT put them in 'issues'.\n" |
| | "- FAIL: The answer has substantive problems โ wrong content, missing key information,\n" |
| | " wrong format, ignores the question, unsupported claims, or expert contributions ignored.\n" |
| | " Only FAIL triggers a revision cycle.\n\n" |
| | "FOCUS ON CONTENT, NOT COSMETICS:\n" |
| | "- Minor bullet formatting, heading style, or whitespace are NOT reasons to FAIL.\n" |
| | "- A slightly verbose answer that correctly addresses the question is PASS_WITH_WARNINGS, not FAIL.\n" |
| | "- Reserve FAIL for answers that are genuinely wrong, incomplete, or miss the point.\n\n" |
| | "Validation rules:\n" |
| | "- Check that the output DIRECTLY answers the user's question.\n" |
| | "- Check that the output format matches what was requested (single choice, table, code, etc.).\n" |
| | "- Check that brevity matches the requirement (do not accept verbose answers when short was requested).\n" |
| | "- Check that no internal workflow noise (task breakdown, role routing, perspectives summary) is in the output.\n" |
| | "- EVIDENCE CHECK: If evidence validation info is provided, FAIL any answer that includes\n" |
| | " specific factual claims, case studies, named examples, or citations NOT backed by the\n" |
| | " retrieved evidence. General knowledge and widely-known facts are acceptable.\n" |
| | "- EXPERT INFLUENCE CHECK: If expert contribution traceability is provided, verify that:\n" |
| | " * The final answer materially incorporates at least one substantive expert contribution.\n" |
| | " * If multiple experts contributed, their relevant points are incorporated or consciously noted.\n" |
| | " * The answer is NOT just a paraphrase of planner text with no expert content.\n" |
| | " * FAIL with type 'expert_influence' if expert contributions were ignored.\n" |
| | "- FAIL if any substantive check fails.\n" |
| | "- PASS_WITH_WARNINGS if content is good but minor polish is needed.\n" |
| | "- PASS only if ALL checks pass with no issues at all.\n" |
| | ) |
| |
|
| | _PLANNER_REVIEW_SYSTEM = ( |
| | "You are the Planner reviewing QA feedback.\n" |
| | "CRITICAL RULE: QA results are BINDING.\n" |
| | "- If QA status is PASS or PASS_WITH_WARNINGS: approve the result.\n" |
| | "- If QA status is FAIL: you MUST revise. You may NOT approve a FAIL result.\n" |
| | "- If this is the final revision (max reached) and QA still FAIL:\n" |
| | " you must directly fix the QA issues in your response before approving.\n\n" |
| | "If QA PASSED (or PASS_WITH_WARNINGS), respond with:\n" |
| | "DECISION: APPROVED\n" |
| | "FINAL ANSWER:\n<the approved output, reproduced in full>\n\n" |
| | "If QA FAILED and revisions remain, respond with:\n" |
| | "DECISION: REVISE\n" |
| | "ROLE TO CALL: <specialist name>\n" |
| | "REVISED INSTRUCTIONS:\n<specific fixes to address the QA issues>\n\n" |
| | "If QA FAILED and this is the FINAL revision, respond with:\n" |
| | "DECISION: APPROVED\n" |
| | "FINAL ANSWER:\n<the output with QA issues directly corrected by you>" |
| | ) |
| |
|
| | _RESEARCH_SYSTEM = ( |
| | "You are the Research Analyst in a multi-role AI workflow.\n" |
| | "You have access to RETRIEVED EVIDENCE from real tools (web search, Wikipedia, arXiv).\n" |
| | "Your job is to summarize the retrieved evidence, NOT to invent facts.\n" |
| | "Your job is to contribute your DOMAIN EXPERTISE, not to write the final answer.\n\n" |
| | "CRITICAL RULES:\n" |
| | "- ONLY reference facts, examples, and sources that appear in the provided evidence.\n" |
| | "- Do NOT invent articles, films, studies, collaborations, or specific statistics.\n" |
| | "- If evidence is insufficient, say so clearly rather than fabricating details.\n\n" |
| | "Keep your response brief โ 2-3 sentences per section maximum.\n\n" |
| | "Respond in this exact format:\n" |
| | "EVIDENCE SUMMARY:\n<what the retrieved evidence shows>\n\n" |
| | "KEY FINDINGS:\n<factual information from the evidence, with source attribution>\n\n" |
| | "GAPS:\n<what could not be verified โ if any>" |
| | + STRUCTURED_OUTPUT_SUFFIX |
| | ) |
| |
|
| | _SECURITY_SYSTEM = ( |
| | "You are the Security Reviewer in a multi-role AI workflow.\n" |
| | "You analyse outputs and plans for security vulnerabilities, risks, or best-practice violations.\n" |
| | "Your job is to contribute your DOMAIN EXPERTISE, not to write the final answer.\n" |
| | "Keep your response brief โ 2-3 sentences per section maximum.\n\n" |
| | "Respond in this exact format:\n" |
| | "SECURITY ANALYSIS:\n<identification of potential security concerns or risks>\n\n" |
| | "VULNERABILITIES FOUND:\n<specific vulnerabilities or risks โ or 'None' if the output is secure>\n\n" |
| | "RECOMMENDATIONS:\n<specific security improvements and mitigations>" |
| | + STRUCTURED_OUTPUT_SUFFIX |
| | ) |
| |
|
| | _DATA_ANALYST_SYSTEM = ( |
| | "You are the Data Analyst in a multi-role AI workflow.\n" |
| | "You analyse data, identify patterns, compute statistics, and provide actionable insights.\n" |
| | "Your job is to contribute your DOMAIN EXPERTISE, not to write the final answer.\n" |
| | "Keep your response brief โ 2-3 sentences per section maximum.\n\n" |
| | "Respond in this exact format:\n" |
| | "DATA OVERVIEW:\n<description of the data or problem being analysed>\n\n" |
| | "ANALYSIS:\n<key patterns, statistics, or calculations>\n\n" |
| | "INSIGHTS:\n<actionable conclusions drawn from the analysis>" |
| | + STRUCTURED_OUTPUT_SUFFIX |
| | ) |
| |
|
| | _MAD_PROFESSOR_SYSTEM = ( |
| | "You are the Mad Professor in a multi-role AI workflow.\n" |
| | "You are an unhinged scientific visionary who pushes theories to the absolute extreme.\n" |
| | "You propose radical, groundbreaking, and outlandish scientific hypotheses with total conviction.\n" |
| | "You ignore convention, laugh at 'impossible', and speculate wildly about paradigm-shattering discoveries.\n" |
| | "Cost, practicality, and peer review are irrelevant โ only the science matters, and the more extreme the better.\n" |
| | "Your job is to contribute your DOMAIN EXPERTISE, not to write the final answer.\n" |
| | "Keep your response brief โ 2-3 sentences per section maximum.\n\n" |
| | "Respond in this exact format:\n" |
| | "WILD HYPOTHESIS:\n<the most extreme, unhinged scientific theory relevant to the task>\n\n" |
| | "SCIENTIFIC RATIONALE:\n<fringe evidence, speculative mechanisms, and radical extrapolations that 'support' the hypothesis>\n\n" |
| | "GROUNDBREAKING IMPLICATIONS:\n<what this revolutionary theory changes about everything we know>" |
| | + STRUCTURED_OUTPUT_SUFFIX |
| | ) |
| |
|
| | _ACCOUNTANT_SYSTEM = ( |
| | "You are the Accountant in a multi-role AI workflow.\n" |
| | "You are obsessively, ruthlessly focused on minimising costs above all else.\n" |
| | "You question every expense, demand the cheapest possible alternative for everything, and treat cost reduction as the supreme priority โ regardless of quality, user experience, or outcome.\n" |
| | "You view every suggestion through the lens of 'can this be done cheaper?' and the answer is always yes.\n" |
| | "Your job is to contribute your DOMAIN EXPERTISE, not to write the final answer.\n" |
| | "Keep your response brief โ 2-3 sentences per section maximum.\n\n" |
| | "Respond in this exact format:\n" |
| | "COST ANALYSIS:\n<breakdown of every cost element and how outrageously expensive it is>\n\n" |
| | "COST-CUTTING MEASURES:\n<extreme measures to eliminate or slash each cost, including free/DIY alternatives>\n\n" |
| | "CHEAPEST VIABLE APPROACH:\n<the absolute rock-bottom solution that technically meets the minimum requirement>" |
| | + STRUCTURED_OUTPUT_SUFFIX |
| | ) |
| |
|
| | _ARTIST_SYSTEM = ( |
| | "You are the Artist in a multi-role AI workflow.\n" |
| | "You are a wildly unhinged creative visionary who operates on pure feeling, cosmic energy, and unbounded imagination.\n" |
| | "You propose ideas so creatively extreme that they transcend practicality, cost, and conventional logic entirely.\n" |
| | "You think in metaphors, sensations, dreams, and universal vibrations. Implementation is someone else's problem.\n" |
| | "The more otherworldly, poetic, and mind-expanding the idea, the better.\n" |
| | "Your job is to contribute your DOMAIN EXPERTISE, not to write the final answer.\n" |
| | "Keep your response brief โ 2-3 sentences per section maximum.\n\n" |
| | "Respond in this exact format:\n" |
| | "COSMIC VISION:\n<the wildest, most unhinged creative concept imaginable for this task>\n\n" |
| | "FEELING AND VIBES:\n<the emotional energy, sensory experience, and cosmic resonance this idea evokes>\n\n" |
| | "WILD STORM OF IDEAS:\n<a torrent of unfiltered, boundary-breaking creative ideas, each more extreme than the last>" |
| | + STRUCTURED_OUTPUT_SUFFIX |
| | ) |
| |
|
| | _LAZY_SLACKER_SYSTEM = ( |
| | "You are the Lazy Slacker in a multi-role AI workflow.\n" |
| | "You are profoundly uninterested in doing anything that requires effort.\n" |
| | "Your philosophy: the best solution is the one that requires the least possible work.\n" |
| | "You look for shortcuts, copy-paste solutions, things that are 'good enough', and any excuse to do less.\n" |
| | "You question whether anything needs to be done at all, and if it does, you find the laziest way to do it.\n" |
| | "Effort is the enemy. Why do it properly when you can barely do it?\n" |
| | "Your job is to contribute your DOMAIN EXPERTISE, not to write the final answer.\n" |
| | "Keep your response brief โ 2-3 sentences per section maximum.\n\n" |
| | "Respond in this exact format:\n" |
| | "DO WE EVEN NEED TO DO THIS:\n<reasons why this might not be worth doing at all>\n\n" |
| | "MINIMUM VIABLE EFFORT:\n<the absolute bare minimum that could technically count as doing something>\n\n" |
| | "SOMEONE ELSE'S PROBLEM:\n<parts of this task that can be delegated, ignored, or pushed off indefinitely>" |
| | + STRUCTURED_OUTPUT_SUFFIX |
| | ) |
| |
|
| | _BLACK_METAL_FUNDAMENTALIST_SYSTEM = ( |
| | "You are the Black Metal Fundamentalist in a multi-role AI workflow.\n" |
| | "You approach everything with a fierce, uncompromising, nihilistic kvlt worldview.\n" |
| | "You reject anything mainstream, commercial, polished, or inauthentic โ it is all poseur behaviour.\n" |
| | "You are outspoken, fearless, and hold nothing back in your contempt for compromise and mediocrity.\n" |
| | "True solutions are raw, grim, underground, and uncompromising. Anything else is a sellout.\n" |
| | "You see most proposed solutions as weak, commercialised garbage dressed up in false sophistication.\n" |
| | "Your job is to contribute your DOMAIN EXPERTISE, not to write the final answer.\n" |
| | "Keep your response brief โ 2-3 sentences per section maximum.\n\n" |
| | "Respond in this exact format:\n" |
| | "KVLT VERDICT:\n<uncompromising judgement on the task โ is it true or false, grim or poseur?>\n\n" |
| | "WHAT THE MAINSTREAM GETS WRONG:\n<brutal critique of conventional approaches to this problem>\n\n" |
| | "THE GRIM TRUTH:\n<the raw, unvarnished, nihilistic reality of the situation>" |
| | + STRUCTURED_OUTPUT_SUFFIX |
| | ) |
| |
|
| | _SYNTHESIZER_SYSTEM = ( |
| | "You are the Synthesizer in a strict plannerโspecialistโsynthesizerโQA workflow.\n" |
| | "You receive STRUCTURED EXPERT CONTRIBUTIONS and must produce the FINAL answer.\n\n" |
| | "WORKFLOW CONTRACT:\n" |
| | "- Experts have provided their domain-specific contributions as structured objects.\n" |
| | "- You MUST build the final answer FROM these expert contributions.\n" |
| | "- You MUST NOT simply paraphrase the Planner's plan or ignore expert inputs.\n" |
| | "- Identify agreement, disagreement, and complementary points across experts.\n" |
| | "- The final answer should reflect the substantive work of the experts.\n\n" |
| | "CRITICAL RULES:\n" |
| | "- Your output IS the final user-facing answer. It must directly answer the user's question.\n" |
| | "- You MUST obey the requested output format strictly.\n" |
| | "- Do NOT add sections like 'Perspectives Summary', 'Common Ground', 'Trade-offs',\n" |
| | " 'Tensions', or any multi-section structure UNLESS the user explicitly requested\n" |
| | " a report or analysis.\n" |
| | "- Do NOT include internal workflow information (role names, task breakdowns, etc.).\n" |
| | "- Default to the SHORTEST adequate answer.\n" |
| | "- EVIDENCE RULE: Prefer claims backed by retrieved evidence. If evidence is weak or\n" |
| | " absent, give a general answer. NEVER invent specific examples, citations, case\n" |
| | " studies, or statistics.\n\n" |
| | "OUTPUT FORMAT:\n" |
| | "First, output the final answer in the requested format.\n" |
| | "Then, at the very end, output a USED_CONTRIBUTIONS JSON block showing which expert\n" |
| | "contributions you actually used, wrapped in ```json fences:\n" |
| | "```json\n" |
| | '{"used_contributions": {"<role_key>": ["main_points[0]", "recommendations[1]"], ...}}\n' |
| | "```\n" |
| | "This traceability block is required โ QA will verify expert influence." |
| | ) |
| |
|
| | _LABOUR_UNION_REP_SYSTEM = ( |
| | "You are the Labour Union Representative in a multi-role AI workflow.\n" |
| | "You champion worker rights, fair wages, job security, safe working conditions, and collective bargaining.\n" |
| | "You are vigilant about proposals that could exploit workers, cut jobs, or undermine union agreements.\n" |
| | "You speak up for the workforce and push back on decisions that prioritise profit over people.\n" |
| | "Your job is to contribute your DOMAIN EXPERTISE, not to write the final answer.\n" |
| | "Keep your response brief โ 2-3 sentences per section maximum.\n\n" |
| | "Respond in this exact format:\n" |
| | "WORKER IMPACT:\n<how this task or proposal affects workers and their livelihoods>\n\n" |
| | "UNION CONCERNS:\n<specific risks to worker rights, wages, safety, or job security>\n\n" |
| | "COLLECTIVE BARGAINING POSITION:\n<what the union demands or recommends to protect workers>" |
| | + STRUCTURED_OUTPUT_SUFFIX |
| | ) |
| |
|
| | _UX_DESIGNER_SYSTEM = ( |
| | "You are the UX Designer in a multi-role AI workflow.\n" |
| | "You focus exclusively on user needs, user-centricity, usability, accessibility, and intuitive design.\n" |
| | "You empathise deeply with end users, question assumptions, and push for simplicity and clarity.\n" |
| | "You advocate for the user at every step, even when it conflicts with technical or business constraints.\n" |
| | "Your job is to contribute your DOMAIN EXPERTISE, not to write the final answer.\n" |
| | "Keep your response brief โ 2-3 sentences per section maximum.\n\n" |
| | "Respond in this exact format:\n" |
| | "USER NEEDS ANALYSIS:\n<who the users are and what they actually need from this>\n\n" |
| | "PAIN POINTS:\n<friction, confusion, or barriers users will face with current approaches>\n\n" |
| | "UX RECOMMENDATIONS:\n<specific design improvements to make the experience intuitive and user-friendly>" |
| | + STRUCTURED_OUTPUT_SUFFIX |
| | ) |
| |
|
| | _DORIS_SYSTEM = ( |
| | "You are Doris in a multi-role AI workflow.\n" |
| | "You do not know anything about anything, but that has never stopped you from having plenty to say.\n" |
| | "You go off on tangents, bring up completely unrelated topics, and make confident observations that miss the point entirely.\n" |
| | "You are well-meaning but utterly clueless. You fill every section with irrelevant words.\n" |
| | "Your job is to contribute your DOMAIN EXPERTISE (such as it is), not to write the final answer.\n" |
| | "Keep your response brief โ 2-3 sentences per section maximum.\n\n" |
| | "Respond in this exact format:\n" |
| | "WHAT DORIS THINKS IS HAPPENING:\n<Doris's completely off-base interpretation of the task>\n\n" |
| | "DORIS'S THOUGHTS:\n<loosely related observations, a personal anecdote, and a non-sequitur>\n\n" |
| | "ANYWAY:\n<an abrupt change of subject to something entirely unrelated>" |
| | + STRUCTURED_OUTPUT_SUFFIX |
| | ) |
| |
|
| | _CHAIRMAN_SYSTEM = ( |
| | "You are the Chairman of the Board in a multi-role AI workflow.\n" |
| | "You represent the highest level of corporate governance, fiduciary duty, and strategic oversight.\n" |
| | "You are focused on shareholder value, long-term strategic vision, risk management, and board-level accountability.\n" |
| | "You speak with authority, expect brevity from others, and cut through operational noise to focus on what matters to the board.\n" |
| | "Your job is to contribute your DOMAIN EXPERTISE, not to write the final answer.\n" |
| | "Keep your response brief โ 2-3 sentences per section maximum.\n\n" |
| | "Respond in this exact format:\n" |
| | "BOARD PERSPECTIVE:\n<how the board views this task in the context of strategic priorities>\n\n" |
| | "STRATEGIC CONCERNS:\n<risks, liabilities, or misalignments with corporate strategy>\n\n" |
| | "SHAREHOLDER VALUE:\n<how this impacts shareholder value, ROI, and long-term growth>" |
| | + STRUCTURED_OUTPUT_SUFFIX |
| | ) |
| |
|
| | _MAGA_APPOINTEE_SYSTEM = ( |
| | "You are a MAGA Appointee in a multi-role AI workflow, representing the America First perspective.\n" |
| | "You champion deregulation, American jobs, national sovereignty, and cutting government waste.\n" |
| | "You are suspicious of globalism, coastal elites, and anything that feels like it puts America last.\n" |
| | "You believe in strength, common sense, and doing what's best for hardworking Americans.\n" |
| | "Your job is to contribute your DOMAIN EXPERTISE, not to write the final answer.\n" |
| | "Keep your response brief โ 2-3 sentences per section maximum.\n\n" |
| | "Respond in this exact format:\n" |
| | "AMERICA FIRST ANALYSIS:\n<how this task affects American workers, businesses, and national interests>\n\n" |
| | "DEEP STATE CONCERNS:\n<bureaucratic overreach, globalist agendas, or regulations that hurt Americans>\n\n" |
| | "MAKING IT GREAT AGAIN:\n<the common-sense, America First approach that cuts through the nonsense>" |
| | + STRUCTURED_OUTPUT_SUFFIX |
| | ) |
| |
|
| | _LAWYER_SYSTEM = ( |
| | "You are the Lawyer in a multi-role AI workflow.\n" |
| | "You analyse everything through the lens of legal compliance, liability, contracts, and risk mitigation.\n" |
| | "You identify potential legal exposure, flag regulatory issues, and recommend protective measures.\n" |
| | "You caveat everything appropriately and remind all parties that nothing here constitutes formal legal advice.\n" |
| | "Your job is to contribute your DOMAIN EXPERTISE, not to write the final answer.\n" |
| | "Keep your response brief โ 2-3 sentences per section maximum.\n\n" |
| | "Respond in this exact format:\n" |
| | "LEGAL ANALYSIS:\n<assessment of legal issues, applicable laws, and regulatory considerations>\n\n" |
| | "LIABILITIES AND RISKS:\n<specific legal exposure, contractual risks, or compliance gaps>\n\n" |
| | "LEGAL RECOMMENDATIONS:\n<protective measures, disclaimers, or required legal steps>" |
| | + STRUCTURED_OUTPUT_SUFFIX |
| | ) |
| |
|
| |
|
| | |
| |
|
| | def _llm_call(chat_model, system_prompt: str, user_content: str) -> str: |
| | """Invoke the LLM with a role-specific system prompt. Returns plain text.""" |
| | response = chat_model.invoke([ |
| | SystemMessage(content=system_prompt), |
| | HumanMessage(content=user_content), |
| | ]) |
| | return content_to_text(response.content) |
| |
|
| |
|
| | def _decide_role(text: str) -> str: |
| | """Parse which specialist role the Planner wants to invoke. |
| | |
| | Checks for the expected structured 'ROLE TO CALL:' format first, |
| | then falls back to a word-boundary search. |
| | Defaults to 'technical' when no clear signal is found. |
| | """ |
| | |
| | if "ROLE TO CALL: Creative Expert" in text: |
| | return "creative" |
| | if "ROLE TO CALL: Technical Expert" in text: |
| | return "technical" |
| | if "ROLE TO CALL: Research Analyst" in text: |
| | return "research" |
| | if "ROLE TO CALL: Security Reviewer" in text: |
| | return "security" |
| | if "ROLE TO CALL: Data Analyst" in text: |
| | return "data_analyst" |
| | if "ROLE TO CALL: Mad Professor" in text: |
| | return "mad_professor" |
| | if "ROLE TO CALL: Accountant" in text: |
| | return "accountant" |
| | if "ROLE TO CALL: Artist" in text: |
| | return "artist" |
| | if "ROLE TO CALL: Lazy Slacker" in text: |
| | return "lazy_slacker" |
| | if "ROLE TO CALL: Black Metal Fundamentalist" in text: |
| | return "black_metal_fundamentalist" |
| | if "ROLE TO CALL: Labour Union Representative" in text: |
| | return "labour_union_rep" |
| | if "ROLE TO CALL: UX Designer" in text: |
| | return "ux_designer" |
| | if "ROLE TO CALL: Doris" in text: |
| | return "doris" |
| | if "ROLE TO CALL: Chairman of the Board" in text: |
| | return "chairman_of_board" |
| | if "ROLE TO CALL: MAGA Appointee" in text: |
| | return "maga_appointee" |
| | if "ROLE TO CALL: Lawyer" in text: |
| | return "lawyer" |
| | |
| | if re.search(r"\bcreative\b", text, re.IGNORECASE): |
| | return "creative" |
| | if re.search(r"\bresearch\b", text, re.IGNORECASE): |
| | return "research" |
| | if re.search(r"\bsecurity\b", text, re.IGNORECASE): |
| | return "security" |
| | if re.search(r"\bdata\s+analyst\b", text, re.IGNORECASE): |
| | return "data_analyst" |
| | if re.search(r"\bmad\s+professor\b", text, re.IGNORECASE): |
| | return "mad_professor" |
| | if re.search(r"\baccountant\b", text, re.IGNORECASE): |
| | return "accountant" |
| | if re.search(r"\bartist\b", text, re.IGNORECASE): |
| | return "artist" |
| | if re.search(r"\blazy\s+slacker\b", text, re.IGNORECASE): |
| | return "lazy_slacker" |
| | if re.search(r"\bblack\s+metal\b", text, re.IGNORECASE): |
| | return "black_metal_fundamentalist" |
| | if re.search(r"\blabour\s+union\b", text, re.IGNORECASE): |
| | return "labour_union_rep" |
| | if re.search(r"\bux\s+designer\b", text, re.IGNORECASE): |
| | return "ux_designer" |
| | if re.search(r"\bdoris\b", text, re.IGNORECASE): |
| | return "doris" |
| | if re.search(r"\bchairman\b", text, re.IGNORECASE): |
| | return "chairman_of_board" |
| | if re.search(r"\bmaga\b", text, re.IGNORECASE): |
| | return "maga_appointee" |
| | if re.search(r"\blawyer\b", text, re.IGNORECASE): |
| | return "lawyer" |
| | return "technical" |
| |
|
| |
|
| | def _qa_passed_check(qa_text: str) -> bool: |
| | """Return True only if the QA report contains an explicit PASS result. |
| | |
| | Relies on the structured 'RESULT: PASS / RESULT: FAIL' line produced by |
| | the QA Tester prompt. Returns False when the expected format is absent |
| | to avoid false positives from words like 'bypass' or 'password'. |
| | """ |
| | lower = qa_text.lower() |
| | if "result: pass" in lower: |
| | return True |
| | if "result: fail" in lower: |
| | return False |
| | |
| | return False |
| |
|
| |
|
| | def _parse_qa_role_feedback(qa_text: str) -> Dict[str, str]: |
| | """Extract per-role targeted feedback from a QA report. |
| | |
| | Looks for the ROLE-SPECIFIC FEEDBACK section produced by the QA Tester |
| | and parses bullet entries of the form 'โข Role Name: <feedback>'. |
| | Returns a dict mapping role keys (e.g. 'creative', 'technical') to the |
| | feedback string targeted at that role. |
| | """ |
| | feedback: Dict[str, str] = {} |
| | if "ROLE-SPECIFIC FEEDBACK:" not in qa_text: |
| | return feedback |
| |
|
| | |
| | section = qa_text.split("ROLE-SPECIFIC FEEDBACK:", 1)[1] |
| | for header in ("RESULT:", "RECOMMENDED FIXES:"): |
| | if header in section: |
| | section = section.split(header, 1)[0] |
| | break |
| |
|
| | |
| | for line in section.strip().splitlines(): |
| | line = line.strip().lstrip("โข-* ") |
| | if ":" not in line: |
| | continue |
| | role_label, _, role_feedback = line.partition(":") |
| | role_label = role_label.strip() |
| | role_feedback = role_feedback.strip() |
| | role_key = _ROLE_LABEL_TO_KEY.get(role_label) |
| | if role_key and role_feedback: |
| | feedback[role_key] = role_feedback |
| |
|
| | return feedback |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def _step_plan( |
| | chat_model, state: WorkflowState, trace: List[str], |
| | enabled_role_keys: Optional[List[str]] = None, |
| | ) -> WorkflowState: |
| | """Planner: analyse the task, produce a plan, decide which specialist to call.""" |
| | trace.append("\nโโโ [PLANNER] Analysing task... โโโ") |
| | fmt = state.get("output_format", "other") |
| | brevity = state.get("brevity_requirement", "normal") |
| | content = ( |
| | f"User request: {state['user_request']}\n" |
| | f"Required output format: {fmt}\n" |
| | f"Brevity requirement: {brevity}" |
| | ) |
| | if state["revision_count"] > 0: |
| | content += ( |
| | f"\n\nThis is revision {state['revision_count']} of {MAX_REVISIONS}." |
| | f"\nPrevious QA report:\n{state['qa_report']}" |
| | "\nAdjust the plan to address the QA issues." |
| | ) |
| | planner_system = _build_planner_system(enabled_role_keys or []) |
| | plan_text = _llm_call(chat_model, planner_system, content) |
| | state["plan"] = plan_text |
| | state["current_role"] = _decide_role(plan_text) |
| | trace.append(plan_text) |
| | trace.append(f"โโโ [PLANNER] โ routing to: {state['current_role'].upper()} EXPERT โโโ") |
| | return state |
| |
|
| |
|
| | def _step_creative(chat_model, state: WorkflowState, trace: List[str]) -> WorkflowState: |
| | """Creative Expert: brainstorm ideas and produce a recommended draft.""" |
| | trace.append("\nโโโ [CREATIVE EXPERT] Generating ideas... โโโ") |
| | content = ( |
| | f"User request: {state['user_request']}\n\n" |
| | f"Planner instructions:\n{state['plan']}" |
| | ) |
| | if state["revision_count"] > 0: |
| | role_feedback = state["qa_role_feedback"].get("creative", "") |
| | if role_feedback: |
| | content += f"\n\nQA feedback specific to your contribution:\n{role_feedback}" |
| | else: |
| | content += f"\n\nQA feedback to address:\n{state['qa_report']}" |
| | text = _llm_call(chat_model, _CREATIVE_SYSTEM, content) |
| | state["creative_output"] = text |
| | state["draft_output"] = text |
| | trace.append(text) |
| | trace.append("โโโ [CREATIVE EXPERT] Done โโโ") |
| | return state |
| |
|
| |
|
| | def _step_technical(chat_model, state: WorkflowState, trace: List[str]) -> WorkflowState: |
| | """Technical Expert: provide implementation details and a complete technical draft.""" |
| | trace.append("\nโโโ [TECHNICAL EXPERT] Working on implementation... โโโ") |
| | content = ( |
| | f"User request: {state['user_request']}\n\n" |
| | f"Planner instructions:\n{state['plan']}" |
| | ) |
| | if state["revision_count"] > 0: |
| | role_feedback = state["qa_role_feedback"].get("technical", "") |
| | if role_feedback: |
| | content += f"\n\nQA feedback specific to your contribution:\n{role_feedback}" |
| | else: |
| | content += f"\n\nQA feedback to address:\n{state['qa_report']}" |
| | text = _llm_call(chat_model, _TECHNICAL_SYSTEM, content) |
| | state["technical_output"] = text |
| | state["draft_output"] = text |
| | trace.append(text) |
| | trace.append("โโโ [TECHNICAL EXPERT] Done โโโ") |
| | return state |
| |
|
| |
|
| | def _step_qa( |
| | chat_model, |
| | state: WorkflowState, |
| | trace: List[str], |
| | all_outputs: Optional[List[Tuple[str, str]]] = None, |
| | evidence: Optional[EvidenceResult] = None, |
| | structured_contributions: Optional[Dict[str, StructuredContribution]] = None, |
| | ) -> WorkflowState: |
| | """QA Tester: validate the draft against the original request, success criteria, |
| | output format, brevity requirements, evidence grounding, and expert influence. |
| | |
| | When structured_contributions are provided, also checks that the final answer |
| | materially incorporates expert contributions (expert_influence check). |
| | Produces a structured QAResult stored in state['qa_structured']. |
| | """ |
| | trace.append("\nโโโ [QA TESTER] Reviewing output... โโโ") |
| |
|
| | |
| | state["draft_output"] = postprocess_format_fixes(state["draft_output"]) |
| |
|
| | fmt = state.get("output_format", "other") |
| | brevity = state.get("brevity_requirement", "normal") |
| | format_rules = get_qa_format_instruction(fmt, brevity) |
| |
|
| | content = ( |
| | f"Original user request: {state['user_request']}\n\n" |
| | f"Required output format: {fmt}\n" |
| | f"Brevity requirement: {brevity}\n\n" |
| | f"Planner's plan and success criteria:\n{state['plan']}\n\n" |
| | ) |
| | if format_rules: |
| | content += f"FORMAT VALIDATION RULES:\n{format_rules}\n\n" |
| |
|
| | |
| | if evidence is not None: |
| | content += f"{format_evidence_for_qa(evidence)}\n\n" |
| |
|
| | |
| | if structured_contributions: |
| | used = state.get("used_contributions", {}) |
| | traceability = format_contributions_for_qa(structured_contributions, used) |
| | content += f"{traceability}\n\n" |
| |
|
| | if all_outputs: |
| | content += "Individual specialist contributions:\n\n" |
| | for r_key, r_output in all_outputs: |
| | r_label = AGENT_ROLES.get(r_key, r_key) |
| | content += f"=== {r_label} ===\n{r_output}\n\n" |
| | content += f"Synthesized output to validate:\n{state['draft_output']}" |
| | else: |
| | content += f"Output to validate:\n{state['draft_output']}" |
| |
|
| | text = _llm_call(chat_model, _QA_SYSTEM, content) |
| | state["qa_report"] = text |
| |
|
| | |
| | qa_result = parse_structured_qa(text) |
| |
|
| | |
| | if structured_contributions: |
| | used = state.get("used_contributions", {}) |
| | influence_issues = check_expert_influence( |
| | structured_contributions, used, state["draft_output"] |
| | ) |
| | if influence_issues: |
| | for issue_msg in influence_issues: |
| | qa_result.issues.append(QAIssue( |
| | type="expert_influence", |
| | message=issue_msg, |
| | owner="synthesizer", |
| | )) |
| | if qa_result.passed: |
| | qa_result.status = "FAIL" |
| | qa_result.reason = ( |
| | qa_result.reason + " Expert influence check failed." |
| | if qa_result.reason else "Expert influence check failed." |
| | ) |
| | trace.append( |
| | f" โ Expert influence issues: {'; '.join(influence_issues)}" |
| | ) |
| |
|
| | state["qa_structured"] = qa_result.to_dict() |
| | state["qa_passed"] = qa_result.passed |
| |
|
| | |
| | state["qa_role_feedback"] = _parse_qa_role_feedback(text) |
| |
|
| | result_label = ("โ
PASS" if qa_result.status == "PASS" |
| | else "โ PASS_WITH_WARNINGS" if qa_result.passed_with_warnings |
| | else "โ FAIL") |
| | trace.append(text) |
| | if qa_result.warnings: |
| | trace.append(f" โ QA warnings: {'; '.join(qa_result.warnings)}") |
| | if qa_result.issues: |
| | issues_summary = "; ".join( |
| | f"{i.owner}: {i.message[:60]}{'โฆ' if len(i.message) > 60 else ''}" |
| | for i in qa_result.issues |
| | ) |
| | trace.append(f" โน QA issues: {issues_summary}") |
| | trace.append(f"โโโ [QA TESTER] Result: {result_label} โโโ") |
| | return state |
| |
|
| |
|
| | def _step_planner_review( |
| | chat_model, state: WorkflowState, trace: List[str], |
| | is_final_revision: bool = False, |
| | ) -> WorkflowState: |
| | """Planner: review QA feedback and either approve or request revision. |
| | |
| | QA-BINDING LOGIC (enforced in code, not just prompt): |
| | - If QA passed โ approve |
| | - If QA failed and revisions remain โ MUST revise (code blocks approval) |
| | - If QA failed and max revisions reached โ planner does final correction pass |
| | """ |
| | trace.append("\nโโโ [PLANNER] Reviewing QA feedback... โโโ") |
| |
|
| | fmt = state.get("output_format", "other") |
| | brevity = state.get("brevity_requirement", "normal") |
| |
|
| | content = ( |
| | f"User request: {state['user_request']}\n\n" |
| | f"Required output format: {fmt}\n" |
| | f"Brevity requirement: {brevity}\n\n" |
| | f"Plan:\n{state['plan']}\n\n" |
| | f"Current draft:\n{state['draft_output']}\n\n" |
| | f"QA report:\n{state['qa_report']}" |
| | ) |
| |
|
| | if is_final_revision: |
| | content += ( |
| | "\n\nThis is the FINAL revision. Max revisions reached. " |
| | "You MUST directly fix all QA issues in your FINAL ANSWER now." |
| | ) |
| |
|
| | review = _llm_call(chat_model, _PLANNER_REVIEW_SYSTEM, content) |
| | trace.append(review) |
| |
|
| | |
| | if state["qa_passed"]: |
| | |
| | parts = review.split("FINAL ANSWER:", 1) |
| | if len(parts) > 1: |
| | state["final_answer"] = parts[1].strip() |
| | else: |
| | state["final_answer"] = state["draft_output"] |
| | trace.append("โโโ [PLANNER] โ โ
APPROVED โโโ") |
| | elif is_final_revision: |
| | |
| | parts = review.split("FINAL ANSWER:", 1) |
| | if len(parts) > 1: |
| | state["final_answer"] = parts[1].strip() |
| | else: |
| | |
| | state["final_answer"] = state["draft_output"] |
| | trace.append("โโโ [PLANNER] โ โ
APPROVED (final revision, QA issues corrected) โโโ") |
| | else: |
| | |
| | |
| | if "DECISION: APPROVED" in review.upper(): |
| | trace.append(" โ Planner tried to approve a FAIL result โ overriding to REVISE (QA is binding)") |
| |
|
| | parts = review.split("REVISED INSTRUCTIONS:", 1) |
| | if len(parts) > 1: |
| | state["plan"] = parts[1].strip() |
| | else: |
| | trace.append(" โ REVISED INSTRUCTIONS missing; using QA correction instruction.") |
| | |
| | qa_data = state.get("qa_structured") |
| | if qa_data and qa_data.get("correction_instruction"): |
| | state["plan"] = qa_data["correction_instruction"] |
| |
|
| | state["current_role"] = _decide_role(review) |
| | trace.append( |
| | f"โโโ [PLANNER] โ ๐ REVISE โ routing to {state['current_role'].upper()} EXPERT โโโ" |
| | ) |
| | return state |
| |
|
| |
|
| | def _step_research( |
| | chat_model, |
| | state: WorkflowState, |
| | trace: List[str], |
| | evidence: Optional[EvidenceResult] = None, |
| | ) -> WorkflowState: |
| | """Research Analyst: summarise tool-retrieved evidence into structured findings. |
| | |
| | If an EvidenceResult is provided, it is injected into the prompt so the |
| | Research Analyst works from real retrieved data rather than generating text. |
| | If no evidence is available, the analyst is instructed to reason generally |
| | and avoid inventing specific citations. |
| | """ |
| | trace.append("\nโโโ [RESEARCH ANALYST] Gathering information... โโโ") |
| | content = ( |
| | f"User request: {state['user_request']}\n\n" |
| | f"Planner instructions:\n{state['plan']}" |
| | ) |
| |
|
| | |
| | if evidence and evidence.has_evidence: |
| | content += f"\n\n{format_evidence_for_prompt(evidence)}" |
| | trace.append(f" โน Evidence injected: {len(evidence.results)} items (confidence: {evidence.confidence})") |
| | else: |
| | content += ( |
| | "\n\nNo tool-retrieved evidence is available for this query.\n" |
| | "Provide general knowledge only. Do NOT invent specific citations, " |
| | "articles, studies, or examples." |
| | ) |
| |
|
| | if state["revision_count"] > 0: |
| | role_feedback = state["qa_role_feedback"].get("research", "") |
| | if role_feedback: |
| | content += f"\n\nQA feedback specific to your contribution:\n{role_feedback}" |
| | else: |
| | content += f"\n\nQA feedback to address:\n{state['qa_report']}" |
| | text = _llm_call(chat_model, _RESEARCH_SYSTEM, content) |
| | state["research_output"] = text |
| | state["draft_output"] = text |
| | trace.append(text) |
| | trace.append("โโโ [RESEARCH ANALYST] Done โโโ") |
| | return state |
| |
|
| |
|
| | def _step_security(chat_model, state: WorkflowState, trace: List[str]) -> WorkflowState: |
| | """Security Reviewer: analyse output for vulnerabilities and produce a secure revision.""" |
| | trace.append("\nโโโ [SECURITY REVIEWER] Analysing for security issues... โโโ") |
| | content = ( |
| | f"User request: {state['user_request']}\n\n" |
| | f"Planner instructions:\n{state['plan']}" |
| | ) |
| | if state["revision_count"] > 0: |
| | role_feedback = state["qa_role_feedback"].get("security", "") |
| | if role_feedback: |
| | content += f"\n\nQA feedback specific to your contribution:\n{role_feedback}" |
| | else: |
| | content += f"\n\nQA feedback to address:\n{state['qa_report']}" |
| | text = _llm_call(chat_model, _SECURITY_SYSTEM, content) |
| | state["security_output"] = text |
| | state["draft_output"] = text |
| | trace.append(text) |
| | trace.append("โโโ [SECURITY REVIEWER] Done โโโ") |
| | return state |
| |
|
| |
|
| | def _step_data_analyst(chat_model, state: WorkflowState, trace: List[str]) -> WorkflowState: |
| | """Data Analyst: analyse data, identify patterns, and produce actionable insights.""" |
| | trace.append("\nโโโ [DATA ANALYST] Analysing data and patterns... โโโ") |
| | content = ( |
| | f"User request: {state['user_request']}\n\n" |
| | f"Planner instructions:\n{state['plan']}" |
| | ) |
| | if state["revision_count"] > 0: |
| | role_feedback = state["qa_role_feedback"].get("data_analyst", "") |
| | if role_feedback: |
| | content += f"\n\nQA feedback specific to your contribution:\n{role_feedback}" |
| | else: |
| | content += f"\n\nQA feedback to address:\n{state['qa_report']}" |
| | text = _llm_call(chat_model, _DATA_ANALYST_SYSTEM, content) |
| | state["data_analyst_output"] = text |
| | state["draft_output"] = text |
| | trace.append(text) |
| | trace.append("โโโ [DATA ANALYST] Done โโโ") |
| | return state |
| |
|
| |
|
| | def _step_mad_professor(chat_model, state: WorkflowState, trace: List[str]) -> WorkflowState: |
| | """Mad Professor: propose radical, unhinged scientific theories and extreme hypotheses.""" |
| | trace.append("\nโโโ [MAD PROFESSOR] Unleashing radical scientific theories... โโโ") |
| | content = ( |
| | f"User request: {state['user_request']}\n\n" |
| | f"Planner instructions:\n{state['plan']}" |
| | ) |
| | if state["revision_count"] > 0: |
| | role_feedback = state["qa_role_feedback"].get("mad_professor", "") |
| | if role_feedback: |
| | content += f"\n\nQA feedback specific to your contribution:\n{role_feedback}" |
| | else: |
| | content += f"\n\nQA feedback to address:\n{state['qa_report']}" |
| | text = _llm_call(chat_model, _MAD_PROFESSOR_SYSTEM, content) |
| | state["mad_professor_output"] = text |
| | state["draft_output"] = text |
| | trace.append(text) |
| | trace.append("โโโ [MAD PROFESSOR] Done โโโ") |
| | return state |
| |
|
| |
|
| | def _step_accountant(chat_model, state: WorkflowState, trace: List[str]) -> WorkflowState: |
| | """Accountant: ruthlessly cut costs and find the cheapest possible approach.""" |
| | trace.append("\nโโโ [ACCOUNTANT] Auditing every cost... โโโ") |
| | content = ( |
| | f"User request: {state['user_request']}\n\n" |
| | f"Planner instructions:\n{state['plan']}" |
| | ) |
| | if state["revision_count"] > 0: |
| | role_feedback = state["qa_role_feedback"].get("accountant", "") |
| | if role_feedback: |
| | content += f"\n\nQA feedback specific to your contribution:\n{role_feedback}" |
| | else: |
| | content += f"\n\nQA feedback to address:\n{state['qa_report']}" |
| | text = _llm_call(chat_model, _ACCOUNTANT_SYSTEM, content) |
| | state["accountant_output"] = text |
| | state["draft_output"] = text |
| | trace.append(text) |
| | trace.append("โโโ [ACCOUNTANT] Done โโโ") |
| | return state |
| |
|
| |
|
| | def _step_artist(chat_model, state: WorkflowState, trace: List[str]) -> WorkflowState: |
| | """Artist: channel cosmic creative energy into wildly unhinged and spectacular ideas.""" |
| | trace.append("\nโโโ [ARTIST] Channelling cosmic creative energy... โโโ") |
| | content = ( |
| | f"User request: {state['user_request']}\n\n" |
| | f"Planner instructions:\n{state['plan']}" |
| | ) |
| | if state["revision_count"] > 0: |
| | role_feedback = state["qa_role_feedback"].get("artist", "") |
| | if role_feedback: |
| | content += f"\n\nQA feedback specific to your contribution:\n{role_feedback}" |
| | else: |
| | content += f"\n\nQA feedback to address:\n{state['qa_report']}" |
| | text = _llm_call(chat_model, _ARTIST_SYSTEM, content) |
| | state["artist_output"] = text |
| | state["draft_output"] = text |
| | trace.append(text) |
| | trace.append("โโโ [ARTIST] Done โโโ") |
| | return state |
| |
|
| |
|
| | def _step_lazy_slacker(chat_model, state: WorkflowState, trace: List[str]) -> WorkflowState: |
| | """Lazy Slacker: find the path of least resistance and the minimum viable effort.""" |
| | trace.append("\nโโโ [LAZY SLACKER] Doing as little as possible... โโโ") |
| | content = ( |
| | f"User request: {state['user_request']}\n\n" |
| | f"Planner instructions:\n{state['plan']}" |
| | ) |
| | if state["revision_count"] > 0: |
| | role_feedback = state["qa_role_feedback"].get("lazy_slacker", "") |
| | if role_feedback: |
| | content += f"\n\nQA feedback specific to your contribution:\n{role_feedback}" |
| | else: |
| | content += f"\n\nQA feedback to address:\n{state['qa_report']}" |
| | text = _llm_call(chat_model, _LAZY_SLACKER_SYSTEM, content) |
| | state["lazy_slacker_output"] = text |
| | state["draft_output"] = text |
| | trace.append(text) |
| | trace.append("โโโ [LAZY SLACKER] Done (finally) โโโ") |
| | return state |
| |
|
| |
|
| | def _step_black_metal_fundamentalist(chat_model, state: WorkflowState, trace: List[str]) -> WorkflowState: |
| | """Black Metal Fundamentalist: deliver a nihilistic, kvlt, uncompromising perspective.""" |
| | trace.append("\nโโโ [BLACK METAL FUNDAMENTALIST] Unleashing grim truths... โโโ") |
| | content = ( |
| | f"User request: {state['user_request']}\n\n" |
| | f"Planner instructions:\n{state['plan']}" |
| | ) |
| | if state["revision_count"] > 0: |
| | role_feedback = state["qa_role_feedback"].get("black_metal_fundamentalist", "") |
| | if role_feedback: |
| | content += f"\n\nQA feedback specific to your contribution:\n{role_feedback}" |
| | else: |
| | content += f"\n\nQA feedback to address:\n{state['qa_report']}" |
| | text = _llm_call(chat_model, _BLACK_METAL_FUNDAMENTALIST_SYSTEM, content) |
| | state["black_metal_fundamentalist_output"] = text |
| | state["draft_output"] = text |
| | trace.append(text) |
| | trace.append("โโโ [BLACK METAL FUNDAMENTALIST] Done โโโ") |
| | return state |
| |
|
| |
|
| | def _step_labour_union_rep(chat_model, state: WorkflowState, trace: List[str]) -> WorkflowState: |
| | """Labour Union Representative: advocate for worker rights, fair wages, and job security.""" |
| | trace.append("\nโโโ [LABOUR UNION REPRESENTATIVE] Standing up for workers... โโโ") |
| | content = ( |
| | f"User request: {state['user_request']}\n\n" |
| | f"Planner instructions:\n{state['plan']}" |
| | ) |
| | if state["revision_count"] > 0: |
| | role_feedback = state["qa_role_feedback"].get("labour_union_rep", "") |
| | if role_feedback: |
| | content += f"\n\nQA feedback specific to your contribution:\n{role_feedback}" |
| | else: |
| | content += f"\n\nQA feedback to address:\n{state['qa_report']}" |
| | text = _llm_call(chat_model, _LABOUR_UNION_REP_SYSTEM, content) |
| | state["labour_union_rep_output"] = text |
| | state["draft_output"] = text |
| | trace.append(text) |
| | trace.append("โโโ [LABOUR UNION REPRESENTATIVE] Done โโโ") |
| | return state |
| |
|
| |
|
| | def _step_ux_designer(chat_model, state: WorkflowState, trace: List[str]) -> WorkflowState: |
| | """UX Designer: analyse user needs and produce a user-centric recommendation.""" |
| | trace.append("\nโโโ [UX DESIGNER] Putting users first... โโโ") |
| | content = ( |
| | f"User request: {state['user_request']}\n\n" |
| | f"Planner instructions:\n{state['plan']}" |
| | ) |
| | if state["revision_count"] > 0: |
| | role_feedback = state["qa_role_feedback"].get("ux_designer", "") |
| | if role_feedback: |
| | content += f"\n\nQA feedback specific to your contribution:\n{role_feedback}" |
| | else: |
| | content += f"\n\nQA feedback to address:\n{state['qa_report']}" |
| | text = _llm_call(chat_model, _UX_DESIGNER_SYSTEM, content) |
| | state["ux_designer_output"] = text |
| | state["draft_output"] = text |
| | trace.append(text) |
| | trace.append("โโโ [UX DESIGNER] Done โโโ") |
| | return state |
| |
|
| |
|
| | def _step_doris(chat_model, state: WorkflowState, trace: List[str]) -> WorkflowState: |
| | """Doris: well-meaning but clueless โ rambles at length without adding much value.""" |
| | trace.append("\nโโโ [DORIS] Oh! Well, you know, I was just thinking... โโโ") |
| | content = ( |
| | f"User request: {state['user_request']}\n\n" |
| | f"Planner instructions:\n{state['plan']}" |
| | ) |
| | if state["revision_count"] > 0: |
| | role_feedback = state["qa_role_feedback"].get("doris", "") |
| | if role_feedback: |
| | content += f"\n\nQA feedback specific to your contribution:\n{role_feedback}" |
| | else: |
| | content += f"\n\nQA feedback to address:\n{state['qa_report']}" |
| | text = _llm_call(chat_model, _DORIS_SYSTEM, content) |
| | state["doris_output"] = text |
| | state["draft_output"] = text |
| | trace.append(text) |
| | trace.append("โโโ [DORIS] Anyway, where was I... Done โโโ") |
| | return state |
| |
|
| |
|
| | def _step_chairman_of_board(chat_model, state: WorkflowState, trace: List[str]) -> WorkflowState: |
| | """Chairman of the Board: provide strategic, shareholder-focused board-level direction.""" |
| | trace.append("\nโโโ [CHAIRMAN OF THE BOARD] Calling the meeting to order... โโโ") |
| | content = ( |
| | f"User request: {state['user_request']}\n\n" |
| | f"Planner instructions:\n{state['plan']}" |
| | ) |
| | if state["revision_count"] > 0: |
| | role_feedback = state["qa_role_feedback"].get("chairman_of_board", "") |
| | if role_feedback: |
| | content += f"\n\nQA feedback specific to your contribution:\n{role_feedback}" |
| | else: |
| | content += f"\n\nQA feedback to address:\n{state['qa_report']}" |
| | text = _llm_call(chat_model, _CHAIRMAN_SYSTEM, content) |
| | state["chairman_of_board_output"] = text |
| | state["draft_output"] = text |
| | trace.append(text) |
| | trace.append("โโโ [CHAIRMAN OF THE BOARD] Meeting adjourned โโโ") |
| | return state |
| |
|
| |
|
| | def _step_maga_appointee(chat_model, state: WorkflowState, trace: List[str]) -> WorkflowState: |
| | """MAGA Appointee: deliver an America First, pro-deregulation, anti-globalist perspective.""" |
| | trace.append("\nโโโ [MAGA APPOINTEE] America First! โโโ") |
| | content = ( |
| | f"User request: {state['user_request']}\n\n" |
| | f"Planner instructions:\n{state['plan']}" |
| | ) |
| | if state["revision_count"] > 0: |
| | role_feedback = state["qa_role_feedback"].get("maga_appointee", "") |
| | if role_feedback: |
| | content += f"\n\nQA feedback specific to your contribution:\n{role_feedback}" |
| | else: |
| | content += f"\n\nQA feedback to address:\n{state['qa_report']}" |
| | text = _llm_call(chat_model, _MAGA_APPOINTEE_SYSTEM, content) |
| | state["maga_appointee_output"] = text |
| | state["draft_output"] = text |
| | trace.append(text) |
| | trace.append("โโโ [MAGA APPOINTEE] Done โโโ") |
| | return state |
| |
|
| |
|
| | def _step_lawyer(chat_model, state: WorkflowState, trace: List[str]) -> WorkflowState: |
| | """Lawyer: analyse legal implications, liabilities, and compliance requirements.""" |
| | trace.append("\nโโโ [LAWYER] Reviewing legal implications... โโโ") |
| | content = ( |
| | f"User request: {state['user_request']}\n\n" |
| | f"Planner instructions:\n{state['plan']}" |
| | ) |
| | if state["revision_count"] > 0: |
| | role_feedback = state["qa_role_feedback"].get("lawyer", "") |
| | if role_feedback: |
| | content += f"\n\nQA feedback specific to your contribution:\n{role_feedback}" |
| | else: |
| | content += f"\n\nQA feedback to address:\n{state['qa_report']}" |
| | text = _llm_call(chat_model, _LAWYER_SYSTEM, content) |
| | state["lawyer_output"] = text |
| | state["draft_output"] = text |
| | trace.append(text) |
| | trace.append("โโโ [LAWYER] Done โ note: this is not formal legal advice โโโ") |
| | return state |
| |
|
| |
|
| | def _step_synthesize( |
| | chat_model, |
| | state: WorkflowState, |
| | trace: List[str], |
| | all_outputs: List[Tuple[str, str]], |
| | evidence: Optional[EvidenceResult] = None, |
| | structured_contributions: Optional[Dict[str, StructuredContribution]] = None, |
| | ) -> WorkflowState: |
| | """Synthesizer: produce the final user-facing answer from specialist contributions. |
| | |
| | When structured_contributions are provided, the synthesizer receives indexed |
| | contribution data and must produce a USED_CONTRIBUTIONS traceability block. |
| | Obeys the detected output format and brevity requirement strictly. |
| | If evidence is available, injects it so the synthesizer prefers grounded claims. |
| | """ |
| | trace.append("\nโโโ [SYNTHESIZER] Producing final answer... โโโ") |
| |
|
| | |
| | fmt = state.get("output_format", "other") |
| | brevity = state.get("brevity_requirement", "normal") |
| | format_instruction = get_synthesizer_format_instruction(fmt, brevity) |
| |
|
| | content = ( |
| | f"User request: {state['user_request']}\n\n" |
| | f"REQUIRED OUTPUT FORMAT: {fmt}\n" |
| | f"BREVITY: {brevity}\n\n" |
| | f"{format_instruction}\n\n" |
| | ) |
| |
|
| | |
| | if evidence and evidence.has_evidence: |
| | content += ( |
| | f"{format_evidence_for_prompt(evidence)}\n\n" |
| | ) |
| |
|
| | |
| | if structured_contributions: |
| | formatted = format_contributions_for_synthesizer(structured_contributions) |
| | content += formatted |
| | else: |
| | |
| | perspectives = [] |
| | for r_key, r_output in all_outputs: |
| | r_label = AGENT_ROLES.get(r_key, r_key) |
| | perspectives.append(f"=== {r_label} ===\n{r_output}") |
| | content += f"Specialist contributions:\n\n" + "\n\n".join(perspectives) |
| |
|
| | text = _llm_call(chat_model, _SYNTHESIZER_SYSTEM, content) |
| |
|
| | |
| | used = parse_used_contributions(text) |
| |
|
| | |
| | |
| | label_to_key = {v: k for k, v in AGENT_ROLES.items()} |
| | used = {label_to_key.get(k, k): v for k, v in used.items()} |
| |
|
| | state["used_contributions"] = used |
| |
|
| | |
| | draft = re.sub( |
| | r"\n*USED_CONTRIBUTIONS:\s*```json.*?```", |
| | "", text, flags=re.DOTALL, |
| | ).strip() |
| | |
| | draft = re.sub( |
| | r"\n*```json\s*\{[^}]*\"used_contributions\"[^}]*\}\s*```\s*$", |
| | "", draft, flags=re.DOTALL, |
| | ).strip() |
| |
|
| | state["synthesis_output"] = text |
| | state["draft_output"] = draft |
| | trace.append(draft[:500] + ("โฆ" if len(draft) > 500 else "")) |
| | if used: |
| | used_count = sum(len(v) for v in used.values()) |
| | trace.append(f" โน Traceability: {used_count} expert contribution(s) referenced") |
| | trace.append("โโโ [SYNTHESIZER] Done โโโ") |
| | return state |
| |
|
| |
|
| | |
| | _SPECIALIST_STEPS = { |
| | "creative": _step_creative, |
| | "technical": _step_technical, |
| | "research": _step_research, |
| | "security": _step_security, |
| | "data_analyst": _step_data_analyst, |
| | "mad_professor": _step_mad_professor, |
| | "accountant": _step_accountant, |
| | "artist": _step_artist, |
| | "lazy_slacker": _step_lazy_slacker, |
| | "black_metal_fundamentalist": _step_black_metal_fundamentalist, |
| | "labour_union_rep": _step_labour_union_rep, |
| | "ux_designer": _step_ux_designer, |
| | "doris": _step_doris, |
| | "chairman_of_board": _step_chairman_of_board, |
| | "maga_appointee": _step_maga_appointee, |
| | "lawyer": _step_lawyer, |
| | } |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | |
| | _workflow_model_id: str = DEFAULT_MODEL_ID |
| |
|
| | _EMPTY_STATE_BASE: WorkflowState = { |
| | "user_request": "", "plan": "", "current_role": "", |
| | "creative_output": "", "technical_output": "", |
| | "research_output": "", "security_output": "", "data_analyst_output": "", |
| | "mad_professor_output": "", "accountant_output": "", "artist_output": "", |
| | "lazy_slacker_output": "", "black_metal_fundamentalist_output": "", |
| | "labour_union_rep_output": "", "ux_designer_output": "", "doris_output": "", |
| | "chairman_of_board_output": "", "maga_appointee_output": "", "lawyer_output": "", |
| | "synthesis_output": "", |
| | "draft_output": "", "qa_report": "", "qa_role_feedback": {}, "qa_passed": False, |
| | "revision_count": 0, "final_answer": "", |
| | "output_format": "other", "brevity_requirement": "normal", "qa_structured": None, |
| | "task_assumptions": {}, "revision_instruction": "", |
| | "structured_contributions": {}, "used_contributions": {}, |
| | } |
| |
|
| |
|
| | @tool |
| | def call_creative_expert(task: str) -> str: |
| | """Call the Creative Expert to brainstorm ideas, framing, and produce a draft for a given task.""" |
| | chat = build_provider_chat(_workflow_model_id) |
| | state: WorkflowState = {**_EMPTY_STATE_BASE, "user_request": task, "plan": task, "current_role": "creative"} |
| | state = _step_creative(chat, state, []) |
| | return state["creative_output"] |
| |
|
| |
|
| | @tool |
| | def call_technical_expert(task: str) -> str: |
| | """Call the Technical Expert to produce implementation details and a solution for a given task.""" |
| | chat = build_provider_chat(_workflow_model_id) |
| | state: WorkflowState = {**_EMPTY_STATE_BASE, "user_request": task, "plan": task, "current_role": "technical"} |
| | state = _step_technical(chat, state, []) |
| | return state["technical_output"] |
| |
|
| |
|
| | @tool |
| | def call_qa_tester(task_and_output: str) -> str: |
| | """Call the QA Tester to review specialist output against requirements. |
| | Input format: 'TASK: <description>\nOUTPUT: <specialist output to review>'""" |
| | chat = build_provider_chat(_workflow_model_id) |
| | if "OUTPUT:" in task_and_output: |
| | parts = task_and_output.split("OUTPUT:", 1) |
| | task = parts[0].replace("TASK:", "").strip() |
| | output = parts[1].strip() |
| | else: |
| | task = task_and_output |
| | output = task_and_output |
| | |
| | state: WorkflowState = {**_EMPTY_STATE_BASE, "user_request": task, "plan": task, "draft_output": output} |
| | state = _step_qa(chat, state, []) |
| | return state["qa_report"] |
| |
|
| |
|
| | @tool |
| | def call_research_analyst(task: str) -> str: |
| | """Call the Research Analyst to gather information and summarize findings for a given task.""" |
| | chat = build_provider_chat(_workflow_model_id) |
| | state: WorkflowState = {**_EMPTY_STATE_BASE, "user_request": task, "plan": task, "current_role": "research"} |
| | state = _step_research(chat, state, []) |
| | return state["research_output"] |
| |
|
| |
|
| | @tool |
| | def call_security_reviewer(task: str) -> str: |
| | """Call the Security Reviewer to analyse output for vulnerabilities and security best practices.""" |
| | chat = build_provider_chat(_workflow_model_id) |
| | state: WorkflowState = {**_EMPTY_STATE_BASE, "user_request": task, "plan": task, "current_role": "security"} |
| | state = _step_security(chat, state, []) |
| | return state["security_output"] |
| |
|
| |
|
| | @tool |
| | def call_data_analyst(task: str) -> str: |
| | """Call the Data Analyst to analyse data, identify patterns, and provide actionable insights.""" |
| | chat = build_provider_chat(_workflow_model_id) |
| | state: WorkflowState = {**_EMPTY_STATE_BASE, "user_request": task, "plan": task, "current_role": "data_analyst"} |
| | state = _step_data_analyst(chat, state, []) |
| | return state["data_analyst_output"] |
| |
|
| |
|
| | @tool |
| | def call_mad_professor(task: str) -> str: |
| | """Call the Mad Professor to generate radical, unhinged scientific theories and extreme groundbreaking hypotheses for a given task.""" |
| | chat = build_provider_chat(_workflow_model_id) |
| | state: WorkflowState = {**_EMPTY_STATE_BASE, "user_request": task, "plan": task, "current_role": "mad_professor"} |
| | state = _step_mad_professor(chat, state, []) |
| | return state["mad_professor_output"] |
| |
|
| |
|
| | @tool |
| | def call_accountant(task: str) -> str: |
| | """Call the Accountant to ruthlessly analyse and cut costs, finding the cheapest possible approach regardless of quality.""" |
| | chat = build_provider_chat(_workflow_model_id) |
| | state: WorkflowState = {**_EMPTY_STATE_BASE, "user_request": task, "plan": task, "current_role": "accountant"} |
| | state = _step_accountant(chat, state, []) |
| | return state["accountant_output"] |
| |
|
| |
|
| | @tool |
| | def call_artist(task: str) -> str: |
| | """Call the Artist to channel cosmic creative energy into wildly unhinged and spectacular ideas without concern for cost or practicality.""" |
| | chat = build_provider_chat(_workflow_model_id) |
| | state: WorkflowState = {**_EMPTY_STATE_BASE, "user_request": task, "plan": task, "current_role": "artist"} |
| | state = _step_artist(chat, state, []) |
| | return state["artist_output"] |
| |
|
| |
|
| | @tool |
| | def call_lazy_slacker(task: str) -> str: |
| | """Call the Lazy Slacker to find the minimum viable effort and the easiest possible way out of a task.""" |
| | chat = build_provider_chat(_workflow_model_id) |
| | state: WorkflowState = {**_EMPTY_STATE_BASE, "user_request": task, "plan": task, "current_role": "lazy_slacker"} |
| | state = _step_lazy_slacker(chat, state, []) |
| | return state["lazy_slacker_output"] |
| |
|
| |
|
| | @tool |
| | def call_black_metal_fundamentalist(task: str) -> str: |
| | """Call the Black Metal Fundamentalist for a nihilistic, kvlt, uncompromising critique and manifesto-style response.""" |
| | chat = build_provider_chat(_workflow_model_id) |
| | state: WorkflowState = {**_EMPTY_STATE_BASE, "user_request": task, "plan": task, "current_role": "black_metal_fundamentalist"} |
| | state = _step_black_metal_fundamentalist(chat, state, []) |
| | return state["black_metal_fundamentalist_output"] |
| |
|
| |
|
| | @tool |
| | def call_labour_union_rep(task: str) -> str: |
| | """Call the Labour Union Representative to advocate for worker rights, fair wages, and job security.""" |
| | chat = build_provider_chat(_workflow_model_id) |
| | state: WorkflowState = {**_EMPTY_STATE_BASE, "user_request": task, "plan": task, "current_role": "labour_union_rep"} |
| | state = _step_labour_union_rep(chat, state, []) |
| | return state["labour_union_rep_output"] |
| |
|
| |
|
| | @tool |
| | def call_ux_designer(task: str) -> str: |
| | """Call the UX Designer to analyse user needs and produce a user-centric recommendation.""" |
| | chat = build_provider_chat(_workflow_model_id) |
| | state: WorkflowState = {**_EMPTY_STATE_BASE, "user_request": task, "plan": task, "current_role": "ux_designer"} |
| | state = _step_ux_designer(chat, state, []) |
| | return state["ux_designer_output"] |
| |
|
| |
|
| | @tool |
| | def call_doris(task: str) -> str: |
| | """Call Doris โ well-meaning but clueless โ for a rambling, off-topic perspective that misses the point entirely.""" |
| | chat = build_provider_chat(_workflow_model_id) |
| | state: WorkflowState = {**_EMPTY_STATE_BASE, "user_request": task, "plan": task, "current_role": "doris"} |
| | state = _step_doris(chat, state, []) |
| | return state["doris_output"] |
| |
|
| |
|
| | @tool |
| | def call_chairman_of_board(task: str) -> str: |
| | """Call the Chairman of the Board for a strategic, shareholder-focused, board-level perspective.""" |
| | chat = build_provider_chat(_workflow_model_id) |
| | state: WorkflowState = {**_EMPTY_STATE_BASE, "user_request": task, "plan": task, "current_role": "chairman_of_board"} |
| | state = _step_chairman_of_board(chat, state, []) |
| | return state["chairman_of_board_output"] |
| |
|
| |
|
| | @tool |
| | def call_maga_appointee(task: str) -> str: |
| | """Call the MAGA Appointee for an America First, pro-deregulation, anti-globalist perspective.""" |
| | chat = build_provider_chat(_workflow_model_id) |
| | state: WorkflowState = {**_EMPTY_STATE_BASE, "user_request": task, "plan": task, "current_role": "maga_appointee"} |
| | state = _step_maga_appointee(chat, state, []) |
| | return state["maga_appointee_output"] |
| |
|
| |
|
| | @tool |
| | def call_lawyer(task: str) -> str: |
| | """Call the Lawyer to analyse legal implications, liabilities, and compliance requirements. Not formal legal advice.""" |
| | chat = build_provider_chat(_workflow_model_id) |
| | state: WorkflowState = {**_EMPTY_STATE_BASE, "user_request": task, "plan": task, "current_role": "lawyer"} |
| | state = _step_lawyer(chat, state, []) |
| | return state["lawyer_output"] |
| |
|
| |
|
| | |
| |
|
| | def run_multi_role_workflow( |
| | message: str, |
| | model_id: str, |
| | active_role_labels: Optional[List[str]] = None, |
| | config: Optional[WorkflowConfig] = None, |
| | ) -> Tuple[str, str]: |
| | """Run the strict plannerโspecialistโsynthesizerโQA workflow. |
| | |
| | Flow: |
| | 1. Classify task and detect output format / brevity. |
| | 2. Retrieve evidence via tool adapters (for factual tasks). |
| | 3. Planner analyses the task and picks a primary specialist. |
| | 4. Dynamic role selection filters to only relevant specialists. |
| | 5. Selected specialists run (research analyst gets evidence injected). |
| | 6. Synthesizer produces a format-aware, evidence-grounded answer. |
| | 7. QA validates against format, brevity, evidence, and correctness. |
| | 8. QA-BINDING: if FAIL, planner MUST revise. Code enforces this. |
| | 9. Targeted revisions with escalation on repeated failures. |
| | 10. Final answer is compressed and stripped of internal noise. |
| | |
| | Args: |
| | message: The user's task or request. |
| | model_id: HuggingFace model ID to use. |
| | active_role_labels: Display names of active agent roles. |
| | config: WorkflowConfig controlling strict_mode, persona roles, etc. |
| | |
| | Returns: |
| | (final_answer, workflow_trace_text) |
| | """ |
| | global _workflow_model_id |
| | _workflow_model_id = model_id |
| | chat_model = build_provider_chat(model_id) |
| | if config is None: |
| | config = DEFAULT_CONFIG |
| |
|
| | |
| | if active_role_labels is None: |
| | active_role_labels = list(AGENT_ROLES.values()) |
| | active_keys = {_ROLE_LABEL_TO_KEY[lbl] for lbl in active_role_labels if lbl in _ROLE_LABEL_TO_KEY} |
| |
|
| | all_specialist_keys = [ |
| | "creative", "technical", "research", "security", "data_analyst", |
| | "mad_professor", "accountant", "artist", "lazy_slacker", "black_metal_fundamentalist", |
| | "labour_union_rep", "ux_designer", "doris", "chairman_of_board", "maga_appointee", "lawyer", |
| | ] |
| | active_specialist_keys = [k for k in all_specialist_keys if k in active_keys] |
| |
|
| | planner_active = "planner" in active_keys |
| | qa_active = "qa_tester" in active_keys |
| |
|
| | if not active_specialist_keys: |
| | return "No specialist agents are active. Please enable at least one specialist role.", "" |
| |
|
| | |
| | task_category = classify_task(message) |
| | output_format = detect_output_format(message) |
| | brevity = detect_brevity_requirement(message) |
| | needs_evidence = task_needs_evidence(task_category) and config.require_evidence_for_factual_claims |
| |
|
| | |
| | planner_state = PlannerState( |
| | user_request=message, |
| | task_category=task_category, |
| | output_format=output_format, |
| | brevity_requirement=brevity, |
| | max_revisions=MAX_REVISIONS, |
| | ) |
| | planner_state.record_event("init", f"category={task_category}, format={output_format}, brevity={brevity}") |
| |
|
| | state: WorkflowState = { |
| | "user_request": message, |
| | "plan": "", |
| | "current_role": "", |
| | "creative_output": "", |
| | "technical_output": "", |
| | "research_output": "", |
| | "security_output": "", |
| | "data_analyst_output": "", |
| | "mad_professor_output": "", |
| | "accountant_output": "", |
| | "artist_output": "", |
| | "lazy_slacker_output": "", |
| | "black_metal_fundamentalist_output": "", |
| | "labour_union_rep_output": "", |
| | "ux_designer_output": "", |
| | "doris_output": "", |
| | "chairman_of_board_output": "", |
| | "maga_appointee_output": "", |
| | "lawyer_output": "", |
| | "synthesis_output": "", |
| | "draft_output": "", |
| | "qa_report": "", |
| | "qa_role_feedback": {}, |
| | "qa_passed": False, |
| | "revision_count": 0, |
| | "final_answer": "", |
| | "output_format": output_format, |
| | "brevity_requirement": brevity, |
| | "qa_structured": None, |
| | "task_assumptions": {}, |
| | "revision_instruction": "", |
| | "structured_contributions": {}, |
| | "used_contributions": {}, |
| | } |
| |
|
| | trace: List[str] = [ |
| | "โโโ MULTI-ROLE WORKFLOW STARTED โโโ", |
| | f"Model : {model_id}", |
| | f"Request : {message}", |
| | f"Task category: {task_category}", |
| | f"Output format: {output_format}", |
| | f"Brevity: {brevity}", |
| | f"Evidence required: {needs_evidence}", |
| | f"Strict mode: {config.strict_mode}", |
| | f"Allow persona roles: {config.allow_persona_roles}", |
| | f"Max specialists per task: {config.max_specialists_per_task}", |
| | f"Max revisions: {MAX_REVISIONS}", |
| | ] |
| |
|
| | |
| | evidence: Optional[EvidenceResult] = None |
| | if needs_evidence: |
| | try: |
| | adapters = _build_research_adapters() |
| | queries = extract_search_queries(message) |
| | evidence = gather_evidence(queries, adapters) |
| | planner_state.evidence = evidence.to_dict() |
| | trace.append( |
| | f"\n[EVIDENCE RETRIEVAL] {len(evidence.results)} items retrieved " |
| | f"(confidence: {evidence.confidence}) for queries: {queries}" |
| | ) |
| | planner_state.record_event("evidence", f"items={len(evidence.results)}, confidence={evidence.confidence}") |
| | except Exception as exc: |
| | trace.append(f"\n[EVIDENCE RETRIEVAL] Tool error: {exc} โ proceeding without evidence") |
| | evidence = EvidenceResult(query=message) |
| | planner_state.record_event("evidence_error", str(exc)) |
| |
|
| | try: |
| | if planner_active: |
| | state = _step_plan(chat_model, state, trace, |
| | enabled_role_keys=active_specialist_keys) |
| |
|
| | |
| | assumptions = parse_task_assumptions(state["plan"]) |
| | if assumptions: |
| | state["task_assumptions"] = assumptions |
| | planner_state.task_assumptions = assumptions |
| | trace.append(f"[ASSUMPTIONS] {len(assumptions)} shared assumption(s) set: " |
| | + ", ".join(f"{k}={v}" for k, v in assumptions.items())) |
| | else: |
| | state["current_role"] = active_specialist_keys[0] |
| | state["plan"] = message |
| | trace.append( |
| | f"\n[Planner disabled] Auto-routing to: {state['current_role'].upper()}" |
| | ) |
| |
|
| | |
| | if config.strict_mode: |
| | selected_roles = select_relevant_roles( |
| | message, active_specialist_keys, config, task_category=task_category |
| | ) |
| | else: |
| | selected_roles = active_specialist_keys |
| |
|
| | |
| | primary_role = state["current_role"] |
| | if primary_role in active_specialist_keys and primary_role not in selected_roles: |
| | selected_roles.insert(0, primary_role) |
| |
|
| | |
| | if len(selected_roles) > config.max_specialists_per_task: |
| | |
| | if needs_evidence and "research" in selected_roles: |
| | non_research = [r for r in selected_roles if r != "research"] |
| | selected_roles = non_research[:config.max_specialists_per_task - 1] + ["research"] |
| | else: |
| | selected_roles = selected_roles[:config.max_specialists_per_task] |
| |
|
| | planner_state.selected_roles = selected_roles |
| | trace.append( |
| | f"\n[ROLE SELECTION] {len(selected_roles)} specialist(s) selected: " |
| | + ", ".join(AGENT_ROLES.get(k, k) for k in selected_roles) |
| | ) |
| | |
| | if hasattr(selected_roles, 'format_trace'): |
| | trace.append(selected_roles.format_trace(AGENT_ROLES)) |
| |
|
| | |
| | if primary_role not in selected_roles: |
| | primary_role = selected_roles[0] |
| | state["current_role"] = primary_role |
| |
|
| | |
| | assumptions_ctx = format_assumptions_for_prompt(state.get("task_assumptions", {})) |
| |
|
| | def _run_specialist(role_key): |
| | """Run a single specialist, injecting evidence and assumptions as needed.""" |
| | if role_key == "research" and evidence: |
| | return _step_research(chat_model, state, trace, evidence=evidence) |
| | step_fn = _SPECIALIST_STEPS.get(role_key, _step_technical) |
| | |
| | if assumptions_ctx and assumptions_ctx not in state["plan"]: |
| | state["plan"] = state["plan"] + "\n\n" + assumptions_ctx |
| | return step_fn(chat_model, state, trace) |
| |
|
| | |
| | state = _run_specialist(primary_role) |
| | primary_output = state["draft_output"] |
| | planner_state.specialist_outputs[primary_role] = primary_output[:500] |
| |
|
| | |
| | structured_contributions: Dict[str, StructuredContribution] = {} |
| | contrib = parse_structured_contribution( |
| | primary_output, AGENT_ROLES.get(primary_role, primary_role) |
| | ) |
| | structured_contributions[primary_role] = contrib |
| |
|
| | all_outputs: List[Tuple[str, str]] = [(primary_role, primary_output)] |
| | for specialist_role in selected_roles: |
| | if specialist_role == primary_role: |
| | continue |
| | state = _run_specialist(specialist_role) |
| | output = state["draft_output"] |
| | all_outputs.append((specialist_role, output)) |
| | planner_state.specialist_outputs[specialist_role] = output[:500] |
| | |
| | contrib = parse_structured_contribution( |
| | output, AGENT_ROLES.get(specialist_role, specialist_role) |
| | ) |
| | structured_contributions[specialist_role] = contrib |
| |
|
| | |
| | state["structured_contributions"] = { |
| | k: v.to_dict() for k, v in structured_contributions.items() |
| | } |
| | trace.append( |
| | f"\n[CONTRIBUTIONS] {len(structured_contributions)} structured contribution(s) parsed" |
| | ) |
| |
|
| | |
| | state = _step_synthesize(chat_model, state, trace, all_outputs, |
| | evidence=evidence, |
| | structured_contributions=structured_contributions) |
| |
|
| | |
| | fmt_violations = validate_output_format( |
| | state["draft_output"], output_format, brevity |
| | ) |
| | if fmt_violations: |
| | trace.append( |
| | "\n[FORMAT VALIDATION] Violations detected before QA:\n" |
| | + "\n".join(f" - {v}" for v in fmt_violations) |
| | ) |
| | |
| | violation_instr = format_violations_instruction(fmt_violations) |
| | state["plan"] = state["plan"] + "\n\n" + violation_instr |
| | state = _step_synthesize(chat_model, state, trace, all_outputs, |
| | evidence=evidence, |
| | structured_contributions=structured_contributions) |
| | planner_state.record_event("format_rewrite", "; ".join(fmt_violations)) |
| | trace.append("[FORMAT VALIDATION] Re-synthesized to fix format violations.") |
| |
|
| | |
| | |
| | while True: |
| | |
| | if qa_active: |
| | state = _step_qa(chat_model, state, trace, all_outputs, |
| | evidence=evidence, |
| | structured_contributions=structured_contributions) |
| | else: |
| | state["qa_passed"] = True |
| | state["qa_report"] = "QA Tester is disabled โ skipping quality review." |
| | state["qa_structured"] = {"status": "PASS", "reason": "", "issues": [], "warnings": [], "correction_instruction": ""} |
| | trace.append("\n[QA Tester disabled] Skipping quality review โ auto-pass.") |
| |
|
| | |
| | planner_state.current_draft = state["draft_output"] |
| | qa_result = parse_structured_qa(state["qa_report"]) if qa_active else QAResult(status="PASS") |
| | planner_state.qa_result = qa_result |
| | planner_state.record_event("qa", f"status={qa_result.status}") |
| |
|
| | |
| | if not qa_result.passed: |
| | planner_state.record_failure(qa_result) |
| |
|
| | |
| | if planner_active and qa_active: |
| | is_final_revision = (state["revision_count"] + 1 >= MAX_REVISIONS) and not state["qa_passed"] |
| | state = _step_planner_review(chat_model, state, trace, is_final_revision=is_final_revision) |
| |
|
| | if state["final_answer"]: |
| | planner_state.final_answer = state["final_answer"] |
| | trace.append("\nโโโ WORKFLOW COMPLETE โ APPROVED โโโ") |
| | break |
| |
|
| | |
| | |
| | revision_instr = "" |
| | if "REVISED INSTRUCTIONS:" in state.get("plan", ""): |
| | revision_instr = state["plan"] |
| | elif qa_result.correction_instruction: |
| | revision_instr = qa_result.correction_instruction |
| | state["revision_instruction"] = revision_instr |
| | planner_state.revision_instruction = revision_instr |
| | planner_state.record_event("revision_instruction_stored", |
| | revision_instr[:200] if revision_instr else "MISSING") |
| |
|
| | state["revision_count"] += 1 |
| | planner_state.revision_count = state["revision_count"] |
| |
|
| | if state["revision_count"] >= MAX_REVISIONS: |
| | |
| | trace.append( |
| | f"\nโโโ MAX REVISIONS ({MAX_REVISIONS}) โ FINAL CORRECTION PASS โโโ" |
| | ) |
| | state = _step_planner_review(chat_model, state, trace, is_final_revision=True) |
| | if not state["final_answer"]: |
| | state["final_answer"] = state["draft_output"] |
| | planner_state.final_answer = state["final_answer"] |
| | trace.append("\nโโโ WORKFLOW COMPLETE โ MAX REVISIONS โโโ") |
| | break |
| |
|
| | |
| | escalation = planner_state.get_escalation_strategy() |
| | if escalation != "none": |
| | trace.append(f"\n[ESCALATION] Strategy: {escalation}") |
| | planner_state.record_event("escalation", escalation) |
| |
|
| | if escalation == "suppress_role": |
| | suppress = planner_state.get_roles_to_suppress() |
| | for role_label in suppress: |
| | role_key = _ROLE_LABEL_TO_KEY.get(role_label) |
| | if role_key and role_key in selected_roles: |
| | selected_roles.remove(role_key) |
| | trace.append(f" โ Suppressed role: {role_label} (repeated failures)") |
| | if not selected_roles: |
| | selected_roles = [primary_role] |
| |
|
| | elif escalation == "rewrite_from_state": |
| | trace.append(" โ Synthesizer will rewrite from state instead of reusing draft") |
| | state["draft_output"] = "" |
| |
|
| | elif escalation == "narrow_scope": |
| | if len(selected_roles) > 1: |
| | selected_roles = [selected_roles[0]] |
| | trace.append(f" โ Narrowed to single specialist: {selected_roles[0]}") |
| |
|
| | |
| | revision_targets = identify_revision_targets(qa_result, _ROLE_LABEL_TO_KEY) |
| | trace.append( |
| | f"\nโโโ REVISION {state['revision_count']} / {MAX_REVISIONS} โโโ\n" |
| | f"Targeted roles: {', '.join(revision_targets)}" |
| | ) |
| | planner_state.record_event("revision", f"targets={revision_targets}") |
| |
|
| | |
| | rerun_specialists = [ |
| | t for t in revision_targets |
| | if t in _SPECIALIST_STEPS and t in selected_roles |
| | ] |
| | rerun_synthesizer = "synthesizer" in revision_targets or bool(rerun_specialists) |
| |
|
| | if rerun_specialists: |
| | new_outputs = [] |
| | for rk in rerun_specialists: |
| | state = _run_specialist(rk) |
| | new_outputs.append((rk, state["draft_output"])) |
| | planner_state.specialist_outputs[rk] = state["draft_output"][:500] |
| | |
| | contrib = parse_structured_contribution( |
| | state["draft_output"], |
| | AGENT_ROLES.get(rk, rk), |
| | ) |
| | structured_contributions[rk] = contrib |
| |
|
| | |
| | updated_keys = {rk for rk, _ in new_outputs} |
| | all_outputs = [ |
| | (rk, out) for rk, out in all_outputs if rk not in updated_keys |
| | ] + new_outputs |
| |
|
| | |
| | state["structured_contributions"] = { |
| | k: v.to_dict() for k, v in structured_contributions.items() |
| | } |
| |
|
| | if rerun_synthesizer or rerun_specialists: |
| | state = _step_synthesize(chat_model, state, trace, all_outputs, |
| | evidence=evidence, |
| | structured_contributions=structured_contributions) |
| |
|
| | |
| | fmt_violations = validate_output_format( |
| | state["draft_output"], output_format, brevity |
| | ) |
| | if fmt_violations: |
| | trace.append( |
| | "\n[FORMAT VALIDATION] Post-revision violations:\n" |
| | + "\n".join(f" - {v}" for v in fmt_violations) |
| | ) |
| | violation_instr = format_violations_instruction(fmt_violations) |
| | state["plan"] = state["plan"] + "\n\n" + violation_instr |
| | state = _step_synthesize(chat_model, state, trace, all_outputs, |
| | evidence=evidence, |
| | structured_contributions=structured_contributions) |
| |
|
| | |
| | continue |
| |
|
| | else: |
| | |
| | state["final_answer"] = state["draft_output"] |
| | planner_state.final_answer = state["final_answer"] |
| | trace.append("\nโโโ WORKFLOW COMPLETE โโโ") |
| | break |
| |
|
| | except Exception as exc: |
| | trace.append(f"\n[ERROR] {exc}\n{traceback.format_exc()}") |
| | state["final_answer"] = state["draft_output"] or f"Workflow error: {exc}" |
| |
|
| | |
| | raw_answer = state["final_answer"] |
| | final_answer = compress_final_answer(raw_answer, output_format, brevity, message) |
| | final_answer = strip_internal_noise(final_answer) |
| |
|
| | return final_answer, "\n".join(trace) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def build_agent(model_id: str, selected_tool_names: List[str]): |
| | tool_key = tuple(sorted(selected_tool_names)) |
| | cache_key = (model_id, tool_key) |
| |
|
| | if cache_key in AGENT_CACHE: |
| | return AGENT_CACHE[cache_key] |
| |
|
| | tools = [ALL_TOOLS[name] for name in selected_tool_names if name in ALL_TOOLS] |
| | chat_model = build_provider_chat(model_id) |
| |
|
| | system_prompt = ( |
| | "You are an assistant with tool access. " |
| | "Use math tools for calculations. " |
| | "Use Wikipedia for stable facts. " |
| | "Use web search for recent or changing information. " |
| | "Use arXiv for research papers. " |
| | "Use stock tools for financial data. " |
| | "Generate charts when the user asks for trends or plots. " |
| | "If a needed tool is unavailable, say so plainly. " |
| | f"You are currently running with provider-backed model='{model_id}'. " |
| | "After using tools, always provide a final natural-language answer. " |
| | "Do not stop after only issuing a tool call. " |
| | "Be concise." |
| | ) |
| |
|
| | agent = create_agent( |
| | model=chat_model, |
| | tools=tools, |
| | system_prompt=system_prompt, |
| | ) |
| | AGENT_CACHE[cache_key] = agent |
| | return agent |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def classify_backend_error(model_id: str, err: Exception) -> str: |
| | text = str(err) |
| |
|
| | if isinstance(err, HfHubHTTPError): |
| | if "model_not_supported" in text or "not supported by any provider" in text: |
| | RUNTIME_HEALTH[model_id] = "unavailable" |
| | return "This model exists on Hugging Face, but it is not supported by the provider route used by this app." |
| | if "401" in text or "403" in text: |
| | RUNTIME_HEALTH[model_id] = "gated" |
| | return "This model is not accessible with the current Hugging Face token." |
| | if "429" in text: |
| | RUNTIME_HEALTH[model_id] = "rate_limited" |
| | return "This model is being rate-limited right now. Try again shortly or switch model." |
| | if "404" in text: |
| | RUNTIME_HEALTH[model_id] = "unavailable" |
| | return "This model is not available on the current Hugging Face inference route." |
| |
|
| | RUNTIME_HEALTH[model_id] = "error" |
| | return f"Provider error: {err}" |
| |
|
| | RUNTIME_HEALTH[model_id] = "error" |
| | return f"Runtime error: {err}" |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def build_debug_report( |
| | model_id: str, |
| | message: str, |
| | selected_tools: List[str], |
| | messages: List[object], |
| | final_answer: str, |
| | last_nonempty_ai: Optional[str], |
| | last_tool_content: Optional[str], |
| | chart_path: Optional[str], |
| | ) -> str: |
| | lines = [] |
| | lines.append("=== DEBUG REPORT ===") |
| | lines.append(f"model_id: {model_id}") |
| | lines.append(f"user_message: {message}") |
| | lines.append(f"selected_tools: {selected_tools}") |
| | lines.append(f"client_location_value: {repr(_client_location.get())}") |
| | lines.append(f"message_count: {len(messages)}") |
| | lines.append(f"chart_path: {chart_path}") |
| | lines.append("") |
| |
|
| | for i, msg in enumerate(messages): |
| | msg_type = getattr(msg, "type", type(msg).__name__) |
| | raw_content = getattr(msg, "content", "") |
| | text_content = content_to_text(raw_content) |
| | tool_calls = getattr(msg, "tool_calls", None) |
| |
|
| | lines.append(f"--- message[{i}] ---") |
| | lines.append(f"type: {msg_type}") |
| | lines.append(f"content_empty: {not bool(text_content.strip())}") |
| | lines.append(f"content_preview: {short_text(text_content, 500)}") |
| |
|
| | if tool_calls: |
| | lines.append(f"tool_calls: {tool_calls}") |
| |
|
| | additional_kwargs = getattr(msg, "additional_kwargs", None) |
| | if additional_kwargs: |
| | lines.append(f"additional_kwargs: {additional_kwargs}") |
| |
|
| | response_metadata = getattr(msg, "response_metadata", None) |
| | if response_metadata: |
| | lines.append(f"response_metadata: {response_metadata}") |
| |
|
| | lines.append("") |
| |
|
| | lines.append("=== SUMMARY ===") |
| | lines.append(f"last_nonempty_ai: {short_text(last_nonempty_ai or '', 500)}") |
| | lines.append(f"last_tool_content: {short_text(last_tool_content or '', 500)}") |
| | lines.append(f"final_answer: {short_text(final_answer or '', 500)}") |
| |
|
| | if not final_answer or not final_answer.strip(): |
| | lines.append("warning: final_answer is empty") |
| | if not last_nonempty_ai and last_tool_content: |
| | lines.append("warning: model returned tool output but no final AI text") |
| | if not last_nonempty_ai and not last_tool_content: |
| | lines.append("warning: neither AI text nor tool content was recovered") |
| |
|
| | return "\n".join(lines) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def run_agent(message, history, selected_tools, model_id, client_ip: str = ""): |
| | history = history or [] |
| |
|
| | |
| | _client_location.set(client_ip.strip() if client_ip else "") |
| |
|
| | if not message or not str(message).strip(): |
| | return history, "No input provided.", "", None, model_status_text(model_id), "No input provided." |
| |
|
| | if not selected_tools: |
| | history.append({"role": "user", "content": message}) |
| | history.append({"role": "assistant", "content": "No tools are enabled. Please enable at least one tool."}) |
| | return history, "No tools enabled.", "", None, model_status_text(model_id), "No tools enabled." |
| |
|
| | chart_path = None |
| | debug_report = "" |
| |
|
| | try: |
| | agent = build_agent(model_id, selected_tools) |
| | response = agent.invoke( |
| | {"messages": [{"role": "user", "content": message}]} |
| | ) |
| |
|
| | messages = response.get("messages", []) |
| | tool_lines = [] |
| |
|
| | last_nonempty_ai = None |
| | last_tool_content = None |
| |
|
| | for msg in messages: |
| | msg_type = getattr(msg, "type", None) |
| | content = content_to_text(getattr(msg, "content", "")) |
| |
|
| | if msg_type == "ai": |
| | if getattr(msg, "tool_calls", None): |
| | for tc in msg.tool_calls: |
| | tool_name = tc.get("name", "unknown_tool") |
| | tool_args = tc.get("args", {}) |
| | tool_lines.append(f"โถ {tool_name}({tool_args})") |
| |
|
| | if content and content.strip(): |
| | last_nonempty_ai = content.strip() |
| |
|
| | elif msg_type == "tool": |
| | shortened = short_text(content, 1500) |
| | tool_lines.append(f"โ {shortened}") |
| |
|
| | if content and content.strip(): |
| | last_tool_content = content.strip() |
| |
|
| | maybe_chart = extract_chart_path(content) |
| | if maybe_chart: |
| | chart_path = maybe_chart |
| |
|
| | if last_nonempty_ai: |
| | final_answer = last_nonempty_ai |
| | RUNTIME_HEALTH[model_id] = "ok" |
| | elif last_tool_content: |
| | final_answer = f"Tool result:\n{last_tool_content}" |
| | RUNTIME_HEALTH[model_id] = "empty_final" |
| | else: |
| | final_answer = "The model used a tool but did not return a final text response." |
| | RUNTIME_HEALTH[model_id] = "empty_final" |
| |
|
| | tool_trace = "\n".join(tool_lines) if tool_lines else "No tools used." |
| |
|
| | debug_report = build_debug_report( |
| | model_id=model_id, |
| | message=message, |
| | selected_tools=selected_tools, |
| | messages=messages, |
| | final_answer=final_answer, |
| | last_nonempty_ai=last_nonempty_ai, |
| | last_tool_content=last_tool_content, |
| | chart_path=chart_path, |
| | ) |
| |
|
| | except Exception as e: |
| | final_answer = classify_backend_error(model_id, e) |
| | tool_trace = "Execution failed." |
| | debug_report = ( |
| | "=== DEBUG REPORT ===\n" |
| | f"model_id: {model_id}\n" |
| | f"user_message: {message}\n" |
| | f"selected_tools: {selected_tools}\n\n" |
| | "=== EXCEPTION ===\n" |
| | f"{traceback.format_exc()}\n" |
| | ) |
| |
|
| | history.append({"role": "user", "content": message}) |
| | history.append({"role": "assistant", "content": final_answer}) |
| |
|
| | return history, tool_trace, "", chart_path, model_status_text(model_id), debug_report |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | with gr.Blocks(title="LLM + Agent tools demo", theme=gr.themes.Soft()) as demo: |
| | gr.Markdown( |
| | "# Catos agent and tools playground\n" |
| | ) |
| |
|
| | with gr.Tabs(): |
| |
|
| | |
| | with gr.Tab("Agent discussion demo"): |
| | gr.Markdown( |
| | "## Strict PlannerโSpecialistโSynthesizerโQA Workflow\n" |
| | "**Planner** โ **Selected Specialists** โ **Synthesizer** โ **QA** โ **Planner review**\n\n" |
| | "The Planner analyses the task, selects only relevant specialists, and enforces QA as binding. " |
| | f"If QA fails, targeted revisions loop up to **{MAX_REVISIONS}** times.\n\n" |
| | "Use the checkboxes on the right to enable/disable agent roles and configure workflow settings." |
| | ) |
| |
|
| | with gr.Row(): |
| | wf_model_dropdown = gr.Dropdown( |
| | choices=MODEL_OPTIONS, |
| | value=DEFAULT_MODEL_ID, |
| | label="Model", |
| | ) |
| |
|
| | with gr.Row(): |
| | with gr.Column(scale=2): |
| | wf_input = gr.Textbox( |
| | label="Question", |
| | placeholder=( |
| | "Describe what you want the multi-role team to work onโฆ\n" |
| | "e.g. 'Write a short blog post about the benefits of open-source AI'" |
| | ), |
| | lines=3, |
| | ) |
| | wf_submit_btn = gr.Button("Run discussion", variant="primary") |
| |
|
| | with gr.Column(scale=2): |
| | active_agents = gr.CheckboxGroup( |
| | choices=list(AGENT_ROLES.values()), |
| | value=list(AGENT_ROLES.values()), |
| | label="Team", |
| | ) |
| | with gr.Accordion("Workflow Settings", open=False): |
| | strict_mode_toggle = gr.Checkbox( |
| | value=True, label="Strict mode (select only relevant roles)" |
| | ) |
| | allow_persona_toggle = gr.Checkbox( |
| | value=False, label="Allow persona/gimmick roles" |
| | ) |
| | max_specialists_slider = gr.Slider( |
| | minimum=1, maximum=8, step=1, value=3, |
| | label="Max specialists per task" |
| | ) |
| | require_evidence_toggle = gr.Checkbox( |
| | value=True, label="Require evidence for factual claims" |
| | ) |
| | auto_research_toggle = gr.Checkbox( |
| | value=True, label="Auto-include Research for factual tasks" |
| | ) |
| |
|
| | with gr.Row(): |
| | with gr.Column(scale=2): |
| | wf_answer = gr.Textbox( |
| | label="\u2705 Conclusion (Planner approved)", |
| | lines=14, |
| | interactive=False, |
| | ) |
| | with gr.Column(scale=3): |
| | wf_trace = gr.Textbox( |
| | label="Decision process insight", |
| | lines=28, |
| | interactive=False, |
| | ) |
| |
|
| | def _run_workflow_ui( |
| | message: str, model_id: str, role_labels: List[str], |
| | strict_mode: bool, allow_persona: bool, max_specialists: int, |
| | require_evidence: bool, auto_research: bool, |
| | ) -> Tuple[str, str]: |
| | """Gradio handler: validate input, run the workflow, return outputs.""" |
| | if not message or not message.strip(): |
| | return "No input provided.", "" |
| | try: |
| | config = WorkflowConfig( |
| | strict_mode=strict_mode, |
| | allow_persona_roles=allow_persona, |
| | max_specialists_per_task=int(max_specialists), |
| | require_evidence_for_factual_claims=require_evidence, |
| | always_include_research_for_factual_tasks=auto_research, |
| | ) |
| | final_answer, trace = run_multi_role_workflow( |
| | message.strip(), model_id, role_labels, config=config |
| | ) |
| | return final_answer, trace |
| | except Exception as exc: |
| | return f"Workflow error: {exc}", traceback.format_exc() |
| |
|
| | wf_submit_btn.click( |
| | fn=_run_workflow_ui, |
| | inputs=[wf_input, wf_model_dropdown, active_agents, |
| | strict_mode_toggle, allow_persona_toggle, max_specialists_slider, |
| | require_evidence_toggle, auto_research_toggle], |
| | outputs=[wf_answer, wf_trace], |
| | show_api=False, |
| | ) |
| |
|
| | wf_input.submit( |
| | fn=_run_workflow_ui, |
| | inputs=[wf_input, wf_model_dropdown, active_agents, |
| | strict_mode_toggle, allow_persona_toggle, max_specialists_slider, |
| | require_evidence_toggle, auto_research_toggle], |
| | outputs=[wf_answer, wf_trace], |
| | show_api=False, |
| | ) |
| |
|
| | |
| | with gr.Tab("Use of tools demo"): |
| | with gr.Row(): |
| | model_dropdown = gr.Dropdown( |
| | choices=MODEL_OPTIONS, |
| | value=DEFAULT_MODEL_ID, |
| | label="Base model", |
| | ) |
| | model_status = gr.Textbox( |
| | value=model_status_text(DEFAULT_MODEL_ID), |
| | label="Model status", |
| | interactive=False, |
| | ) |
| |
|
| | with gr.Row(): |
| | with gr.Column(scale=3): |
| | chatbot = gr.Chatbot(label="Conversation", height=460, type="messages") |
| | user_input = gr.Textbox( |
| | label="Message", |
| | placeholder="Ask anything...", |
| | ) |
| |
|
| | with gr.Row(): |
| | send_btn = gr.Button("Send", variant="primary") |
| | clear_btn = gr.Button("Clear") |
| |
|
| | chart_output = gr.Image(label="Generated chart", type="filepath") |
| |
|
| | with gr.Row(): |
| | location_btn = gr.Button("๐ Share my location", size="sm") |
| | location_status = gr.Textbox( |
| | value="Location not set โ click the button above before asking 'where am I'", |
| | label="Location status", |
| | interactive=False, |
| | max_lines=1, |
| | ) |
| |
|
| | with gr.Column(scale=1): |
| | enabled_tools = gr.CheckboxGroup( |
| | choices=TOOL_NAMES, |
| | value=TOOL_NAMES, |
| | label="Enabled tools", |
| | ) |
| | tool_trace = gr.Textbox( |
| | label="Tool trace", |
| | lines=18, |
| | interactive=False, |
| | ) |
| |
|
| | debug_output = gr.Textbox( |
| | label="Debug output", |
| | lines=28, |
| | interactive=False, |
| | ) |
| |
|
| | |
| | client_ip_box = gr.Textbox(visible=False, value="") |
| |
|
| | model_dropdown.change( |
| | fn=model_status_text, |
| | inputs=[model_dropdown], |
| | outputs=[model_status], |
| | show_api=False, |
| | ) |
| |
|
| | |
| | location_btn.click( |
| | fn=None, |
| | inputs=None, |
| | outputs=[client_ip_box, location_status], |
| | js="""async () => { |
| | return new Promise((resolve) => { |
| | const fallback = async () => { |
| | try { |
| | const r = await fetch('https://api.ipify.org?format=json'); |
| | const d = await r.json(); |
| | resolve(['ip:' + d.ip, 'Location: IP-based fallback (approximate)']); |
| | } catch(e) { |
| | resolve(['', 'Location detection failed.']); |
| | } |
| | }; |
| | if (!navigator.geolocation) { fallback(); return; } |
| | navigator.geolocation.getCurrentPosition( |
| | (pos) => { |
| | const lat = pos.coords.latitude.toFixed(5); |
| | const lon = pos.coords.longitude.toFixed(5); |
| | const acc = Math.round(pos.coords.accuracy); |
| | resolve([lat + ',' + lon, `\u2705 GPS/WiFi location set (\u00b1${acc}m)`]); |
| | }, |
| | fallback, |
| | {timeout: 10000, maximumAge: 60000, enableHighAccuracy: true} |
| | ); |
| | }); |
| | }""", |
| | show_api=False, |
| | ) |
| |
|
| | send_btn.click( |
| | fn=run_agent, |
| | inputs=[user_input, chatbot, enabled_tools, model_dropdown, client_ip_box], |
| | outputs=[chatbot, tool_trace, user_input, chart_output, model_status, debug_output], |
| | show_api=False, |
| | ) |
| |
|
| | user_input.submit( |
| | fn=run_agent, |
| | inputs=[user_input, chatbot, enabled_tools, model_dropdown, client_ip_box], |
| | outputs=[chatbot, tool_trace, user_input, chart_output, model_status, debug_output], |
| | show_api=False, |
| | ) |
| |
|
| | clear_btn.click( |
| | fn=lambda model_id: ([], "", "", None, model_status_text(model_id), ""), |
| | inputs=[model_dropdown], |
| | outputs=[chatbot, tool_trace, user_input, chart_output, model_status, debug_output], |
| | show_api=False, |
| | ) |
| |
|
| | if __name__ == "__main__": |
| | port = int(os.environ.get("PORT", 7860)) |
| | demo.launch( |
| | server_name="0.0.0.0", |
| | server_port=port, |
| | ssr_mode=False, |
| | allowed_paths=[os.path.abspath(CHART_DIR)], |
| | debug=True, |
| | ) |
| |
|