PromiseTrack-AI / app /services /cache_service.py
Harisri
Initial deployment: PromiseTrack AI with 15 companies
80b6680
"""
app/services/cache_service.py
All SQLite read/write operations for the cache layer.
No pipeline logic lives here β€” only DB operations.
"""
import json
import re
from datetime import datetime
from typing import Optional
from db import get_db
# ── Helpers ───────────────────────────────────────────────────────────────────
def _clean_display_name(folder_name: str) -> str:
"""Strip year/quarter suffixes from folder names."""
name = re.sub(r'\s*\d{4}\s*Quarterly\s*Data\s*', '', folder_name, flags=re.IGNORECASE)
name = re.sub(r'\s*Quarterly\s*Data\s*', '', name, flags=re.IGNORECASE)
return name.strip()
# ── Company registry ──────────────────────────────────────────────────────────
def get_all_companies() -> list:
"""Return all companies with their processing status."""
with get_db() as conn:
rows = conn.execute(
"SELECT id, folder_name, display_name, status, processed_at FROM companies ORDER BY display_name"
).fetchall()
return [dict(r) for r in rows]
def get_ready_companies() -> list:
"""Return only companies that have been successfully processed."""
with get_db() as conn:
rows = conn.execute(
"SELECT id, folder_name, display_name, processed_at FROM companies WHERE status = 'ready' ORDER BY display_name"
).fetchall()
return [dict(r) for r in rows]
def get_company_by_folder(folder_name: str) -> Optional[dict]:
with get_db() as conn:
row = conn.execute(
"SELECT * FROM companies WHERE folder_name = ?", (folder_name,)
).fetchone()
return dict(row) if row else None
def upsert_company(folder_name: str, status: str = "pending", error_msg: str = None) -> int:
"""Insert or update a company record. Returns company id."""
display = _clean_display_name(folder_name)
with get_db() as conn:
conn.execute("""
INSERT INTO companies (folder_name, display_name, status, processed_at, error_msg)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(folder_name) DO UPDATE SET
status = excluded.status,
processed_at = excluded.processed_at,
error_msg = excluded.error_msg
""", (folder_name, display, status,
datetime.utcnow().isoformat() if status == "ready" else None,
error_msg))
row = conn.execute(
"SELECT id FROM companies WHERE folder_name = ?", (folder_name,)
).fetchone()
return row["id"]
# ── Analysis cache ────────────────────────────────────────────────────────────
def get_cached_analysis(company_id: int, mode: str) -> Optional[dict]:
"""Return cached analysis result or None if not found."""
with get_db() as conn:
row = conn.execute(
"SELECT result_json FROM analysis_cache WHERE company_id = ? AND mode = ?",
(company_id, mode)
).fetchone()
return json.loads(row["result_json"]) if row else None
def save_analysis(company_id: int, mode: str, result: dict) -> None:
"""Save (overwrite) analysis result for a company+mode."""
with get_db() as conn:
conn.execute("""
INSERT INTO analysis_cache (company_id, mode, result_json, created_at)
VALUES (?, ?, ?, ?)
ON CONFLICT(company_id, mode) DO UPDATE SET
result_json = excluded.result_json,
created_at = excluded.created_at
""", (company_id, mode, json.dumps(result), datetime.utcnow().isoformat()))
# ── Claims ────────────────────────────────────────────────────────────────────
def save_claims(company_id: int, claims: list) -> None:
"""Overwrite all claims for a company."""
with get_db() as conn:
conn.execute("DELETE FROM claims WHERE company_id = ?", (company_id,))
conn.executemany("""
INSERT INTO claims
(company_id, quarter, sentence, metric, direction, magnitude, result, actual_change, confidence)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", [(
company_id,
c.get("quarter"),
c.get("sentence"),
c.get("metric"),
c.get("direction"),
c.get("magnitude"),
c.get("result"),
c.get("actual_change"),
c.get("confidence"),
) for c in claims])
def get_claims(company_id: int) -> list:
with get_db() as conn:
rows = conn.execute(
"SELECT * FROM claims WHERE company_id = ? ORDER BY quarter", (company_id,)
).fetchall()
return [dict(r) for r in rows]
# ── Timeseries ────────────────────────────────────────────────────────────────
def save_timeseries(company_id: int, ts_records: list) -> None:
"""Overwrite all timeseries rows for a company."""
with get_db() as conn:
conn.execute("DELETE FROM timeseries WHERE company_id = ?", (company_id,))
conn.executemany("""
INSERT OR REPLACE INTO timeseries
(company_id, quarter, revenue, net_profit, operating_profit, profit_margin,
revenue_qoq_change, net_profit_qoq_change, operating_profit_qoq_change, profit_margin_qoq_change,
revenue_yoy_change, net_profit_yoy_change, operating_profit_yoy_change, profit_margin_yoy_change)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", [(
company_id,
r.get("quarter"),
r.get("revenue"),
r.get("net_profit"),
r.get("operating_profit"),
r.get("profit_margin"),
r.get("revenue_qoq_change"),
r.get("net_profit_qoq_change"),
r.get("operating_profit_qoq_change"),
r.get("profit_margin_qoq_change"),
r.get("revenue_yoy_change"),
r.get("net_profit_yoy_change"),
r.get("operating_profit_yoy_change"),
r.get("profit_margin_yoy_change"),
) for r in ts_records])
# ── Risk ──────────────────────────────────────────────────────────────────────
def save_risk(company_id: int, risk_records: list) -> None:
"""Overwrite all risk rows for a company."""
with get_db() as conn:
conn.execute("DELETE FROM risk WHERE company_id = ?", (company_id,))
conn.executemany("""
INSERT OR REPLACE INTO risk
(company_id, quarter, total_claims, verification_rate, failure_rate,
partial_rate, direction_mismatch_rate, consistency_score, risk_drift, warning_flag)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", [(
company_id,
r.get("quarter"),
r.get("total_claims"),
r.get("verification_rate"),
r.get("failure_rate"),
r.get("partial_rate"),
r.get("direction_mismatch_rate"),
r.get("consistency_score"),
r.get("risk_drift"),
r.get("warning_flag"),
) for r in risk_records])