UnifiedFinancialPlatform / app /utils /ai_summary_store.py
Dmitry Beresnev
init project
e189a31
"""File-backed AI summary buffer and cache with optional HF dataset sync."""
import json
import os
import time
from contextlib import contextmanager
from datetime import datetime
from typing import Dict, Iterable, List, Optional, Tuple
try:
import fcntl
except Exception: # pragma: no cover
fcntl = None
try:
from huggingface_hub import HfApi, snapshot_download
except Exception: # pragma: no cover
HfApi = None
snapshot_download = None
CACHE_DIR = os.getenv("AI_SUMMARY_CACHE_DIR", "./ai-summary-cache")
BUFFER_SECONDS = int(os.getenv("LLM_SUMMARY_BUFFER_SECONDS", "120"))
BATCH_MAX_CHARS = int(os.getenv("LLM_SUMMARY_BATCH_MAX_CHARS", "2400"))
HF_REPO_ID = os.getenv("AI_SUMMARY_HF_REPO", "ResearchEngineering/ai_news_summaries")
HF_REPO_TYPE = os.getenv("AI_SUMMARY_HF_REPO_TYPE", "dataset")
BUFFER_FILE = "buffer.jsonl"
SUMMARIES_FILE = "summaries.jsonl"
META_FILE = "meta.json"
LOCK_FILE = ".lock"
def init_storage():
os.makedirs(CACHE_DIR, exist_ok=True)
if snapshot_download and HF_REPO_ID:
_maybe_restore_from_hf()
_ensure_files()
def enqueue_items(items: Iterable[Dict]):
init_storage()
now = time.time()
with _file_lock():
buffer_items = _read_jsonl(BUFFER_FILE)
summaries = _read_jsonl(SUMMARIES_FILE)
existing_keys = {item.get("item_key") for item in buffer_items if item.get("item_key")}
existing_keys.update({item.get("item_key") for item in summaries if item.get("item_key")})
added = 0
for item in items:
key = _item_key(item)
title = str(item.get("title", "")).strip()
if not key or not title or key in existing_keys:
continue
source = str(item.get("source", "")).strip()
buffer_items.append(
{
"item_key": key,
"title": title,
"source": source,
"created_at": now,
}
)
existing_keys.add(key)
added += 1
if added:
_write_jsonl(BUFFER_FILE, buffer_items)
def get_status() -> Dict:
init_storage()
with _file_lock():
buffer_items = _read_jsonl(BUFFER_FILE)
summaries = _read_jsonl(SUMMARIES_FILE)
buffer_count = len(buffer_items)
summaries_count = len(summaries)
last_update = None
if summaries:
last_update = max(item.get("updated_at", 0) for item in summaries)
buffer_oldest = None
if buffer_items:
buffer_oldest = min(item.get("created_at", 0) for item in buffer_items)
buffer_remaining = None
if buffer_oldest:
age = time.time() - buffer_oldest
buffer_remaining = max(BUFFER_SECONDS - age, 0)
last_update_text = (
datetime.fromtimestamp(last_update).strftime("%Y-%m-%d %H:%M:%S") if last_update else None
)
return {
"buffer_size": buffer_count,
"total_summaries": summaries_count,
"last_update": last_update_text,
"buffer_remaining_seconds": buffer_remaining,
"batch_max_chars": BATCH_MAX_CHARS,
"buffer_window_seconds": BUFFER_SECONDS,
}
def fetch_summaries(limit: int = 50) -> List[Dict]:
init_storage()
with _file_lock():
summaries = _read_jsonl(SUMMARIES_FILE)
summaries.sort(key=lambda x: x.get("updated_at", 0), reverse=True)
results = []
for item in summaries[:limit]:
results.append(
{
"title": item.get("title", ""),
"source": item.get("source", ""),
"summary": item.get("summary", ""),
"timestamp": datetime.fromtimestamp(item.get("updated_at", time.time())),
}
)
return results
def fetch_ready_batches(max_chars_total: int, buffer_seconds: int) -> List[List[Tuple[str, str, str]]]:
init_storage()
cutoff = time.time() - buffer_seconds
with _file_lock():
buffer_items = _read_jsonl(BUFFER_FILE)
eligible = [item for item in buffer_items if item.get("created_at", 0) <= cutoff]
eligible.sort(key=lambda x: x.get("created_at", 0))
batches: List[List[Tuple[str, str, str]]] = []
current: List[Tuple[str, str, str]] = []
current_chars = 0
for item in eligible:
title = item.get("title", "")
source = item.get("source", "")
text = _build_input_text(title, source)
text_len = len(text)
if current and current_chars + text_len > max_chars_total:
batches.append(current)
current = []
current_chars = 0
current.append((item.get("item_key"), title, source))
current_chars += text_len
if current:
batches.append(current)
return batches
def store_summaries(items: List[Tuple[str, str, str, str]]):
if not items:
return
init_storage()
now = time.time()
with _file_lock():
summaries = _read_jsonl(SUMMARIES_FILE)
buffer_items = _read_jsonl(BUFFER_FILE)
summaries_by_key = {item.get("item_key"): item for item in summaries if item.get("item_key")}
buffer_by_key = {item.get("item_key"): item for item in buffer_items if item.get("item_key")}
for item_key, title, source, summary in items:
summaries_by_key[item_key] = {
"item_key": item_key,
"title": title,
"source": source,
"summary": summary,
"updated_at": now,
}
if item_key in buffer_by_key:
del buffer_by_key[item_key]
_write_jsonl(SUMMARIES_FILE, list(summaries_by_key.values()))
_write_jsonl(BUFFER_FILE, list(buffer_by_key.values()))
_write_meta({"last_sync": None, "last_update": now})
_sync_to_hf_if_configured()
def _item_key(item: Dict) -> str:
if item.get("id") is not None:
return str(item.get("id"))
title = str(item.get("title", "")).strip()
source = str(item.get("source", "")).strip()
if not title:
return ""
return f"{source}|{title}".lower()
def _build_input_text(title: str, source: str) -> str:
if source:
return f"Source: {source}\nTitle: {title}"
return f"Title: {title}"
def _ensure_files():
for name in (BUFFER_FILE, SUMMARIES_FILE):
path = os.path.join(CACHE_DIR, name)
if not os.path.exists(path):
with open(path, "w", encoding="utf-8") as f:
f.write("")
def _read_jsonl(filename: str) -> List[Dict]:
path = os.path.join(CACHE_DIR, filename)
if not os.path.exists(path):
return []
items = []
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
items.append(json.loads(line))
except Exception:
continue
return items
def _write_jsonl(filename: str, items: List[Dict]):
path = os.path.join(CACHE_DIR, filename)
tmp_path = path + ".tmp"
with open(tmp_path, "w", encoding="utf-8") as f:
for item in items:
f.write(json.dumps(item, ensure_ascii=True) + "\n")
os.replace(tmp_path, path)
def _write_meta(data: Dict):
path = os.path.join(CACHE_DIR, META_FILE)
tmp_path = path + ".tmp"
with open(tmp_path, "w", encoding="utf-8") as f:
json.dump(data, f)
os.replace(tmp_path, path)
@contextmanager
def _file_lock():
os.makedirs(CACHE_DIR, exist_ok=True)
lock_path = os.path.join(CACHE_DIR, LOCK_FILE)
if fcntl is None:
yield
return
with open(lock_path, "w", encoding="utf-8") as lock_file:
fcntl.flock(lock_file, fcntl.LOCK_EX)
try:
yield
finally:
fcntl.flock(lock_file, fcntl.LOCK_UN)
def _maybe_restore_from_hf():
if not snapshot_download:
return
if not HF_REPO_ID:
return
if os.path.exists(os.path.join(CACHE_DIR, SUMMARIES_FILE)):
return
snapshot_download(
repo_id=HF_REPO_ID,
repo_type=HF_REPO_TYPE,
local_dir=CACHE_DIR,
local_dir_use_symlinks=False,
)
def _sync_to_hf_if_configured():
if not HfApi or not HF_REPO_ID:
return
api = HfApi()
api.upload_folder(
folder_path=CACHE_DIR,
repo_id=HF_REPO_ID,
repo_type=HF_REPO_TYPE,
)