from __future__ import annotations import heapq import json import os import tempfile import threading import time from contextlib import contextmanager from pathlib import Path from typing import Any, Dict, Optional, Tuple, List import os try: import fcntl except ImportError: fcntl = None if os.name == 'nt': import msvcrt from .tasks import ( IN_PROGRESS_STATUSES, TERMINAL_STATUSES, CallbackState, TaskRecord, TaskStatus, canonical_task_status_value, now_ts, ) def _model_to_dict(model: Any) -> Dict[str, Any]: if hasattr(model, "model_dump"): return model.model_dump() return model.dict() def _task_from_dict(data: Dict[str, Any]) -> TaskRecord: validate = getattr(TaskRecord, "model_validate", None) if callable(validate): return validate(data) parse_obj = getattr(TaskRecord, "parse_obj") return parse_obj(data) def _infer_target(task_type: str, payload: Dict[str, Any]) -> str: t = (task_type or "").strip().lower() if t == "note_url": return str(payload.get("note_url") or payload.get("url") or "") if t == "search": return str(payload.get("query") or payload.get("keyword") or "") if t == "user_profile": return str(payload.get("user_id") or payload.get("uid") or payload.get("user_url") or payload.get("url") or "") return "" def _upgrade_task_record_dict(data: Dict[str, Any], *, task_id: str | None = None) -> Dict[str, Any]: changed = False if task_id and data.get("id") != task_id: data["id"] = task_id changed = True payload = data.get("payload") if payload is None: data["payload"] = {} payload = data["payload"] changed = True elif not isinstance(payload, dict): data["payload"] = {} payload = data["payload"] changed = True if "created" not in data and "created_at" in data: data["created"] = data.get("created_at") changed = True if "started" not in data and "started_at" in data: data["started"] = data.get("started_at") changed = True if "finished" not in data and "finished_at" in data: data["finished"] = data.get("finished_at") changed = True if "retry_count" not in data: if "retries" in data: data["retry_count"] = data.get("retries") else: data["retry_count"] = 0 changed = True if "started" in data and data.get("started") is not None and not isinstance(data.get("started"), (int, float)): try: data["started"] = float(data.get("started")) except Exception: data["started"] = None changed = True if "finished" in data and data.get("finished") is not None and not isinstance(data.get("finished"), (int, float)): try: data["finished"] = float(data.get("finished")) except Exception: data["finished"] = None changed = True if "created" in data and data.get("created") is not None and not isinstance(data.get("created"), (int, float)): try: data["created"] = float(data.get("created")) except Exception: data["created"] = now_ts() changed = True err = data.get("error") if isinstance(err, str): if err.strip() == "": data["error"] = None else: data["error"] = {"message": err} changed = True elif err is not None and not isinstance(err, dict): data["error"] = {"message": str(err)} changed = True if not data.get("target"): inferred = _infer_target(str(data.get("task_type") or ""), payload) data["target"] = inferred changed = True status_raw = data.get("status") status_val = canonical_task_status_value(status_raw) if status_val is None: finished_any = data.get("finished") or data.get("finished_at") started_any = data.get("started") or data.get("started_at") if finished_any or data.get("error") not in (None, "", {}): status_val = TaskStatus.failed.value elif started_any: status_val = TaskStatus.running.value else: status_val = TaskStatus.queued.value data["status"] = status_val changed = True else: if hasattr(status_raw, "value"): status_raw = status_raw.value if str(status_raw).strip().lower() != status_val: data["status"] = status_val changed = True if changed: return dict(data) return data def _ensure_task_finished_on_status(data: Dict[str, Any]) -> Dict[str, Any]: status = data.get("status") if hasattr(status, "value"): status = status.value status_val = str(status or "").strip().lower() changed = False if status_val in {s.value for s in IN_PROGRESS_STATUSES} and not data.get("started"): data["started"] = now_ts() changed = True if status_val in {s.value for s in TERMINAL_STATUSES}: if not data.get("finished"): data["finished"] = now_ts() changed = True if changed: return dict(data) return data _UNSET = object() class LocalJsonStorage: def __init__(self, base_dir: Path): self.base_dir = base_dir self._thread_lock = threading.Lock() self.lock_file = self.base_dir / ".storage.lock" self.legacy_tasks_file = self.base_dir / "tasks.json" self.tasks_dir = self.base_dir / "tasks" self.results_dir = self.base_dir / "results" self.callbacks_dir = self.base_dir / "callbacks" self.raw_dir = self.base_dir / "raw" def init(self) -> None: self.base_dir.mkdir(parents=True, exist_ok=True) self.tasks_dir.mkdir(parents=True, exist_ok=True) self.results_dir.mkdir(parents=True, exist_ok=True) self.callbacks_dir.mkdir(parents=True, exist_ok=True) self.raw_dir.mkdir(parents=True, exist_ok=True) with self._locked(exclusive=True): self._migrate_tasks_unlocked() self._upgrade_tasks_unlocked() def create_task(self, task: TaskRecord) -> None: task_path = self._task_path(task.id) self._atomic_write_json(task_path, _model_to_dict(task)) def get_task(self, task_id: str) -> Optional[TaskRecord]: task_path = self._task_path(task_id) if not task_path.exists(): return None data = self._read_json_file(task_path) if not isinstance(data, dict): return None upgraded = _upgrade_task_record_dict(data, task_id=task_id) upgraded = _ensure_task_finished_on_status(upgraded) return _task_from_dict(upgraded if isinstance(upgraded, dict) else data) def update_task( self, task_id: str, *, status: Optional[TaskStatus] = None, error: Any = _UNSET, payload: Any = _UNSET, engine: Optional[str] = None, callback: Optional[CallbackState] = None, retry_count: int | None = None, ) -> Optional[TaskRecord]: task_path = self._task_path(task_id) with self._locked(exclusive=True): if not task_path.exists(): return None data = self._read_json_file(task_path) if not isinstance(data, dict): return None data = _upgrade_task_record_dict(data, task_id=task_id) if status is not None: data["status"] = status.value if hasattr(status, "value") else str(status) if status in IN_PROGRESS_STATUSES and not data.get("started"): data["started"] = now_ts() if status in TERMINAL_STATUSES and not data.get("finished"): data["finished"] = now_ts() if error is not _UNSET: if isinstance(error, str): data["error"] = None if error.strip() == "" else {"message": error} else: data["error"] = error if payload is not _UNSET: if payload is None: data["payload"] = {} elif isinstance(payload, dict): data["payload"] = payload else: data["payload"] = {} if engine is not None: data["engine"] = engine if callback is not None: data["callback"] = _model_to_dict(callback) if retry_count is not None: data["retry_count"] = int(retry_count) self._atomic_write_json(task_path, data) return _task_from_dict(data) def write_result(self, task_id: str, result: Dict[str, Any]) -> Path: result_path = self.results_dir / f"{task_id}.json" self._atomic_write_json(result_path, result) return result_path def read_result(self, task_id: str) -> Optional[Dict[str, Any]]: result_path = self.results_dir / f"{task_id}.json" if not result_path.exists(): return None data = self._read_json_file(result_path) if isinstance(data, dict): return data return None def write_callback_failure(self, task_id: str, record: Dict[str, Any]) -> Path: callback_path = self.callbacks_dir / f"{task_id}.json" self._atomic_write_json(callback_path, record) return callback_path def read_callback_failure(self, task_id: str) -> Optional[Dict[str, Any]]: callback_path = self.callbacks_dir / f"{task_id}.json" if not callback_path.exists(): return None data = self._read_json_file(callback_path) if isinstance(data, dict): return data return None def delete_callback_failure(self, task_id: str) -> None: callback_path = self.callbacks_dir / f"{task_id}.json" try: if callback_path.exists(): callback_path.unlink() except OSError: return def write_raw_html_snapshot(self, task_id: str, html: str) -> Path: raw_path = self.raw_dir / f"{task_id}.html" self._atomic_write_text(raw_path, html) return raw_path def cleanup_raw_html_snapshots(self, retention_days: int) -> int: days = int(retention_days or 0) if days < 0: days = 0 cutoff_ts = time.time() - float(days) * 86400.0 removed = 0 if not self.raw_dir.exists(): return 0 for path in self.raw_dir.glob("*.html"): try: stat = path.stat() except OSError: continue if float(stat.st_mtime) >= cutoff_ts: continue try: path.unlink() removed += 1 except OSError: continue return removed def count_tasks_by_status(self, status: TaskStatus) -> int: wanted = status.value if hasattr(status, "value") else str(status) count = 0 for path in self.tasks_dir.glob("*.json"): data = self._read_json_file(path) if not isinstance(data, dict): continue current = data.get("status") if hasattr(current, "value"): current = current.value if current == wanted: count += 1 return count def count_tasks_by_status_snapshot(self) -> Dict[str, int]: counts: Dict[str, int] = {} for path in self.tasks_dir.glob("*.json"): data = self._read_json_file(path) if not isinstance(data, dict): continue status = data.get("status") if hasattr(status, "value"): status = status.value key = str(status or "").strip().lower() or "unknown" counts[key] = int(counts.get(key, 0) or 0) + 1 return counts def list_tasks( self, *, limit: int, offset: int, status: List[str] | None = None, task_type: List[str] | None = None, engine: List[str] | None = None, error_kind: List[str] | None = None, sort: str = "mtime", ) -> Tuple[List[TaskRecord], int]: lim = int(limit) off = int(offset) if lim <= 0: lim = 20 if lim > 500: lim = 500 if off < 0: off = 0 status_set = {canonical_task_status_value(s) for s in (status or [])} status_set.discard(None) if status is not None and len(status) > 0 and not status_set: return ([], 0) task_type_set = {str(s or "").strip().lower() for s in (task_type or []) if str(s or "").strip() != ""} engine_set = {str(s or "").strip().lower() for s in (engine or []) if str(s or "").strip() != ""} error_kind_set = {str(s or "").strip().lower() for s in (error_kind or []) if str(s or "").strip() != ""} use_created = str(sort or "").strip().lower() in ("created", "created_at", "createdat") items: List[Tuple[float, float, TaskRecord]] = [] for path in self.tasks_dir.glob("*.json"): try: stat = path.stat() except OSError: continue data = self._read_json_file(path) if not isinstance(data, dict): continue upgraded = _upgrade_task_record_dict(data, task_id=path.stem) upgraded = _ensure_task_finished_on_status(upgraded) try: task = _task_from_dict(upgraded if isinstance(upgraded, dict) else data) except Exception: continue if status_set and canonical_task_status_value(task.status) not in status_set: continue if task_type_set and str(task.task_type or "").strip().lower() not in task_type_set: continue if engine_set and str(task.engine or "").strip().lower() not in engine_set: continue if error_kind_set: err = task.error if isinstance(task.error, dict) else {} kind = str(err.get("kind") or "").strip().lower() if kind not in error_kind_set: continue created_ts = float(getattr(task, "created", 0.0) or 0.0) mtime_ts = float(stat.st_mtime) key_ts = created_ts if use_created else mtime_ts items.append((key_ts, mtime_ts, task)) items.sort(key=lambda x: (x[0], x[1]), reverse=True) total = len(items) sliced = items[off : off + lim] return ([t for _, _, t in sliced], total) def summarize_recent_errors( self, *, scan_limit: int, limit: int, offset: int, status: List[str] | None = None, error_kind: List[str] | None = None, ) -> Tuple[Dict[str, int], List[TaskRecord], int, int]: scan_lim = int(scan_limit) if scan_lim <= 0: scan_lim = 1000 if scan_lim > 20000: scan_lim = 20000 lim = int(limit) off = int(offset) if lim <= 0: lim = 20 if lim > 500: lim = 500 if off < 0: off = 0 if status is None: status_set = {TaskStatus.failed.value, TaskStatus.rpa_failed.value} else: status_set = {canonical_task_status_value(s) for s in (status or [])} status_set.discard(None) if len(status) > 0 and not status_set: return ({}, [], 0, 0) error_kind_set = {str(s or "").strip().lower() for s in (error_kind or []) if str(s or "").strip() != ""} heap: list[tuple[float, float, Path]] = [] for path in self.tasks_dir.glob("*.json"): try: stat = path.stat() except OSError: continue mtime_ts = float(stat.st_mtime) item = (mtime_ts, mtime_ts, path) if len(heap) < scan_lim: heapq.heappush(heap, item) elif item > heap[0]: heapq.heapreplace(heap, item) heap.sort(reverse=True) scanned = len(heap) matched: list[TaskRecord] = [] error_kind_counts: Dict[str, int] = {} for _, _, path in heap: data = self._read_json_file(path) if not isinstance(data, dict): continue upgraded = _upgrade_task_record_dict(data, task_id=path.stem) upgraded = _ensure_task_finished_on_status(upgraded) try: task = _task_from_dict(upgraded if isinstance(upgraded, dict) else data) except Exception: continue if status_set and canonical_task_status_value(task.status) not in status_set: continue err = task.error if isinstance(task.error, dict) else {} kind = str(err.get("kind") or "").strip().lower() if error_kind_set and kind not in error_kind_set: continue matched.append(task) key = kind or "unknown" error_kind_counts[key] = int(error_kind_counts.get(key, 0) or 0) + 1 total = len(matched) return (error_kind_counts, matched[off : off + lim], total, scanned) def _task_path(self, task_id: str) -> Path: return self.tasks_dir / f"{task_id}.json" @contextmanager def _locked(self, *, exclusive: bool) -> Any: with self._thread_lock: self.lock_file.parent.mkdir(parents=True, exist_ok=True) f = self.lock_file.open("a+", encoding="utf-8") try: if fcntl: mode = fcntl.LOCK_EX if exclusive else fcntl.LOCK_SH fcntl.flock(f.fileno(), mode) elif os.name == 'nt': # Windows fallback using msvcrt # msvcrt doesn't support shared locks natively the same way, we use a blocking lock for both. mode = msvcrt.LK_LOCK msvcrt.locking(f.fileno(), mode, 1) yield finally: try: if fcntl: fcntl.flock(f.fileno(), fcntl.LOCK_UN) elif os.name == 'nt': msvcrt.locking(f.fileno(), msvcrt.LK_UNLCK, 1) finally: f.close() def _migrate_tasks_unlocked(self) -> None: if not self.legacy_tasks_file.exists(): return try: with self.legacy_tasks_file.open("r", encoding="utf-8") as f: data = json.load(f) except Exception: data = {} if isinstance(data, dict): for task_id, record in data.items(): if not isinstance(task_id, str): continue if not isinstance(record, dict): continue dst = self._task_path(task_id) if dst.exists(): continue upgraded = _upgrade_task_record_dict(record, task_id=task_id) self._atomic_write_json(dst, upgraded) migrated_path = self.base_dir / "tasks.json.migrated" try: os.replace(self.legacy_tasks_file, migrated_path) self._fsync_dir(self.base_dir) except OSError: return def _upgrade_tasks_unlocked(self) -> None: if not self.tasks_dir.exists(): return for path in self.tasks_dir.glob("*.json"): data = self._read_json_file(path) if not isinstance(data, dict): continue task_id = path.stem upgraded = _upgrade_task_record_dict(data, task_id=task_id) upgraded = _ensure_task_finished_on_status(upgraded) if upgraded is data: continue try: self._atomic_write_json(path, upgraded) except Exception: continue def _read_json_file(self, path: Path) -> Any: try: with path.open("r", encoding="utf-8") as f: return json.load(f) except Exception: return None def _atomic_write_json(self, path: Path, data: Any) -> None: path.parent.mkdir(parents=True, exist_ok=True) fd, tmp_path = tempfile.mkstemp(prefix=path.name, dir=str(path.parent)) try: with os.fdopen(fd, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) f.flush() os.fsync(f.fileno()) os.replace(tmp_path, path) self._fsync_dir(path.parent) finally: try: if os.path.exists(tmp_path): os.remove(tmp_path) except OSError: pass def _atomic_write_text(self, path: Path, content: str) -> None: path.parent.mkdir(parents=True, exist_ok=True) fd, tmp_path = tempfile.mkstemp(prefix=path.name, dir=str(path.parent)) try: with os.fdopen(fd, "w", encoding="utf-8") as f: f.write(content or "") f.flush() os.fsync(f.fileno()) os.replace(tmp_path, path) self._fsync_dir(path.parent) finally: try: if os.path.exists(tmp_path): os.remove(tmp_path) except OSError: pass def _fsync_dir(self, path: Path) -> None: try: fd = os.open(str(path), os.O_RDONLY | os.O_DIRECTORY) except OSError: return try: os.fsync(fd) except OSError: return finally: os.close(fd)