Spaces:
Sleeping
Sleeping
File size: 7,771 Bytes
80b6680 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 | """
app/services/cache_service.py
All SQLite read/write operations for the cache layer.
No pipeline logic lives here β only DB operations.
"""
import json
import re
from datetime import datetime
from typing import Optional
from db import get_db
# ββ Helpers βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def _clean_display_name(folder_name: str) -> str:
"""Strip year/quarter suffixes from folder names."""
name = re.sub(r'\s*\d{4}\s*Quarterly\s*Data\s*', '', folder_name, flags=re.IGNORECASE)
name = re.sub(r'\s*Quarterly\s*Data\s*', '', name, flags=re.IGNORECASE)
return name.strip()
# ββ Company registry ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def get_all_companies() -> list:
"""Return all companies with their processing status."""
with get_db() as conn:
rows = conn.execute(
"SELECT id, folder_name, display_name, status, processed_at FROM companies ORDER BY display_name"
).fetchall()
return [dict(r) for r in rows]
def get_ready_companies() -> list:
"""Return only companies that have been successfully processed."""
with get_db() as conn:
rows = conn.execute(
"SELECT id, folder_name, display_name, processed_at FROM companies WHERE status = 'ready' ORDER BY display_name"
).fetchall()
return [dict(r) for r in rows]
def get_company_by_folder(folder_name: str) -> Optional[dict]:
with get_db() as conn:
row = conn.execute(
"SELECT * FROM companies WHERE folder_name = ?", (folder_name,)
).fetchone()
return dict(row) if row else None
def upsert_company(folder_name: str, status: str = "pending", error_msg: str = None) -> int:
"""Insert or update a company record. Returns company id."""
display = _clean_display_name(folder_name)
with get_db() as conn:
conn.execute("""
INSERT INTO companies (folder_name, display_name, status, processed_at, error_msg)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(folder_name) DO UPDATE SET
status = excluded.status,
processed_at = excluded.processed_at,
error_msg = excluded.error_msg
""", (folder_name, display, status,
datetime.utcnow().isoformat() if status == "ready" else None,
error_msg))
row = conn.execute(
"SELECT id FROM companies WHERE folder_name = ?", (folder_name,)
).fetchone()
return row["id"]
# ββ Analysis cache ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def get_cached_analysis(company_id: int, mode: str) -> Optional[dict]:
"""Return cached analysis result or None if not found."""
with get_db() as conn:
row = conn.execute(
"SELECT result_json FROM analysis_cache WHERE company_id = ? AND mode = ?",
(company_id, mode)
).fetchone()
return json.loads(row["result_json"]) if row else None
def save_analysis(company_id: int, mode: str, result: dict) -> None:
"""Save (overwrite) analysis result for a company+mode."""
with get_db() as conn:
conn.execute("""
INSERT INTO analysis_cache (company_id, mode, result_json, created_at)
VALUES (?, ?, ?, ?)
ON CONFLICT(company_id, mode) DO UPDATE SET
result_json = excluded.result_json,
created_at = excluded.created_at
""", (company_id, mode, json.dumps(result), datetime.utcnow().isoformat()))
# ββ Claims ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def save_claims(company_id: int, claims: list) -> None:
"""Overwrite all claims for a company."""
with get_db() as conn:
conn.execute("DELETE FROM claims WHERE company_id = ?", (company_id,))
conn.executemany("""
INSERT INTO claims
(company_id, quarter, sentence, metric, direction, magnitude, result, actual_change, confidence)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", [(
company_id,
c.get("quarter"),
c.get("sentence"),
c.get("metric"),
c.get("direction"),
c.get("magnitude"),
c.get("result"),
c.get("actual_change"),
c.get("confidence"),
) for c in claims])
def get_claims(company_id: int) -> list:
with get_db() as conn:
rows = conn.execute(
"SELECT * FROM claims WHERE company_id = ? ORDER BY quarter", (company_id,)
).fetchall()
return [dict(r) for r in rows]
# ββ Timeseries ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def save_timeseries(company_id: int, ts_records: list) -> None:
"""Overwrite all timeseries rows for a company."""
with get_db() as conn:
conn.execute("DELETE FROM timeseries WHERE company_id = ?", (company_id,))
conn.executemany("""
INSERT OR REPLACE INTO timeseries
(company_id, quarter, revenue, net_profit, operating_profit, profit_margin,
revenue_qoq_change, net_profit_qoq_change, operating_profit_qoq_change, profit_margin_qoq_change,
revenue_yoy_change, net_profit_yoy_change, operating_profit_yoy_change, profit_margin_yoy_change)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", [(
company_id,
r.get("quarter"),
r.get("revenue"),
r.get("net_profit"),
r.get("operating_profit"),
r.get("profit_margin"),
r.get("revenue_qoq_change"),
r.get("net_profit_qoq_change"),
r.get("operating_profit_qoq_change"),
r.get("profit_margin_qoq_change"),
r.get("revenue_yoy_change"),
r.get("net_profit_yoy_change"),
r.get("operating_profit_yoy_change"),
r.get("profit_margin_yoy_change"),
) for r in ts_records])
# ββ Risk ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def save_risk(company_id: int, risk_records: list) -> None:
"""Overwrite all risk rows for a company."""
with get_db() as conn:
conn.execute("DELETE FROM risk WHERE company_id = ?", (company_id,))
conn.executemany("""
INSERT OR REPLACE INTO risk
(company_id, quarter, total_claims, verification_rate, failure_rate,
partial_rate, direction_mismatch_rate, consistency_score, risk_drift, warning_flag)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", [(
company_id,
r.get("quarter"),
r.get("total_claims"),
r.get("verification_rate"),
r.get("failure_rate"),
r.get("partial_rate"),
r.get("direction_mismatch_rate"),
r.get("consistency_score"),
r.get("risk_drift"),
r.get("warning_flag"),
) for r in risk_records]) |