File size: 8,434 Bytes
e189a31 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 | """File-backed AI summary buffer and cache with optional HF dataset sync."""
import json
import os
import time
from contextlib import contextmanager
from datetime import datetime
from typing import Dict, Iterable, List, Optional, Tuple
try:
import fcntl
except Exception: # pragma: no cover
fcntl = None
try:
from huggingface_hub import HfApi, snapshot_download
except Exception: # pragma: no cover
HfApi = None
snapshot_download = None
CACHE_DIR = os.getenv("AI_SUMMARY_CACHE_DIR", "./ai-summary-cache")
BUFFER_SECONDS = int(os.getenv("LLM_SUMMARY_BUFFER_SECONDS", "120"))
BATCH_MAX_CHARS = int(os.getenv("LLM_SUMMARY_BATCH_MAX_CHARS", "2400"))
HF_REPO_ID = os.getenv("AI_SUMMARY_HF_REPO", "ResearchEngineering/ai_news_summaries")
HF_REPO_TYPE = os.getenv("AI_SUMMARY_HF_REPO_TYPE", "dataset")
BUFFER_FILE = "buffer.jsonl"
SUMMARIES_FILE = "summaries.jsonl"
META_FILE = "meta.json"
LOCK_FILE = ".lock"
def init_storage():
os.makedirs(CACHE_DIR, exist_ok=True)
if snapshot_download and HF_REPO_ID:
_maybe_restore_from_hf()
_ensure_files()
def enqueue_items(items: Iterable[Dict]):
init_storage()
now = time.time()
with _file_lock():
buffer_items = _read_jsonl(BUFFER_FILE)
summaries = _read_jsonl(SUMMARIES_FILE)
existing_keys = {item.get("item_key") for item in buffer_items if item.get("item_key")}
existing_keys.update({item.get("item_key") for item in summaries if item.get("item_key")})
added = 0
for item in items:
key = _item_key(item)
title = str(item.get("title", "")).strip()
if not key or not title or key in existing_keys:
continue
source = str(item.get("source", "")).strip()
buffer_items.append(
{
"item_key": key,
"title": title,
"source": source,
"created_at": now,
}
)
existing_keys.add(key)
added += 1
if added:
_write_jsonl(BUFFER_FILE, buffer_items)
def get_status() -> Dict:
init_storage()
with _file_lock():
buffer_items = _read_jsonl(BUFFER_FILE)
summaries = _read_jsonl(SUMMARIES_FILE)
buffer_count = len(buffer_items)
summaries_count = len(summaries)
last_update = None
if summaries:
last_update = max(item.get("updated_at", 0) for item in summaries)
buffer_oldest = None
if buffer_items:
buffer_oldest = min(item.get("created_at", 0) for item in buffer_items)
buffer_remaining = None
if buffer_oldest:
age = time.time() - buffer_oldest
buffer_remaining = max(BUFFER_SECONDS - age, 0)
last_update_text = (
datetime.fromtimestamp(last_update).strftime("%Y-%m-%d %H:%M:%S") if last_update else None
)
return {
"buffer_size": buffer_count,
"total_summaries": summaries_count,
"last_update": last_update_text,
"buffer_remaining_seconds": buffer_remaining,
"batch_max_chars": BATCH_MAX_CHARS,
"buffer_window_seconds": BUFFER_SECONDS,
}
def fetch_summaries(limit: int = 50) -> List[Dict]:
init_storage()
with _file_lock():
summaries = _read_jsonl(SUMMARIES_FILE)
summaries.sort(key=lambda x: x.get("updated_at", 0), reverse=True)
results = []
for item in summaries[:limit]:
results.append(
{
"title": item.get("title", ""),
"source": item.get("source", ""),
"summary": item.get("summary", ""),
"timestamp": datetime.fromtimestamp(item.get("updated_at", time.time())),
}
)
return results
def fetch_ready_batches(max_chars_total: int, buffer_seconds: int) -> List[List[Tuple[str, str, str]]]:
init_storage()
cutoff = time.time() - buffer_seconds
with _file_lock():
buffer_items = _read_jsonl(BUFFER_FILE)
eligible = [item for item in buffer_items if item.get("created_at", 0) <= cutoff]
eligible.sort(key=lambda x: x.get("created_at", 0))
batches: List[List[Tuple[str, str, str]]] = []
current: List[Tuple[str, str, str]] = []
current_chars = 0
for item in eligible:
title = item.get("title", "")
source = item.get("source", "")
text = _build_input_text(title, source)
text_len = len(text)
if current and current_chars + text_len > max_chars_total:
batches.append(current)
current = []
current_chars = 0
current.append((item.get("item_key"), title, source))
current_chars += text_len
if current:
batches.append(current)
return batches
def store_summaries(items: List[Tuple[str, str, str, str]]):
if not items:
return
init_storage()
now = time.time()
with _file_lock():
summaries = _read_jsonl(SUMMARIES_FILE)
buffer_items = _read_jsonl(BUFFER_FILE)
summaries_by_key = {item.get("item_key"): item for item in summaries if item.get("item_key")}
buffer_by_key = {item.get("item_key"): item for item in buffer_items if item.get("item_key")}
for item_key, title, source, summary in items:
summaries_by_key[item_key] = {
"item_key": item_key,
"title": title,
"source": source,
"summary": summary,
"updated_at": now,
}
if item_key in buffer_by_key:
del buffer_by_key[item_key]
_write_jsonl(SUMMARIES_FILE, list(summaries_by_key.values()))
_write_jsonl(BUFFER_FILE, list(buffer_by_key.values()))
_write_meta({"last_sync": None, "last_update": now})
_sync_to_hf_if_configured()
def _item_key(item: Dict) -> str:
if item.get("id") is not None:
return str(item.get("id"))
title = str(item.get("title", "")).strip()
source = str(item.get("source", "")).strip()
if not title:
return ""
return f"{source}|{title}".lower()
def _build_input_text(title: str, source: str) -> str:
if source:
return f"Source: {source}\nTitle: {title}"
return f"Title: {title}"
def _ensure_files():
for name in (BUFFER_FILE, SUMMARIES_FILE):
path = os.path.join(CACHE_DIR, name)
if not os.path.exists(path):
with open(path, "w", encoding="utf-8") as f:
f.write("")
def _read_jsonl(filename: str) -> List[Dict]:
path = os.path.join(CACHE_DIR, filename)
if not os.path.exists(path):
return []
items = []
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
items.append(json.loads(line))
except Exception:
continue
return items
def _write_jsonl(filename: str, items: List[Dict]):
path = os.path.join(CACHE_DIR, filename)
tmp_path = path + ".tmp"
with open(tmp_path, "w", encoding="utf-8") as f:
for item in items:
f.write(json.dumps(item, ensure_ascii=True) + "\n")
os.replace(tmp_path, path)
def _write_meta(data: Dict):
path = os.path.join(CACHE_DIR, META_FILE)
tmp_path = path + ".tmp"
with open(tmp_path, "w", encoding="utf-8") as f:
json.dump(data, f)
os.replace(tmp_path, path)
@contextmanager
def _file_lock():
os.makedirs(CACHE_DIR, exist_ok=True)
lock_path = os.path.join(CACHE_DIR, LOCK_FILE)
if fcntl is None:
yield
return
with open(lock_path, "w", encoding="utf-8") as lock_file:
fcntl.flock(lock_file, fcntl.LOCK_EX)
try:
yield
finally:
fcntl.flock(lock_file, fcntl.LOCK_UN)
def _maybe_restore_from_hf():
if not snapshot_download:
return
if not HF_REPO_ID:
return
if os.path.exists(os.path.join(CACHE_DIR, SUMMARIES_FILE)):
return
snapshot_download(
repo_id=HF_REPO_ID,
repo_type=HF_REPO_TYPE,
local_dir=CACHE_DIR,
local_dir_use_symlinks=False,
)
def _sync_to_hf_if_configured():
if not HfApi or not HF_REPO_ID:
return
api = HfApi()
api.upload_folder(
folder_path=CACHE_DIR,
repo_id=HF_REPO_ID,
repo_type=HF_REPO_TYPE,
)
|