| from __future__ import annotations |
|
|
| import heapq |
| import json |
| import os |
| import tempfile |
| import threading |
| import time |
| from contextlib import contextmanager |
| from pathlib import Path |
| from typing import Any, Dict, Optional, Tuple, List |
|
|
| import os |
| try: |
| import fcntl |
| except ImportError: |
| fcntl = None |
| if os.name == 'nt': |
| import msvcrt |
|
|
| from .tasks import ( |
| IN_PROGRESS_STATUSES, |
| TERMINAL_STATUSES, |
| CallbackState, |
| TaskRecord, |
| TaskStatus, |
| canonical_task_status_value, |
| now_ts, |
| ) |
|
|
|
|
| def _model_to_dict(model: Any) -> Dict[str, Any]: |
| if hasattr(model, "model_dump"): |
| return model.model_dump() |
| return model.dict() |
|
|
|
|
| def _task_from_dict(data: Dict[str, Any]) -> TaskRecord: |
| validate = getattr(TaskRecord, "model_validate", None) |
| if callable(validate): |
| return validate(data) |
| parse_obj = getattr(TaskRecord, "parse_obj") |
| return parse_obj(data) |
|
|
|
|
| def _infer_target(task_type: str, payload: Dict[str, Any]) -> str: |
| t = (task_type or "").strip().lower() |
| if t == "note_url": |
| return str(payload.get("note_url") or payload.get("url") or "") |
| if t == "search": |
| return str(payload.get("query") or payload.get("keyword") or "") |
| if t == "user_profile": |
| return str(payload.get("user_id") or payload.get("uid") or payload.get("user_url") or payload.get("url") or "") |
| return "" |
|
|
|
|
| def _upgrade_task_record_dict(data: Dict[str, Any], *, task_id: str | None = None) -> Dict[str, Any]: |
| changed = False |
| if task_id and data.get("id") != task_id: |
| data["id"] = task_id |
| changed = True |
|
|
| payload = data.get("payload") |
| if payload is None: |
| data["payload"] = {} |
| payload = data["payload"] |
| changed = True |
| elif not isinstance(payload, dict): |
| data["payload"] = {} |
| payload = data["payload"] |
| changed = True |
|
|
| if "created" not in data and "created_at" in data: |
| data["created"] = data.get("created_at") |
| changed = True |
| if "started" not in data and "started_at" in data: |
| data["started"] = data.get("started_at") |
| changed = True |
| if "finished" not in data and "finished_at" in data: |
| data["finished"] = data.get("finished_at") |
| changed = True |
|
|
| if "retry_count" not in data: |
| if "retries" in data: |
| data["retry_count"] = data.get("retries") |
| else: |
| data["retry_count"] = 0 |
| changed = True |
|
|
| if "started" in data and data.get("started") is not None and not isinstance(data.get("started"), (int, float)): |
| try: |
| data["started"] = float(data.get("started")) |
| except Exception: |
| data["started"] = None |
| changed = True |
| if "finished" in data and data.get("finished") is not None and not isinstance(data.get("finished"), (int, float)): |
| try: |
| data["finished"] = float(data.get("finished")) |
| except Exception: |
| data["finished"] = None |
| changed = True |
| if "created" in data and data.get("created") is not None and not isinstance(data.get("created"), (int, float)): |
| try: |
| data["created"] = float(data.get("created")) |
| except Exception: |
| data["created"] = now_ts() |
| changed = True |
|
|
| err = data.get("error") |
| if isinstance(err, str): |
| if err.strip() == "": |
| data["error"] = None |
| else: |
| data["error"] = {"message": err} |
| changed = True |
| elif err is not None and not isinstance(err, dict): |
| data["error"] = {"message": str(err)} |
| changed = True |
|
|
| if not data.get("target"): |
| inferred = _infer_target(str(data.get("task_type") or ""), payload) |
| data["target"] = inferred |
| changed = True |
|
|
| status_raw = data.get("status") |
| status_val = canonical_task_status_value(status_raw) |
| if status_val is None: |
| finished_any = data.get("finished") or data.get("finished_at") |
| started_any = data.get("started") or data.get("started_at") |
| if finished_any or data.get("error") not in (None, "", {}): |
| status_val = TaskStatus.failed.value |
| elif started_any: |
| status_val = TaskStatus.running.value |
| else: |
| status_val = TaskStatus.queued.value |
| data["status"] = status_val |
| changed = True |
| else: |
| if hasattr(status_raw, "value"): |
| status_raw = status_raw.value |
| if str(status_raw).strip().lower() != status_val: |
| data["status"] = status_val |
| changed = True |
|
|
| if changed: |
| return dict(data) |
| return data |
|
|
|
|
| def _ensure_task_finished_on_status(data: Dict[str, Any]) -> Dict[str, Any]: |
| status = data.get("status") |
| if hasattr(status, "value"): |
| status = status.value |
| status_val = str(status or "").strip().lower() |
| changed = False |
| if status_val in {s.value for s in IN_PROGRESS_STATUSES} and not data.get("started"): |
| data["started"] = now_ts() |
| changed = True |
| if status_val in {s.value for s in TERMINAL_STATUSES}: |
| if not data.get("finished"): |
| data["finished"] = now_ts() |
| changed = True |
| if changed: |
| return dict(data) |
| return data |
|
|
|
|
| _UNSET = object() |
|
|
|
|
| class LocalJsonStorage: |
| def __init__(self, base_dir: Path): |
| self.base_dir = base_dir |
| self._thread_lock = threading.Lock() |
| self.lock_file = self.base_dir / ".storage.lock" |
| self.legacy_tasks_file = self.base_dir / "tasks.json" |
| self.tasks_dir = self.base_dir / "tasks" |
| self.results_dir = self.base_dir / "results" |
| self.callbacks_dir = self.base_dir / "callbacks" |
| self.raw_dir = self.base_dir / "raw" |
|
|
| def init(self) -> None: |
| self.base_dir.mkdir(parents=True, exist_ok=True) |
| self.tasks_dir.mkdir(parents=True, exist_ok=True) |
| self.results_dir.mkdir(parents=True, exist_ok=True) |
| self.callbacks_dir.mkdir(parents=True, exist_ok=True) |
| self.raw_dir.mkdir(parents=True, exist_ok=True) |
| with self._locked(exclusive=True): |
| self._migrate_tasks_unlocked() |
| self._upgrade_tasks_unlocked() |
|
|
| def create_task(self, task: TaskRecord) -> None: |
| task_path = self._task_path(task.id) |
| self._atomic_write_json(task_path, _model_to_dict(task)) |
|
|
| def get_task(self, task_id: str) -> Optional[TaskRecord]: |
| task_path = self._task_path(task_id) |
| if not task_path.exists(): |
| return None |
| data = self._read_json_file(task_path) |
| if not isinstance(data, dict): |
| return None |
| upgraded = _upgrade_task_record_dict(data, task_id=task_id) |
| upgraded = _ensure_task_finished_on_status(upgraded) |
| return _task_from_dict(upgraded if isinstance(upgraded, dict) else data) |
|
|
| def update_task( |
| self, |
| task_id: str, |
| *, |
| status: Optional[TaskStatus] = None, |
| error: Any = _UNSET, |
| payload: Any = _UNSET, |
| engine: Optional[str] = None, |
| callback: Optional[CallbackState] = None, |
| retry_count: int | None = None, |
| ) -> Optional[TaskRecord]: |
| task_path = self._task_path(task_id) |
| with self._locked(exclusive=True): |
| if not task_path.exists(): |
| return None |
| data = self._read_json_file(task_path) |
| if not isinstance(data, dict): |
| return None |
| data = _upgrade_task_record_dict(data, task_id=task_id) |
|
|
| if status is not None: |
| data["status"] = status.value if hasattr(status, "value") else str(status) |
| if status in IN_PROGRESS_STATUSES and not data.get("started"): |
| data["started"] = now_ts() |
| if status in TERMINAL_STATUSES and not data.get("finished"): |
| data["finished"] = now_ts() |
| if error is not _UNSET: |
| if isinstance(error, str): |
| data["error"] = None if error.strip() == "" else {"message": error} |
| else: |
| data["error"] = error |
| if payload is not _UNSET: |
| if payload is None: |
| data["payload"] = {} |
| elif isinstance(payload, dict): |
| data["payload"] = payload |
| else: |
| data["payload"] = {} |
| if engine is not None: |
| data["engine"] = engine |
| if callback is not None: |
| data["callback"] = _model_to_dict(callback) |
| if retry_count is not None: |
| data["retry_count"] = int(retry_count) |
|
|
| self._atomic_write_json(task_path, data) |
| return _task_from_dict(data) |
|
|
| def write_result(self, task_id: str, result: Dict[str, Any]) -> Path: |
| result_path = self.results_dir / f"{task_id}.json" |
| self._atomic_write_json(result_path, result) |
| return result_path |
|
|
| def read_result(self, task_id: str) -> Optional[Dict[str, Any]]: |
| result_path = self.results_dir / f"{task_id}.json" |
| if not result_path.exists(): |
| return None |
| data = self._read_json_file(result_path) |
| if isinstance(data, dict): |
| return data |
| return None |
|
|
| def write_callback_failure(self, task_id: str, record: Dict[str, Any]) -> Path: |
| callback_path = self.callbacks_dir / f"{task_id}.json" |
| self._atomic_write_json(callback_path, record) |
| return callback_path |
|
|
| def read_callback_failure(self, task_id: str) -> Optional[Dict[str, Any]]: |
| callback_path = self.callbacks_dir / f"{task_id}.json" |
| if not callback_path.exists(): |
| return None |
| data = self._read_json_file(callback_path) |
| if isinstance(data, dict): |
| return data |
| return None |
|
|
| def delete_callback_failure(self, task_id: str) -> None: |
| callback_path = self.callbacks_dir / f"{task_id}.json" |
| try: |
| if callback_path.exists(): |
| callback_path.unlink() |
| except OSError: |
| return |
|
|
| def write_raw_html_snapshot(self, task_id: str, html: str) -> Path: |
| raw_path = self.raw_dir / f"{task_id}.html" |
| self._atomic_write_text(raw_path, html) |
| return raw_path |
|
|
| def cleanup_raw_html_snapshots(self, retention_days: int) -> int: |
| days = int(retention_days or 0) |
| if days < 0: |
| days = 0 |
| cutoff_ts = time.time() - float(days) * 86400.0 |
| removed = 0 |
| if not self.raw_dir.exists(): |
| return 0 |
| for path in self.raw_dir.glob("*.html"): |
| try: |
| stat = path.stat() |
| except OSError: |
| continue |
| if float(stat.st_mtime) >= cutoff_ts: |
| continue |
| try: |
| path.unlink() |
| removed += 1 |
| except OSError: |
| continue |
| return removed |
|
|
| def count_tasks_by_status(self, status: TaskStatus) -> int: |
| wanted = status.value if hasattr(status, "value") else str(status) |
| count = 0 |
| for path in self.tasks_dir.glob("*.json"): |
| data = self._read_json_file(path) |
| if not isinstance(data, dict): |
| continue |
| current = data.get("status") |
| if hasattr(current, "value"): |
| current = current.value |
| if current == wanted: |
| count += 1 |
| return count |
|
|
| def count_tasks_by_status_snapshot(self) -> Dict[str, int]: |
| counts: Dict[str, int] = {} |
| for path in self.tasks_dir.glob("*.json"): |
| data = self._read_json_file(path) |
| if not isinstance(data, dict): |
| continue |
| status = data.get("status") |
| if hasattr(status, "value"): |
| status = status.value |
| key = str(status or "").strip().lower() or "unknown" |
| counts[key] = int(counts.get(key, 0) or 0) + 1 |
| return counts |
|
|
| def list_tasks( |
| self, |
| *, |
| limit: int, |
| offset: int, |
| status: List[str] | None = None, |
| task_type: List[str] | None = None, |
| engine: List[str] | None = None, |
| error_kind: List[str] | None = None, |
| sort: str = "mtime", |
| ) -> Tuple[List[TaskRecord], int]: |
| lim = int(limit) |
| off = int(offset) |
| if lim <= 0: |
| lim = 20 |
| if lim > 500: |
| lim = 500 |
| if off < 0: |
| off = 0 |
|
|
| status_set = {canonical_task_status_value(s) for s in (status or [])} |
| status_set.discard(None) |
| if status is not None and len(status) > 0 and not status_set: |
| return ([], 0) |
| task_type_set = {str(s or "").strip().lower() for s in (task_type or []) if str(s or "").strip() != ""} |
| engine_set = {str(s or "").strip().lower() for s in (engine or []) if str(s or "").strip() != ""} |
| error_kind_set = {str(s or "").strip().lower() for s in (error_kind or []) if str(s or "").strip() != ""} |
|
|
| use_created = str(sort or "").strip().lower() in ("created", "created_at", "createdat") |
| items: List[Tuple[float, float, TaskRecord]] = [] |
| for path in self.tasks_dir.glob("*.json"): |
| try: |
| stat = path.stat() |
| except OSError: |
| continue |
| data = self._read_json_file(path) |
| if not isinstance(data, dict): |
| continue |
| upgraded = _upgrade_task_record_dict(data, task_id=path.stem) |
| upgraded = _ensure_task_finished_on_status(upgraded) |
| try: |
| task = _task_from_dict(upgraded if isinstance(upgraded, dict) else data) |
| except Exception: |
| continue |
|
|
| if status_set and canonical_task_status_value(task.status) not in status_set: |
| continue |
| if task_type_set and str(task.task_type or "").strip().lower() not in task_type_set: |
| continue |
| if engine_set and str(task.engine or "").strip().lower() not in engine_set: |
| continue |
| if error_kind_set: |
| err = task.error if isinstance(task.error, dict) else {} |
| kind = str(err.get("kind") or "").strip().lower() |
| if kind not in error_kind_set: |
| continue |
|
|
| created_ts = float(getattr(task, "created", 0.0) or 0.0) |
| mtime_ts = float(stat.st_mtime) |
| key_ts = created_ts if use_created else mtime_ts |
| items.append((key_ts, mtime_ts, task)) |
|
|
| items.sort(key=lambda x: (x[0], x[1]), reverse=True) |
| total = len(items) |
| sliced = items[off : off + lim] |
| return ([t for _, _, t in sliced], total) |
|
|
| def summarize_recent_errors( |
| self, |
| *, |
| scan_limit: int, |
| limit: int, |
| offset: int, |
| status: List[str] | None = None, |
| error_kind: List[str] | None = None, |
| ) -> Tuple[Dict[str, int], List[TaskRecord], int, int]: |
| scan_lim = int(scan_limit) |
| if scan_lim <= 0: |
| scan_lim = 1000 |
| if scan_lim > 20000: |
| scan_lim = 20000 |
|
|
| lim = int(limit) |
| off = int(offset) |
| if lim <= 0: |
| lim = 20 |
| if lim > 500: |
| lim = 500 |
| if off < 0: |
| off = 0 |
|
|
| if status is None: |
| status_set = {TaskStatus.failed.value, TaskStatus.rpa_failed.value} |
| else: |
| status_set = {canonical_task_status_value(s) for s in (status or [])} |
| status_set.discard(None) |
| if len(status) > 0 and not status_set: |
| return ({}, [], 0, 0) |
|
|
| error_kind_set = {str(s or "").strip().lower() for s in (error_kind or []) if str(s or "").strip() != ""} |
|
|
| heap: list[tuple[float, float, Path]] = [] |
| for path in self.tasks_dir.glob("*.json"): |
| try: |
| stat = path.stat() |
| except OSError: |
| continue |
| mtime_ts = float(stat.st_mtime) |
| item = (mtime_ts, mtime_ts, path) |
| if len(heap) < scan_lim: |
| heapq.heappush(heap, item) |
| elif item > heap[0]: |
| heapq.heapreplace(heap, item) |
|
|
| heap.sort(reverse=True) |
| scanned = len(heap) |
| matched: list[TaskRecord] = [] |
| error_kind_counts: Dict[str, int] = {} |
|
|
| for _, _, path in heap: |
| data = self._read_json_file(path) |
| if not isinstance(data, dict): |
| continue |
| upgraded = _upgrade_task_record_dict(data, task_id=path.stem) |
| upgraded = _ensure_task_finished_on_status(upgraded) |
| try: |
| task = _task_from_dict(upgraded if isinstance(upgraded, dict) else data) |
| except Exception: |
| continue |
|
|
| if status_set and canonical_task_status_value(task.status) not in status_set: |
| continue |
|
|
| err = task.error if isinstance(task.error, dict) else {} |
| kind = str(err.get("kind") or "").strip().lower() |
| if error_kind_set and kind not in error_kind_set: |
| continue |
|
|
| matched.append(task) |
| key = kind or "unknown" |
| error_kind_counts[key] = int(error_kind_counts.get(key, 0) or 0) + 1 |
|
|
| total = len(matched) |
| return (error_kind_counts, matched[off : off + lim], total, scanned) |
|
|
| def _task_path(self, task_id: str) -> Path: |
| return self.tasks_dir / f"{task_id}.json" |
|
|
| @contextmanager |
| def _locked(self, *, exclusive: bool) -> Any: |
| with self._thread_lock: |
| self.lock_file.parent.mkdir(parents=True, exist_ok=True) |
| f = self.lock_file.open("a+", encoding="utf-8") |
| try: |
| if fcntl: |
| mode = fcntl.LOCK_EX if exclusive else fcntl.LOCK_SH |
| fcntl.flock(f.fileno(), mode) |
| elif os.name == 'nt': |
| |
| |
| mode = msvcrt.LK_LOCK |
| msvcrt.locking(f.fileno(), mode, 1) |
| yield |
| finally: |
| try: |
| if fcntl: |
| fcntl.flock(f.fileno(), fcntl.LOCK_UN) |
| elif os.name == 'nt': |
| msvcrt.locking(f.fileno(), msvcrt.LK_UNLCK, 1) |
| finally: |
| f.close() |
|
|
| def _migrate_tasks_unlocked(self) -> None: |
| if not self.legacy_tasks_file.exists(): |
| return |
| try: |
| with self.legacy_tasks_file.open("r", encoding="utf-8") as f: |
| data = json.load(f) |
| except Exception: |
| data = {} |
| if isinstance(data, dict): |
| for task_id, record in data.items(): |
| if not isinstance(task_id, str): |
| continue |
| if not isinstance(record, dict): |
| continue |
| dst = self._task_path(task_id) |
| if dst.exists(): |
| continue |
| upgraded = _upgrade_task_record_dict(record, task_id=task_id) |
| self._atomic_write_json(dst, upgraded) |
| migrated_path = self.base_dir / "tasks.json.migrated" |
| try: |
| os.replace(self.legacy_tasks_file, migrated_path) |
| self._fsync_dir(self.base_dir) |
| except OSError: |
| return |
|
|
| def _upgrade_tasks_unlocked(self) -> None: |
| if not self.tasks_dir.exists(): |
| return |
| for path in self.tasks_dir.glob("*.json"): |
| data = self._read_json_file(path) |
| if not isinstance(data, dict): |
| continue |
| task_id = path.stem |
| upgraded = _upgrade_task_record_dict(data, task_id=task_id) |
| upgraded = _ensure_task_finished_on_status(upgraded) |
| if upgraded is data: |
| continue |
| try: |
| self._atomic_write_json(path, upgraded) |
| except Exception: |
| continue |
|
|
| def _read_json_file(self, path: Path) -> Any: |
| try: |
| with path.open("r", encoding="utf-8") as f: |
| return json.load(f) |
| except Exception: |
| return None |
|
|
| def _atomic_write_json(self, path: Path, data: Any) -> None: |
| path.parent.mkdir(parents=True, exist_ok=True) |
| fd, tmp_path = tempfile.mkstemp(prefix=path.name, dir=str(path.parent)) |
| try: |
| with os.fdopen(fd, "w", encoding="utf-8") as f: |
| json.dump(data, f, ensure_ascii=False, indent=2) |
| f.flush() |
| os.fsync(f.fileno()) |
| os.replace(tmp_path, path) |
| self._fsync_dir(path.parent) |
| finally: |
| try: |
| if os.path.exists(tmp_path): |
| os.remove(tmp_path) |
| except OSError: |
| pass |
|
|
| def _atomic_write_text(self, path: Path, content: str) -> None: |
| path.parent.mkdir(parents=True, exist_ok=True) |
| fd, tmp_path = tempfile.mkstemp(prefix=path.name, dir=str(path.parent)) |
| try: |
| with os.fdopen(fd, "w", encoding="utf-8") as f: |
| f.write(content or "") |
| f.flush() |
| os.fsync(f.fileno()) |
| os.replace(tmp_path, path) |
| self._fsync_dir(path.parent) |
| finally: |
| try: |
| if os.path.exists(tmp_path): |
| os.remove(tmp_path) |
| except OSError: |
| pass |
|
|
| def _fsync_dir(self, path: Path) -> None: |
| try: |
| fd = os.open(str(path), os.O_RDONLY | os.O_DIRECTORY) |
| except OSError: |
| return |
| try: |
| os.fsync(fd) |
| except OSError: |
| return |
| finally: |
| os.close(fd) |
|
|