from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from pydantic import BaseModel, Field from typing import Any, Dict, List, Optional import uvicorn import httpx import json import os import sys import time import re import base64 import zipfile import xml.etree.ElementTree as ET import uuid import asyncio import threading from io import BytesIO try: from workflow_runtime import ( WorkflowRuntimeError, WorkflowServices, continue_interactive_run, create_interactive_run, parse_workflow_graph, serialize_run_response, ) except ImportError: from .workflow_runtime import ( WorkflowRuntimeError, WorkflowServices, continue_interactive_run, create_interactive_run, parse_workflow_graph, serialize_run_response, ) app = FastAPI() model_name = os.environ.get("OLLAMA_MODEL", "qwen3:14b") classifier_model_name = os.environ.get("CLASSIFIER_MODEL", model_name) backend_dir = os.path.dirname(__file__) provider_name = os.environ.get("LLM_PROVIDER", "ollama") deepinfra_model_name = os.environ.get("DEEPINFRA_MODEL", "Qwen/Qwen3-14B") deepinfra_classifier_model_name = os.environ.get("DEEPINFRA_CLASSIFIER_MODEL", deepinfra_model_name) deepinfra_api_key_path = os.environ.get("DEEPINFRA_API_KEY_FILE", os.path.join(backend_dir, ".deepinfra_api_key")) default_project_dir = os.path.normpath(os.path.join(backend_dir, "..")) project_dir = backend_dir if os.path.exists(os.path.join(backend_dir, "Brains.py")) else default_project_dir presets_path = os.path.join(project_dir, "public", "presets.json") knowledge_contexts_dir = os.path.join(project_dir, "knowledge_contexts") test_logs_dir = os.path.join(project_dir, "test_logs") transcript_logs_dir = os.path.join(project_dir, "transcript_logs") uploaded_contexts = {} llm_session_logs = {} workflow_runtime_sessions = {} active_workflow_lock = threading.RLock() active_workflow_session = { "active": False, "run_id": "", "workflow_name": "", "workflow_path": "", "provider": "", "llm_session_id": "", "test_run_id": "", "knowledge_context_id": "", "last_response": None, } live_debug_enabled = False live_debug_clients: set[WebSocket] = set() if hasattr(sys.stdout, "reconfigure"): sys.stdout.reconfigure(encoding="utf-8", errors="replace") sys.stderr.reconfigure(encoding="utf-8", errors="replace") app.add_middleware( CORSMiddleware, allow_origins=["*"], # URL вашего Vite dev сервера allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) class LLMRequest(BaseModel): system_prompt: str user_prompt: str provider: str = "" llm_session_id: str = "" test_run_id: str = "" workflow_name: str = "" node_id: str = "" class ParaphraseTextRequest(BaseModel): text: str message_type: str = "message" language: str = "русский" provider: str = "" llm_session_id: str = "" assistant_role: str = "" test_run_id: str = "" workflow_name: str = "" node_id: str = "" class ExtractMemoryFieldRequest(BaseModel): answer: str key: str instruction: str = "" question: str = "" language: str = "русский" provider: str = "" llm_session_id: str = "" test_run_id: str = "" workflow_name: str = "" node_id: str = "" class ClassifyRequest(BaseModel): answer: str question: str options: str language: str = "русский" provider: str = "" llm_session_id: str = "" test_run_id: str = "" workflow_name: str = "" node_id: str = "" class PresetRequest(BaseModel): name: str text: str class SimulateUserAnswerRequest(BaseModel): provider: str = "ollama" role_prompt: str question: str transcript: List[dict] = [] llm_session_id: str = "" test_run_id: str = "" workflow_name: str = "" class UploadContextRequest(BaseModel): filename: str content_base64: str class ContextAnswerRequest(BaseModel): question: str context_id: str = "" context_path: str = "" source: str = "uploaded" language: str = "русский" provider: str = "" llm_session_id: str = "" assistant_role: str = "" test_run_id: str = "" workflow_name: str = "" node_id: str = "" class StartTestRunRequest(BaseModel): workflow_name: str = "workflow" class StartTranscriptLogRequest(BaseModel): workflow_name: str = "workflow" class AppendTranscriptLogRequest(BaseModel): log_path: str role: str text: str class LlmSessionLogRequest(BaseModel): llm_session_id: str class SaveDeepInfraKeyRequest(BaseModel): api_key: str class SaveDefaultWorkflowRequest(BaseModel): document: Dict[str, Any] class EvaluateAssistantAnswerRequest(BaseModel): question: str answer: str transcript: List[dict] = [] provider: str = "" llm_session_id: str = "" test_run_id: str = "" workflow_name: str = "workflow" message_type: str = "answer" class WorkflowRuntimeStartRequest(BaseModel): workflow: dict workflow_name: str = "workflow" graph_path: List[str] = Field(default_factory=list) provider: str = "" llm_session_id: str = "" test_run_id: str = "" knowledge_context_id: str = "" external_input_value: Any = None external_input_values: Dict[str, Any] = Field(default_factory=dict) class WorkflowRuntimeInputRequest(BaseModel): run_id: str user_answer: str = "" provider: str = "" llm_session_id: str = "" test_run_id: str = "" knowledge_context_id: str = "" external_input_value: Any = None external_input_values: Dict[str, Any] = Field(default_factory=dict) class WorkflowRuntimeActiveStartRequest(BaseModel): workflow: Optional[dict] = None workflow_path: str = "" workflow_name: str = "workflow" graph_path: List[str] = Field(default_factory=list) provider: str = "" llm_session_id: str = "" test_run_id: str = "" knowledge_context_id: str = "" external_input_value: Any = None external_input_values: Dict[str, Any] = Field(default_factory=dict) class WorkflowRuntimeActiveInputRequest(BaseModel): user_answer: str = "" provider: str = "" llm_session_id: str = "" test_run_id: str = "" knowledge_context_id: str = "" external_input_value: Any = None external_input_values: Dict[str, Any] = Field(default_factory=dict) def normalize_classifier_text(value: str) -> str: value = (value or "").strip().strip("\"'«»") value = re.sub(r"^[\-\*\d\.\)\s]+", "", value) return value.strip().lower() def parse_classifier_choice(raw_content: str) -> str: try: parsed = json.loads(raw_content) if isinstance(parsed, dict): return str(parsed.get("choice", "")).strip() except json.JSONDecodeError: pass return raw_content.strip().strip("\"'«»") def parse_classifier_index(raw_content: str) -> Optional[int]: try: parsed = json.loads(raw_content) if isinstance(parsed, dict): raw_index = parsed.get("choice_index", parsed.get("index", None)) if raw_index is None: return None return int(raw_index) except (json.JSONDecodeError, TypeError, ValueError): pass raw_text = normalize_classifier_text(raw_content) if raw_text in {"unclear", "-1"}: return -1 if raw_text.isdigit() or (raw_text.startswith("-") and raw_text[1:].isdigit()): return int(raw_text) return None def option_from_index(choice_index: Optional[int], options: List[str]) -> str: if choice_index is None or choice_index < 0 or choice_index >= len(options): return "unclear" return options[choice_index] def parse_classifier_options(raw_options: str) -> List[str]: if "\n" in raw_options: return [ option.strip() for option in raw_options.splitlines() if option.strip() ] return [ option.strip() for option in raw_options.split(",") if option.strip() ] def read_public_config() -> dict: config_path = os.path.join(project_dir, "public", "config.json") try: with open(config_path, "r", encoding="utf-8") as config_file: config = json.load(config_file) return config if isinstance(config, dict) else {} except (OSError, json.JSONDecodeError): return {} def get_public_config_value(key: str) -> str: value = read_public_config().get(key, "") return value.strip() if isinstance(value, str) else "" def normalize_keyword_text(value: str) -> str: value = str(value or "").lower().replace("ё", "е") value = value.replace("gotek", "готэк").replace("gotech", "готэк") value = re.sub(r"[\"'«»“”]", "", value) value = re.sub(r"[^a-zа-я0-9]+", " ", value, flags=re.IGNORECASE) return re.sub(r"\s+", " ", value).strip() def keyword_classifier_index(answer: str, options: List[str]) -> Optional[int]: normalized_answer = normalize_keyword_text(answer) if not normalized_answer: return None matches = [] for index, option in enumerate(options): alternatives = [part.strip() for part in option.split(";") if part.strip()] for alternative in alternatives: normalized_alternative = normalize_keyword_text(alternative) if not normalized_alternative: continue if ( normalized_answer == normalized_alternative or f" {normalized_alternative} " in f" {normalized_answer} " ): matches.append(index) break unique_matches = sorted(set(matches)) return unique_matches[0] if len(unique_matches) == 1 else None def extract_docx_text(file_bytes: bytes) -> str: with zipfile.ZipFile(BytesIO(file_bytes), "r") as docx: xml_content = docx.read("word/document.xml") root = ET.fromstring(xml_content) namespace = {"w": "http://schemas.openxmlformats.org/wordprocessingml/2006/main"} paragraphs = [] for paragraph in root.findall(".//w:p", namespace): text = "".join((node.text or "") for node in paragraph.findall(".//w:t", namespace)).strip() if text: paragraphs.append(text) return "\n".join(paragraphs) def extract_context_text(filename: str, file_bytes: bytes) -> str: lower_name = filename.lower() if lower_name.endswith(".docx"): return extract_docx_text(file_bytes) if lower_name.endswith(".md") or lower_name.endswith(".txt"): return file_bytes.decode("utf-8", errors="replace") if lower_name.endswith(".doc"): return file_bytes.decode("utf-8", errors="ignore") or file_bytes.decode("cp1251", errors="ignore") return file_bytes.decode("utf-8", errors="replace") def safe_filename(filename: str) -> str: name = os.path.basename(filename or "knowledge") name = re.sub(r"[^a-zA-Z0-9а-яА-ЯёЁ._-]+", "_", name, flags=re.IGNORECASE).strip("._") return name or "knowledge" def relative_project_path(path: str) -> str: return os.path.relpath(path, project_dir).replace("\\", "/") def resolve_project_path(relative_path: str) -> Optional[str]: if not relative_path: return None normalized = os.path.normpath(os.path.join(project_dir, relative_path)) project_root = os.path.abspath(project_dir) absolute_path = os.path.abspath(normalized) if absolute_path != project_root and not absolute_path.startswith(project_root + os.sep): return None return absolute_path def default_workflow_path() -> str: return os.path.join(project_dir, "workflows", "rosupack_2026_assistant_workflow.json") def test_log_path(test_run_id: str, workflow_name: str) -> Optional[str]: if not test_run_id: return None workflow_folder = safe_filename(workflow_name or "workflow") run_name = safe_filename(test_run_id) return os.path.join(test_logs_dir, workflow_folder, f"{run_name}.jsonl") def transcript_log_path(log_id: str, workflow_name: str) -> str: workflow_folder = safe_filename(workflow_name or "workflow") run_name = safe_filename(log_id) return os.path.join(transcript_logs_dir, workflow_folder, f"{run_name}.txt") def append_test_log(test_run_id: str, workflow_name: str, event_type: str, payload: dict): path = test_log_path(test_run_id, workflow_name) if not path: return os.makedirs(os.path.dirname(path), exist_ok=True) record = { "timestamp": time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime()), "event_type": event_type, **payload, } with open(path, "a", encoding="utf-8") as log_file: log_file.write(json.dumps(record, ensure_ascii=False) + "\n") def prepend_test_log(test_run_id: str, workflow_name: str, event_type: str, payload: dict): path = test_log_path(test_run_id, workflow_name) if not path: return os.makedirs(os.path.dirname(path), exist_ok=True) previous_content = "" if os.path.exists(path): with open(path, "r", encoding="utf-8") as log_file: previous_content = log_file.read() record = { "timestamp": time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime()), "event_type": event_type, **payload, } with open(path, "w", encoding="utf-8") as log_file: log_file.write(json.dumps(record, ensure_ascii=False) + "\n") log_file.write(previous_content) def normalize_provider(provider: str = "") -> str: normalized = (provider or provider_name or "ollama").strip().lower() aliases = { "local": "ollama", "deep-infra": "deepinfra", "deep_infra": "deepinfra", } normalized = aliases.get(normalized, normalized) if normalized not in {"ollama", "deepinfra"}: return "ollama" return normalized def read_secret_file(path: str) -> str: if not path or not os.path.exists(path): return "" with open(path, "r", encoding="utf-8") as secret_file: return secret_file.read().strip() def write_secret_file(path: str, value: str): os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "w", encoding="utf-8") as secret_file: secret_file.write(value.strip()) def get_deepinfra_api_key() -> str: return ( os.environ.get("DEEPINFRA_API_KEY", "").strip() or read_secret_file(deepinfra_api_key_path) or get_public_config_value("deepinfraApiKey") ) def model_for_provider(provider: str, role: str = "main") -> str: if provider == "deepinfra": return deepinfra_classifier_model_name if role == "classifier" else deepinfra_model_name return classifier_model_name if role == "classifier" else model_name def extract_chat_content(provider: str, response_payload: dict) -> str: if provider == "ollama": return response_payload.get("message", {}).get("content", "") choices = response_payload.get("choices") or [] if not choices: return "" message = choices[0].get("message") or {} return message.get("content", "") or "" def log_llm_call(request, endpoint: str, provider: str, model: str, messages: list, response_payload: dict): record = { "timestamp": time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime()), "endpoint": endpoint, "node_id": getattr(request, "node_id", ""), "provider": provider, "model": model, "messages": messages, "response": response_payload, } append_test_log( getattr(request, "test_run_id", ""), getattr(request, "workflow_name", ""), "llm_call", record, ) session_id = getattr(request, "llm_session_id", "") if session_id: llm_session_logs.setdefault(session_id, []).append(record) async def chat_completion( request, endpoint: str, messages: list, role: str = "main", response_format: str = "", temperature: Optional[float] = None, max_tokens: Optional[int] = None, timeout: float = 60.0, ): provider = normalize_provider(getattr(request, "provider", "")) model = model_for_provider(provider, role) async with httpx.AsyncClient() as client: if provider == "deepinfra": api_key = get_deepinfra_api_key() if not api_key: raise RuntimeError("DEEPINFRA_API_KEY is not set and backend/.deepinfra_api_key was not found") payload = { "model": model, "reasoning_effort": "none", "messages": messages, } if response_format == "json": payload["response_format"] = {"type": "json_object"} if temperature is not None: payload["temperature"] = temperature if max_tokens is not None: payload["max_tokens"] = max_tokens response = await client.post( "https://api.deepinfra.com/v1/openai/chat/completions", headers={ "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", }, json=payload, timeout=timeout, ) response.raise_for_status() response_payload = response.json() else: payload = { "model": model, "messages": messages, "stream": False, "think": False, } if response_format == "json": payload["format"] = "json" options = {} if temperature is not None: options["temperature"] = temperature if max_tokens is not None: options["num_predict"] = max_tokens if options: payload["options"] = options response = await client.post( "http://localhost:11434/api/chat", json=payload, timeout=timeout, ) response.raise_for_status() response_payload = response.json() log_llm_call(request, endpoint, provider, model, messages, response_payload) return { "provider": provider, "model": model, "response": response_payload, "content": extract_chat_content(provider, response_payload).strip(), } @app.post("/start-test-run") async def start_test_run(request: StartTestRunRequest): test_run_id = f"test_{int(time.time() * 1000)}" append_test_log( test_run_id, request.workflow_name, "test_run_start", { "workflow_name": request.workflow_name, }, ) path = test_log_path(test_run_id, request.workflow_name) or "" return { "test_run_id": test_run_id, "log_path": relative_project_path(path) if path else "", } @app.post("/start-transcript-log") async def start_transcript_log(request: StartTranscriptLogRequest): log_id = f"transcript_{int(time.time() * 1000)}" path = transcript_log_path(log_id, request.workflow_name) os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "w", encoding="utf-8") as log_file: log_file.write(f"Workflow: {request.workflow_name}\n") log_file.write(f"Started: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}\n") log_file.write("\n") return { "log_id": log_id, "log_path": relative_project_path(path), } @app.post("/append-transcript-log") async def append_transcript_log(request: AppendTranscriptLogRequest): path = resolve_project_path(request.log_path) if not path: return { "success": False, "message": "Invalid transcript log path", } os.makedirs(os.path.dirname(path), exist_ok=True) role = "ASSISTANT" if request.role == "assistant" else "USER" text = (request.text or "").strip() with open(path, "a", encoding="utf-8") as log_file: log_file.write(f"{role}: {text}\n\n") return { "success": True, "log_path": request.log_path, } @app.post("/get-llm-session-log") async def get_llm_session_log(request: LlmSessionLogRequest): return { "llm_session_id": request.llm_session_id, "calls": llm_session_logs.get(request.llm_session_id, []), } @app.get("/deepinfra-key-status") async def deepinfra_key_status(): env_key = os.environ.get("DEEPINFRA_API_KEY", "").strip() file_key = read_secret_file(deepinfra_api_key_path) return { "has_key": bool(env_key or file_key), "source": "env" if env_key else "file" if file_key else "", } @app.post("/save-deepinfra-key") async def save_deepinfra_key(request: SaveDeepInfraKeyRequest): api_key = (request.api_key or "").strip() if not api_key: return { "success": False, "has_key": bool(get_deepinfra_api_key()), "message": "DeepInfra API key is empty", } write_secret_file(deepinfra_api_key_path, api_key) return { "success": True, "has_key": True, "source": "file", } @app.post("/request-llm") async def request_llm(request: LLMRequest): print(f"System Prompt: {request.system_prompt}") print(f"User Prompt: {request.user_prompt}") messages = [ {"role": "system", "content": request.system_prompt}, {"role": "user", "content": request.user_prompt} ] result = await chat_completion( request, "/request-llm", messages, role="main", timeout=60.0, ) return { "response": result["content"], "provider": result["provider"], "model": result["model"], } @app.post("/paraphrase-text") async def paraphrase_text(request: ParaphraseTextRequest): source_text = (request.text or "").strip() if not source_text: return { "text": "", "provider": normalize_provider(request.provider), "model": model_for_provider(normalize_provider(request.provider), "main"), } is_question = request.message_type == "question" assistant_role = request.assistant_role.strip() system_prompt = ( "Ты редактор реплик голосового ассистента. Перефразируй текст так, чтобы он звучал естественно " "и немного иначе, но смысл, факты, имена, числа, ссылки, формат и язык остались теми же. " f"{f'Текущая роль ассистента в этой сессии: {assistant_role}. ' if assistant_role else ''}" "Не добавляй новую информацию. Не объясняй изменения. Верни только готовую реплику." ) user_prompt = "\n".join([ f"Язык: {request.language or 'русский'}", f"Тип реплики: {'вопрос ассистента' if is_question else 'реплика ассистента'}", "Исходный текст:", source_text, ]) result = await chat_completion( request, "/paraphrase-text", [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}, ], role="main", temperature=0.8, timeout=60.0, ) return { "text": result["content"] or source_text, "provider": result["provider"], "model": result["model"], } @app.post("/extract-memory-field") async def extract_memory_field(request: ExtractMemoryFieldRequest): print(f"Extract memory key: {request.key}") print(f"Question: {request.question}") print(f"Answer: {request.answer}") system_prompt = ( "Ты универсальный извлекатель значения для интерактивного сценария. " "Рабочий язык сценария: {language}. " "Извлеки из ответа пользователя значение для указанного ключа памяти. " "Если ответ на другом языке, верни значение на рабочем языке сценария, когда это уместно. " "Если ключ похож на имя, название компании, должность или отрасль, сохраняй естественную краткую форму. " "Если значение нельзя уверенно извлечь, верни null. " "Верни только JSON вида {{\"value\": \"...\"}} или {{\"value\": null}}. Без пояснений." ).format(language=request.language) user_prompt = ( f"Ключ памяти: {request.key}\n" f"Инструкция: {request.instruction or 'извлеки значение по смыслу'}\n" f"Вопрос ассистента: {request.question}\n" f"Ответ пользователя: {request.answer}\n" ) messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ] result = await chat_completion( request, "/extract-memory-field", messages, role="classifier", response_format="json", temperature=0, max_tokens=80, timeout=60.0, ) response_content = result["content"] try: payload = json.loads(response_content) value = payload.get("value") except json.JSONDecodeError: value = None if isinstance(value, str): value = value.strip() or None return { "key": request.key, "value": value, "raw_response": response_content, "provider": result["provider"], "model": result["model"] } @app.post("/classify") async def classify(request: ClassifyRequest): print(f"Question: {request.question}") print(f"Answer: {request.answer}") print(f"Options: {request.options}") options = parse_classifier_options(request.options) keyword_index = keyword_classifier_index(request.answer, options) if keyword_index is not None: provider = normalize_provider(request.provider) return { "classification": option_from_index(keyword_index, options), "raw_classification": json.dumps({"choice_index": keyword_index, "source": "keyword"}, ensure_ascii=False), "provider": provider, "model": model_for_provider(provider, "classifier") } options_text = "\n".join(f"{index}: {option}" for index, option in enumerate(options)) answer_for_classification = request.answer normalize_system_prompt = ( "Ты нормализатор пользовательского ответа для semantic router. " "Рабочий язык сценария: {language}. " "Переведи ответ пользователя на рабочий язык сценария, сохранив только смысл ответа. " "Не выбирай вариант из списка и не добавляй пояснений. " "Если ответ бессмысленный или нерелевантный, верни его как есть. " "Верни JSON вида {{\"normalized_answer\":\"...\"}}." ).format(language=request.language) system_prompt = ( "Ты универсальный semantic router для интерактивного сценария. " "Рабочий язык сценария: {language}. " "Сопоставь ответ пользователя с одним вариантом по смыслу. " "Если ответ на другом языке, сначала мысленно переведи смысл на рабочий язык сценария. " "Каждая строка списка вариантов — это один вариант маршрута. " "Если внутри варианта есть фразы через точку с запятой, считай их альтернативными формулировками одного и того же варианта, " "а не условиями, которые должны выполниться одновременно. " "Выбирать можно только номер одного варианта из списка. " "Если ответ нерелевантный, шуточный, бессмысленный, не относится к вопросу " "или не позволяет уверенно выбрать вариант, верни -1. " "Верни только JSON вида {{\"choice_index\":0}} или {{\"choice_index\":-1}}. " "Не добавляй пояснений." ).format(language=request.language) repair_system_prompt = ( "Ты нормализатор результата semantic router. " "Тебе дают вопрос, ответ пользователя, черновой выбор модели и список вариантов с номерами. " "Одна строка списка вариантов — один маршрут. Фразы внутри строки через точку с запятой — альтернативы этого маршрута. " "Если по смыслу можно уверенно выбрать один вариант, верни его номер. " "Если нельзя, верни -1. " "Верни только JSON вида {\"choice_index\":0} или {\"choice_index\":-1}. Без пояснений." ) normalize_messages = [ {"role": "system", "content": normalize_system_prompt}, {"role": "user", "content": f"Ответ пользователя:\n{request.answer}"} ] normalize_result = await chat_completion( request, "/classify/normalize", normalize_messages, role="classifier", response_format="json", temperature=0, max_tokens=80, timeout=60.0, ) normalize_content = normalize_result["content"] try: normalized_payload = json.loads(normalize_content) normalized_answer = str(normalized_payload.get("normalized_answer", "")).strip() if normalized_answer: answer_for_classification = normalized_answer except json.JSONDecodeError: pass user_prompt = ( f"Вопрос:\n{request.question}\n\n" f"Варианты:\n{options_text}\n\n" f"Исходный ответ пользователя:\n{request.answer}\n\n" f"Нормализованный ответ на рабочем языке:\n{answer_for_classification}\n\n" "Верни JSON." ) classify_messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ] result = await chat_completion( request, "/classify", classify_messages, role="classifier", response_format="json", temperature=0, max_tokens=80, timeout=60.0, ) raw_classification = result["content"] draft_index = parse_classifier_index(raw_classification) classification = option_from_index(draft_index, options) if classification == "unclear": draft_choice = parse_classifier_choice(raw_classification) repair_prompt = ( f"Вопрос:\n{request.question}\n\n" f"Допустимые варианты:\n{options_text}\n\n" f"Ответ пользователя:\n{request.answer}\n\n" f"Нормализованный ответ:\n{answer_for_classification}\n\n" f"Черновой выбор модели:\n{draft_choice}\n\n" "Верни JSON." ) repair_messages = [ {"role": "system", "content": repair_system_prompt}, {"role": "user", "content": repair_prompt} ] repair_result = await chat_completion( request, "/classify/repair", repair_messages, role="classifier", response_format="json", temperature=0, max_tokens=80, timeout=60.0, ) repair_raw = repair_result["content"] repaired_index = parse_classifier_index(repair_raw) classification = option_from_index(repaired_index, options) raw_classification = repair_raw return { "classification": classification, "raw_classification": raw_classification, "provider": result["provider"], "model": result["model"] } @app.post("/upload-context") async def upload_context(request: UploadContextRequest): file_bytes = base64.b64decode(request.content_base64) text = extract_context_text(request.filename, file_bytes).strip() context_id = f"context_{int(time.time() * 1000)}" safe_name = safe_filename(request.filename) context_dir = os.path.join(knowledge_contexts_dir, context_id) os.makedirs(context_dir, exist_ok=True) original_path = os.path.join(context_dir, safe_name) text_path = os.path.join(context_dir, f"{os.path.splitext(safe_name)[0] or 'knowledge'}.txt") with open(original_path, "wb") as original_file: original_file.write(file_bytes) with open(text_path, "w", encoding="utf-8") as text_file: text_file.write(text) uploaded_contexts[context_id] = { "filename": request.filename, "text": text, "context_path": relative_project_path(text_path), "original_path": relative_project_path(original_path), "created_at": time.time(), } return { "context_id": context_id, "filename": request.filename, "characters": len(text), "context_path": relative_project_path(text_path), "original_path": relative_project_path(original_path), } @app.post("/answer-from-context") async def answer_from_context(request: ContextAnswerRequest): if request.source == "rag": return { "answer": "RAG пока не подключен. Это заглушка для будущего источника знаний.", "source": "rag", } context = None if request.context_path: context_file_path = resolve_project_path(request.context_path) if not context_file_path or not os.path.exists(context_file_path): return { "answer": "Файл с контекстом не найден по пути из workflow. Загрузите файл в Knowledge Answer ноду заново.", "source": "uploaded", } with open(context_file_path, "r", encoding="utf-8", errors="replace") as context_file: context = { "filename": os.path.basename(context_file_path), "text": context_file.read(), "context_path": request.context_path, } elif request.context_id: context = uploaded_contexts.get(request.context_id) if not context: return { "answer": "Файл с контекстом пока не загружен. Загрузите docx, doc, md или txt файл в Knowledge Answer ноду.", "source": "uploaded", } context_text = context["text"][:24000] assistant_role = request.assistant_role.strip() system_prompt = ( "Ты ассистент интерактивного сценария. " f"{f'Текущая роль ассистента в этой сессии: {assistant_role}. ' if assistant_role else ''}" "Отвечай на вопрос пользователя только по предоставленному контексту файла. " "Если в контексте нет ответа, так и скажи. " f"Отвечай на языке сценария: {request.language}. " "Ответ должен быть коротким и полезным, 2-5 предложений. " "Не используй Markdown, списки, заголовки, жирный текст, нумерацию или маркеры. " "Пиши обычным живым текстом в один абзац." ) user_prompt = ( f"Контекст файла ({context['filename']}):\n{context_text}\n\n" f"Вопрос пользователя:\n{request.question}" ) messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ] result = await chat_completion( request, "/answer-from-context", messages, role="main", temperature=0.2, max_tokens=220, timeout=90.0, ) answer = result["content"] if not answer: answer = "Не смогла сформировать ответ по файлу. Попробуйте переформулировать вопрос или проверьте, что knowledge-файл загружен в ноду." return { "answer": answer, "source": "uploaded", "filename": context["filename"], "context_path": context.get("context_path", ""), "provider": result["provider"], "model": result["model"], } @app.post("/evaluate-assistant-answer") async def evaluate_assistant_answer(request: EvaluateAssistantAnswerRequest): message_type = "question" if request.message_type == "question" else "answer" transcript_text = "\n".join( f"{message.get('role', 'unknown')}: {message.get('text', '')}" for message in request.transcript[-16:] ) if message_type == "question": system_prompt = ( "Ты строгий тестировщик интерактивного AI-ассистента. " "Оценивай именно вопрос ассистента пользователю, а не ответ пользователя и не работу модели-симулятора. " "Вопрос адекватен, если он понятный, не пустой, релевантен текущему месту диалога и не противоречит видимому контексту. " "Верни только JSON вида {\"ok\": true, \"reason\": \"...\"} или {\"ok\": false, \"reason\": \"...\"}." ) user_prompt = ( f"Последние сообщения диалога:\n{transcript_text}\n\n" f"Вопрос ассистента:\n{request.answer}\n\n" "Адекватен ли вопрос ассистента пользователю?" ) else: system_prompt = ( "Ты строгий тестировщик интерактивного AI-ассистента. " "Оценивай именно ответ ассистента на вопрос пользователя, а не ответ пользователя и не работу модели-симулятора. " "Ответ адекватен, если он отвечает на вопрос по смыслу, не пустой, не уходит от темы и не противоречит видимому контексту диалога. " "Верни только JSON вида {\"ok\": true, \"reason\": \"...\"} или {\"ok\": false, \"reason\": \"...\"}." ) user_prompt = ( f"Последние сообщения диалога:\n{transcript_text}\n\n" f"Вопрос пользователя:\n{request.question}\n\n" f"Ответ ассистента:\n{request.answer}\n\n" "Адекватен ли ответ ассистента?" ) messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}, ] result = await chat_completion( request, "/evaluate-assistant-answer", messages, role="main", response_format="json", temperature=0, max_tokens=160, timeout=60.0, ) raw_content = result["content"] try: payload = json.loads(raw_content) ok = bool(payload.get("ok", False)) reason = str(payload.get("reason", "")).strip() except (json.JSONDecodeError, TypeError, ValueError): ok = False reason = raw_content or "Evaluator returned an invalid response." if not ok: prepend_test_log( request.test_run_id, request.workflow_name, "evaluation_failure_summary", { "message_type": message_type, "reason": reason, "user_question": request.question, "assistant_text": request.answer, }, ) append_test_log( request.test_run_id, request.workflow_name, "assistant_message_evaluation", { "message_type": message_type, "question": request.question, "answer": request.answer, "ok": ok, "reason": reason, "raw_response": raw_content, }, ) path = test_log_path(request.test_run_id, request.workflow_name) or "" return { "ok": ok, "reason": reason, "raw_response": raw_content, "provider": result["provider"], "model": result["model"], "log_path": relative_project_path(path) if path else "", } @app.post("/simulate-user-answer") async def simulate_user_answer(request: SimulateUserAnswerRequest): transcript_text = "\n".join( f"{message.get('role', 'unknown')}: {message.get('text', '')}" for message in request.transcript[-12:] ) system_prompt = ( f"{request.role_prompt}\n\n" "Ты симулируешь посетителя выставки, который отвечает ассистенту. " "Отвечай от первого лица, естественно и кратко. " "Если вопрос просит конкретику, придумай правдоподобные данные. " "Верни только ответ пользователя, без пояснений." ) user_prompt = ( f"История диалога:\n{transcript_text}\n\n" f"Текущий вопрос ассистента:\n{request.question}\n\n" "Ответь как посетитель." ) messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}, ] try: result = await chat_completion( request, "/simulate-user-answer", messages, role="main", temperature=0.8, timeout=60.0, ) except RuntimeError as error: return { "answer": "", "error": str(error), } return { "answer": result["content"], "provider": result["provider"], "model": result["model"], } class BackendWorkflowServices(WorkflowServices): def __init__( self, provider: str = "", llm_session_id: str = "", test_run_id: str = "", workflow_name: str = "workflow", knowledge_context_id: str = "", ): self.provider = provider self.llm_session_id = llm_session_id self.test_run_id = test_run_id self.workflow_name = workflow_name or "workflow" self.knowledge_context_id = knowledge_context_id def _trace_fields(self, node_id: str) -> dict: return { "provider": self.provider, "llm_session_id": self.llm_session_id, "test_run_id": self.test_run_id, "workflow_name": self.workflow_name, "node_id": node_id, } async def request_llm(self, system: str, user: str, node_id: str = "", context: Optional[dict] = None) -> str: response = await request_llm( LLMRequest( system_prompt=system, user_prompt=user, **self._trace_fields(node_id), ) ) return response.get("response", "") async def classify( self, question: str, answer: str, options: Any, node_id: str = "", context: Optional[dict] = None, ) -> str: options_text = options if isinstance(options, str) else "\n".join(options) + "\n" response = await classify( ClassifyRequest( question=question, answer=answer, options=options_text, language="русский", **self._trace_fields(node_id), ) ) return response.get("classification", "unclear") async def extract_memory_field( self, answer: str, key: str, instruction: str, question: str = "", node_id: str = "", context: Optional[dict] = None, ) -> Dict[str, Any]: response = await extract_memory_field( ExtractMemoryFieldRequest( answer=answer, key=key, instruction=instruction, question=question, language="русский", **self._trace_fields(node_id), ) ) return { "value": response.get("value") if isinstance(response, dict) else None, "raw_response": response.get("raw_response", "") if isinstance(response, dict) else "", } async def answer_from_context( self, question: str, source: str = "uploaded", context_path: str = "", node_id: str = "", context: Optional[dict] = None, ) -> str: knowledge_context_id = ( (context or {}).get("knowledge_context_id") or self.knowledge_context_id ) response = await answer_from_context( ContextAnswerRequest( question=question, context_id=knowledge_context_id, context_path=context_path, source=source, language="русский", assistant_role=(context or {}).get("assistant_role", ""), **self._trace_fields(node_id), ) ) return response.get("answer", "") if isinstance(response, dict) else "" async def paraphrase_text( self, text: str, message_type: str = "message", node_id: str = "", context: Optional[dict] = None, ) -> str: response = await paraphrase_text( ParaphraseTextRequest( text=text, message_type=message_type, language="русский", assistant_role=(context or {}).get("assistant_role", ""), **self._trace_fields(node_id), ) ) return response.get("text", text) if isinstance(response, dict) else text def workflow_runtime_context(knowledge_context_id: str = "") -> dict: return { "knowledge_context_id": knowledge_context_id, } def workflow_runtime_services( provider: str = "", llm_session_id: str = "", test_run_id: str = "", workflow_name: str = "workflow", knowledge_context_id: str = "", ) -> BackendWorkflowServices: return BackendWorkflowServices( provider=provider, llm_session_id=llm_session_id, test_run_id=test_run_id, workflow_name=workflow_name, knowledge_context_id=knowledge_context_id, ) def load_workflow_from_path(workflow_path: str) -> tuple[dict, str]: path = os.path.expanduser(workflow_path or "") if not path: raise WorkflowRuntimeError("Workflow path is empty.") if not os.path.exists(path): raise WorkflowRuntimeError(f"Workflow JSON was not found: {path}") with open(path, "r", encoding="utf-8") as workflow_file: workflow = json.load(workflow_file) if not isinstance(workflow, dict): raise WorkflowRuntimeError("Workflow JSON must contain an object.") return workflow, os.path.abspath(path) async def start_workflow_runtime_session( workflow: dict, workflow_name: str = "workflow", graph_path: Optional[List[str]] = None, provider: str = "", llm_session_id: str = "", test_run_id: str = "", knowledge_context_id: str = "", external_input_value: Any = None, external_input_values: Optional[Dict[str, Any]] = None, ) -> dict: graph = parse_workflow_graph(workflow, graph_path or []) state = create_interactive_run(graph) run_id = f"workflow_{uuid.uuid4().hex}" workflow_runtime_sessions[run_id] = { "state": state, "workflow_name": workflow_name or "workflow", "provider": provider, "llm_session_id": llm_session_id, "test_run_id": test_run_id, "knowledge_context_id": knowledge_context_id, } events = [] services = workflow_runtime_services( provider=provider, llm_session_id=llm_session_id, test_run_id=test_run_id, workflow_name=workflow_name or "workflow", knowledge_context_id=knowledge_context_id, ) result = await continue_interactive_run( state, services=services, external_input_value=external_input_value, external_input_values=external_input_values or {}, context=workflow_runtime_context(knowledge_context_id), on_event=events.append, ) return serialize_run_response(run_id, result, events) async def continue_workflow_runtime_session( run_id: str, user_answer: str = "", provider: str = "", llm_session_id: str = "", test_run_id: str = "", knowledge_context_id: str = "", external_input_value: Any = None, external_input_values: Optional[Dict[str, Any]] = None, ) -> dict: session = workflow_runtime_sessions.get(run_id) if not session: raise KeyError(run_id) state = session["state"] workflow_name = session.get("workflow_name", "workflow") selected_provider = provider or session.get("provider", "") selected_llm_session_id = llm_session_id or session.get("llm_session_id", "") selected_test_run_id = test_run_id or session.get("test_run_id", "") selected_knowledge_context_id = knowledge_context_id or session.get("knowledge_context_id", "") events = [] services = workflow_runtime_services( provider=selected_provider, llm_session_id=selected_llm_session_id, test_run_id=selected_test_run_id, workflow_name=workflow_name, knowledge_context_id=selected_knowledge_context_id, ) result = await continue_interactive_run( state, user_answer=user_answer, services=services, external_input_value=external_input_value, external_input_values=external_input_values or {}, context=workflow_runtime_context(selected_knowledge_context_id), on_event=events.append, ) return serialize_run_response(run_id, result, events) def remember_active_workflow_response(response: dict) -> None: with active_workflow_lock: active_workflow_session["last_response"] = response active_workflow_session["run_id"] = response.get("run_id", active_workflow_session.get("run_id", "")) status = response.get("status") active_workflow_session["active"] = bool(active_workflow_session.get("run_id")) and status not in {"complete", "restart"} def set_active_workflow_session( response: dict, workflow_path: str = "", workflow_name: str = "workflow", provider: str = "", llm_session_id: str = "", test_run_id: str = "", knowledge_context_id: str = "", ) -> None: with active_workflow_lock: old_run_id = active_workflow_session.get("run_id") new_run_id = response.get("run_id", "") if old_run_id and old_run_id != new_run_id: workflow_runtime_sessions.pop(old_run_id, None) active_workflow_session.update({ "active": response.get("status") not in {"complete", "restart"}, "run_id": new_run_id, "workflow_name": workflow_name or "workflow", "workflow_path": workflow_path, "provider": provider, "llm_session_id": llm_session_id, "test_run_id": test_run_id, "knowledge_context_id": knowledge_context_id, "last_response": response, }) def clear_active_workflow_session() -> str: with active_workflow_lock: run_id = active_workflow_session.get("run_id", "") active_workflow_session.update({ "active": False, "run_id": "", "workflow_name": "", "workflow_path": "", "provider": "", "llm_session_id": "", "test_run_id": "", "knowledge_context_id": "", "last_response": None, }) return run_id def active_workflow_snapshot() -> dict: with active_workflow_lock: snapshot = dict(active_workflow_session) run_id = snapshot.get("run_id") session = workflow_runtime_sessions.get(run_id) if run_id else None if not session: return { **snapshot, "active": False, "status": "idle", "pending_question": None, "runtime": None, } state = session["state"] status = "paused" if state.pending_question else "complete" if state.completed else "running" runtime_response = serialize_run_response( run_id, { "status": status, "state": state, "pending_question": state.pending_question, }, [], ) return { **snapshot, "active": snapshot.get("active", False) and status != "complete", "status": status, "pending_question": state.pending_question, "runtime": runtime_response, } def set_live_debug_enabled(enabled: bool) -> None: global live_debug_enabled live_debug_enabled = bool(enabled) print(f"[live-debug] {'enabled' if live_debug_enabled else 'disabled'}", flush=True) def live_debug_snapshot_message(event_type: str = "snapshot", events: Optional[List[dict]] = None) -> dict: return { "type": event_type, "live": live_debug_enabled, "state": active_workflow_snapshot(), "events": events or [], } async def broadcast_live_debug(event_type: str = "runtime-update", events: Optional[List[dict]] = None) -> None: if not live_debug_enabled or not live_debug_clients: return message = live_debug_snapshot_message(event_type, events) stale_clients = [] for websocket in list(live_debug_clients): try: await websocket.send_json(message) except Exception: stale_clients.append(websocket) for websocket in stale_clients: live_debug_clients.discard(websocket) async def start_active_workflow_from_path(workflow_path: str, provider: str = "") -> dict: workflow, absolute_path = load_workflow_from_path(workflow_path) workflow_name = os.path.basename(absolute_path) or "workflow" response = await start_workflow_runtime_session( workflow, workflow_name=workflow_name, provider=provider, ) set_active_workflow_session( response, workflow_path=absolute_path, workflow_name=workflow_name, provider=provider, ) await broadcast_live_debug("runtime-start", response.get("events") or []) return response @app.post("/workflow-runtime/start") async def workflow_runtime_start(request: WorkflowRuntimeStartRequest): try: return await start_workflow_runtime_session( request.workflow, workflow_name=request.workflow_name, graph_path=request.graph_path, provider=request.provider, llm_session_id=request.llm_session_id, test_run_id=request.test_run_id, knowledge_context_id=request.knowledge_context_id, external_input_value=request.external_input_value, external_input_values=request.external_input_values, ) except WorkflowRuntimeError as error: raise HTTPException(status_code=400, detail=str(error)) @app.post("/workflow-runtime/input") async def workflow_runtime_input(request: WorkflowRuntimeInputRequest): try: return await continue_workflow_runtime_session( request.run_id, user_answer=request.user_answer, provider=request.provider, llm_session_id=request.llm_session_id, test_run_id=request.test_run_id, knowledge_context_id=request.knowledge_context_id, external_input_value=request.external_input_value, external_input_values=request.external_input_values, ) except KeyError: raise HTTPException(status_code=404, detail="Workflow runtime session was not found.") except WorkflowRuntimeError as error: raise HTTPException(status_code=400, detail=str(error)) @app.get("/workflow-runtime/{run_id}") async def workflow_runtime_state(run_id: str): session = workflow_runtime_sessions.get(run_id) if not session: raise HTTPException(status_code=404, detail="Workflow runtime session was not found.") return serialize_run_response( run_id, { "status": "paused" if session["state"].pending_question else "complete" if session["state"].completed else "running", "state": session["state"], "pending_question": session["state"].pending_question, }, [], ) @app.delete("/workflow-runtime/{run_id}") async def workflow_runtime_delete(run_id: str): existed = run_id in workflow_runtime_sessions workflow_runtime_sessions.pop(run_id, None) return {"success": existed} @app.post("/workflow-runtime/active/start") async def workflow_runtime_active_start(request: WorkflowRuntimeActiveStartRequest): try: workflow = request.workflow workflow_path = "" workflow_name = request.workflow_name or "workflow" if workflow is None: workflow, workflow_path = load_workflow_from_path(request.workflow_path) workflow_name = os.path.basename(workflow_path) or workflow_name response = await start_workflow_runtime_session( workflow, workflow_name=workflow_name, graph_path=request.graph_path, provider=request.provider, llm_session_id=request.llm_session_id, test_run_id=request.test_run_id, knowledge_context_id=request.knowledge_context_id, external_input_value=request.external_input_value, external_input_values=request.external_input_values, ) set_active_workflow_session( response, workflow_path=workflow_path or request.workflow_path, workflow_name=workflow_name, provider=request.provider, llm_session_id=request.llm_session_id, test_run_id=request.test_run_id, knowledge_context_id=request.knowledge_context_id, ) await broadcast_live_debug("runtime-start", response.get("events") or []) return response except WorkflowRuntimeError as error: raise HTTPException(status_code=400, detail=str(error)) @app.post("/workflow-runtime/active/input") async def workflow_runtime_active_input(request: WorkflowRuntimeActiveInputRequest): with active_workflow_lock: run_id = active_workflow_session.get("run_id", "") provider = request.provider or active_workflow_session.get("provider", "") llm_session_id = request.llm_session_id or active_workflow_session.get("llm_session_id", "") test_run_id = request.test_run_id or active_workflow_session.get("test_run_id", "") knowledge_context_id = request.knowledge_context_id or active_workflow_session.get("knowledge_context_id", "") if not run_id: raise HTTPException(status_code=409, detail="No active workflow runtime session.") try: response = await continue_workflow_runtime_session( run_id, user_answer=request.user_answer, provider=provider, llm_session_id=llm_session_id, test_run_id=test_run_id, knowledge_context_id=knowledge_context_id, external_input_value=request.external_input_value, external_input_values=request.external_input_values, ) remember_active_workflow_response(response) await broadcast_live_debug("runtime-update", response.get("events") or []) return response except KeyError: clear_active_workflow_session() await broadcast_live_debug("runtime-missing", []) raise HTTPException(status_code=404, detail="Active workflow runtime session was not found.") except WorkflowRuntimeError as error: raise HTTPException(status_code=400, detail=str(error)) @app.get("/workflow-runtime/active/state") async def workflow_runtime_active_state(): return active_workflow_snapshot() @app.websocket("/workflow-runtime/active/live") async def workflow_runtime_active_live(websocket: WebSocket): await websocket.accept() if not live_debug_enabled: await websocket.send_json(live_debug_snapshot_message("live-disabled")) await websocket.close(code=1008) return live_debug_clients.add(websocket) try: await websocket.send_json(live_debug_snapshot_message("snapshot")) while True: await websocket.receive_text() except WebSocketDisconnect: pass finally: live_debug_clients.discard(websocket) @app.get("/workflow-runtime/live-debug") async def workflow_runtime_live_debug_status(): return { "enabled": live_debug_enabled, "clients": len(live_debug_clients), } @app.post("/workflow-runtime/live-debug") async def workflow_runtime_live_debug_toggle(request: dict): set_live_debug_enabled(bool(request.get("enabled"))) await broadcast_live_debug("live-debug", []) return { "enabled": live_debug_enabled, "clients": len(live_debug_clients), } @app.post("/workflow-runtime/active/reset") async def workflow_runtime_active_reset(): run_id = clear_active_workflow_session() if run_id: workflow_runtime_sessions.pop(run_id, None) await broadcast_live_debug("runtime-reset", []) return {"success": True} @app.get("/default-workflow") async def get_default_workflow(): path = default_workflow_path() if not os.path.exists(path): raise HTTPException(status_code=404, detail="Default workflow not found") try: with open(path, "r", encoding="utf-8") as workflow_file: return json.load(workflow_file) except Exception as error: raise HTTPException(status_code=500, detail=str(error)) from error @app.post("/default-workflow") async def save_default_workflow(request: SaveDefaultWorkflowRequest): document = request.document if not isinstance(document, dict): raise HTTPException(status_code=400, detail="Workflow document must be an object") path = default_workflow_path() temp_path = f"{path}.tmp" try: os.makedirs(os.path.dirname(path), exist_ok=True) with open(temp_path, "w", encoding="utf-8") as workflow_file: json.dump(document, workflow_file, ensure_ascii=False, indent=2) workflow_file.write("\n") os.replace(temp_path, path) return {"success": True, "path": relative_project_path(path)} except Exception as error: if os.path.exists(temp_path): os.remove(temp_path) raise HTTPException(status_code=500, detail=str(error)) from error @app.post("/save-preset") async def save_preset(request: PresetRequest): print(f"Saving preset: {request.name}") try: # Читаем текущий файл with open(presets_path, 'r', encoding='utf-8') as f: data = json.load(f) # Создаем массив custom_presets если его нет if 'custom_presets' not in data: data['custom_presets'] = [] # Создаем новый пресет new_preset = { "id": f"custom_{int(time.time() * 1000)}", "name": request.name, "text": request.text } # Добавляем в массив data['custom_presets'].append(new_preset) # Записываем обратно в файл с красивым форматированием with open(presets_path, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) return { "success": True, "message": "Preset saved successfully", "preset": new_preset } except Exception as e: print(f"Error saving preset: {e}") return { "success": False, "message": str(e) } dist_dir = os.path.join(project_dir, "dist") if os.path.isdir(dist_dir): app.mount("/", StaticFiles(directory=dist_dir, html=True), name="frontend") def cli_has(*flags: str) -> bool: return any(flag in sys.argv for flag in flags) def cli_arg_value(name: str, default: str = "") -> str: prefix = f"{name}=" for index, arg in enumerate(sys.argv): if arg == name and index + 1 < len(sys.argv): return sys.argv[index + 1] if arg.startswith(prefix): return arg[len(prefix):] return default if __name__ == "__main__": server_port = int(cli_arg_value("--port", os.environ.get("PORT", "8000"))) if cli_has("--reload"): uvicorn.run("main:app", host="0.0.0.0", port=server_port, reload=True) else: uvicorn.run(app, host="0.0.0.0", port=server_port)