from __future__ import annotations import os import sqlite3 import uuid from pathlib import Path from urllib.parse import quote from fastapi import APIRouter, BackgroundTasks, File, Form, Query, Request, UploadFile from fastapi.responses import PlainTextResponse from loguru import logger from importer import ExcelParseError, UnsupportedTemplateError, import_excel_bytes from .errors import ApiException from .health import build_health_data from .metrics import build_prometheus_text from .proxy_pool import proxy_pool from .response import ApiResponse, ok from .schemas import ( AccountPoolSnapshotResponse, AccountCooldownRequest, GeneratedPostListResponse, LeadListResponse, ErrorSummaryResponse, ImportExtensionRequest, ImportExtensionResponse, ImportExcelResponse, ProxyPoolSnapshotResponse, SessionPoolLightCheckResponse, TaskCallbackRetryResponse, CleanedNoteListResponse, RawNoteListResponse, TaskCreateRequest, TaskCreateResponse, TaskListResponse, TaskResultResponse, TaskStatusResponse, ) from .storage import LocalJsonStorage from .tasks import IN_PROGRESS_STATUSES, TaskRecord, TaskStatus from .runner import TaskRunner router = APIRouter() def _open_orchestrator_db(db_path: Path) -> sqlite3.Connection: try: if not db_path.exists() or not db_path.is_file(): raise ApiException("orchestrator_db") if not os.access(db_path, os.R_OK): raise ApiException("orchestrator_db") uri = f"file:{quote(str(db_path.resolve()))}?mode=ro" return sqlite3.connect(uri, uri=True) except ApiException: raise except Exception: raise ApiException("orchestrator_db") @router.get("/tasks", response_model=ApiResponse) def list_tasks( request: Request, limit: int = Query(20, ge=1, le=500), offset: int = Query(0, ge=0), status: str | None = Query(None), task_type: str | None = Query(None), engine: str | None = Query(None), error_kind: str | None = Query(None), sort: str | None = Query(None), ) -> ApiResponse: storage: LocalJsonStorage = request.app.state.storage def _split(value: str | None) -> list[str] | None: if value is None: return None parts = [p.strip() for p in str(value).split(",")] parts = [p for p in parts if p != ""] return parts or None tasks, total = storage.list_tasks( limit=limit, offset=offset, status=_split(status), task_type=_split(task_type), engine=_split(engine), error_kind=_split(error_kind), sort=str(sort or "").strip().lower() or "mtime", ) return ok(TaskListResponse(tasks=tasks, total=total, limit=limit, offset=offset)) @router.get("/errors/summary", response_model=ApiResponse) def errors_summary( request: Request, scan_limit: int | None = Query(None, ge=1, le=20000), limit: int = Query(20, ge=1, le=500), offset: int = Query(0, ge=0), status: str | None = Query(None), error_kind: str | None = Query(None), ) -> ApiResponse: storage: LocalJsonStorage = request.app.state.storage config = request.app.state.config def _split(value: str | None) -> list[str] | None: if value is None: return None parts = [p.strip() for p in str(value).split(",")] parts = [p for p in parts if p != ""] return parts or None scan = int(scan_limit or getattr(config, "error_summary_scan_limit", 1000) or 1000) counts, tasks, total, scanned = storage.summarize_recent_errors( scan_limit=scan, limit=limit, offset=offset, status=_split(status), error_kind=_split(error_kind), ) return ok( ErrorSummaryResponse( scan_limit=scan, scanned=scanned, error_kind_counts=counts, tasks=tasks, total=total, limit=limit, offset=offset, ) ) @router.post("/tasks", response_model=ApiResponse) def create_task( payload: TaskCreateRequest, background_tasks: BackgroundTasks, request: Request, ) -> ApiResponse: storage: LocalJsonStorage = request.app.state.storage runner: TaskRunner = request.app.state.runner task_type = (payload.task_type or "").strip() task_type_lower = task_type.lower() base_payload = dict(payload.payload or {}) target = str(payload.target or "").strip() if target != "": if task_type_lower == "note_url": base_payload.setdefault("note_url", target) elif task_type_lower == "search": if "query" not in base_payload and "keyword" not in base_payload: base_payload["query"] = target elif task_type_lower == "user_profile": if target.startswith("http://") or target.startswith("https://"): base_payload.setdefault("user_url", target) else: base_payload.setdefault("user_id", target) if target == "": if task_type_lower == "note_url": target = str(base_payload.get("note_url") or base_payload.get("url") or "").strip() elif task_type_lower == "search": target = str(base_payload.get("query") or base_payload.get("keyword") or "").strip() elif task_type_lower == "user_profile": target = str( base_payload.get("user_id") or base_payload.get("uid") or base_payload.get("user_url") or base_payload.get("url") or "" ).strip() task = TaskRecord( id=uuid.uuid4().hex, status=TaskStatus.queued, task_type=task_type, target=target, payload=base_payload, engine=payload.engine, ) storage.create_task(task) logger.bind(task_id=task.id, task_type=task.task_type, engine=str(payload.engine or ""), status="queued").info( "task_created" ) background_tasks.add_task(runner.run_task, task.id) return ok(TaskCreateResponse(task=task)) @router.get("/tasks/{task_id}", response_model=ApiResponse) def get_task(task_id: str, request: Request) -> ApiResponse: storage: LocalJsonStorage = request.app.state.storage task = storage.get_task(task_id) if task is None: raise ApiException("task_not_found") return ok(TaskStatusResponse(task=task)) @router.get("/tasks/{task_id}/result", response_model=ApiResponse) def get_task_result(task_id: str, request: Request) -> ApiResponse: storage: LocalJsonStorage = request.app.state.storage task = storage.get_task(task_id) if task is None: raise ApiException("task_not_found") result = storage.read_result(task_id) if result is None: if task.status == TaskStatus.queued or task.status in IN_PROGRESS_STATUSES: raise ApiException( "task_not_ready", data={"task_id": task_id, "status": task.status, "message": "result not ready"}, ) raise ApiException( "parse", msg="result missing", data={"task_id": task_id, "status": task.status, "message": "result missing"}, ) raw = None normalized = None meta: dict = {} if isinstance(result, dict): raw = result.get("raw") normalized = result.get("normalized") meta_val = result.get("meta") if isinstance(meta_val, dict): meta = dict(meta_val) else: meta = {} else: raw = result meta.setdefault("task_id", task_id) meta.setdefault("ok", task.status in (TaskStatus.succeeded, TaskStatus.rpa_imported)) return ok(TaskResultResponse(task_id=task_id, status=task.status, raw=raw, normalized=normalized, meta=meta)) @router.post("/import/extension", response_model=ApiResponse) def import_extension(payload: ImportExtensionRequest, request: Request) -> ApiResponse: storage: LocalJsonStorage = request.app.state.storage runner: TaskRunner = request.app.state.runner task_id = str(payload.task_id or "").strip() task = storage.get_task(task_id) if task is None: raise ApiException("task_not_found") meta = dict(payload.meta or {}) meta.setdefault("task_id", task_id) meta.setdefault("ok", True) result = {"raw": payload.raw, "normalized": payload.normalized, "meta": meta} try: updated = runner.import_extension_result(task_id, result) except KeyError: raise ApiException("task_not_found") return ok(ImportExtensionResponse(task=updated)) @router.post("/tasks/{task_id}/retry", response_model=ApiResponse) def retry_task(task_id: str, background_tasks: BackgroundTasks, request: Request) -> ApiResponse: storage: LocalJsonStorage = request.app.state.storage runner: TaskRunner = request.app.state.runner task = storage.get_task(task_id) if task is None: raise ApiException("task_not_found") task.status = TaskStatus.queued task.retry_count = 0 task.error = None storage.update_task(task) logger.bind(task_id=task.id, status="queued").info("task_retried_manually") background_tasks.add_task(runner.run_task, task.id) return ok(TaskStatusResponse(task=task)) @router.post("/tasks/{task_id}/cancel", response_model=ApiResponse) def cancel_task(task_id: str, request: Request) -> ApiResponse: storage: LocalJsonStorage = request.app.state.storage task = storage.get_task(task_id) if task is None: raise ApiException("task_not_found") if task.status in IN_PROGRESS_STATUSES or task.status == TaskStatus.queued: task.status = TaskStatus.failed task.error = {"kind": "cancelled", "msg": "Task manually cancelled"} storage.update_task(task) logger.bind(task_id=task.id, status="failed").info("task_cancelled_manually") return ok(TaskStatusResponse(task=task)) @router.post("/tasks/{task_id}/mark-rpa", response_model=ApiResponse) def mark_task_rpa(task_id: str, request: Request) -> ApiResponse: storage: LocalJsonStorage = request.app.state.storage task = storage.get_task(task_id) if task is None: raise ApiException("task_not_found") task.status = TaskStatus.waiting_rpa storage.update_task(task) logger.bind(task_id=task.id, status="waiting_rpa").info("task_marked_waiting_rpa") return ok(TaskStatusResponse(task=task)) @router.post("/resources/accounts/{account_id}/cooldown", response_model=ApiResponse) def cooldown_account(account_id: str, payload: AccountCooldownRequest, request: Request) -> ApiResponse: runner: TaskRunner = request.app.state.runner try: runner._stability.account_pool.trigger_cooldown(account_id, seconds=payload.seconds, reason="manual_cooldown") except Exception as e: raise ApiException("invalid_target", msg=str(e)) return ok({"account_id": account_id, "cooldown_seconds": payload.seconds}) @router.post("/resources/accounts/{account_id}/disable", response_model=ApiResponse) def disable_account(account_id: str, request: Request) -> ApiResponse: runner: TaskRunner = request.app.state.runner try: runner._stability.account_pool.trigger_cooldown(account_id, seconds=315360000, reason="manual_disable") except Exception as e: raise ApiException("invalid_target", msg=str(e)) return ok({"account_id": account_id, "disabled": True}) @router.post("/tasks/{task_id}/callback/retry", response_model=ApiResponse) def retry_task_callback(task_id: str, request: Request) -> ApiResponse: storage: LocalJsonStorage = request.app.state.storage runner: TaskRunner = request.app.state.runner task = storage.get_task(task_id) if task is None: raise ApiException("task_not_found") try: state = runner.retry_callback(task_id) except ValueError as e: raise ApiException("invalid_target", msg=str(e)) except RuntimeError as e: raise ApiException( "task_not_ready", msg=str(e), data={"task_id": task_id, "status": task.status, "message": str(e)}, ) except Exception as e: raise ApiException("parse", msg=str(e)) return ok(TaskCallbackRetryResponse(task_id=task_id, callback=state)) @router.post("/import/excel", response_model=ApiResponse) async def import_excel( request: Request, file: UploadFile = File(...), operator: str = Form(...), source_name: str | None = Form(None), ) -> ApiResponse: data = await file.read() try: result = import_excel_bytes(data, operator=operator, source_name=source_name, filename=file.filename) except (UnsupportedTemplateError, ExcelParseError) as e: raise ApiException("invalid_target", msg=str(e)) except Exception as e: raise ApiException("parse", msg=str(e)) return ok(ImportExcelResponse(**result)) @router.get("/health", response_model=ApiResponse) def health(request: Request) -> ApiResponse: runner: TaskRunner = request.app.state.runner config = request.app.state.config return ok(build_health_data(runner=runner, config=config)) @router.get("/resources/accounts", response_model=ApiResponse) def get_resource_accounts(request: Request) -> ApiResponse: runner: TaskRunner = request.app.state.runner data = runner._stability.account_pool.snapshot() return ok(AccountPoolSnapshotResponse(**data)) @router.get("/resources/sessions", response_model=ApiResponse) def get_resource_sessions(request: Request) -> ApiResponse: runner: TaskRunner = request.app.state.runner data = runner._stability.session_pool.check_all_light() return ok(SessionPoolLightCheckResponse(**data)) @router.get("/resources/proxies", response_model=ApiResponse) def get_resource_proxies(_: Request) -> ApiResponse: data = proxy_pool.snapshot() return ok(ProxyPoolSnapshotResponse(**data)) @router.get("/metrics") def metrics(request: Request) -> PlainTextResponse: runner: TaskRunner = request.app.state.runner config = request.app.state.config text = build_prometheus_text(runner=runner, config=config) return PlainTextResponse(text, media_type="text/plain; version=0.0.4; charset=utf-8") @router.get("/content/raw-notes", response_model=ApiResponse) def list_raw_notes( request: Request, limit: int = Query(20, ge=1, le=500), offset: int = Query(0, ge=0), query: str | None = Query(None), ) -> ApiResponse: config = request.app.state.config db_path = Path(getattr(config, "orchestrator_db_path")) q = str(query or "").strip() pat = f"%{q}%" if q else None conn = _open_orchestrator_db(db_path) conn.row_factory = sqlite3.Row try: cur = conn.cursor() where = "" params: list = [] if pat is not None: where = " WHERE author LIKE ? OR url LIKE ? OR content LIKE ?" params.extend([pat, pat, pat]) total = int(cur.execute(f"SELECT COUNT(1) AS c FROM raw_note{where}", params).fetchone()["c"]) rows = cur.execute( f"SELECT id, source_platform, content, author, url, created_at FROM raw_note{where} ORDER BY id DESC LIMIT ? OFFSET ?", [*params, int(limit), int(offset)], ).fetchall() notes = [dict(r) for r in rows] return ok(RawNoteListResponse(notes=notes, total=total, limit=limit, offset=offset, query=q or None)) finally: conn.close() @router.get("/content/cleaned-notes", response_model=ApiResponse) def list_cleaned_notes( request: Request, limit: int = Query(20, ge=1, le=500), offset: int = Query(0, ge=0), query: str | None = Query(None), ) -> ApiResponse: config = request.app.state.config db_path = Path(getattr(config, "orchestrator_db_path")) q = str(query or "").strip() pat = f"%{q}%" if q else None conn = _open_orchestrator_db(db_path) conn.row_factory = sqlite3.Row try: cur = conn.cursor() where = "" params: list = [] if pat is not None: where = " WHERE cn.cleaned_content LIKE ? OR rn.author LIKE ? OR rn.url LIKE ? OR rn.content LIKE ?" params.extend([pat, pat, pat, pat]) total = int( cur.execute( f"SELECT COUNT(1) AS c FROM cleaned_note cn LEFT JOIN raw_note rn ON rn.id = cn.raw_note_id{where}", params, ).fetchone()["c"] ) rows = cur.execute( "SELECT cn.id, cn.raw_note_id, cn.cleaned_content, cn.created_at, rn.author AS raw_author, rn.url AS raw_url " "FROM cleaned_note cn LEFT JOIN raw_note rn ON rn.id = cn.raw_note_id" f"{where} ORDER BY cn.id DESC LIMIT ? OFFSET ?", [*params, int(limit), int(offset)], ).fetchall() notes = [dict(r) for r in rows] return ok(CleanedNoteListResponse(notes=notes, total=total, limit=limit, offset=offset, query=q or None)) finally: conn.close() @router.get("/business/posts", response_model=ApiResponse) def list_generated_posts( request: Request, limit: int = Query(20, ge=1, le=500), offset: int = Query(0, ge=0), status: str | None = Query(None), ) -> ApiResponse: config = request.app.state.config db_path = Path(getattr(config, "orchestrator_db_path")) conn = _open_orchestrator_db(db_path) conn.row_factory = sqlite3.Row try: cur = conn.cursor() where = "" params: list = [] if status: where = " WHERE status = ?" params.append(status) total = int(cur.execute(f"SELECT COUNT(1) AS c FROM generated_post{where}", params).fetchone()["c"]) rows = cur.execute( f"SELECT * FROM generated_post{where} ORDER BY id DESC LIMIT ? OFFSET ?", [*params, int(limit), int(offset)], ).fetchall() posts = [dict(r) for r in rows] return ok(GeneratedPostListResponse(posts=posts, total=total, limit=limit, offset=offset)) finally: conn.close() @router.get("/business/leads", response_model=ApiResponse) def list_leads( request: Request, limit: int = Query(20, ge=1, le=500), offset: int = Query(0, ge=0), status: str | None = Query(None), ) -> ApiResponse: config = request.app.state.config db_path = Path(getattr(config, "orchestrator_db_path")) conn = _open_orchestrator_db(db_path) conn.row_factory = sqlite3.Row try: cur = conn.cursor() where = "" params: list = [] if status: where = " WHERE status = ?" params.append(status) total = int(cur.execute(f"SELECT COUNT(1) AS c FROM lead{where}", params).fetchone()["c"]) rows = cur.execute( f"SELECT * FROM lead{where} ORDER BY id DESC LIMIT ? OFFSET ?", [*params, int(limit), int(offset)], ).fetchall() leads = [dict(r) for r in rows] return ok(LeadListResponse(leads=leads, total=total, limit=limit, offset=offset)) finally: conn.close()