"""File-backed AI summary buffer and cache with optional HF dataset sync.""" import json import os import time from contextlib import contextmanager from datetime import datetime from typing import Dict, Iterable, List, Optional, Tuple try: import fcntl except Exception: # pragma: no cover fcntl = None try: from huggingface_hub import HfApi, snapshot_download except Exception: # pragma: no cover HfApi = None snapshot_download = None CACHE_DIR = os.getenv("AI_SUMMARY_CACHE_DIR", "./ai-summary-cache") BUFFER_SECONDS = int(os.getenv("LLM_SUMMARY_BUFFER_SECONDS", "120")) BATCH_MAX_CHARS = int(os.getenv("LLM_SUMMARY_BATCH_MAX_CHARS", "2400")) HF_REPO_ID = os.getenv("AI_SUMMARY_HF_REPO", "ResearchEngineering/ai_news_summaries") HF_REPO_TYPE = os.getenv("AI_SUMMARY_HF_REPO_TYPE", "dataset") BUFFER_FILE = "buffer.jsonl" SUMMARIES_FILE = "summaries.jsonl" META_FILE = "meta.json" LOCK_FILE = ".lock" def init_storage(): os.makedirs(CACHE_DIR, exist_ok=True) if snapshot_download and HF_REPO_ID: _maybe_restore_from_hf() _ensure_files() def enqueue_items(items: Iterable[Dict]): init_storage() now = time.time() with _file_lock(): buffer_items = _read_jsonl(BUFFER_FILE) summaries = _read_jsonl(SUMMARIES_FILE) existing_keys = {item.get("item_key") for item in buffer_items if item.get("item_key")} existing_keys.update({item.get("item_key") for item in summaries if item.get("item_key")}) added = 0 for item in items: key = _item_key(item) title = str(item.get("title", "")).strip() if not key or not title or key in existing_keys: continue source = str(item.get("source", "")).strip() buffer_items.append( { "item_key": key, "title": title, "source": source, "created_at": now, } ) existing_keys.add(key) added += 1 if added: _write_jsonl(BUFFER_FILE, buffer_items) def get_status() -> Dict: init_storage() with _file_lock(): buffer_items = _read_jsonl(BUFFER_FILE) summaries = _read_jsonl(SUMMARIES_FILE) buffer_count = len(buffer_items) summaries_count = len(summaries) last_update = None if summaries: last_update = max(item.get("updated_at", 0) for item in summaries) buffer_oldest = None if buffer_items: buffer_oldest = min(item.get("created_at", 0) for item in buffer_items) buffer_remaining = None if buffer_oldest: age = time.time() - buffer_oldest buffer_remaining = max(BUFFER_SECONDS - age, 0) last_update_text = ( datetime.fromtimestamp(last_update).strftime("%Y-%m-%d %H:%M:%S") if last_update else None ) return { "buffer_size": buffer_count, "total_summaries": summaries_count, "last_update": last_update_text, "buffer_remaining_seconds": buffer_remaining, "batch_max_chars": BATCH_MAX_CHARS, "buffer_window_seconds": BUFFER_SECONDS, } def fetch_summaries(limit: int = 50) -> List[Dict]: init_storage() with _file_lock(): summaries = _read_jsonl(SUMMARIES_FILE) summaries.sort(key=lambda x: x.get("updated_at", 0), reverse=True) results = [] for item in summaries[:limit]: results.append( { "title": item.get("title", ""), "source": item.get("source", ""), "summary": item.get("summary", ""), "timestamp": datetime.fromtimestamp(item.get("updated_at", time.time())), } ) return results def fetch_ready_batches(max_chars_total: int, buffer_seconds: int) -> List[List[Tuple[str, str, str]]]: init_storage() cutoff = time.time() - buffer_seconds with _file_lock(): buffer_items = _read_jsonl(BUFFER_FILE) eligible = [item for item in buffer_items if item.get("created_at", 0) <= cutoff] eligible.sort(key=lambda x: x.get("created_at", 0)) batches: List[List[Tuple[str, str, str]]] = [] current: List[Tuple[str, str, str]] = [] current_chars = 0 for item in eligible: title = item.get("title", "") source = item.get("source", "") text = _build_input_text(title, source) text_len = len(text) if current and current_chars + text_len > max_chars_total: batches.append(current) current = [] current_chars = 0 current.append((item.get("item_key"), title, source)) current_chars += text_len if current: batches.append(current) return batches def store_summaries(items: List[Tuple[str, str, str, str]]): if not items: return init_storage() now = time.time() with _file_lock(): summaries = _read_jsonl(SUMMARIES_FILE) buffer_items = _read_jsonl(BUFFER_FILE) summaries_by_key = {item.get("item_key"): item for item in summaries if item.get("item_key")} buffer_by_key = {item.get("item_key"): item for item in buffer_items if item.get("item_key")} for item_key, title, source, summary in items: summaries_by_key[item_key] = { "item_key": item_key, "title": title, "source": source, "summary": summary, "updated_at": now, } if item_key in buffer_by_key: del buffer_by_key[item_key] _write_jsonl(SUMMARIES_FILE, list(summaries_by_key.values())) _write_jsonl(BUFFER_FILE, list(buffer_by_key.values())) _write_meta({"last_sync": None, "last_update": now}) _sync_to_hf_if_configured() def _item_key(item: Dict) -> str: if item.get("id") is not None: return str(item.get("id")) title = str(item.get("title", "")).strip() source = str(item.get("source", "")).strip() if not title: return "" return f"{source}|{title}".lower() def _build_input_text(title: str, source: str) -> str: if source: return f"Source: {source}\nTitle: {title}" return f"Title: {title}" def _ensure_files(): for name in (BUFFER_FILE, SUMMARIES_FILE): path = os.path.join(CACHE_DIR, name) if not os.path.exists(path): with open(path, "w", encoding="utf-8") as f: f.write("") def _read_jsonl(filename: str) -> List[Dict]: path = os.path.join(CACHE_DIR, filename) if not os.path.exists(path): return [] items = [] with open(path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue try: items.append(json.loads(line)) except Exception: continue return items def _write_jsonl(filename: str, items: List[Dict]): path = os.path.join(CACHE_DIR, filename) tmp_path = path + ".tmp" with open(tmp_path, "w", encoding="utf-8") as f: for item in items: f.write(json.dumps(item, ensure_ascii=True) + "\n") os.replace(tmp_path, path) def _write_meta(data: Dict): path = os.path.join(CACHE_DIR, META_FILE) tmp_path = path + ".tmp" with open(tmp_path, "w", encoding="utf-8") as f: json.dump(data, f) os.replace(tmp_path, path) @contextmanager def _file_lock(): os.makedirs(CACHE_DIR, exist_ok=True) lock_path = os.path.join(CACHE_DIR, LOCK_FILE) if fcntl is None: yield return with open(lock_path, "w", encoding="utf-8") as lock_file: fcntl.flock(lock_file, fcntl.LOCK_EX) try: yield finally: fcntl.flock(lock_file, fcntl.LOCK_UN) def _maybe_restore_from_hf(): if not snapshot_download: return if not HF_REPO_ID: return if os.path.exists(os.path.join(CACHE_DIR, SUMMARIES_FILE)): return snapshot_download( repo_id=HF_REPO_ID, repo_type=HF_REPO_TYPE, local_dir=CACHE_DIR, local_dir_use_symlinks=False, ) def _sync_to_hf_if_configured(): if not HfApi or not HF_REPO_ID: return api = HfApi() api.upload_folder( folder_path=CACHE_DIR, repo_id=HF_REPO_ID, repo_type=HF_REPO_TYPE, )