| from __future__ import annotations |
|
|
| import os |
| import sqlite3 |
| import uuid |
| from pathlib import Path |
| from urllib.parse import quote |
|
|
| from fastapi import APIRouter, BackgroundTasks, File, Form, Query, Request, UploadFile |
| from fastapi.responses import PlainTextResponse |
| from loguru import logger |
|
|
| from importer import ExcelParseError, UnsupportedTemplateError, import_excel_bytes |
| from .errors import ApiException |
| from .health import build_health_data |
| from .metrics import build_prometheus_text |
| from .proxy_pool import proxy_pool |
| from .response import ApiResponse, ok |
| from .schemas import ( |
| AccountPoolSnapshotResponse, |
| AccountCooldownRequest, |
| GeneratedPostListResponse, |
| LeadListResponse, |
| ErrorSummaryResponse, |
| ImportExtensionRequest, |
| ImportExtensionResponse, |
| ImportExcelResponse, |
| ProxyPoolSnapshotResponse, |
| SessionPoolLightCheckResponse, |
| TaskCallbackRetryResponse, |
| CleanedNoteListResponse, |
| RawNoteListResponse, |
| TaskCreateRequest, |
| TaskCreateResponse, |
| TaskListResponse, |
| TaskResultResponse, |
| TaskStatusResponse, |
| ) |
| from .storage import LocalJsonStorage |
| from .tasks import IN_PROGRESS_STATUSES, TaskRecord, TaskStatus |
| from .runner import TaskRunner |
|
|
|
|
| router = APIRouter() |
|
|
|
|
| def _open_orchestrator_db(db_path: Path) -> sqlite3.Connection: |
| try: |
| if not db_path.exists() or not db_path.is_file(): |
| raise ApiException("orchestrator_db") |
| if not os.access(db_path, os.R_OK): |
| raise ApiException("orchestrator_db") |
| uri = f"file:{quote(str(db_path.resolve()))}?mode=ro" |
| return sqlite3.connect(uri, uri=True) |
| except ApiException: |
| raise |
| except Exception: |
| raise ApiException("orchestrator_db") |
|
|
|
|
| @router.get("/tasks", response_model=ApiResponse) |
| def list_tasks( |
| request: Request, |
| limit: int = Query(20, ge=1, le=500), |
| offset: int = Query(0, ge=0), |
| status: str | None = Query(None), |
| task_type: str | None = Query(None), |
| engine: str | None = Query(None), |
| error_kind: str | None = Query(None), |
| sort: str | None = Query(None), |
| ) -> ApiResponse: |
| storage: LocalJsonStorage = request.app.state.storage |
|
|
| def _split(value: str | None) -> list[str] | None: |
| if value is None: |
| return None |
| parts = [p.strip() for p in str(value).split(",")] |
| parts = [p for p in parts if p != ""] |
| return parts or None |
|
|
| tasks, total = storage.list_tasks( |
| limit=limit, |
| offset=offset, |
| status=_split(status), |
| task_type=_split(task_type), |
| engine=_split(engine), |
| error_kind=_split(error_kind), |
| sort=str(sort or "").strip().lower() or "mtime", |
| ) |
| return ok(TaskListResponse(tasks=tasks, total=total, limit=limit, offset=offset)) |
|
|
|
|
| @router.get("/errors/summary", response_model=ApiResponse) |
| def errors_summary( |
| request: Request, |
| scan_limit: int | None = Query(None, ge=1, le=20000), |
| limit: int = Query(20, ge=1, le=500), |
| offset: int = Query(0, ge=0), |
| status: str | None = Query(None), |
| error_kind: str | None = Query(None), |
| ) -> ApiResponse: |
| storage: LocalJsonStorage = request.app.state.storage |
| config = request.app.state.config |
|
|
| def _split(value: str | None) -> list[str] | None: |
| if value is None: |
| return None |
| parts = [p.strip() for p in str(value).split(",")] |
| parts = [p for p in parts if p != ""] |
| return parts or None |
|
|
| scan = int(scan_limit or getattr(config, "error_summary_scan_limit", 1000) or 1000) |
| counts, tasks, total, scanned = storage.summarize_recent_errors( |
| scan_limit=scan, |
| limit=limit, |
| offset=offset, |
| status=_split(status), |
| error_kind=_split(error_kind), |
| ) |
| return ok( |
| ErrorSummaryResponse( |
| scan_limit=scan, |
| scanned=scanned, |
| error_kind_counts=counts, |
| tasks=tasks, |
| total=total, |
| limit=limit, |
| offset=offset, |
| ) |
| ) |
|
|
|
|
| @router.post("/tasks", response_model=ApiResponse) |
| def create_task( |
| payload: TaskCreateRequest, |
| background_tasks: BackgroundTasks, |
| request: Request, |
| ) -> ApiResponse: |
| storage: LocalJsonStorage = request.app.state.storage |
| runner: TaskRunner = request.app.state.runner |
|
|
| task_type = (payload.task_type or "").strip() |
| task_type_lower = task_type.lower() |
| base_payload = dict(payload.payload or {}) |
| target = str(payload.target or "").strip() |
|
|
| if target != "": |
| if task_type_lower == "note_url": |
| base_payload.setdefault("note_url", target) |
| elif task_type_lower == "search": |
| if "query" not in base_payload and "keyword" not in base_payload: |
| base_payload["query"] = target |
| elif task_type_lower == "user_profile": |
| if target.startswith("http://") or target.startswith("https://"): |
| base_payload.setdefault("user_url", target) |
| else: |
| base_payload.setdefault("user_id", target) |
|
|
| if target == "": |
| if task_type_lower == "note_url": |
| target = str(base_payload.get("note_url") or base_payload.get("url") or "").strip() |
| elif task_type_lower == "search": |
| target = str(base_payload.get("query") or base_payload.get("keyword") or "").strip() |
| elif task_type_lower == "user_profile": |
| target = str( |
| base_payload.get("user_id") |
| or base_payload.get("uid") |
| or base_payload.get("user_url") |
| or base_payload.get("url") |
| or "" |
| ).strip() |
|
|
| task = TaskRecord( |
| id=uuid.uuid4().hex, |
| status=TaskStatus.queued, |
| task_type=task_type, |
| target=target, |
| payload=base_payload, |
| engine=payload.engine, |
| ) |
| storage.create_task(task) |
| logger.bind(task_id=task.id, task_type=task.task_type, engine=str(payload.engine or ""), status="queued").info( |
| "task_created" |
| ) |
| background_tasks.add_task(runner.run_task, task.id) |
| return ok(TaskCreateResponse(task=task)) |
|
|
|
|
| @router.get("/tasks/{task_id}", response_model=ApiResponse) |
| def get_task(task_id: str, request: Request) -> ApiResponse: |
| storage: LocalJsonStorage = request.app.state.storage |
| task = storage.get_task(task_id) |
| if task is None: |
| raise ApiException("task_not_found") |
| return ok(TaskStatusResponse(task=task)) |
|
|
|
|
| @router.get("/tasks/{task_id}/result", response_model=ApiResponse) |
| def get_task_result(task_id: str, request: Request) -> ApiResponse: |
| storage: LocalJsonStorage = request.app.state.storage |
| task = storage.get_task(task_id) |
| if task is None: |
| raise ApiException("task_not_found") |
|
|
| result = storage.read_result(task_id) |
| if result is None: |
| if task.status == TaskStatus.queued or task.status in IN_PROGRESS_STATUSES: |
| raise ApiException( |
| "task_not_ready", |
| data={"task_id": task_id, "status": task.status, "message": "result not ready"}, |
| ) |
| raise ApiException( |
| "parse", |
| msg="result missing", |
| data={"task_id": task_id, "status": task.status, "message": "result missing"}, |
| ) |
|
|
| raw = None |
| normalized = None |
| meta: dict = {} |
| if isinstance(result, dict): |
| raw = result.get("raw") |
| normalized = result.get("normalized") |
| meta_val = result.get("meta") |
| if isinstance(meta_val, dict): |
| meta = dict(meta_val) |
| else: |
| meta = {} |
| else: |
| raw = result |
|
|
| meta.setdefault("task_id", task_id) |
| meta.setdefault("ok", task.status in (TaskStatus.succeeded, TaskStatus.rpa_imported)) |
|
|
| return ok(TaskResultResponse(task_id=task_id, status=task.status, raw=raw, normalized=normalized, meta=meta)) |
|
|
|
|
| @router.post("/import/extension", response_model=ApiResponse) |
| def import_extension(payload: ImportExtensionRequest, request: Request) -> ApiResponse: |
| storage: LocalJsonStorage = request.app.state.storage |
| runner: TaskRunner = request.app.state.runner |
|
|
| task_id = str(payload.task_id or "").strip() |
| task = storage.get_task(task_id) |
| if task is None: |
| raise ApiException("task_not_found") |
|
|
| meta = dict(payload.meta or {}) |
| meta.setdefault("task_id", task_id) |
| meta.setdefault("ok", True) |
| result = {"raw": payload.raw, "normalized": payload.normalized, "meta": meta} |
|
|
| try: |
| updated = runner.import_extension_result(task_id, result) |
| except KeyError: |
| raise ApiException("task_not_found") |
| return ok(ImportExtensionResponse(task=updated)) |
|
|
|
|
| @router.post("/tasks/{task_id}/retry", response_model=ApiResponse) |
| def retry_task(task_id: str, background_tasks: BackgroundTasks, request: Request) -> ApiResponse: |
| storage: LocalJsonStorage = request.app.state.storage |
| runner: TaskRunner = request.app.state.runner |
| task = storage.get_task(task_id) |
| if task is None: |
| raise ApiException("task_not_found") |
| |
| task.status = TaskStatus.queued |
| task.retry_count = 0 |
| task.error = None |
| storage.update_task(task) |
| logger.bind(task_id=task.id, status="queued").info("task_retried_manually") |
| background_tasks.add_task(runner.run_task, task.id) |
| return ok(TaskStatusResponse(task=task)) |
|
|
| @router.post("/tasks/{task_id}/cancel", response_model=ApiResponse) |
| def cancel_task(task_id: str, request: Request) -> ApiResponse: |
| storage: LocalJsonStorage = request.app.state.storage |
| task = storage.get_task(task_id) |
| if task is None: |
| raise ApiException("task_not_found") |
| |
| if task.status in IN_PROGRESS_STATUSES or task.status == TaskStatus.queued: |
| task.status = TaskStatus.failed |
| task.error = {"kind": "cancelled", "msg": "Task manually cancelled"} |
| storage.update_task(task) |
| logger.bind(task_id=task.id, status="failed").info("task_cancelled_manually") |
| |
| return ok(TaskStatusResponse(task=task)) |
|
|
| @router.post("/tasks/{task_id}/mark-rpa", response_model=ApiResponse) |
| def mark_task_rpa(task_id: str, request: Request) -> ApiResponse: |
| storage: LocalJsonStorage = request.app.state.storage |
| task = storage.get_task(task_id) |
| if task is None: |
| raise ApiException("task_not_found") |
| |
| task.status = TaskStatus.waiting_rpa |
| storage.update_task(task) |
| logger.bind(task_id=task.id, status="waiting_rpa").info("task_marked_waiting_rpa") |
| |
| return ok(TaskStatusResponse(task=task)) |
|
|
| @router.post("/resources/accounts/{account_id}/cooldown", response_model=ApiResponse) |
| def cooldown_account(account_id: str, payload: AccountCooldownRequest, request: Request) -> ApiResponse: |
| runner: TaskRunner = request.app.state.runner |
| try: |
| runner._stability.account_pool.trigger_cooldown(account_id, seconds=payload.seconds, reason="manual_cooldown") |
| except Exception as e: |
| raise ApiException("invalid_target", msg=str(e)) |
| return ok({"account_id": account_id, "cooldown_seconds": payload.seconds}) |
|
|
| @router.post("/resources/accounts/{account_id}/disable", response_model=ApiResponse) |
| def disable_account(account_id: str, request: Request) -> ApiResponse: |
| runner: TaskRunner = request.app.state.runner |
| try: |
| runner._stability.account_pool.trigger_cooldown(account_id, seconds=315360000, reason="manual_disable") |
| except Exception as e: |
| raise ApiException("invalid_target", msg=str(e)) |
| return ok({"account_id": account_id, "disabled": True}) |
|
|
|
|
| @router.post("/tasks/{task_id}/callback/retry", response_model=ApiResponse) |
| def retry_task_callback(task_id: str, request: Request) -> ApiResponse: |
| storage: LocalJsonStorage = request.app.state.storage |
| runner: TaskRunner = request.app.state.runner |
| task = storage.get_task(task_id) |
| if task is None: |
| raise ApiException("task_not_found") |
|
|
| try: |
| state = runner.retry_callback(task_id) |
| except ValueError as e: |
| raise ApiException("invalid_target", msg=str(e)) |
| except RuntimeError as e: |
| raise ApiException( |
| "task_not_ready", |
| msg=str(e), |
| data={"task_id": task_id, "status": task.status, "message": str(e)}, |
| ) |
| except Exception as e: |
| raise ApiException("parse", msg=str(e)) |
|
|
| return ok(TaskCallbackRetryResponse(task_id=task_id, callback=state)) |
|
|
|
|
| @router.post("/import/excel", response_model=ApiResponse) |
| async def import_excel( |
| request: Request, |
| file: UploadFile = File(...), |
| operator: str = Form(...), |
| source_name: str | None = Form(None), |
| ) -> ApiResponse: |
| data = await file.read() |
| try: |
| result = import_excel_bytes(data, operator=operator, source_name=source_name, filename=file.filename) |
| except (UnsupportedTemplateError, ExcelParseError) as e: |
| raise ApiException("invalid_target", msg=str(e)) |
| except Exception as e: |
| raise ApiException("parse", msg=str(e)) |
| return ok(ImportExcelResponse(**result)) |
|
|
|
|
| @router.get("/health", response_model=ApiResponse) |
| def health(request: Request) -> ApiResponse: |
| runner: TaskRunner = request.app.state.runner |
| config = request.app.state.config |
| return ok(build_health_data(runner=runner, config=config)) |
|
|
|
|
| @router.get("/resources/accounts", response_model=ApiResponse) |
| def get_resource_accounts(request: Request) -> ApiResponse: |
| runner: TaskRunner = request.app.state.runner |
| data = runner._stability.account_pool.snapshot() |
| return ok(AccountPoolSnapshotResponse(**data)) |
|
|
|
|
| @router.get("/resources/sessions", response_model=ApiResponse) |
| def get_resource_sessions(request: Request) -> ApiResponse: |
| runner: TaskRunner = request.app.state.runner |
| data = runner._stability.session_pool.check_all_light() |
| return ok(SessionPoolLightCheckResponse(**data)) |
|
|
|
|
| @router.get("/resources/proxies", response_model=ApiResponse) |
| def get_resource_proxies(_: Request) -> ApiResponse: |
| data = proxy_pool.snapshot() |
| return ok(ProxyPoolSnapshotResponse(**data)) |
|
|
|
|
|
|
| @router.get("/metrics") |
| def metrics(request: Request) -> PlainTextResponse: |
| runner: TaskRunner = request.app.state.runner |
| config = request.app.state.config |
| text = build_prometheus_text(runner=runner, config=config) |
| return PlainTextResponse(text, media_type="text/plain; version=0.0.4; charset=utf-8") |
|
|
|
|
| @router.get("/content/raw-notes", response_model=ApiResponse) |
| def list_raw_notes( |
| request: Request, |
| limit: int = Query(20, ge=1, le=500), |
| offset: int = Query(0, ge=0), |
| query: str | None = Query(None), |
| ) -> ApiResponse: |
| config = request.app.state.config |
| db_path = Path(getattr(config, "orchestrator_db_path")) |
| q = str(query or "").strip() |
| pat = f"%{q}%" if q else None |
|
|
| conn = _open_orchestrator_db(db_path) |
| conn.row_factory = sqlite3.Row |
| try: |
| cur = conn.cursor() |
| where = "" |
| params: list = [] |
| if pat is not None: |
| where = " WHERE author LIKE ? OR url LIKE ? OR content LIKE ?" |
| params.extend([pat, pat, pat]) |
| total = int(cur.execute(f"SELECT COUNT(1) AS c FROM raw_note{where}", params).fetchone()["c"]) |
| rows = cur.execute( |
| f"SELECT id, source_platform, content, author, url, created_at FROM raw_note{where} ORDER BY id DESC LIMIT ? OFFSET ?", |
| [*params, int(limit), int(offset)], |
| ).fetchall() |
| notes = [dict(r) for r in rows] |
| return ok(RawNoteListResponse(notes=notes, total=total, limit=limit, offset=offset, query=q or None)) |
| finally: |
| conn.close() |
|
|
|
|
| @router.get("/content/cleaned-notes", response_model=ApiResponse) |
| def list_cleaned_notes( |
| request: Request, |
| limit: int = Query(20, ge=1, le=500), |
| offset: int = Query(0, ge=0), |
| query: str | None = Query(None), |
| ) -> ApiResponse: |
| config = request.app.state.config |
| db_path = Path(getattr(config, "orchestrator_db_path")) |
| q = str(query or "").strip() |
| pat = f"%{q}%" if q else None |
|
|
| conn = _open_orchestrator_db(db_path) |
| conn.row_factory = sqlite3.Row |
| try: |
| cur = conn.cursor() |
| where = "" |
| params: list = [] |
| if pat is not None: |
| where = " WHERE cn.cleaned_content LIKE ? OR rn.author LIKE ? OR rn.url LIKE ? OR rn.content LIKE ?" |
| params.extend([pat, pat, pat, pat]) |
| total = int( |
| cur.execute( |
| f"SELECT COUNT(1) AS c FROM cleaned_note cn LEFT JOIN raw_note rn ON rn.id = cn.raw_note_id{where}", |
| params, |
| ).fetchone()["c"] |
| ) |
| rows = cur.execute( |
| "SELECT cn.id, cn.raw_note_id, cn.cleaned_content, cn.created_at, rn.author AS raw_author, rn.url AS raw_url " |
| "FROM cleaned_note cn LEFT JOIN raw_note rn ON rn.id = cn.raw_note_id" |
| f"{where} ORDER BY cn.id DESC LIMIT ? OFFSET ?", |
| [*params, int(limit), int(offset)], |
| ).fetchall() |
| notes = [dict(r) for r in rows] |
| return ok(CleanedNoteListResponse(notes=notes, total=total, limit=limit, offset=offset, query=q or None)) |
| finally: |
| conn.close() |
|
|
| @router.get("/business/posts", response_model=ApiResponse) |
| def list_generated_posts( |
| request: Request, |
| limit: int = Query(20, ge=1, le=500), |
| offset: int = Query(0, ge=0), |
| status: str | None = Query(None), |
| ) -> ApiResponse: |
| config = request.app.state.config |
| db_path = Path(getattr(config, "orchestrator_db_path")) |
|
|
| conn = _open_orchestrator_db(db_path) |
| conn.row_factory = sqlite3.Row |
| try: |
| cur = conn.cursor() |
| where = "" |
| params: list = [] |
| if status: |
| where = " WHERE status = ?" |
| params.append(status) |
| |
| total = int(cur.execute(f"SELECT COUNT(1) AS c FROM generated_post{where}", params).fetchone()["c"]) |
| rows = cur.execute( |
| f"SELECT * FROM generated_post{where} ORDER BY id DESC LIMIT ? OFFSET ?", |
| [*params, int(limit), int(offset)], |
| ).fetchall() |
| posts = [dict(r) for r in rows] |
| return ok(GeneratedPostListResponse(posts=posts, total=total, limit=limit, offset=offset)) |
| finally: |
| conn.close() |
|
|
| @router.get("/business/leads", response_model=ApiResponse) |
| def list_leads( |
| request: Request, |
| limit: int = Query(20, ge=1, le=500), |
| offset: int = Query(0, ge=0), |
| status: str | None = Query(None), |
| ) -> ApiResponse: |
| config = request.app.state.config |
| db_path = Path(getattr(config, "orchestrator_db_path")) |
|
|
| conn = _open_orchestrator_db(db_path) |
| conn.row_factory = sqlite3.Row |
| try: |
| cur = conn.cursor() |
| where = "" |
| params: list = [] |
| if status: |
| where = " WHERE status = ?" |
| params.append(status) |
| |
| total = int(cur.execute(f"SELECT COUNT(1) AS c FROM lead{where}", params).fetchone()["c"]) |
| rows = cur.execute( |
| f"SELECT * FROM lead{where} ORDER BY id DESC LIMIT ? OFFSET ?", |
| [*params, int(limit), int(offset)], |
| ).fetchall() |
| leads = [dict(r) for r in rows] |
| return ok(LeadListResponse(leads=leads, total=total, limit=limit, offset=offset)) |
| finally: |
| conn.close() |
|
|