| |
| """ |
| Topology 3: Agent-Controlled tool use — core loop. |
| |
| Additive module. Does NOT modify any existing endpoints or behavior. |
| The agent asks the main LLM to output strict JSON (final answer or tool call), |
| executes tools when requested, injects results, and loops until a final answer. |
| |
| Tools available in v2: |
| - vision.analyze → uses existing multimodal.analyze_image(...) |
| - knowledge.search → queries project RAG knowledge base (ChromaDB) |
| - memory.recall → retrieves long-term persona memory (Memory V2) |
| - web.search → performs web search with summarization |
| - image.index → indexes image into knowledge base via vision analysis (T4) |
| """ |
| from __future__ import annotations |
|
|
| import json |
| import re |
| import uuid |
| from dataclasses import dataclass, field |
| from pathlib import Path |
| from typing import Any, Dict, List, Optional, Tuple |
|
|
| from .llm import chat as llm_chat, strip_think_tags, _is_reasoning_text, recover_from_reasoning |
| from .multimodal import analyze_image |
| from .storage import add_message, get_messages, get_recent |
| from .config import UPLOAD_DIR |
|
|
| |
| DEFAULT_MAX_TOOL_CALLS = 4 |
| DEFAULT_HISTORY_LIMIT = 24 |
|
|
| _JSON_OBJ_RE = re.compile(r"\{.*\}", re.DOTALL) |
| |
| |
| _MEDIA_REF_RE = re.compile(r"media://persona/[a-f0-9\-]+/(?:default\b|label/[^)\"]+|[^)/\"]+)") |
|
|
| |
| |
| |
| TOOL_REGISTRY: Dict[str, str] = {} |
|
|
|
|
| def _register_tool(name: str, description: str) -> None: |
| """Register a tool name + catalog description (handler lives as _run_<name>).""" |
| TOOL_REGISTRY[name] = description |
|
|
|
|
| |
| _register_tool( |
| "vision.analyze", |
| "vision.analyze(image_url, question, mode): analyzes an image and returns useful text.\n" |
| " mode is one of: caption | ocr | both", |
| ) |
| _register_tool( |
| "knowledge.search", |
| "knowledge.search(query, n_results): searches the project's knowledge base for relevant documents.\n" |
| " query: text to search for. n_results: number of results (default 3, max 5).", |
| ) |
| _register_tool( |
| "memory.recall", |
| "memory.recall(query): recalls long-term memories about the user (preferences, facts, boundaries).\n" |
| " query: topic or keyword to search memories for.", |
| ) |
| _register_tool( |
| "web.search", |
| "web.search(query, max_results): searches the web for current information.\n" |
| " query: search query. max_results: number of results (default 3, max 5).", |
| ) |
| _register_tool( |
| "image.index", |
| "image.index(image_url): indexes an image into the project knowledge base for future search.\n" |
| " Extracts visual description + text via vision analysis and stores it.\n" |
| " Use this when a user uploads an image and wants it searchable later.", |
| ) |
| _register_tool( |
| "image.generate", |
| "image.generate(prompt): generates an image from a text description.\n" |
| " prompt: detailed visual description of the image to create.\n" |
| " Use this when the user asks to generate, create, draw, imagine, or make a picture/photo/image.", |
| ) |
| _register_tool( |
| "memory.store", |
| "memory.store(key, value, importance): stores a fact or preference about the user.\n" |
| " key: short label (e.g. 'favorite_color'). value: the fact. importance: 0.0-1.0 (default 0.5).\n" |
| " Use this when you learn something important the user would want remembered long-term.", |
| ) |
| _register_tool( |
| "user.profile.get", |
| "user.profile.get(): returns the latest user profile/preferences + user-approved memory context.\n" |
| " Use this if the user says they changed settings, or you need the freshest preferences in real time.", |
| ) |
| _register_tool( |
| "user.integrations.list", |
| "user.integrations.list(): lists configured integration keys (no secret values).\n" |
| " Use this to know which services are connected before recommending actions.", |
| ) |
| _register_tool( |
| "persona.avatar.get", |
| "persona.avatar.get(project_id): returns the current selected avatar + thumbnail URLs for the persona.\n" |
| " Use this when you need the latest avatar image path.", |
| ) |
|
|
|
|
| @dataclass |
| class AgentStepResult: |
| type: str |
| text: str = "" |
| tool: str = "" |
| args: Dict[str, Any] = field(default_factory=dict) |
|
|
|
|
| def _extract_json_obj(s: str) -> Optional[Dict[str, Any]]: |
| """ |
| Robustly extract the first JSON object from a model response. |
| Returns None if parsing fails. |
| """ |
| if not s: |
| return None |
| s = s.strip() |
| |
| try: |
| obj = json.loads(s) |
| if isinstance(obj, dict): |
| return obj |
| except Exception: |
| pass |
|
|
| |
| m = _JSON_OBJ_RE.search(s) |
| if not m: |
| return None |
| candidate = m.group(0) |
| try: |
| obj = json.loads(candidate) |
| if isinstance(obj, dict): |
| return obj |
| except Exception: |
| return None |
| return None |
|
|
|
|
| def _build_tool_catalog() -> str: |
| """Build the tool catalog section of the system prompt from the registry.""" |
| lines = ["Tool catalog:"] |
| for name, desc in TOOL_REGISTRY.items(): |
| lines.append(f"- {desc}") |
| return "\n".join(lines) |
|
|
|
|
| def _build_agent_system_prompt( |
| *, |
| memory_context: str = "", |
| knowledge_hint: str = "", |
| user_context: str = "", |
| session_context: str = "", |
| persona_context: str = "", |
| ) -> str: |
| """ |
| System prompt instructing the LLM to use a minimal JSON protocol. |
| This avoids requiring full function-calling support and is easy to ship. |
| |
| Accepts optional context blocks that are injected into the prompt |
| when available (additive, never destructive). |
| """ |
| parts = [] |
|
|
| |
| if persona_context: |
| parts.append(persona_context) |
|
|
| |
| |
| if persona_context: |
| parts.append("You can optionally call tools when needed.\n") |
| else: |
| parts.append("You are an agent that can optionally call tools.\n") |
|
|
| parts += [ |
| "You MUST reply with STRICT JSON ONLY (no markdown, no extra text).\n", |
| "Allowed response shapes:\n" |
| "1) Final answer:\n" |
| '{ "type": "final", "text": "..." }\n', |
| "2) Tool call (one at a time):\n" |
| '{ "type": "tool_call", "tool": "<tool_name>", "args": { ... } }\n', |
| _build_tool_catalog(), |
| "\nRules:\n" |
| "- Only call a tool if it is necessary to answer correctly.\n" |
| "- If no image_url is available for vision.analyze, return a final answer asking the user to upload an image.\n" |
| "- Use knowledge.search when the user asks about project documents or uploaded files.\n" |
| "- Use memory.recall when you need to remember user preferences, facts, or boundaries.\n" |
| "- Use memory.store when you learn an important fact about the user that should be remembered.\n" |
| "- Use web.search when the user asks about current events or needs up-to-date information.\n" |
| "- Use image.generate when the user asks to generate, create, draw, imagine, or make a picture/photo/image.\n" |
| "- Keep tool calls minimal.\n", |
| ] |
|
|
| |
| if user_context: |
| parts.append(f"\n--- USER CONTEXT ---\n{user_context}\n--- END USER CONTEXT ---\n") |
|
|
| |
| if session_context: |
| parts.append(f"\n{session_context}\n") |
|
|
| |
| if memory_context: |
| parts.append(f"\n--- PERSONA MEMORY ---\n{memory_context}\n--- END MEMORY ---\n") |
|
|
| |
| if knowledge_hint: |
| parts.append(f"\n{knowledge_hint}\n") |
|
|
| return "\n".join(parts) |
|
|
|
|
| def _resolve_media_ref(ref: str) -> Optional[str]: |
| """Resolve a media://persona/<id>/... ref to a real image URL.""" |
| if not ref.startswith("media://"): |
| return None |
| try: |
| from .media_resolver import _build_label_index, _lookup_label |
| parts = ref.replace("media://", "").split("/") |
| if len(parts) < 3: |
| return None |
| kind, project_id, action = parts[0], parts[1], parts[2] |
| if kind != "persona": |
| return None |
| idx = _build_label_index(project_id) |
| if action == "default": |
| return idx.get("default") |
| if action == "label" and len(parts) >= 4: |
| label = "/".join(parts[3:]).strip() |
| else: |
| |
| |
| label = "/".join(parts[2:]).strip() |
| return _lookup_label(idx, label) |
| except Exception: |
| pass |
| return None |
|
|
|
|
| def _extract_media_from_text(text: str) -> Optional[Dict[str, Any]]: |
| """ |
| Extract media:// refs from LLM text and resolve them to real image URLs. |
| Returns a media dict like {"images": [url1, ...]} or None. |
| Deduplicates URLs — LLMs sometimes repeat the same image ref multiple times. |
| """ |
| refs = _MEDIA_REF_RE.findall(text) |
| if not refs: |
| return None |
| seen: set[str] = set() |
| urls: list[str] = [] |
| for ref in refs: |
| url = _resolve_media_ref(ref) |
| if url and url not in seen: |
| seen.add(url) |
| urls.append(url) |
| return {"images": urls} if urls else None |
|
|
|
|
| |
| _MD_MEDIA_IMG_RE = re.compile(r'!\[[^\]]*\]\(media://[^)]+\)\s*') |
|
|
|
|
| def _strip_media_images_from_text(text: str) -> str: |
| """ |
| Remove markdown image tags that contain media:// refs from the text. |
| This prevents double-rendering: once via MessageMarkdown and once via |
| the media gallery. |
| """ |
| return _MD_MEDIA_IMG_RE.sub('', text).strip() |
|
|
|
|
| def _build_persona_photo_index(project_id: str) -> Dict[str, str]: |
| """ |
| Build a mapping from lowercase label -> media:// ref for deterministic |
| photo shortcut resolution. Also includes 'default' key. |
| """ |
| try: |
| from .media_resolver import _build_label_index |
| idx = _build_label_index(project_id) |
| result: Dict[str, str] = {} |
| for key, url in idx.items(): |
| if key == "default": |
| result["default"] = url |
| elif key.startswith("label:"): |
| label = key[len("label:"):] |
| result[label.lower()] = url |
| return result |
| except Exception: |
| return {} |
|
|
|
|
| def _find_last_image_url(conversation_id: str) -> Optional[str]: |
| """ |
| Looks for the most recent message with media.images and returns its first image URL. |
| Uses storage.get_messages which includes media. |
| """ |
| try: |
| msgs = get_messages(conversation_id, limit=200) |
| except Exception: |
| return None |
|
|
| for m in reversed(msgs): |
| media = m.get("media") or {} |
| images = media.get("images") or [] |
| if images and isinstance(images, list): |
| url = images[0] |
| if isinstance(url, str) and url.strip(): |
| return url.strip() |
| return None |
|
|
|
|
| def _format_tool_context(tool_name: str, tool_output: str, meta: Optional[Dict[str, Any]] = None) -> str: |
| """ |
| Injected into the agent context after tool execution. |
| """ |
| lines = [ |
| "TOOL_RESULT", |
| f"tool={tool_name}", |
| "output:", |
| tool_output.strip() if tool_output else "(empty)", |
| ] |
| if meta: |
| try: |
| lines.append("meta:") |
| lines.append(json.dumps(meta, ensure_ascii=False)) |
| except Exception: |
| pass |
| return "\n".join(lines) |
|
|
|
|
| |
| |
| |
|
|
| async def _run_vision_analyze( |
| *, |
| image_url: str, |
| question: Optional[str], |
| mode: str, |
| provider: str, |
| base_url: Optional[str], |
| model: Optional[str], |
| nsfw_mode: bool, |
| ) -> Tuple[str, Dict[str, Any]]: |
| """ |
| Execute the vision tool using existing multimodal module. |
| Returns (analysis_text, full_result_json). |
| """ |
| upload_path = Path(UPLOAD_DIR) |
| res = await analyze_image( |
| image_url=image_url, |
| upload_path=upload_path, |
| provider=provider, |
| base_url=base_url, |
| model=model, |
| user_prompt=question, |
| nsfw_mode=nsfw_mode, |
| mode=mode or "both", |
| ) |
| analysis = (res.get("analysis_text") or "").strip() |
| if not analysis: |
| analysis = "No analysis available." |
| return analysis, res |
|
|
|
|
| async def _run_knowledge_search( |
| *, |
| query: str, |
| project_id: Optional[str], |
| n_results: int = 3, |
| allowed_item_ids: Optional[List[str]] = None, |
| ) -> Tuple[str, Dict[str, Any]]: |
| """ |
| Search the project knowledge base (ChromaDB RAG). |
| |
| When allowed_item_ids is provided, retrieval is scoped to only those |
| project_items (persona-scoped Chat-with-Docs). When None/empty, behaves |
| like the standard project-wide search. |
| |
| Returns (formatted_results_text, meta). |
| """ |
| if not project_id: |
| return "No project context available. Knowledge search requires a project.", {"error": "no_project"} |
|
|
| try: |
| from .vectordb import query_project_knowledge, query_project_knowledge_filtered, get_project_document_count, CHROMADB_AVAILABLE |
| except ImportError: |
| return "Knowledge base is not available (ChromaDB not installed).", {"error": "chromadb_missing"} |
|
|
| if not CHROMADB_AVAILABLE: |
| return "Knowledge base is not available (ChromaDB not installed).", {"error": "chromadb_missing"} |
|
|
| n_results = max(1, min(n_results, 5)) |
|
|
| try: |
| doc_count = get_project_document_count(project_id) |
| if doc_count == 0: |
| return "No documents in this project's knowledge base. Upload files to enable knowledge search.", { |
| "doc_count": 0, |
| } |
|
|
| |
| if allowed_item_ids: |
| results = query_project_knowledge_filtered( |
| project_id, query, n_results=n_results, |
| allowed_item_ids=allowed_item_ids, |
| ) |
| else: |
| results = query_project_knowledge(project_id, query, n_results=n_results) |
|
|
| if not results: |
| return f"No relevant results found for: {query}", {"doc_count": doc_count, "results_found": 0} |
|
|
| lines = [f"Found {len(results)} relevant document chunks (out of {doc_count} total):"] |
| for i, doc in enumerate(results, 1): |
| source = doc.get("metadata", {}).get("source", "Unknown") |
| content = doc.get("content", "") |
| similarity = doc.get("similarity", 0.0) |
| lines.append(f"\n[{i}] Source: {source} (relevance: {similarity:.2f})") |
| lines.append(content[:500]) |
|
|
| return "\n".join(lines), {"doc_count": doc_count, "results_found": len(results)} |
|
|
| except Exception as e: |
| return f"Error searching knowledge base: {e}", {"error": str(e)} |
|
|
|
|
| async def _run_memory_recall( |
| *, |
| query: str, |
| project_id: Optional[str], |
| ) -> Tuple[str, Dict[str, Any]]: |
| """ |
| Recall long-term memories from Memory V2. |
| Wraps existing memory_v2.MemoryV2Engine.build_context(). |
| Returns (memory_context_text, meta). |
| """ |
| if not project_id: |
| return "No project context available. Memory recall requires a project.", {"error": "no_project"} |
|
|
| try: |
| from .memory_v2 import get_memory_v2, ensure_v2_columns |
| except ImportError: |
| return "Memory V2 module is not available.", {"error": "memory_v2_missing"} |
|
|
| try: |
| ensure_v2_columns() |
| engine = get_memory_v2() |
| context = engine.build_context(project_id, query) |
| if not context or not context.strip(): |
| return "No memories stored yet for this project.", {"memories_found": 0} |
|
|
| return context, {"memories_found": context.count(" - ")} |
|
|
| except Exception as e: |
| return f"Error recalling memories: {e}", {"error": str(e)} |
|
|
|
|
| async def _run_web_search( |
| *, |
| query: str, |
| max_results: int = 3, |
| ) -> Tuple[str, Dict[str, Any]]: |
| """ |
| Perform web search using existing search module. |
| Wraps existing search.web_search() + search.summarize_results(). |
| Returns (summary_text, meta). |
| """ |
| try: |
| from .search import web_search, summarize_results |
| except ImportError: |
| return "Web search module is not available.", {"error": "search_missing"} |
|
|
| max_results = max(1, min(max_results, 5)) |
|
|
| try: |
| results = await web_search(query, max_results=max_results) |
| if not results: |
| return f"No web results found for: {query}", {"results_found": 0} |
|
|
| |
| lines = [f"Web search results for: {query}"] |
| for i, r in enumerate(results, 1): |
| title = r.get("title", "") |
| snippet = r.get("snippet", "") |
| url = r.get("url", "") |
| lines.append(f"\n[{i}] {title}") |
| lines.append(f" {snippet}") |
| if url: |
| lines.append(f" URL: {url}") |
|
|
| return "\n".join(lines), {"results_found": len(results)} |
|
|
| except Exception as e: |
| return f"Error performing web search: {e}", {"error": str(e)} |
|
|
|
|
| async def _run_image_index( |
| *, |
| image_url: str, |
| project_id: Optional[str], |
| provider: str, |
| base_url: Optional[str], |
| model: Optional[str], |
| nsfw_mode: bool, |
| ) -> Tuple[str, Dict[str, Any]]: |
| """ |
| Index an image into the project knowledge base via vision analysis. |
| Wraps vectordb_images.index_image_from_url(). |
| Returns (result_text, meta). |
| """ |
| if not project_id: |
| return "No project context available. Image indexing requires a project.", {"error": "no_project"} |
|
|
| if not image_url: |
| return "No image URL provided for indexing.", {"error": "no_image"} |
|
|
| try: |
| from .vectordb_images import index_image_from_url |
| except ImportError: |
| return "Image indexing module is not available.", {"error": "module_missing"} |
|
|
| try: |
| result = await index_image_from_url( |
| project_id=project_id, |
| image_url=image_url, |
| provider=provider, |
| base_url=base_url, |
| model=model, |
| nsfw_mode=nsfw_mode, |
| ) |
|
|
| if result.get("ok"): |
| chunks = result.get("chunks_added", 0) |
| preview = result.get("analysis_preview", "") |
| return ( |
| f"Image indexed successfully. {chunks} chunks added to knowledge base.\n" |
| f"Content preview: {preview}" |
| ), result |
| else: |
| return f"Image indexing failed: {result.get('error', 'unknown')}", result |
|
|
| except Exception as e: |
| return f"Error indexing image: {e}", {"error": str(e)} |
|
|
|
|
| async def _run_memory_store( |
| *, |
| key: str, |
| value: str, |
| importance: float, |
| project_id: Optional[str], |
| ) -> Tuple[str, Dict[str, Any]]: |
| """ |
| Store a fact/preference into Memory V2 as a semantic memory. |
| Wraps memory_v2._upsert_memory(). |
| Returns (result_text, meta). |
| """ |
| if not project_id: |
| return "No project context available. Memory store requires a project.", {"error": "no_project"} |
|
|
| if not key or not value: |
| return "Both key and value are required to store a memory.", {"error": "missing_args"} |
|
|
| try: |
| from .memory_v2 import get_memory_v2, ensure_v2_columns, _upsert_memory |
| except ImportError: |
| return "Memory V2 module is not available.", {"error": "memory_v2_missing"} |
|
|
| try: |
| ensure_v2_columns() |
| importance = max(0.0, min(1.0, importance)) |
|
|
| _upsert_memory( |
| project_id=project_id, |
| category="agent_learned", |
| key=key.strip()[:100], |
| value=value.strip()[:600], |
| mem_type="S", |
| source_type="inferred", |
| confidence=0.8, |
| strength=0.6, |
| importance=importance, |
| ) |
|
|
| return ( |
| f"Stored memory: '{key}' = '{value[:80]}{'...' if len(value) > 80 else ''}' (importance: {importance:.1f})" |
| ), {"key": key, "importance": importance} |
|
|
| except Exception as e: |
| return f"Error storing memory: {e}", {"error": str(e)} |
|
|
|
|
| async def _run_image_generate( |
| *, |
| prompt: str, |
| conversation_id: Optional[str], |
| project_id: Optional[str], |
| nsfw_mode: bool, |
| user_id: Optional[str], |
| ) -> Tuple[str, Dict[str, Any], Optional[Dict[str, Any]]]: |
| """ |
| Generate an image by routing through the orchestrator's imagine pipeline. |
| Returns (description_text, meta, media_dict). |
| """ |
| if not prompt: |
| return "Please describe what image you'd like me to generate.", {"error": "no_prompt"}, None |
|
|
| try: |
| from .orchestrator import orchestrate |
| except ImportError: |
| return "Image generation module is not available.", {"error": "orchestrator_missing"}, None |
|
|
| try: |
| |
| |
| |
| |
| result = await orchestrate( |
| user_text=prompt, |
| conversation_id=None, |
| mode="imagine", |
| nsfw_mode=nsfw_mode, |
| user_id=user_id, |
| ) |
| |
| if conversation_id and result.get("media") and result["media"].get("images"): |
| try: |
| from .files import _db |
| con = _db() |
| cur = con.cursor() |
| throwaway_cid = result.get("conversation_id", "") |
| if throwaway_cid and throwaway_cid != conversation_id: |
| cur.execute( |
| "UPDATE file_assets SET conversation_id = ? WHERE conversation_id = ?", |
| (conversation_id, throwaway_cid), |
| ) |
| con.commit() |
| con.close() |
| except Exception: |
| pass |
| media = result.get("media") |
| text = result.get("text", "") |
|
|
| if media and media.get("images"): |
| urls = media["images"] |
| text = text or f"Generated {len(urls)} image(s) for: {prompt[:100]}" |
| return text, {"prompt": prompt, "images": urls}, media |
| else: |
| return text or "Image generation completed but no images were returned.", {"prompt": prompt}, None |
|
|
| except Exception as e: |
| return f"Image generation failed: {e}", {"error": str(e)}, None |
|
|
|
|
| |
| |
| |
|
|
| async def _run_user_profile_get( |
| *, |
| project_id: Optional[str], |
| user_id: Optional[str], |
| nsfw_mode: bool, |
| ) -> Tuple[str, Dict[str, Any]]: |
| """Return a human-readable user context block for real-time profile reads.""" |
| try: |
| from .user_context import build_user_context_for_ai |
| from .config import NSFW_MODE as GLOBAL_NSFW |
|
|
| effective_nsfw = bool(nsfw_mode) if nsfw_mode is not None else bool(GLOBAL_NSFW) |
|
|
| profile: Dict[str, Any] = {} |
| memory: Dict[str, Any] = {"items": []} |
|
|
| if user_id: |
| try: |
| from .user_profile_store import _get_user_profile, _get_db_path |
| import sqlite3 |
|
|
| profile = _get_user_profile(user_id) |
|
|
| path = _get_db_path() |
| con = sqlite3.connect(path) |
| con.row_factory = sqlite3.Row |
| cur = con.cursor() |
| cur.execute( |
| "SELECT * FROM user_memory_items WHERE user_id = ? ORDER BY pinned DESC, importance DESC", |
| (user_id,), |
| ) |
| rows = cur.fetchall() |
| con.close() |
| memory = {"items": [dict(r) for r in rows]} |
| except Exception: |
| profile = {} |
|
|
| if not profile: |
| try: |
| from .profile import read_profile |
| from .user_memory import _read as read_memory |
| profile = read_profile() |
| memory = read_memory() |
| except Exception: |
| profile = {} |
| memory = {"items": []} |
|
|
| if not profile.get("personalization_enabled", True): |
| return "(Personalization is disabled in user preferences.)", {"personalization_enabled": False} |
|
|
| ctx = build_user_context_for_ai(profile, memory, nsfw_mode=effective_nsfw) |
| return (ctx or "").strip(), {"personalization_enabled": True} |
| except Exception as e: |
| return "(Could not load user profile context.)", {"error": str(e)} |
|
|
|
|
| async def _run_user_integrations_list( |
| *, |
| user_id: Optional[str], |
| ) -> Tuple[str, Dict[str, Any]]: |
| """Safe integration listing: returns only keys/descriptions, never secret values.""" |
| try: |
| items: list[dict] = [] |
|
|
| if user_id: |
| try: |
| from .user_profile_store import _get_db_path, ensure_user_profile_tables |
| import sqlite3 |
| ensure_user_profile_tables() |
| path = _get_db_path() |
| con = sqlite3.connect(path) |
| con.row_factory = sqlite3.Row |
| cur = con.cursor() |
| cur.execute( |
| "SELECT key, description, updated_at FROM user_secrets WHERE user_id = ? ORDER BY key ASC", |
| (user_id,), |
| ) |
| rows = cur.fetchall() |
| con.close() |
| for r in rows: |
| items.append({"key": r["key"], "description": r["description"] or "", "updated_at": r["updated_at"] or ""}) |
| except Exception: |
| items = [] |
| else: |
| try: |
| from .profile import _data_root, SECRETS_FILE, _read_json |
| root = _data_root() |
| secrets = _read_json(root / SECRETS_FILE, default={}) |
| for k, v in (secrets or {}).items(): |
| items.append({"key": k, "description": (v or {}).get("description", "")}) |
| items.sort(key=lambda x: (x.get("key") or "").lower()) |
| except Exception: |
| items = [] |
|
|
| if not items: |
| return "No integrations are configured.", {"count": 0, "integrations": []} |
|
|
| lines = ["Configured integrations (keys only, no secret values):"] |
| for it in items[:50]: |
| desc = (it.get("description") or "").strip() |
| lines.append(f"- {it['key']}: {desc}" if desc else f"- {it['key']}") |
| return "\n".join(lines), {"count": len(items), "integrations": items} |
| except Exception as e: |
| return "(Could not list integrations.)", {"error": str(e)} |
|
|
|
|
| async def _run_persona_avatar_get( |
| *, |
| project_id: Optional[str], |
| ) -> Tuple[str, Dict[str, Any]]: |
| """Return current persona avatar URL for real-time tool access.""" |
| if not project_id: |
| return "(No project context available.)", {"ok": False} |
| try: |
| from . import projects |
| p = projects.get_project_by_id(project_id) |
| if not p: |
| return "(Persona not found.)", {"ok": False} |
| appearance = p.get("persona_appearance") or {} |
| sel = appearance.get("selected_filename") or "" |
| th = appearance.get("selected_thumb_filename") or "" |
| if not sel: |
| return "(No avatar selected.)", {"ok": True, "selected": None} |
|
|
| text = f"Current avatar:\n- image: /files/{sel}" |
| if th: |
| text += f"\n- thumb: /files/{th}" |
| return text, {"ok": True, "selected_filename": sel, "selected_thumb_filename": th} |
| except Exception as e: |
| return f"(Could not load persona avatar: {e})", {"error": str(e)} |
|
|
|
|
| |
| |
| |
|
|
| def _get_memory_context(project_id: Optional[str], query: str) -> str: |
| """ |
| Build Memory V2 context for system prompt injection. |
| Returns empty string if Memory V2 is unavailable or no memories exist. |
| Non-blocking: failures return empty string. |
| """ |
| if not project_id: |
| return "" |
| try: |
| from .memory_v2 import get_memory_v2, ensure_v2_columns |
| ensure_v2_columns() |
| engine = get_memory_v2() |
| ctx = engine.build_context(project_id, query) |
| return (ctx or "").strip() |
| except Exception: |
| return "" |
|
|
|
|
| def _get_knowledge_hint(project_id: Optional[str]) -> str: |
| """ |
| Build a short hint about available knowledge base docs and project items. |
| Helps the agent decide whether to call knowledge.search. |
| """ |
| if not project_id: |
| return "" |
| parts: list[str] = [] |
|
|
| |
| try: |
| from .vectordb import get_project_document_count, CHROMADB_AVAILABLE |
| if CHROMADB_AVAILABLE: |
| chunk_count = get_project_document_count(project_id) |
| if chunk_count > 0: |
| |
| |
| doc_label = f"{chunk_count} document chunks" |
| try: |
| from .persona_attachments import list_persona_documents as _pa_docs |
| pa_docs = _pa_docs(project_id) |
| if pa_docs: |
| n_docs = len({d.get("id") or d.get("item_id") for d in pa_docs}) |
| doc_label = f"{n_docs} document{'s' if n_docs != 1 else ''} ({chunk_count} chunks)" |
| except Exception: |
| pass |
| parts.append(f"[This project has {doc_label} in its knowledge base. Use knowledge.search to find relevant information.]") |
| except Exception: |
| pass |
|
|
| |
| try: |
| from .project_files import build_item_context |
| item_ctx = build_item_context(project_id) |
| if item_ctx: |
| parts.append(item_ctx) |
| except Exception: |
| pass |
|
|
| return "\n".join(parts) |
|
|
|
|
| def _get_user_context(project_id: Optional[str], user_id: Optional[str] = None) -> str: |
| """ |
| Build user context (profile + preferences) for agent system prompt. |
| Uses per-user SQLite profile when user_id is available (Bearer auth), |
| falls back to legacy profile.json for single-user/API-key mode. |
| Returns empty string if unavailable. |
| """ |
| if not project_id: |
| return "" |
| try: |
| from .user_context import build_user_context_for_ai |
|
|
| profile: dict = {} |
| memory: dict = {} |
|
|
| if user_id: |
| |
| try: |
| from .user_profile_store import _get_user_profile, _get_db_path |
| import sqlite3, json |
| profile = _get_user_profile(user_id) |
| |
| path = _get_db_path() |
| con = sqlite3.connect(path) |
| con.row_factory = sqlite3.Row |
| cur = con.cursor() |
| cur.execute( |
| "SELECT * FROM user_memory_items WHERE user_id = ? ORDER BY pinned DESC, importance DESC", |
| (user_id,), |
| ) |
| rows = cur.fetchall() |
| con.close() |
| memory = {"items": [dict(r) for r in rows]} |
| except Exception: |
| |
| pass |
|
|
| if not profile: |
| |
| from .profile import read_profile |
| from .user_memory import _read as read_memory |
| profile = read_profile() |
| memory = read_memory() |
|
|
| if not profile.get("personalization_enabled", True): |
| return "" |
| ctx = build_user_context_for_ai(profile, memory, nsfw_mode=False) |
| return (ctx or "").strip() |
| except Exception: |
| return "" |
|
|
|
|
| def _get_session_context(project_id: Optional[str]) -> str: |
| """ |
| Build session awareness context for the agent. |
| Shows active session info and last session summary. |
| Returns empty string if no session data available. |
| """ |
| if not project_id: |
| return "" |
| try: |
| from .sessions import list_sessions, resolve_session |
| active = resolve_session(project_id) |
| if not active: |
| return "" |
| lines = [] |
| title = active.get("title", "Untitled") |
| msg_count = active.get("message_count", 0) |
| lines.append(f"[Active session: \"{title}\" ({msg_count} messages)]") |
|
|
| |
| sessions = list_sessions(project_id, limit=3) |
| for s in sessions: |
| if s.get("ended_at") and s.get("summary"): |
| lines.append(f"[Previous session summary: {s['summary'][:200]}]") |
| break |
|
|
| return "\n".join(lines) |
| except Exception: |
| return "" |
|
|
|
|
| |
| |
| |
|
|
| async def _dispatch_tool( |
| tool: str, |
| args: Dict[str, Any], |
| *, |
| |
| conversation_id: str, |
| project_id: Optional[str], |
| user_text: str, |
| last_image_url: Optional[str], |
| vision_provider: str, |
| vision_base_url: Optional[str], |
| vision_model: Optional[str], |
| nsfw_mode: bool, |
| user_id: Optional[str] = None, |
| ) -> Tuple[str, Dict[str, Any], Optional[Dict[str, Any]]]: |
| """ |
| Dispatch a tool call to the appropriate handler. |
| Returns (output_text, meta_dict, media_dict_or_none). |
| """ |
| if tool == "vision.analyze": |
| |
| image_url = (args.get("image_url") or "").strip() or (last_image_url or "") |
| question = (args.get("question") or "").strip() or user_text |
| mode = (args.get("mode") or "both").strip().lower() |
| if mode not in ("caption", "ocr", "both"): |
| mode = "both" |
|
|
| if not image_url: |
| return ( |
| "I don't have an image to analyze. Please upload an image first.", |
| {"error": "no_image"}, |
| None, |
| ) |
|
|
| try: |
| text, full = await _run_vision_analyze( |
| image_url=image_url, |
| question=question, |
| mode=mode, |
| provider=vision_provider, |
| base_url=vision_base_url, |
| model=vision_model, |
| nsfw_mode=nsfw_mode, |
| ) |
| except FileNotFoundError: |
| return ( |
| "Could not load the image — the file may no longer exist. Please upload it again.", |
| {"error": "image_load_failed", "image_url": image_url}, |
| None, |
| ) |
| return text, {"image_url": image_url, "mode": mode}, {"images": [image_url]} |
|
|
| elif tool == "knowledge.search": |
| query = (args.get("query") or "").strip() or user_text |
| n_results = int(args.get("n_results", 3)) |
| |
| _allowed_ids: Optional[List[str]] = None |
| if project_id: |
| try: |
| from .persona_attachments import get_allowed_document_item_ids_for_chat |
| _ids = get_allowed_document_item_ids_for_chat(project_id) |
| if _ids: |
| _allowed_ids = _ids |
| except Exception: |
| pass |
| text, meta = await _run_knowledge_search( |
| query=query, |
| project_id=project_id, |
| n_results=n_results, |
| allowed_item_ids=_allowed_ids, |
| ) |
| return text, meta, None |
|
|
| elif tool == "memory.recall": |
| query = (args.get("query") or "").strip() or user_text |
| text, meta = await _run_memory_recall( |
| query=query, |
| project_id=project_id, |
| ) |
| return text, meta, None |
|
|
| elif tool == "web.search": |
| query = (args.get("query") or "").strip() or user_text |
| max_results = int(args.get("max_results", 3)) |
| text, meta = await _run_web_search( |
| query=query, |
| max_results=max_results, |
| ) |
| return text, meta, None |
|
|
| elif tool == "image.index": |
| image_url = (args.get("image_url") or "").strip() or (last_image_url or "") |
| if not image_url: |
| return ( |
| "No image URL provided. Please specify an image to index.", |
| {"error": "no_image"}, |
| None, |
| ) |
| text, meta = await _run_image_index( |
| image_url=image_url, |
| project_id=project_id, |
| provider=vision_provider, |
| base_url=vision_base_url, |
| model=vision_model, |
| nsfw_mode=nsfw_mode, |
| ) |
| return text, meta, {"images": [image_url]} |
|
|
| elif tool == "image.generate": |
| prompt = (args.get("prompt") or "").strip() or user_text |
| text, meta, media = await _run_image_generate( |
| prompt=prompt, |
| conversation_id=conversation_id, |
| project_id=project_id, |
| nsfw_mode=nsfw_mode, |
| user_id=user_id, |
| ) |
| return text, meta, media |
|
|
| elif tool == "memory.store": |
| key = (args.get("key") or "").strip() |
| value = (args.get("value") or "").strip() |
| importance = float(args.get("importance", 0.5)) |
| text, meta = await _run_memory_store( |
| key=key, |
| value=value, |
| importance=importance, |
| project_id=project_id, |
| ) |
| return text, meta, None |
|
|
| elif tool == "user.profile.get": |
| text, meta = await _run_user_profile_get( |
| project_id=project_id, |
| user_id=user_id, |
| nsfw_mode=nsfw_mode, |
| ) |
| return text, meta, None |
|
|
| elif tool == "user.integrations.list": |
| text, meta = await _run_user_integrations_list(user_id=user_id) |
| return text, meta, None |
|
|
| elif tool == "persona.avatar.get": |
| text, meta = await _run_persona_avatar_get(project_id=project_id) |
| return text, meta, None |
|
|
| else: |
| return f"Requested tool '{tool}' is not available.", {"error": "unknown_tool"}, None |
|
|
|
|
| |
| |
| |
|
|
| async def agent_chat( |
| *, |
| user_text: str, |
| conversation_id: Optional[str], |
| project_id: Optional[str], |
| |
| llm_provider: str, |
| llm_base_url: Optional[str], |
| llm_model: Optional[str], |
| temperature: float, |
| max_tokens: int, |
| |
| vision_provider: str, |
| vision_base_url: Optional[str], |
| vision_model: Optional[str], |
| nsfw_mode: bool, |
| |
| max_tool_calls: int = DEFAULT_MAX_TOOL_CALLS, |
| history_limit: int = DEFAULT_HISTORY_LIMIT, |
| |
| user_id: Optional[str] = None, |
| |
| image_url: Optional[str] = None, |
| ) -> Dict[str, Any]: |
| """ |
| Topology 3: Agent-Controlled tool use. |
| Additive-only. Does not change /chat. |
| """ |
|
|
| cid = conversation_id or str(uuid.uuid4()) |
| text_in = (user_text or "").strip() |
|
|
| |
| _user_media = {"images": [image_url]} if image_url else None |
| if text_in: |
| add_message(cid, "user", text_in, media=_user_media, project_id=project_id) |
|
|
| |
| |
| |
| |
| if text_in and project_id: |
| _lower = text_in.lower().strip() |
| _default_triggers = [ |
| "show me your photo", "show me your picture", "show your photo", |
| "show your picture", "your photo", "your picture", |
| "what do you look like", "how do you look", "show me yourself", |
| "let me see you", "show me your photos", |
| ] |
| _outfit_triggers = [ |
| "show me your", "show your", "show me the", |
| "let me see your", "i want to see your", |
| ] |
| |
| _confirm_triggers = [ |
| "yes", "yes please", "sure", "yeah", "yep", "ok", "okay", |
| "go ahead", "show me", "please", "absolutely", "of course", |
| "do it", "let's see", "i'd love to", "yes show me", |
| ] |
| try: |
| from .projects import get_project_by_id |
| proj = get_project_by_id(project_id) |
| if proj and proj.get("project_type") == "persona": |
| photo_idx = _build_persona_photo_index(project_id) |
| pa = proj.get("persona_agent") or {} |
| p_label = pa.get("label", "") |
|
|
| matched_url: Optional[str] = None |
| matched_outfit_label: Optional[str] = None |
|
|
| |
| if any(t in _lower for t in _default_triggers): |
| matched_url = photo_idx.get("default") |
| matched_outfit_label = "Default Look" |
|
|
| |
| if not matched_url: |
| for trigger in _outfit_triggers: |
| if trigger in _lower: |
| |
| after = _lower.split(trigger, 1)[1].strip() |
| |
| for suffix in ["look", "outfit", "photo", "picture", "style", "please"]: |
| after = after.replace(suffix, "").strip() |
| if after: |
| |
| for label_key, url in photo_idx.items(): |
| if label_key == "default": |
| continue |
| if label_key in after or after in label_key: |
| matched_url = url |
| matched_outfit_label = label_key.title() |
| break |
|
|
| |
| if not matched_url and _lower in _confirm_triggers: |
| try: |
| history = get_recent(cid, limit=4) |
| for role, content in reversed(history): |
| if role == "assistant" and content: |
| content_lower = content.lower() |
| |
| bracket_match = re.findall(r'\[([^\]]+)\]', content) |
| for offered_label in bracket_match: |
| ol = offered_label.lower().strip() |
| if ol in photo_idx: |
| matched_url = photo_idx[ol] |
| matched_outfit_label = offered_label |
| break |
| |
| if not matched_url: |
| for label_key, label_url in photo_idx.items(): |
| if label_key in content_lower and label_key != "default": |
| matched_url = label_url |
| matched_outfit_label = label_key.title() |
| break |
| if matched_url: |
| break |
| except Exception: |
| pass |
|
|
| if matched_url: |
| if matched_outfit_label and matched_outfit_label.lower() != "default look": |
| reply = f"Here's my {matched_outfit_label} look! 😘" |
| elif p_label: |
| reply = f"Here I am! 😊" |
| else: |
| reply = "Here's my current photo." |
| media = {"images": [matched_url]} |
| add_message(cid, "assistant", reply, media=media, project_id=project_id) |
| return { |
| "conversation_id": cid, |
| "text": reply, |
| "media": media, |
| "agent": {"tool_calls_used": 0, "tools_invoked": []}, |
| } |
| except Exception: |
| pass |
|
|
| |
| if text_in and project_id: |
| try: |
| from .memory_v2 import get_memory_v2, ensure_v2_columns |
| ensure_v2_columns() |
| get_memory_v2().ingest_user_text(project_id, text_in, user_id=user_id) |
| except Exception: |
| pass |
|
|
| |
| history_pairs: List[Tuple[str, str]] = [] |
| try: |
| history_pairs = get_recent(cid, limit=history_limit) |
| except Exception: |
| history_pairs = [] |
|
|
| |
| memory_ctx = _get_memory_context(project_id, text_in) |
| knowledge_hint = _get_knowledge_hint(project_id) |
| user_ctx = _get_user_context(project_id, user_id=user_id) |
| session_ctx = _get_session_context(project_id) |
|
|
| |
| persona_ctx = "" |
| if project_id: |
| try: |
| from .projects import build_persona_context |
| persona_ctx = build_persona_context(project_id, nsfw_mode=nsfw_mode) |
| except Exception: |
| pass |
|
|
| |
| messages: List[Dict[str, Any]] = [ |
| {"role": "system", "content": _build_agent_system_prompt( |
| memory_context=memory_ctx, |
| knowledge_hint=knowledge_hint, |
| user_context=user_ctx, |
| session_context=session_ctx, |
| persona_context=persona_ctx, |
| )} |
| ] |
|
|
| |
| for role, content in history_pairs[-history_limit:]: |
| if role in ("system", "user", "assistant") and content: |
| messages.append({"role": role, "content": content}) |
|
|
| |
| |
| if text_in: |
| user_content = text_in |
| if image_url: |
| user_content = f"[The user uploaded an image: {image_url}]\n{text_in}" |
| messages.append({"role": "user", "content": user_content}) |
|
|
| tool_calls_used = 0 |
| tools_invoked: List[str] = [] |
| last_media: Optional[Dict[str, Any]] = None |
| |
| last_image_url = image_url or _find_last_image_url(cid) |
|
|
| |
| while True: |
| |
| llm_res = await llm_chat( |
| messages, |
| provider=llm_provider, |
| base_url=llm_base_url, |
| model=llm_model, |
| temperature=temperature, |
| max_tokens=max_tokens, |
| ) |
| |
| raw_text = "" |
| if isinstance(llm_res, dict): |
| |
| raw_text = (llm_res.get("text") or llm_res.get("content") or "").strip() |
| if not raw_text and "choices" in llm_res: |
| try: |
| raw_text = llm_res["choices"][0]["message"]["content"].strip() |
| except Exception: |
| pass |
|
|
| |
| raw_text = strip_think_tags(raw_text) |
|
|
| parsed = _extract_json_obj(raw_text) |
| if not parsed: |
| |
| final_text = raw_text or "I couldn't parse the agent response. Please try again." |
| |
| if _is_reasoning_text(final_text): |
| recovered = recover_from_reasoning(final_text) |
| if recovered: |
| final_text = recovered |
| else: |
| final_text = "I'm here. What would you like?" |
| text_media = _extract_media_from_text(final_text) |
| if text_media: |
| final_text = _strip_media_images_from_text(final_text) |
| add_message(cid, "assistant", final_text, media=text_media, project_id=project_id) |
| return {"conversation_id": cid, "text": final_text, "media": text_media, "agent": {"tool_calls_used": tool_calls_used, "tools_invoked": tools_invoked}} |
|
|
| step_type = (parsed.get("type") or "").strip().lower() |
|
|
| if step_type == "final": |
| final_text = (parsed.get("text") or "").strip() |
| if not final_text: |
| final_text = "No answer provided." |
| |
| if _is_reasoning_text(final_text): |
| recovered = recover_from_reasoning(final_text) |
| if recovered: |
| final_text = recovered |
| else: |
| final_text = "I'm here. What would you like?" |
| |
| merged_media = last_media |
| text_media = _extract_media_from_text(final_text) |
| if text_media: |
| final_text = _strip_media_images_from_text(final_text) |
| if text_media and not merged_media: |
| merged_media = text_media |
| elif text_media and merged_media: |
| merged_media = dict(merged_media) |
| existing = merged_media.get("images") or [] |
| |
| seen = set(existing) |
| new_imgs = [u for u in (text_media.get("images") or []) if u not in seen] |
| merged_media["images"] = existing + new_imgs |
| add_message(cid, "assistant", final_text, media=merged_media, project_id=project_id) |
| return {"conversation_id": cid, "text": final_text, "media": merged_media, "agent": {"tool_calls_used": tool_calls_used, "tools_invoked": tools_invoked}} |
|
|
| if step_type != "tool_call": |
| |
| final_text = (parsed.get("text") or "").strip() or raw_text or "I couldn't complete the request." |
| text_media = _extract_media_from_text(final_text) |
| if text_media: |
| final_text = _strip_media_images_from_text(final_text) |
| add_message(cid, "assistant", final_text, media=text_media, project_id=project_id) |
| return {"conversation_id": cid, "text": final_text, "media": text_media, "agent": {"tool_calls_used": tool_calls_used, "tools_invoked": tools_invoked}} |
|
|
| |
| if tool_calls_used >= max_tool_calls: |
| final_text = "I reached the maximum number of tool calls for this request. Please refine your question." |
| add_message(cid, "assistant", final_text, media=None, project_id=project_id) |
| return {"conversation_id": cid, "text": final_text, "media": None, "agent": {"tool_calls_used": tool_calls_used, "tools_invoked": tools_invoked}} |
|
|
| tool = (parsed.get("tool") or "").strip() |
| args = parsed.get("args") or {} |
| if not isinstance(args, dict): |
| args = {} |
|
|
| |
| if tool not in TOOL_REGISTRY: |
| final_text = f"Requested tool '{tool}' is not available. Available tools: {', '.join(TOOL_REGISTRY.keys())}" |
| add_message(cid, "assistant", final_text, media=None, project_id=project_id) |
| return {"conversation_id": cid, "text": final_text, "media": None, "agent": {"tool_calls_used": tool_calls_used, "tools_invoked": tools_invoked}} |
|
|
| |
| tool_calls_used += 1 |
| tools_invoked.append(tool) |
|
|
| output_text, meta, media = await _dispatch_tool( |
| tool, |
| args, |
| conversation_id=cid, |
| project_id=project_id, |
| user_text=text_in, |
| last_image_url=last_image_url, |
| vision_provider=vision_provider, |
| vision_base_url=vision_base_url, |
| vision_model=vision_model, |
| nsfw_mode=nsfw_mode, |
| user_id=user_id, |
| ) |
|
|
| |
| if meta.get("error") in ("no_image", "unknown_tool"): |
| add_message(cid, "assistant", output_text, media=None, project_id=project_id) |
| return {"conversation_id": cid, "text": output_text, "media": None, "agent": {"tool_calls_used": tool_calls_used, "tools_invoked": tools_invoked}} |
|
|
| |
| |
| |
| if tool == "image.generate" and media: |
| add_message(cid, "assistant", output_text, media=media, project_id=project_id) |
| return {"conversation_id": cid, "text": output_text, "media": media, "agent": {"tool_calls_used": tool_calls_used, "tools_invoked": tools_invoked}} |
|
|
| |
| if media: |
| last_media = media |
|
|
| |
| tool_label = { |
| "vision.analyze": "Image Analysis", |
| "knowledge.search": "Knowledge Search", |
| "memory.recall": "Memory Recall", |
| "web.search": "Web Search", |
| "image.index": "Image Indexed", |
| "memory.store": "Memory Stored", |
| "image.generate": "Image Generated", |
| }.get(tool, tool) |
|
|
| add_message( |
| cid, |
| "assistant", |
| f"[{tool_label}]\n{output_text}", |
| media=media, |
| project_id=project_id, |
| ) |
|
|
| |
| tool_ctx = _format_tool_context(tool, output_text, meta=meta) |
| messages.append({"role": "system", "content": tool_ctx}) |
|
|
| |
|
|