Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.staticfiles import StaticFiles | |
| from pydantic import BaseModel, Field | |
| from typing import Any, Dict, List, Optional | |
| import uvicorn | |
| import httpx | |
| import json | |
| import os | |
| import sys | |
| import time | |
| import re | |
| import base64 | |
| import zipfile | |
| import xml.etree.ElementTree as ET | |
| import uuid | |
| import asyncio | |
| import threading | |
| from io import BytesIO | |
| try: | |
| from workflow_runtime import ( | |
| WorkflowRuntimeError, | |
| WorkflowServices, | |
| continue_interactive_run, | |
| create_interactive_run, | |
| parse_workflow_graph, | |
| serialize_run_response, | |
| ) | |
| except ImportError: | |
| from .workflow_runtime import ( | |
| WorkflowRuntimeError, | |
| WorkflowServices, | |
| continue_interactive_run, | |
| create_interactive_run, | |
| parse_workflow_graph, | |
| serialize_run_response, | |
| ) | |
| app = FastAPI() | |
| model_name = os.environ.get("OLLAMA_MODEL", "qwen3:14b") | |
| classifier_model_name = os.environ.get("CLASSIFIER_MODEL", model_name) | |
| backend_dir = os.path.dirname(__file__) | |
| provider_name = os.environ.get("LLM_PROVIDER", "ollama") | |
| deepinfra_model_name = os.environ.get("DEEPINFRA_MODEL", "Qwen/Qwen3-14B") | |
| deepinfra_classifier_model_name = os.environ.get("DEEPINFRA_CLASSIFIER_MODEL", deepinfra_model_name) | |
| deepinfra_api_key_path = os.environ.get("DEEPINFRA_API_KEY_FILE", os.path.join(backend_dir, ".deepinfra_api_key")) | |
| default_project_dir = os.path.normpath(os.path.join(backend_dir, "..")) | |
| project_dir = backend_dir if os.path.exists(os.path.join(backend_dir, "Brains.py")) else default_project_dir | |
| presets_path = os.path.join(project_dir, "public", "presets.json") | |
| knowledge_contexts_dir = os.path.join(project_dir, "knowledge_contexts") | |
| test_logs_dir = os.path.join(project_dir, "test_logs") | |
| transcript_logs_dir = os.path.join(project_dir, "transcript_logs") | |
| uploaded_contexts = {} | |
| llm_session_logs = {} | |
| workflow_runtime_sessions = {} | |
| active_workflow_lock = threading.RLock() | |
| active_workflow_session = { | |
| "active": False, | |
| "run_id": "", | |
| "workflow_name": "", | |
| "workflow_path": "", | |
| "provider": "", | |
| "llm_session_id": "", | |
| "test_run_id": "", | |
| "knowledge_context_id": "", | |
| "last_response": None, | |
| } | |
| live_debug_enabled = False | |
| live_debug_clients: set[WebSocket] = set() | |
| if hasattr(sys.stdout, "reconfigure"): | |
| sys.stdout.reconfigure(encoding="utf-8", errors="replace") | |
| sys.stderr.reconfigure(encoding="utf-8", errors="replace") | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], # URL вашего Vite dev сервера | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| class LLMRequest(BaseModel): | |
| system_prompt: str | |
| user_prompt: str | |
| provider: str = "" | |
| llm_session_id: str = "" | |
| test_run_id: str = "" | |
| workflow_name: str = "" | |
| node_id: str = "" | |
| class ParaphraseTextRequest(BaseModel): | |
| text: str | |
| message_type: str = "message" | |
| language: str = "русский" | |
| provider: str = "" | |
| llm_session_id: str = "" | |
| assistant_role: str = "" | |
| test_run_id: str = "" | |
| workflow_name: str = "" | |
| node_id: str = "" | |
| class ExtractMemoryFieldRequest(BaseModel): | |
| answer: str | |
| key: str | |
| instruction: str = "" | |
| question: str = "" | |
| language: str = "русский" | |
| provider: str = "" | |
| llm_session_id: str = "" | |
| test_run_id: str = "" | |
| workflow_name: str = "" | |
| node_id: str = "" | |
| class ClassifyRequest(BaseModel): | |
| answer: str | |
| question: str | |
| options: str | |
| language: str = "русский" | |
| provider: str = "" | |
| llm_session_id: str = "" | |
| test_run_id: str = "" | |
| workflow_name: str = "" | |
| node_id: str = "" | |
| class PresetRequest(BaseModel): | |
| name: str | |
| text: str | |
| class SimulateUserAnswerRequest(BaseModel): | |
| provider: str = "ollama" | |
| role_prompt: str | |
| question: str | |
| transcript: List[dict] = [] | |
| llm_session_id: str = "" | |
| test_run_id: str = "" | |
| workflow_name: str = "" | |
| class UploadContextRequest(BaseModel): | |
| filename: str | |
| content_base64: str | |
| class ContextAnswerRequest(BaseModel): | |
| question: str | |
| context_id: str = "" | |
| context_path: str = "" | |
| source: str = "uploaded" | |
| language: str = "русский" | |
| provider: str = "" | |
| llm_session_id: str = "" | |
| assistant_role: str = "" | |
| test_run_id: str = "" | |
| workflow_name: str = "" | |
| node_id: str = "" | |
| class StartTestRunRequest(BaseModel): | |
| workflow_name: str = "workflow" | |
| class StartTranscriptLogRequest(BaseModel): | |
| workflow_name: str = "workflow" | |
| class AppendTranscriptLogRequest(BaseModel): | |
| log_path: str | |
| role: str | |
| text: str | |
| class LlmSessionLogRequest(BaseModel): | |
| llm_session_id: str | |
| class SaveDeepInfraKeyRequest(BaseModel): | |
| api_key: str | |
| class SaveDefaultWorkflowRequest(BaseModel): | |
| document: Dict[str, Any] | |
| class EvaluateAssistantAnswerRequest(BaseModel): | |
| question: str | |
| answer: str | |
| transcript: List[dict] = [] | |
| provider: str = "" | |
| llm_session_id: str = "" | |
| test_run_id: str = "" | |
| workflow_name: str = "workflow" | |
| message_type: str = "answer" | |
| class WorkflowRuntimeStartRequest(BaseModel): | |
| workflow: dict | |
| workflow_name: str = "workflow" | |
| graph_path: List[str] = Field(default_factory=list) | |
| provider: str = "" | |
| llm_session_id: str = "" | |
| test_run_id: str = "" | |
| knowledge_context_id: str = "" | |
| external_input_value: Any = None | |
| external_input_values: Dict[str, Any] = Field(default_factory=dict) | |
| class WorkflowRuntimeInputRequest(BaseModel): | |
| run_id: str | |
| user_answer: str = "" | |
| provider: str = "" | |
| llm_session_id: str = "" | |
| test_run_id: str = "" | |
| knowledge_context_id: str = "" | |
| external_input_value: Any = None | |
| external_input_values: Dict[str, Any] = Field(default_factory=dict) | |
| class WorkflowRuntimeActiveStartRequest(BaseModel): | |
| workflow: Optional[dict] = None | |
| workflow_path: str = "" | |
| workflow_name: str = "workflow" | |
| graph_path: List[str] = Field(default_factory=list) | |
| provider: str = "" | |
| llm_session_id: str = "" | |
| test_run_id: str = "" | |
| knowledge_context_id: str = "" | |
| external_input_value: Any = None | |
| external_input_values: Dict[str, Any] = Field(default_factory=dict) | |
| class WorkflowRuntimeActiveInputRequest(BaseModel): | |
| user_answer: str = "" | |
| provider: str = "" | |
| llm_session_id: str = "" | |
| test_run_id: str = "" | |
| knowledge_context_id: str = "" | |
| external_input_value: Any = None | |
| external_input_values: Dict[str, Any] = Field(default_factory=dict) | |
| def normalize_classifier_text(value: str) -> str: | |
| value = (value or "").strip().strip("\"'«»") | |
| value = re.sub(r"^[\-\*\d\.\)\s]+", "", value) | |
| return value.strip().lower() | |
| def parse_classifier_choice(raw_content: str) -> str: | |
| try: | |
| parsed = json.loads(raw_content) | |
| if isinstance(parsed, dict): | |
| return str(parsed.get("choice", "")).strip() | |
| except json.JSONDecodeError: | |
| pass | |
| return raw_content.strip().strip("\"'«»") | |
| def parse_classifier_index(raw_content: str) -> Optional[int]: | |
| try: | |
| parsed = json.loads(raw_content) | |
| if isinstance(parsed, dict): | |
| raw_index = parsed.get("choice_index", parsed.get("index", None)) | |
| if raw_index is None: | |
| return None | |
| return int(raw_index) | |
| except (json.JSONDecodeError, TypeError, ValueError): | |
| pass | |
| raw_text = normalize_classifier_text(raw_content) | |
| if raw_text in {"unclear", "-1"}: | |
| return -1 | |
| if raw_text.isdigit() or (raw_text.startswith("-") and raw_text[1:].isdigit()): | |
| return int(raw_text) | |
| return None | |
| def option_from_index(choice_index: Optional[int], options: List[str]) -> str: | |
| if choice_index is None or choice_index < 0 or choice_index >= len(options): | |
| return "unclear" | |
| return options[choice_index] | |
| def parse_classifier_options(raw_options: str) -> List[str]: | |
| if "\n" in raw_options: | |
| return [ | |
| option.strip() | |
| for option in raw_options.splitlines() | |
| if option.strip() | |
| ] | |
| return [ | |
| option.strip() | |
| for option in raw_options.split(",") | |
| if option.strip() | |
| ] | |
| def read_public_config() -> dict: | |
| config_path = os.path.join(project_dir, "public", "config.json") | |
| try: | |
| with open(config_path, "r", encoding="utf-8") as config_file: | |
| config = json.load(config_file) | |
| return config if isinstance(config, dict) else {} | |
| except (OSError, json.JSONDecodeError): | |
| return {} | |
| def get_public_config_value(key: str) -> str: | |
| value = read_public_config().get(key, "") | |
| return value.strip() if isinstance(value, str) else "" | |
| def normalize_keyword_text(value: str) -> str: | |
| value = str(value or "").lower().replace("ё", "е") | |
| value = value.replace("gotek", "готэк").replace("gotech", "готэк") | |
| value = re.sub(r"[\"'«»“”]", "", value) | |
| value = re.sub(r"[^a-zа-я0-9]+", " ", value, flags=re.IGNORECASE) | |
| return re.sub(r"\s+", " ", value).strip() | |
| def keyword_classifier_index(answer: str, options: List[str]) -> Optional[int]: | |
| normalized_answer = normalize_keyword_text(answer) | |
| if not normalized_answer: | |
| return None | |
| matches = [] | |
| for index, option in enumerate(options): | |
| alternatives = [part.strip() for part in option.split(";") if part.strip()] | |
| for alternative in alternatives: | |
| normalized_alternative = normalize_keyword_text(alternative) | |
| if not normalized_alternative: | |
| continue | |
| if ( | |
| normalized_answer == normalized_alternative | |
| or f" {normalized_alternative} " in f" {normalized_answer} " | |
| ): | |
| matches.append(index) | |
| break | |
| unique_matches = sorted(set(matches)) | |
| return unique_matches[0] if len(unique_matches) == 1 else None | |
| def extract_docx_text(file_bytes: bytes) -> str: | |
| with zipfile.ZipFile(BytesIO(file_bytes), "r") as docx: | |
| xml_content = docx.read("word/document.xml") | |
| root = ET.fromstring(xml_content) | |
| namespace = {"w": "http://schemas.openxmlformats.org/wordprocessingml/2006/main"} | |
| paragraphs = [] | |
| for paragraph in root.findall(".//w:p", namespace): | |
| text = "".join((node.text or "") for node in paragraph.findall(".//w:t", namespace)).strip() | |
| if text: | |
| paragraphs.append(text) | |
| return "\n".join(paragraphs) | |
| def extract_context_text(filename: str, file_bytes: bytes) -> str: | |
| lower_name = filename.lower() | |
| if lower_name.endswith(".docx"): | |
| return extract_docx_text(file_bytes) | |
| if lower_name.endswith(".md") or lower_name.endswith(".txt"): | |
| return file_bytes.decode("utf-8", errors="replace") | |
| if lower_name.endswith(".doc"): | |
| return file_bytes.decode("utf-8", errors="ignore") or file_bytes.decode("cp1251", errors="ignore") | |
| return file_bytes.decode("utf-8", errors="replace") | |
| def safe_filename(filename: str) -> str: | |
| name = os.path.basename(filename or "knowledge") | |
| name = re.sub(r"[^a-zA-Z0-9а-яА-ЯёЁ._-]+", "_", name, flags=re.IGNORECASE).strip("._") | |
| return name or "knowledge" | |
| def relative_project_path(path: str) -> str: | |
| return os.path.relpath(path, project_dir).replace("\\", "/") | |
| def resolve_project_path(relative_path: str) -> Optional[str]: | |
| if not relative_path: | |
| return None | |
| normalized = os.path.normpath(os.path.join(project_dir, relative_path)) | |
| project_root = os.path.abspath(project_dir) | |
| absolute_path = os.path.abspath(normalized) | |
| if absolute_path != project_root and not absolute_path.startswith(project_root + os.sep): | |
| return None | |
| return absolute_path | |
| def default_workflow_path() -> str: | |
| return os.path.join(project_dir, "workflows", "rosupack_2026_assistant_workflow.json") | |
| def test_log_path(test_run_id: str, workflow_name: str) -> Optional[str]: | |
| if not test_run_id: | |
| return None | |
| workflow_folder = safe_filename(workflow_name or "workflow") | |
| run_name = safe_filename(test_run_id) | |
| return os.path.join(test_logs_dir, workflow_folder, f"{run_name}.jsonl") | |
| def transcript_log_path(log_id: str, workflow_name: str) -> str: | |
| workflow_folder = safe_filename(workflow_name or "workflow") | |
| run_name = safe_filename(log_id) | |
| return os.path.join(transcript_logs_dir, workflow_folder, f"{run_name}.txt") | |
| def append_test_log(test_run_id: str, workflow_name: str, event_type: str, payload: dict): | |
| path = test_log_path(test_run_id, workflow_name) | |
| if not path: | |
| return | |
| os.makedirs(os.path.dirname(path), exist_ok=True) | |
| record = { | |
| "timestamp": time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime()), | |
| "event_type": event_type, | |
| **payload, | |
| } | |
| with open(path, "a", encoding="utf-8") as log_file: | |
| log_file.write(json.dumps(record, ensure_ascii=False) + "\n") | |
| def prepend_test_log(test_run_id: str, workflow_name: str, event_type: str, payload: dict): | |
| path = test_log_path(test_run_id, workflow_name) | |
| if not path: | |
| return | |
| os.makedirs(os.path.dirname(path), exist_ok=True) | |
| previous_content = "" | |
| if os.path.exists(path): | |
| with open(path, "r", encoding="utf-8") as log_file: | |
| previous_content = log_file.read() | |
| record = { | |
| "timestamp": time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime()), | |
| "event_type": event_type, | |
| **payload, | |
| } | |
| with open(path, "w", encoding="utf-8") as log_file: | |
| log_file.write(json.dumps(record, ensure_ascii=False) + "\n") | |
| log_file.write(previous_content) | |
| def normalize_provider(provider: str = "") -> str: | |
| normalized = (provider or provider_name or "ollama").strip().lower() | |
| aliases = { | |
| "local": "ollama", | |
| "deep-infra": "deepinfra", | |
| "deep_infra": "deepinfra", | |
| } | |
| normalized = aliases.get(normalized, normalized) | |
| if normalized not in {"ollama", "deepinfra"}: | |
| return "ollama" | |
| return normalized | |
| def read_secret_file(path: str) -> str: | |
| if not path or not os.path.exists(path): | |
| return "" | |
| with open(path, "r", encoding="utf-8") as secret_file: | |
| return secret_file.read().strip() | |
| def write_secret_file(path: str, value: str): | |
| os.makedirs(os.path.dirname(path), exist_ok=True) | |
| with open(path, "w", encoding="utf-8") as secret_file: | |
| secret_file.write(value.strip()) | |
| def get_deepinfra_api_key() -> str: | |
| return ( | |
| os.environ.get("DEEPINFRA_API_KEY", "").strip() | |
| or read_secret_file(deepinfra_api_key_path) | |
| or get_public_config_value("deepinfraApiKey") | |
| ) | |
| def model_for_provider(provider: str, role: str = "main") -> str: | |
| if provider == "deepinfra": | |
| return deepinfra_classifier_model_name if role == "classifier" else deepinfra_model_name | |
| return classifier_model_name if role == "classifier" else model_name | |
| def extract_chat_content(provider: str, response_payload: dict) -> str: | |
| if provider == "ollama": | |
| return response_payload.get("message", {}).get("content", "") | |
| choices = response_payload.get("choices") or [] | |
| if not choices: | |
| return "" | |
| message = choices[0].get("message") or {} | |
| return message.get("content", "") or "" | |
| def log_llm_call(request, endpoint: str, provider: str, model: str, messages: list, response_payload: dict): | |
| record = { | |
| "timestamp": time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime()), | |
| "endpoint": endpoint, | |
| "node_id": getattr(request, "node_id", ""), | |
| "provider": provider, | |
| "model": model, | |
| "messages": messages, | |
| "response": response_payload, | |
| } | |
| append_test_log( | |
| getattr(request, "test_run_id", ""), | |
| getattr(request, "workflow_name", ""), | |
| "llm_call", | |
| record, | |
| ) | |
| session_id = getattr(request, "llm_session_id", "") | |
| if session_id: | |
| llm_session_logs.setdefault(session_id, []).append(record) | |
| async def chat_completion( | |
| request, | |
| endpoint: str, | |
| messages: list, | |
| role: str = "main", | |
| response_format: str = "", | |
| temperature: Optional[float] = None, | |
| max_tokens: Optional[int] = None, | |
| timeout: float = 60.0, | |
| ): | |
| provider = normalize_provider(getattr(request, "provider", "")) | |
| model = model_for_provider(provider, role) | |
| async with httpx.AsyncClient() as client: | |
| if provider == "deepinfra": | |
| api_key = get_deepinfra_api_key() | |
| if not api_key: | |
| raise RuntimeError("DEEPINFRA_API_KEY is not set and backend/.deepinfra_api_key was not found") | |
| payload = { | |
| "model": model, | |
| "reasoning_effort": "none", | |
| "messages": messages, | |
| } | |
| if response_format == "json": | |
| payload["response_format"] = {"type": "json_object"} | |
| if temperature is not None: | |
| payload["temperature"] = temperature | |
| if max_tokens is not None: | |
| payload["max_tokens"] = max_tokens | |
| response = await client.post( | |
| "https://api.deepinfra.com/v1/openai/chat/completions", | |
| headers={ | |
| "Authorization": f"Bearer {api_key}", | |
| "Content-Type": "application/json", | |
| }, | |
| json=payload, | |
| timeout=timeout, | |
| ) | |
| response.raise_for_status() | |
| response_payload = response.json() | |
| else: | |
| payload = { | |
| "model": model, | |
| "messages": messages, | |
| "stream": False, | |
| "think": False, | |
| } | |
| if response_format == "json": | |
| payload["format"] = "json" | |
| options = {} | |
| if temperature is not None: | |
| options["temperature"] = temperature | |
| if max_tokens is not None: | |
| options["num_predict"] = max_tokens | |
| if options: | |
| payload["options"] = options | |
| response = await client.post( | |
| "http://localhost:11434/api/chat", | |
| json=payload, | |
| timeout=timeout, | |
| ) | |
| response.raise_for_status() | |
| response_payload = response.json() | |
| log_llm_call(request, endpoint, provider, model, messages, response_payload) | |
| return { | |
| "provider": provider, | |
| "model": model, | |
| "response": response_payload, | |
| "content": extract_chat_content(provider, response_payload).strip(), | |
| } | |
| async def start_test_run(request: StartTestRunRequest): | |
| test_run_id = f"test_{int(time.time() * 1000)}" | |
| append_test_log( | |
| test_run_id, | |
| request.workflow_name, | |
| "test_run_start", | |
| { | |
| "workflow_name": request.workflow_name, | |
| }, | |
| ) | |
| path = test_log_path(test_run_id, request.workflow_name) or "" | |
| return { | |
| "test_run_id": test_run_id, | |
| "log_path": relative_project_path(path) if path else "", | |
| } | |
| async def start_transcript_log(request: StartTranscriptLogRequest): | |
| log_id = f"transcript_{int(time.time() * 1000)}" | |
| path = transcript_log_path(log_id, request.workflow_name) | |
| os.makedirs(os.path.dirname(path), exist_ok=True) | |
| with open(path, "w", encoding="utf-8") as log_file: | |
| log_file.write(f"Workflow: {request.workflow_name}\n") | |
| log_file.write(f"Started: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}\n") | |
| log_file.write("\n") | |
| return { | |
| "log_id": log_id, | |
| "log_path": relative_project_path(path), | |
| } | |
| async def append_transcript_log(request: AppendTranscriptLogRequest): | |
| path = resolve_project_path(request.log_path) | |
| if not path: | |
| return { | |
| "success": False, | |
| "message": "Invalid transcript log path", | |
| } | |
| os.makedirs(os.path.dirname(path), exist_ok=True) | |
| role = "ASSISTANT" if request.role == "assistant" else "USER" | |
| text = (request.text or "").strip() | |
| with open(path, "a", encoding="utf-8") as log_file: | |
| log_file.write(f"{role}: {text}\n\n") | |
| return { | |
| "success": True, | |
| "log_path": request.log_path, | |
| } | |
| async def get_llm_session_log(request: LlmSessionLogRequest): | |
| return { | |
| "llm_session_id": request.llm_session_id, | |
| "calls": llm_session_logs.get(request.llm_session_id, []), | |
| } | |
| async def deepinfra_key_status(): | |
| env_key = os.environ.get("DEEPINFRA_API_KEY", "").strip() | |
| file_key = read_secret_file(deepinfra_api_key_path) | |
| return { | |
| "has_key": bool(env_key or file_key), | |
| "source": "env" if env_key else "file" if file_key else "", | |
| } | |
| async def save_deepinfra_key(request: SaveDeepInfraKeyRequest): | |
| api_key = (request.api_key or "").strip() | |
| if not api_key: | |
| return { | |
| "success": False, | |
| "has_key": bool(get_deepinfra_api_key()), | |
| "message": "DeepInfra API key is empty", | |
| } | |
| write_secret_file(deepinfra_api_key_path, api_key) | |
| return { | |
| "success": True, | |
| "has_key": True, | |
| "source": "file", | |
| } | |
| async def request_llm(request: LLMRequest): | |
| print(f"System Prompt: {request.system_prompt}") | |
| print(f"User Prompt: {request.user_prompt}") | |
| messages = [ | |
| {"role": "system", "content": request.system_prompt}, | |
| {"role": "user", "content": request.user_prompt} | |
| ] | |
| result = await chat_completion( | |
| request, | |
| "/request-llm", | |
| messages, | |
| role="main", | |
| timeout=60.0, | |
| ) | |
| return { | |
| "response": result["content"], | |
| "provider": result["provider"], | |
| "model": result["model"], | |
| } | |
| async def paraphrase_text(request: ParaphraseTextRequest): | |
| source_text = (request.text or "").strip() | |
| if not source_text: | |
| return { | |
| "text": "", | |
| "provider": normalize_provider(request.provider), | |
| "model": model_for_provider(normalize_provider(request.provider), "main"), | |
| } | |
| is_question = request.message_type == "question" | |
| assistant_role = request.assistant_role.strip() | |
| system_prompt = ( | |
| "Ты редактор реплик голосового ассистента. Перефразируй текст так, чтобы он звучал естественно " | |
| "и немного иначе, но смысл, факты, имена, числа, ссылки, формат и язык остались теми же. " | |
| f"{f'Текущая роль ассистента в этой сессии: {assistant_role}. ' if assistant_role else ''}" | |
| "Не добавляй новую информацию. Не объясняй изменения. Верни только готовую реплику." | |
| ) | |
| user_prompt = "\n".join([ | |
| f"Язык: {request.language or 'русский'}", | |
| f"Тип реплики: {'вопрос ассистента' if is_question else 'реплика ассистента'}", | |
| "Исходный текст:", | |
| source_text, | |
| ]) | |
| result = await chat_completion( | |
| request, | |
| "/paraphrase-text", | |
| [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_prompt}, | |
| ], | |
| role="main", | |
| temperature=0.8, | |
| timeout=60.0, | |
| ) | |
| return { | |
| "text": result["content"] or source_text, | |
| "provider": result["provider"], | |
| "model": result["model"], | |
| } | |
| async def extract_memory_field(request: ExtractMemoryFieldRequest): | |
| print(f"Extract memory key: {request.key}") | |
| print(f"Question: {request.question}") | |
| print(f"Answer: {request.answer}") | |
| system_prompt = ( | |
| "Ты универсальный извлекатель значения для интерактивного сценария. " | |
| "Рабочий язык сценария: {language}. " | |
| "Извлеки из ответа пользователя значение для указанного ключа памяти. " | |
| "Если ответ на другом языке, верни значение на рабочем языке сценария, когда это уместно. " | |
| "Если ключ похож на имя, название компании, должность или отрасль, сохраняй естественную краткую форму. " | |
| "Если значение нельзя уверенно извлечь, верни null. " | |
| "Верни только JSON вида {{\"value\": \"...\"}} или {{\"value\": null}}. Без пояснений." | |
| ).format(language=request.language) | |
| user_prompt = ( | |
| f"Ключ памяти: {request.key}\n" | |
| f"Инструкция: {request.instruction or 'извлеки значение по смыслу'}\n" | |
| f"Вопрос ассистента: {request.question}\n" | |
| f"Ответ пользователя: {request.answer}\n" | |
| ) | |
| messages = [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_prompt} | |
| ] | |
| result = await chat_completion( | |
| request, | |
| "/extract-memory-field", | |
| messages, | |
| role="classifier", | |
| response_format="json", | |
| temperature=0, | |
| max_tokens=80, | |
| timeout=60.0, | |
| ) | |
| response_content = result["content"] | |
| try: | |
| payload = json.loads(response_content) | |
| value = payload.get("value") | |
| except json.JSONDecodeError: | |
| value = None | |
| if isinstance(value, str): | |
| value = value.strip() or None | |
| return { | |
| "key": request.key, | |
| "value": value, | |
| "raw_response": response_content, | |
| "provider": result["provider"], | |
| "model": result["model"] | |
| } | |
| async def classify(request: ClassifyRequest): | |
| print(f"Question: {request.question}") | |
| print(f"Answer: {request.answer}") | |
| print(f"Options: {request.options}") | |
| options = parse_classifier_options(request.options) | |
| keyword_index = keyword_classifier_index(request.answer, options) | |
| if keyword_index is not None: | |
| provider = normalize_provider(request.provider) | |
| return { | |
| "classification": option_from_index(keyword_index, options), | |
| "raw_classification": json.dumps({"choice_index": keyword_index, "source": "keyword"}, ensure_ascii=False), | |
| "provider": provider, | |
| "model": model_for_provider(provider, "classifier") | |
| } | |
| options_text = "\n".join(f"{index}: {option}" for index, option in enumerate(options)) | |
| answer_for_classification = request.answer | |
| normalize_system_prompt = ( | |
| "Ты нормализатор пользовательского ответа для semantic router. " | |
| "Рабочий язык сценария: {language}. " | |
| "Переведи ответ пользователя на рабочий язык сценария, сохранив только смысл ответа. " | |
| "Не выбирай вариант из списка и не добавляй пояснений. " | |
| "Если ответ бессмысленный или нерелевантный, верни его как есть. " | |
| "Верни JSON вида {{\"normalized_answer\":\"...\"}}." | |
| ).format(language=request.language) | |
| system_prompt = ( | |
| "Ты универсальный semantic router для интерактивного сценария. " | |
| "Рабочий язык сценария: {language}. " | |
| "Сопоставь ответ пользователя с одним вариантом по смыслу. " | |
| "Если ответ на другом языке, сначала мысленно переведи смысл на рабочий язык сценария. " | |
| "Каждая строка списка вариантов — это один вариант маршрута. " | |
| "Если внутри варианта есть фразы через точку с запятой, считай их альтернативными формулировками одного и того же варианта, " | |
| "а не условиями, которые должны выполниться одновременно. " | |
| "Выбирать можно только номер одного варианта из списка. " | |
| "Если ответ нерелевантный, шуточный, бессмысленный, не относится к вопросу " | |
| "или не позволяет уверенно выбрать вариант, верни -1. " | |
| "Верни только JSON вида {{\"choice_index\":0}} или {{\"choice_index\":-1}}. " | |
| "Не добавляй пояснений." | |
| ).format(language=request.language) | |
| repair_system_prompt = ( | |
| "Ты нормализатор результата semantic router. " | |
| "Тебе дают вопрос, ответ пользователя, черновой выбор модели и список вариантов с номерами. " | |
| "Одна строка списка вариантов — один маршрут. Фразы внутри строки через точку с запятой — альтернативы этого маршрута. " | |
| "Если по смыслу можно уверенно выбрать один вариант, верни его номер. " | |
| "Если нельзя, верни -1. " | |
| "Верни только JSON вида {\"choice_index\":0} или {\"choice_index\":-1}. Без пояснений." | |
| ) | |
| normalize_messages = [ | |
| {"role": "system", "content": normalize_system_prompt}, | |
| {"role": "user", "content": f"Ответ пользователя:\n{request.answer}"} | |
| ] | |
| normalize_result = await chat_completion( | |
| request, | |
| "/classify/normalize", | |
| normalize_messages, | |
| role="classifier", | |
| response_format="json", | |
| temperature=0, | |
| max_tokens=80, | |
| timeout=60.0, | |
| ) | |
| normalize_content = normalize_result["content"] | |
| try: | |
| normalized_payload = json.loads(normalize_content) | |
| normalized_answer = str(normalized_payload.get("normalized_answer", "")).strip() | |
| if normalized_answer: | |
| answer_for_classification = normalized_answer | |
| except json.JSONDecodeError: | |
| pass | |
| user_prompt = ( | |
| f"Вопрос:\n{request.question}\n\n" | |
| f"Варианты:\n{options_text}\n\n" | |
| f"Исходный ответ пользователя:\n{request.answer}\n\n" | |
| f"Нормализованный ответ на рабочем языке:\n{answer_for_classification}\n\n" | |
| "Верни JSON." | |
| ) | |
| classify_messages = [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_prompt} | |
| ] | |
| result = await chat_completion( | |
| request, | |
| "/classify", | |
| classify_messages, | |
| role="classifier", | |
| response_format="json", | |
| temperature=0, | |
| max_tokens=80, | |
| timeout=60.0, | |
| ) | |
| raw_classification = result["content"] | |
| draft_index = parse_classifier_index(raw_classification) | |
| classification = option_from_index(draft_index, options) | |
| if classification == "unclear": | |
| draft_choice = parse_classifier_choice(raw_classification) | |
| repair_prompt = ( | |
| f"Вопрос:\n{request.question}\n\n" | |
| f"Допустимые варианты:\n{options_text}\n\n" | |
| f"Ответ пользователя:\n{request.answer}\n\n" | |
| f"Нормализованный ответ:\n{answer_for_classification}\n\n" | |
| f"Черновой выбор модели:\n{draft_choice}\n\n" | |
| "Верни JSON." | |
| ) | |
| repair_messages = [ | |
| {"role": "system", "content": repair_system_prompt}, | |
| {"role": "user", "content": repair_prompt} | |
| ] | |
| repair_result = await chat_completion( | |
| request, | |
| "/classify/repair", | |
| repair_messages, | |
| role="classifier", | |
| response_format="json", | |
| temperature=0, | |
| max_tokens=80, | |
| timeout=60.0, | |
| ) | |
| repair_raw = repair_result["content"] | |
| repaired_index = parse_classifier_index(repair_raw) | |
| classification = option_from_index(repaired_index, options) | |
| raw_classification = repair_raw | |
| return { | |
| "classification": classification, | |
| "raw_classification": raw_classification, | |
| "provider": result["provider"], | |
| "model": result["model"] | |
| } | |
| async def upload_context(request: UploadContextRequest): | |
| file_bytes = base64.b64decode(request.content_base64) | |
| text = extract_context_text(request.filename, file_bytes).strip() | |
| context_id = f"context_{int(time.time() * 1000)}" | |
| safe_name = safe_filename(request.filename) | |
| context_dir = os.path.join(knowledge_contexts_dir, context_id) | |
| os.makedirs(context_dir, exist_ok=True) | |
| original_path = os.path.join(context_dir, safe_name) | |
| text_path = os.path.join(context_dir, f"{os.path.splitext(safe_name)[0] or 'knowledge'}.txt") | |
| with open(original_path, "wb") as original_file: | |
| original_file.write(file_bytes) | |
| with open(text_path, "w", encoding="utf-8") as text_file: | |
| text_file.write(text) | |
| uploaded_contexts[context_id] = { | |
| "filename": request.filename, | |
| "text": text, | |
| "context_path": relative_project_path(text_path), | |
| "original_path": relative_project_path(original_path), | |
| "created_at": time.time(), | |
| } | |
| return { | |
| "context_id": context_id, | |
| "filename": request.filename, | |
| "characters": len(text), | |
| "context_path": relative_project_path(text_path), | |
| "original_path": relative_project_path(original_path), | |
| } | |
| async def answer_from_context(request: ContextAnswerRequest): | |
| if request.source == "rag": | |
| return { | |
| "answer": "RAG пока не подключен. Это заглушка для будущего источника знаний.", | |
| "source": "rag", | |
| } | |
| context = None | |
| if request.context_path: | |
| context_file_path = resolve_project_path(request.context_path) | |
| if not context_file_path or not os.path.exists(context_file_path): | |
| return { | |
| "answer": "Файл с контекстом не найден по пути из workflow. Загрузите файл в Knowledge Answer ноду заново.", | |
| "source": "uploaded", | |
| } | |
| with open(context_file_path, "r", encoding="utf-8", errors="replace") as context_file: | |
| context = { | |
| "filename": os.path.basename(context_file_path), | |
| "text": context_file.read(), | |
| "context_path": request.context_path, | |
| } | |
| elif request.context_id: | |
| context = uploaded_contexts.get(request.context_id) | |
| if not context: | |
| return { | |
| "answer": "Файл с контекстом пока не загружен. Загрузите docx, doc, md или txt файл в Knowledge Answer ноду.", | |
| "source": "uploaded", | |
| } | |
| context_text = context["text"][:24000] | |
| assistant_role = request.assistant_role.strip() | |
| system_prompt = ( | |
| "Ты ассистент интерактивного сценария. " | |
| f"{f'Текущая роль ассистента в этой сессии: {assistant_role}. ' if assistant_role else ''}" | |
| "Отвечай на вопрос пользователя только по предоставленному контексту файла. " | |
| "Если в контексте нет ответа, так и скажи. " | |
| f"Отвечай на языке сценария: {request.language}. " | |
| "Ответ должен быть коротким и полезным, 2-5 предложений. " | |
| "Не используй Markdown, списки, заголовки, жирный текст, нумерацию или маркеры. " | |
| "Пиши обычным живым текстом в один абзац." | |
| ) | |
| user_prompt = ( | |
| f"Контекст файла ({context['filename']}):\n{context_text}\n\n" | |
| f"Вопрос пользователя:\n{request.question}" | |
| ) | |
| messages = [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_prompt} | |
| ] | |
| result = await chat_completion( | |
| request, | |
| "/answer-from-context", | |
| messages, | |
| role="main", | |
| temperature=0.2, | |
| max_tokens=220, | |
| timeout=90.0, | |
| ) | |
| answer = result["content"] | |
| if not answer: | |
| answer = "Не смогла сформировать ответ по файлу. Попробуйте переформулировать вопрос или проверьте, что knowledge-файл загружен в ноду." | |
| return { | |
| "answer": answer, | |
| "source": "uploaded", | |
| "filename": context["filename"], | |
| "context_path": context.get("context_path", ""), | |
| "provider": result["provider"], | |
| "model": result["model"], | |
| } | |
| async def evaluate_assistant_answer(request: EvaluateAssistantAnswerRequest): | |
| message_type = "question" if request.message_type == "question" else "answer" | |
| transcript_text = "\n".join( | |
| f"{message.get('role', 'unknown')}: {message.get('text', '')}" | |
| for message in request.transcript[-16:] | |
| ) | |
| if message_type == "question": | |
| system_prompt = ( | |
| "Ты строгий тестировщик интерактивного AI-ассистента. " | |
| "Оценивай именно вопрос ассистента пользователю, а не ответ пользователя и не работу модели-симулятора. " | |
| "Вопрос адекватен, если он понятный, не пустой, релевантен текущему месту диалога и не противоречит видимому контексту. " | |
| "Верни только JSON вида {\"ok\": true, \"reason\": \"...\"} или {\"ok\": false, \"reason\": \"...\"}." | |
| ) | |
| user_prompt = ( | |
| f"Последние сообщения диалога:\n{transcript_text}\n\n" | |
| f"Вопрос ассистента:\n{request.answer}\n\n" | |
| "Адекватен ли вопрос ассистента пользователю?" | |
| ) | |
| else: | |
| system_prompt = ( | |
| "Ты строгий тестировщик интерактивного AI-ассистента. " | |
| "Оценивай именно ответ ассистента на вопрос пользователя, а не ответ пользователя и не работу модели-симулятора. " | |
| "Ответ адекватен, если он отвечает на вопрос по смыслу, не пустой, не уходит от темы и не противоречит видимому контексту диалога. " | |
| "Верни только JSON вида {\"ok\": true, \"reason\": \"...\"} или {\"ok\": false, \"reason\": \"...\"}." | |
| ) | |
| user_prompt = ( | |
| f"Последние сообщения диалога:\n{transcript_text}\n\n" | |
| f"Вопрос пользователя:\n{request.question}\n\n" | |
| f"Ответ ассистента:\n{request.answer}\n\n" | |
| "Адекватен ли ответ ассистента?" | |
| ) | |
| messages = [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_prompt}, | |
| ] | |
| result = await chat_completion( | |
| request, | |
| "/evaluate-assistant-answer", | |
| messages, | |
| role="main", | |
| response_format="json", | |
| temperature=0, | |
| max_tokens=160, | |
| timeout=60.0, | |
| ) | |
| raw_content = result["content"] | |
| try: | |
| payload = json.loads(raw_content) | |
| ok = bool(payload.get("ok", False)) | |
| reason = str(payload.get("reason", "")).strip() | |
| except (json.JSONDecodeError, TypeError, ValueError): | |
| ok = False | |
| reason = raw_content or "Evaluator returned an invalid response." | |
| if not ok: | |
| prepend_test_log( | |
| request.test_run_id, | |
| request.workflow_name, | |
| "evaluation_failure_summary", | |
| { | |
| "message_type": message_type, | |
| "reason": reason, | |
| "user_question": request.question, | |
| "assistant_text": request.answer, | |
| }, | |
| ) | |
| append_test_log( | |
| request.test_run_id, | |
| request.workflow_name, | |
| "assistant_message_evaluation", | |
| { | |
| "message_type": message_type, | |
| "question": request.question, | |
| "answer": request.answer, | |
| "ok": ok, | |
| "reason": reason, | |
| "raw_response": raw_content, | |
| }, | |
| ) | |
| path = test_log_path(request.test_run_id, request.workflow_name) or "" | |
| return { | |
| "ok": ok, | |
| "reason": reason, | |
| "raw_response": raw_content, | |
| "provider": result["provider"], | |
| "model": result["model"], | |
| "log_path": relative_project_path(path) if path else "", | |
| } | |
| async def simulate_user_answer(request: SimulateUserAnswerRequest): | |
| transcript_text = "\n".join( | |
| f"{message.get('role', 'unknown')}: {message.get('text', '')}" | |
| for message in request.transcript[-12:] | |
| ) | |
| system_prompt = ( | |
| f"{request.role_prompt}\n\n" | |
| "Ты симулируешь посетителя выставки, который отвечает ассистенту. " | |
| "Отвечай от первого лица, естественно и кратко. " | |
| "Если вопрос просит конкретику, придумай правдоподобные данные. " | |
| "Верни только ответ пользователя, без пояснений." | |
| ) | |
| user_prompt = ( | |
| f"История диалога:\n{transcript_text}\n\n" | |
| f"Текущий вопрос ассистента:\n{request.question}\n\n" | |
| "Ответь как посетитель." | |
| ) | |
| messages = [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_prompt}, | |
| ] | |
| try: | |
| result = await chat_completion( | |
| request, | |
| "/simulate-user-answer", | |
| messages, | |
| role="main", | |
| temperature=0.8, | |
| timeout=60.0, | |
| ) | |
| except RuntimeError as error: | |
| return { | |
| "answer": "", | |
| "error": str(error), | |
| } | |
| return { | |
| "answer": result["content"], | |
| "provider": result["provider"], | |
| "model": result["model"], | |
| } | |
| class BackendWorkflowServices(WorkflowServices): | |
| def __init__( | |
| self, | |
| provider: str = "", | |
| llm_session_id: str = "", | |
| test_run_id: str = "", | |
| workflow_name: str = "workflow", | |
| knowledge_context_id: str = "", | |
| ): | |
| self.provider = provider | |
| self.llm_session_id = llm_session_id | |
| self.test_run_id = test_run_id | |
| self.workflow_name = workflow_name or "workflow" | |
| self.knowledge_context_id = knowledge_context_id | |
| def _trace_fields(self, node_id: str) -> dict: | |
| return { | |
| "provider": self.provider, | |
| "llm_session_id": self.llm_session_id, | |
| "test_run_id": self.test_run_id, | |
| "workflow_name": self.workflow_name, | |
| "node_id": node_id, | |
| } | |
| async def request_llm(self, system: str, user: str, node_id: str = "", context: Optional[dict] = None) -> str: | |
| response = await request_llm( | |
| LLMRequest( | |
| system_prompt=system, | |
| user_prompt=user, | |
| **self._trace_fields(node_id), | |
| ) | |
| ) | |
| return response.get("response", "") | |
| async def classify( | |
| self, | |
| question: str, | |
| answer: str, | |
| options: Any, | |
| node_id: str = "", | |
| context: Optional[dict] = None, | |
| ) -> str: | |
| options_text = options if isinstance(options, str) else "\n".join(options) + "\n" | |
| response = await classify( | |
| ClassifyRequest( | |
| question=question, | |
| answer=answer, | |
| options=options_text, | |
| language="русский", | |
| **self._trace_fields(node_id), | |
| ) | |
| ) | |
| return response.get("classification", "unclear") | |
| async def extract_memory_field( | |
| self, | |
| answer: str, | |
| key: str, | |
| instruction: str, | |
| question: str = "", | |
| node_id: str = "", | |
| context: Optional[dict] = None, | |
| ) -> Dict[str, Any]: | |
| response = await extract_memory_field( | |
| ExtractMemoryFieldRequest( | |
| answer=answer, | |
| key=key, | |
| instruction=instruction, | |
| question=question, | |
| language="русский", | |
| **self._trace_fields(node_id), | |
| ) | |
| ) | |
| return { | |
| "value": response.get("value") if isinstance(response, dict) else None, | |
| "raw_response": response.get("raw_response", "") if isinstance(response, dict) else "", | |
| } | |
| async def answer_from_context( | |
| self, | |
| question: str, | |
| source: str = "uploaded", | |
| context_path: str = "", | |
| node_id: str = "", | |
| context: Optional[dict] = None, | |
| ) -> str: | |
| knowledge_context_id = ( | |
| (context or {}).get("knowledge_context_id") | |
| or self.knowledge_context_id | |
| ) | |
| response = await answer_from_context( | |
| ContextAnswerRequest( | |
| question=question, | |
| context_id=knowledge_context_id, | |
| context_path=context_path, | |
| source=source, | |
| language="русский", | |
| assistant_role=(context or {}).get("assistant_role", ""), | |
| **self._trace_fields(node_id), | |
| ) | |
| ) | |
| return response.get("answer", "") if isinstance(response, dict) else "" | |
| async def paraphrase_text( | |
| self, | |
| text: str, | |
| message_type: str = "message", | |
| node_id: str = "", | |
| context: Optional[dict] = None, | |
| ) -> str: | |
| response = await paraphrase_text( | |
| ParaphraseTextRequest( | |
| text=text, | |
| message_type=message_type, | |
| language="русский", | |
| assistant_role=(context or {}).get("assistant_role", ""), | |
| **self._trace_fields(node_id), | |
| ) | |
| ) | |
| return response.get("text", text) if isinstance(response, dict) else text | |
| def workflow_runtime_context(knowledge_context_id: str = "") -> dict: | |
| return { | |
| "knowledge_context_id": knowledge_context_id, | |
| } | |
| def workflow_runtime_services( | |
| provider: str = "", | |
| llm_session_id: str = "", | |
| test_run_id: str = "", | |
| workflow_name: str = "workflow", | |
| knowledge_context_id: str = "", | |
| ) -> BackendWorkflowServices: | |
| return BackendWorkflowServices( | |
| provider=provider, | |
| llm_session_id=llm_session_id, | |
| test_run_id=test_run_id, | |
| workflow_name=workflow_name, | |
| knowledge_context_id=knowledge_context_id, | |
| ) | |
| def load_workflow_from_path(workflow_path: str) -> tuple[dict, str]: | |
| path = os.path.expanduser(workflow_path or "") | |
| if not path: | |
| raise WorkflowRuntimeError("Workflow path is empty.") | |
| if not os.path.exists(path): | |
| raise WorkflowRuntimeError(f"Workflow JSON was not found: {path}") | |
| with open(path, "r", encoding="utf-8") as workflow_file: | |
| workflow = json.load(workflow_file) | |
| if not isinstance(workflow, dict): | |
| raise WorkflowRuntimeError("Workflow JSON must contain an object.") | |
| return workflow, os.path.abspath(path) | |
| async def start_workflow_runtime_session( | |
| workflow: dict, | |
| workflow_name: str = "workflow", | |
| graph_path: Optional[List[str]] = None, | |
| provider: str = "", | |
| llm_session_id: str = "", | |
| test_run_id: str = "", | |
| knowledge_context_id: str = "", | |
| external_input_value: Any = None, | |
| external_input_values: Optional[Dict[str, Any]] = None, | |
| ) -> dict: | |
| graph = parse_workflow_graph(workflow, graph_path or []) | |
| state = create_interactive_run(graph) | |
| run_id = f"workflow_{uuid.uuid4().hex}" | |
| workflow_runtime_sessions[run_id] = { | |
| "state": state, | |
| "workflow_name": workflow_name or "workflow", | |
| "provider": provider, | |
| "llm_session_id": llm_session_id, | |
| "test_run_id": test_run_id, | |
| "knowledge_context_id": knowledge_context_id, | |
| } | |
| events = [] | |
| services = workflow_runtime_services( | |
| provider=provider, | |
| llm_session_id=llm_session_id, | |
| test_run_id=test_run_id, | |
| workflow_name=workflow_name or "workflow", | |
| knowledge_context_id=knowledge_context_id, | |
| ) | |
| result = await continue_interactive_run( | |
| state, | |
| services=services, | |
| external_input_value=external_input_value, | |
| external_input_values=external_input_values or {}, | |
| context=workflow_runtime_context(knowledge_context_id), | |
| on_event=events.append, | |
| ) | |
| return serialize_run_response(run_id, result, events) | |
| async def continue_workflow_runtime_session( | |
| run_id: str, | |
| user_answer: str = "", | |
| provider: str = "", | |
| llm_session_id: str = "", | |
| test_run_id: str = "", | |
| knowledge_context_id: str = "", | |
| external_input_value: Any = None, | |
| external_input_values: Optional[Dict[str, Any]] = None, | |
| ) -> dict: | |
| session = workflow_runtime_sessions.get(run_id) | |
| if not session: | |
| raise KeyError(run_id) | |
| state = session["state"] | |
| workflow_name = session.get("workflow_name", "workflow") | |
| selected_provider = provider or session.get("provider", "") | |
| selected_llm_session_id = llm_session_id or session.get("llm_session_id", "") | |
| selected_test_run_id = test_run_id or session.get("test_run_id", "") | |
| selected_knowledge_context_id = knowledge_context_id or session.get("knowledge_context_id", "") | |
| events = [] | |
| services = workflow_runtime_services( | |
| provider=selected_provider, | |
| llm_session_id=selected_llm_session_id, | |
| test_run_id=selected_test_run_id, | |
| workflow_name=workflow_name, | |
| knowledge_context_id=selected_knowledge_context_id, | |
| ) | |
| result = await continue_interactive_run( | |
| state, | |
| user_answer=user_answer, | |
| services=services, | |
| external_input_value=external_input_value, | |
| external_input_values=external_input_values or {}, | |
| context=workflow_runtime_context(selected_knowledge_context_id), | |
| on_event=events.append, | |
| ) | |
| return serialize_run_response(run_id, result, events) | |
| def remember_active_workflow_response(response: dict) -> None: | |
| with active_workflow_lock: | |
| active_workflow_session["last_response"] = response | |
| active_workflow_session["run_id"] = response.get("run_id", active_workflow_session.get("run_id", "")) | |
| status = response.get("status") | |
| active_workflow_session["active"] = bool(active_workflow_session.get("run_id")) and status not in {"complete", "restart"} | |
| def set_active_workflow_session( | |
| response: dict, | |
| workflow_path: str = "", | |
| workflow_name: str = "workflow", | |
| provider: str = "", | |
| llm_session_id: str = "", | |
| test_run_id: str = "", | |
| knowledge_context_id: str = "", | |
| ) -> None: | |
| with active_workflow_lock: | |
| old_run_id = active_workflow_session.get("run_id") | |
| new_run_id = response.get("run_id", "") | |
| if old_run_id and old_run_id != new_run_id: | |
| workflow_runtime_sessions.pop(old_run_id, None) | |
| active_workflow_session.update({ | |
| "active": response.get("status") not in {"complete", "restart"}, | |
| "run_id": new_run_id, | |
| "workflow_name": workflow_name or "workflow", | |
| "workflow_path": workflow_path, | |
| "provider": provider, | |
| "llm_session_id": llm_session_id, | |
| "test_run_id": test_run_id, | |
| "knowledge_context_id": knowledge_context_id, | |
| "last_response": response, | |
| }) | |
| def clear_active_workflow_session() -> str: | |
| with active_workflow_lock: | |
| run_id = active_workflow_session.get("run_id", "") | |
| active_workflow_session.update({ | |
| "active": False, | |
| "run_id": "", | |
| "workflow_name": "", | |
| "workflow_path": "", | |
| "provider": "", | |
| "llm_session_id": "", | |
| "test_run_id": "", | |
| "knowledge_context_id": "", | |
| "last_response": None, | |
| }) | |
| return run_id | |
| def active_workflow_snapshot() -> dict: | |
| with active_workflow_lock: | |
| snapshot = dict(active_workflow_session) | |
| run_id = snapshot.get("run_id") | |
| session = workflow_runtime_sessions.get(run_id) if run_id else None | |
| if not session: | |
| return { | |
| **snapshot, | |
| "active": False, | |
| "status": "idle", | |
| "pending_question": None, | |
| "runtime": None, | |
| } | |
| state = session["state"] | |
| status = "paused" if state.pending_question else "complete" if state.completed else "running" | |
| runtime_response = serialize_run_response( | |
| run_id, | |
| { | |
| "status": status, | |
| "state": state, | |
| "pending_question": state.pending_question, | |
| }, | |
| [], | |
| ) | |
| return { | |
| **snapshot, | |
| "active": snapshot.get("active", False) and status != "complete", | |
| "status": status, | |
| "pending_question": state.pending_question, | |
| "runtime": runtime_response, | |
| } | |
| def set_live_debug_enabled(enabled: bool) -> None: | |
| global live_debug_enabled | |
| live_debug_enabled = bool(enabled) | |
| print(f"[live-debug] {'enabled' if live_debug_enabled else 'disabled'}", flush=True) | |
| def live_debug_snapshot_message(event_type: str = "snapshot", events: Optional[List[dict]] = None) -> dict: | |
| return { | |
| "type": event_type, | |
| "live": live_debug_enabled, | |
| "state": active_workflow_snapshot(), | |
| "events": events or [], | |
| } | |
| async def broadcast_live_debug(event_type: str = "runtime-update", events: Optional[List[dict]] = None) -> None: | |
| if not live_debug_enabled or not live_debug_clients: | |
| return | |
| message = live_debug_snapshot_message(event_type, events) | |
| stale_clients = [] | |
| for websocket in list(live_debug_clients): | |
| try: | |
| await websocket.send_json(message) | |
| except Exception: | |
| stale_clients.append(websocket) | |
| for websocket in stale_clients: | |
| live_debug_clients.discard(websocket) | |
| async def start_active_workflow_from_path(workflow_path: str, provider: str = "") -> dict: | |
| workflow, absolute_path = load_workflow_from_path(workflow_path) | |
| workflow_name = os.path.basename(absolute_path) or "workflow" | |
| response = await start_workflow_runtime_session( | |
| workflow, | |
| workflow_name=workflow_name, | |
| provider=provider, | |
| ) | |
| set_active_workflow_session( | |
| response, | |
| workflow_path=absolute_path, | |
| workflow_name=workflow_name, | |
| provider=provider, | |
| ) | |
| await broadcast_live_debug("runtime-start", response.get("events") or []) | |
| return response | |
| async def workflow_runtime_start(request: WorkflowRuntimeStartRequest): | |
| try: | |
| return await start_workflow_runtime_session( | |
| request.workflow, | |
| workflow_name=request.workflow_name, | |
| graph_path=request.graph_path, | |
| provider=request.provider, | |
| llm_session_id=request.llm_session_id, | |
| test_run_id=request.test_run_id, | |
| knowledge_context_id=request.knowledge_context_id, | |
| external_input_value=request.external_input_value, | |
| external_input_values=request.external_input_values, | |
| ) | |
| except WorkflowRuntimeError as error: | |
| raise HTTPException(status_code=400, detail=str(error)) | |
| async def workflow_runtime_input(request: WorkflowRuntimeInputRequest): | |
| try: | |
| return await continue_workflow_runtime_session( | |
| request.run_id, | |
| user_answer=request.user_answer, | |
| provider=request.provider, | |
| llm_session_id=request.llm_session_id, | |
| test_run_id=request.test_run_id, | |
| knowledge_context_id=request.knowledge_context_id, | |
| external_input_value=request.external_input_value, | |
| external_input_values=request.external_input_values, | |
| ) | |
| except KeyError: | |
| raise HTTPException(status_code=404, detail="Workflow runtime session was not found.") | |
| except WorkflowRuntimeError as error: | |
| raise HTTPException(status_code=400, detail=str(error)) | |
| async def workflow_runtime_state(run_id: str): | |
| session = workflow_runtime_sessions.get(run_id) | |
| if not session: | |
| raise HTTPException(status_code=404, detail="Workflow runtime session was not found.") | |
| return serialize_run_response( | |
| run_id, | |
| { | |
| "status": "paused" if session["state"].pending_question else "complete" if session["state"].completed else "running", | |
| "state": session["state"], | |
| "pending_question": session["state"].pending_question, | |
| }, | |
| [], | |
| ) | |
| async def workflow_runtime_delete(run_id: str): | |
| existed = run_id in workflow_runtime_sessions | |
| workflow_runtime_sessions.pop(run_id, None) | |
| return {"success": existed} | |
| async def workflow_runtime_active_start(request: WorkflowRuntimeActiveStartRequest): | |
| try: | |
| workflow = request.workflow | |
| workflow_path = "" | |
| workflow_name = request.workflow_name or "workflow" | |
| if workflow is None: | |
| workflow, workflow_path = load_workflow_from_path(request.workflow_path) | |
| workflow_name = os.path.basename(workflow_path) or workflow_name | |
| response = await start_workflow_runtime_session( | |
| workflow, | |
| workflow_name=workflow_name, | |
| graph_path=request.graph_path, | |
| provider=request.provider, | |
| llm_session_id=request.llm_session_id, | |
| test_run_id=request.test_run_id, | |
| knowledge_context_id=request.knowledge_context_id, | |
| external_input_value=request.external_input_value, | |
| external_input_values=request.external_input_values, | |
| ) | |
| set_active_workflow_session( | |
| response, | |
| workflow_path=workflow_path or request.workflow_path, | |
| workflow_name=workflow_name, | |
| provider=request.provider, | |
| llm_session_id=request.llm_session_id, | |
| test_run_id=request.test_run_id, | |
| knowledge_context_id=request.knowledge_context_id, | |
| ) | |
| await broadcast_live_debug("runtime-start", response.get("events") or []) | |
| return response | |
| except WorkflowRuntimeError as error: | |
| raise HTTPException(status_code=400, detail=str(error)) | |
| async def workflow_runtime_active_input(request: WorkflowRuntimeActiveInputRequest): | |
| with active_workflow_lock: | |
| run_id = active_workflow_session.get("run_id", "") | |
| provider = request.provider or active_workflow_session.get("provider", "") | |
| llm_session_id = request.llm_session_id or active_workflow_session.get("llm_session_id", "") | |
| test_run_id = request.test_run_id or active_workflow_session.get("test_run_id", "") | |
| knowledge_context_id = request.knowledge_context_id or active_workflow_session.get("knowledge_context_id", "") | |
| if not run_id: | |
| raise HTTPException(status_code=409, detail="No active workflow runtime session.") | |
| try: | |
| response = await continue_workflow_runtime_session( | |
| run_id, | |
| user_answer=request.user_answer, | |
| provider=provider, | |
| llm_session_id=llm_session_id, | |
| test_run_id=test_run_id, | |
| knowledge_context_id=knowledge_context_id, | |
| external_input_value=request.external_input_value, | |
| external_input_values=request.external_input_values, | |
| ) | |
| remember_active_workflow_response(response) | |
| await broadcast_live_debug("runtime-update", response.get("events") or []) | |
| return response | |
| except KeyError: | |
| clear_active_workflow_session() | |
| await broadcast_live_debug("runtime-missing", []) | |
| raise HTTPException(status_code=404, detail="Active workflow runtime session was not found.") | |
| except WorkflowRuntimeError as error: | |
| raise HTTPException(status_code=400, detail=str(error)) | |
| async def workflow_runtime_active_state(): | |
| return active_workflow_snapshot() | |
| async def workflow_runtime_active_live(websocket: WebSocket): | |
| await websocket.accept() | |
| if not live_debug_enabled: | |
| await websocket.send_json(live_debug_snapshot_message("live-disabled")) | |
| await websocket.close(code=1008) | |
| return | |
| live_debug_clients.add(websocket) | |
| try: | |
| await websocket.send_json(live_debug_snapshot_message("snapshot")) | |
| while True: | |
| await websocket.receive_text() | |
| except WebSocketDisconnect: | |
| pass | |
| finally: | |
| live_debug_clients.discard(websocket) | |
| async def workflow_runtime_live_debug_status(): | |
| return { | |
| "enabled": live_debug_enabled, | |
| "clients": len(live_debug_clients), | |
| } | |
| async def workflow_runtime_live_debug_toggle(request: dict): | |
| set_live_debug_enabled(bool(request.get("enabled"))) | |
| await broadcast_live_debug("live-debug", []) | |
| return { | |
| "enabled": live_debug_enabled, | |
| "clients": len(live_debug_clients), | |
| } | |
| async def workflow_runtime_active_reset(): | |
| run_id = clear_active_workflow_session() | |
| if run_id: | |
| workflow_runtime_sessions.pop(run_id, None) | |
| await broadcast_live_debug("runtime-reset", []) | |
| return {"success": True} | |
| async def get_default_workflow(): | |
| path = default_workflow_path() | |
| if not os.path.exists(path): | |
| raise HTTPException(status_code=404, detail="Default workflow not found") | |
| try: | |
| with open(path, "r", encoding="utf-8") as workflow_file: | |
| return json.load(workflow_file) | |
| except Exception as error: | |
| raise HTTPException(status_code=500, detail=str(error)) from error | |
| async def save_default_workflow(request: SaveDefaultWorkflowRequest): | |
| document = request.document | |
| if not isinstance(document, dict): | |
| raise HTTPException(status_code=400, detail="Workflow document must be an object") | |
| path = default_workflow_path() | |
| temp_path = f"{path}.tmp" | |
| try: | |
| os.makedirs(os.path.dirname(path), exist_ok=True) | |
| with open(temp_path, "w", encoding="utf-8") as workflow_file: | |
| json.dump(document, workflow_file, ensure_ascii=False, indent=2) | |
| workflow_file.write("\n") | |
| os.replace(temp_path, path) | |
| return {"success": True, "path": relative_project_path(path)} | |
| except Exception as error: | |
| if os.path.exists(temp_path): | |
| os.remove(temp_path) | |
| raise HTTPException(status_code=500, detail=str(error)) from error | |
| async def save_preset(request: PresetRequest): | |
| print(f"Saving preset: {request.name}") | |
| try: | |
| # Читаем текущий файл | |
| with open(presets_path, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| # Создаем массив custom_presets если его нет | |
| if 'custom_presets' not in data: | |
| data['custom_presets'] = [] | |
| # Создаем новый пресет | |
| new_preset = { | |
| "id": f"custom_{int(time.time() * 1000)}", | |
| "name": request.name, | |
| "text": request.text | |
| } | |
| # Добавляем в массив | |
| data['custom_presets'].append(new_preset) | |
| # Записываем обратно в файл с красивым форматированием | |
| with open(presets_path, 'w', encoding='utf-8') as f: | |
| json.dump(data, f, ensure_ascii=False, indent=2) | |
| return { | |
| "success": True, | |
| "message": "Preset saved successfully", | |
| "preset": new_preset | |
| } | |
| except Exception as e: | |
| print(f"Error saving preset: {e}") | |
| return { | |
| "success": False, | |
| "message": str(e) | |
| } | |
| dist_dir = os.path.join(project_dir, "dist") | |
| if os.path.isdir(dist_dir): | |
| app.mount("/", StaticFiles(directory=dist_dir, html=True), name="frontend") | |
| def cli_has(*flags: str) -> bool: | |
| return any(flag in sys.argv for flag in flags) | |
| def cli_arg_value(name: str, default: str = "") -> str: | |
| prefix = f"{name}=" | |
| for index, arg in enumerate(sys.argv): | |
| if arg == name and index + 1 < len(sys.argv): | |
| return sys.argv[index + 1] | |
| if arg.startswith(prefix): | |
| return arg[len(prefix):] | |
| return default | |
| if __name__ == "__main__": | |
| server_port = int(cli_arg_value("--port", os.environ.get("PORT", "8000"))) | |
| if cli_has("--reload"): | |
| uvicorn.run("main:app", host="0.0.0.0", port=server_port, reload=True) | |
| else: | |
| uvicorn.run(app, host="0.0.0.0", port=server_port) | |