| """File-backed AI summary buffer and cache with optional HF dataset sync.""" |
|
|
| import json |
| import os |
| import time |
| from contextlib import contextmanager |
| from datetime import datetime |
| from typing import Dict, Iterable, List, Optional, Tuple |
|
|
| try: |
| import fcntl |
| except Exception: |
| fcntl = None |
|
|
| try: |
| from huggingface_hub import HfApi, snapshot_download |
| except Exception: |
| HfApi = None |
| snapshot_download = None |
|
|
| CACHE_DIR = os.getenv("AI_SUMMARY_CACHE_DIR", "./ai-summary-cache") |
| BUFFER_SECONDS = int(os.getenv("LLM_SUMMARY_BUFFER_SECONDS", "120")) |
| BATCH_MAX_CHARS = int(os.getenv("LLM_SUMMARY_BATCH_MAX_CHARS", "2400")) |
| HF_REPO_ID = os.getenv("AI_SUMMARY_HF_REPO", "ResearchEngineering/ai_news_summaries") |
| HF_REPO_TYPE = os.getenv("AI_SUMMARY_HF_REPO_TYPE", "dataset") |
|
|
| BUFFER_FILE = "buffer.jsonl" |
| SUMMARIES_FILE = "summaries.jsonl" |
| META_FILE = "meta.json" |
| LOCK_FILE = ".lock" |
|
|
|
|
| def init_storage(): |
| os.makedirs(CACHE_DIR, exist_ok=True) |
| if snapshot_download and HF_REPO_ID: |
| _maybe_restore_from_hf() |
| _ensure_files() |
|
|
|
|
| def enqueue_items(items: Iterable[Dict]): |
| init_storage() |
| now = time.time() |
|
|
| with _file_lock(): |
| buffer_items = _read_jsonl(BUFFER_FILE) |
| summaries = _read_jsonl(SUMMARIES_FILE) |
|
|
| existing_keys = {item.get("item_key") for item in buffer_items if item.get("item_key")} |
| existing_keys.update({item.get("item_key") for item in summaries if item.get("item_key")}) |
|
|
| added = 0 |
| for item in items: |
| key = _item_key(item) |
| title = str(item.get("title", "")).strip() |
| if not key or not title or key in existing_keys: |
| continue |
| source = str(item.get("source", "")).strip() |
| buffer_items.append( |
| { |
| "item_key": key, |
| "title": title, |
| "source": source, |
| "created_at": now, |
| } |
| ) |
| existing_keys.add(key) |
| added += 1 |
|
|
| if added: |
| _write_jsonl(BUFFER_FILE, buffer_items) |
|
|
|
|
| def get_status() -> Dict: |
| init_storage() |
| with _file_lock(): |
| buffer_items = _read_jsonl(BUFFER_FILE) |
| summaries = _read_jsonl(SUMMARIES_FILE) |
|
|
| buffer_count = len(buffer_items) |
| summaries_count = len(summaries) |
| last_update = None |
| if summaries: |
| last_update = max(item.get("updated_at", 0) for item in summaries) |
|
|
| buffer_oldest = None |
| if buffer_items: |
| buffer_oldest = min(item.get("created_at", 0) for item in buffer_items) |
|
|
| buffer_remaining = None |
| if buffer_oldest: |
| age = time.time() - buffer_oldest |
| buffer_remaining = max(BUFFER_SECONDS - age, 0) |
|
|
| last_update_text = ( |
| datetime.fromtimestamp(last_update).strftime("%Y-%m-%d %H:%M:%S") if last_update else None |
| ) |
|
|
| return { |
| "buffer_size": buffer_count, |
| "total_summaries": summaries_count, |
| "last_update": last_update_text, |
| "buffer_remaining_seconds": buffer_remaining, |
| "batch_max_chars": BATCH_MAX_CHARS, |
| "buffer_window_seconds": BUFFER_SECONDS, |
| } |
|
|
|
|
| def fetch_summaries(limit: int = 50) -> List[Dict]: |
| init_storage() |
| with _file_lock(): |
| summaries = _read_jsonl(SUMMARIES_FILE) |
|
|
| summaries.sort(key=lambda x: x.get("updated_at", 0), reverse=True) |
| results = [] |
| for item in summaries[:limit]: |
| results.append( |
| { |
| "title": item.get("title", ""), |
| "source": item.get("source", ""), |
| "summary": item.get("summary", ""), |
| "timestamp": datetime.fromtimestamp(item.get("updated_at", time.time())), |
| } |
| ) |
| return results |
|
|
|
|
| def fetch_ready_batches(max_chars_total: int, buffer_seconds: int) -> List[List[Tuple[str, str, str]]]: |
| init_storage() |
| cutoff = time.time() - buffer_seconds |
|
|
| with _file_lock(): |
| buffer_items = _read_jsonl(BUFFER_FILE) |
|
|
| eligible = [item for item in buffer_items if item.get("created_at", 0) <= cutoff] |
| eligible.sort(key=lambda x: x.get("created_at", 0)) |
|
|
| batches: List[List[Tuple[str, str, str]]] = [] |
| current: List[Tuple[str, str, str]] = [] |
| current_chars = 0 |
|
|
| for item in eligible: |
| title = item.get("title", "") |
| source = item.get("source", "") |
| text = _build_input_text(title, source) |
| text_len = len(text) |
| if current and current_chars + text_len > max_chars_total: |
| batches.append(current) |
| current = [] |
| current_chars = 0 |
| current.append((item.get("item_key"), title, source)) |
| current_chars += text_len |
|
|
| if current: |
| batches.append(current) |
|
|
| return batches |
|
|
|
|
| def store_summaries(items: List[Tuple[str, str, str, str]]): |
| if not items: |
| return |
|
|
| init_storage() |
| now = time.time() |
|
|
| with _file_lock(): |
| summaries = _read_jsonl(SUMMARIES_FILE) |
| buffer_items = _read_jsonl(BUFFER_FILE) |
|
|
| summaries_by_key = {item.get("item_key"): item for item in summaries if item.get("item_key")} |
| buffer_by_key = {item.get("item_key"): item for item in buffer_items if item.get("item_key")} |
|
|
| for item_key, title, source, summary in items: |
| summaries_by_key[item_key] = { |
| "item_key": item_key, |
| "title": title, |
| "source": source, |
| "summary": summary, |
| "updated_at": now, |
| } |
| if item_key in buffer_by_key: |
| del buffer_by_key[item_key] |
|
|
| _write_jsonl(SUMMARIES_FILE, list(summaries_by_key.values())) |
| _write_jsonl(BUFFER_FILE, list(buffer_by_key.values())) |
|
|
| _write_meta({"last_sync": None, "last_update": now}) |
| _sync_to_hf_if_configured() |
|
|
|
|
| def _item_key(item: Dict) -> str: |
| if item.get("id") is not None: |
| return str(item.get("id")) |
| title = str(item.get("title", "")).strip() |
| source = str(item.get("source", "")).strip() |
| if not title: |
| return "" |
| return f"{source}|{title}".lower() |
|
|
|
|
| def _build_input_text(title: str, source: str) -> str: |
| if source: |
| return f"Source: {source}\nTitle: {title}" |
| return f"Title: {title}" |
|
|
|
|
| def _ensure_files(): |
| for name in (BUFFER_FILE, SUMMARIES_FILE): |
| path = os.path.join(CACHE_DIR, name) |
| if not os.path.exists(path): |
| with open(path, "w", encoding="utf-8") as f: |
| f.write("") |
|
|
|
|
| def _read_jsonl(filename: str) -> List[Dict]: |
| path = os.path.join(CACHE_DIR, filename) |
| if not os.path.exists(path): |
| return [] |
| items = [] |
| with open(path, "r", encoding="utf-8") as f: |
| for line in f: |
| line = line.strip() |
| if not line: |
| continue |
| try: |
| items.append(json.loads(line)) |
| except Exception: |
| continue |
| return items |
|
|
|
|
| def _write_jsonl(filename: str, items: List[Dict]): |
| path = os.path.join(CACHE_DIR, filename) |
| tmp_path = path + ".tmp" |
| with open(tmp_path, "w", encoding="utf-8") as f: |
| for item in items: |
| f.write(json.dumps(item, ensure_ascii=True) + "\n") |
| os.replace(tmp_path, path) |
|
|
|
|
| def _write_meta(data: Dict): |
| path = os.path.join(CACHE_DIR, META_FILE) |
| tmp_path = path + ".tmp" |
| with open(tmp_path, "w", encoding="utf-8") as f: |
| json.dump(data, f) |
| os.replace(tmp_path, path) |
|
|
|
|
| @contextmanager |
| def _file_lock(): |
| os.makedirs(CACHE_DIR, exist_ok=True) |
| lock_path = os.path.join(CACHE_DIR, LOCK_FILE) |
| if fcntl is None: |
| yield |
| return |
| with open(lock_path, "w", encoding="utf-8") as lock_file: |
| fcntl.flock(lock_file, fcntl.LOCK_EX) |
| try: |
| yield |
| finally: |
| fcntl.flock(lock_file, fcntl.LOCK_UN) |
|
|
|
|
| def _maybe_restore_from_hf(): |
| if not snapshot_download: |
| return |
| if not HF_REPO_ID: |
| return |
| if os.path.exists(os.path.join(CACHE_DIR, SUMMARIES_FILE)): |
| return |
| snapshot_download( |
| repo_id=HF_REPO_ID, |
| repo_type=HF_REPO_TYPE, |
| local_dir=CACHE_DIR, |
| local_dir_use_symlinks=False, |
| ) |
|
|
|
|
| def _sync_to_hf_if_configured(): |
| if not HfApi or not HF_REPO_ID: |
| return |
| api = HfApi() |
| api.upload_folder( |
| folder_path=CACHE_DIR, |
| repo_id=HF_REPO_ID, |
| repo_type=HF_REPO_TYPE, |
| ) |
|
|