File size: 7,771 Bytes
80b6680
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
"""
app/services/cache_service.py
All SQLite read/write operations for the cache layer.
No pipeline logic lives here β€” only DB operations.
"""

import json
import re
from datetime import datetime
from typing import Optional

from db import get_db


# ── Helpers ───────────────────────────────────────────────────────────────────

def _clean_display_name(folder_name: str) -> str:
    """Strip year/quarter suffixes from folder names."""
    name = re.sub(r'\s*\d{4}\s*Quarterly\s*Data\s*', '', folder_name, flags=re.IGNORECASE)
    name = re.sub(r'\s*Quarterly\s*Data\s*', '', name, flags=re.IGNORECASE)
    return name.strip()


# ── Company registry ──────────────────────────────────────────────────────────

def get_all_companies() -> list:
    """Return all companies with their processing status."""
    with get_db() as conn:
        rows = conn.execute(
            "SELECT id, folder_name, display_name, status, processed_at FROM companies ORDER BY display_name"
        ).fetchall()
    return [dict(r) for r in rows]


def get_ready_companies() -> list:
    """Return only companies that have been successfully processed."""
    with get_db() as conn:
        rows = conn.execute(
            "SELECT id, folder_name, display_name, processed_at FROM companies WHERE status = 'ready' ORDER BY display_name"
        ).fetchall()
    return [dict(r) for r in rows]


def get_company_by_folder(folder_name: str) -> Optional[dict]:
    with get_db() as conn:
        row = conn.execute(
            "SELECT * FROM companies WHERE folder_name = ?", (folder_name,)
        ).fetchone()
    return dict(row) if row else None


def upsert_company(folder_name: str, status: str = "pending", error_msg: str = None) -> int:
    """Insert or update a company record. Returns company id."""
    display = _clean_display_name(folder_name)
    with get_db() as conn:
        conn.execute("""
            INSERT INTO companies (folder_name, display_name, status, processed_at, error_msg)
            VALUES (?, ?, ?, ?, ?)
            ON CONFLICT(folder_name) DO UPDATE SET
                status       = excluded.status,
                processed_at = excluded.processed_at,
                error_msg    = excluded.error_msg
        """, (folder_name, display, status,
              datetime.utcnow().isoformat() if status == "ready" else None,
              error_msg))
        row = conn.execute(
            "SELECT id FROM companies WHERE folder_name = ?", (folder_name,)
        ).fetchone()
    return row["id"]


# ── Analysis cache ────────────────────────────────────────────────────────────

def get_cached_analysis(company_id: int, mode: str) -> Optional[dict]:
    """Return cached analysis result or None if not found."""
    with get_db() as conn:
        row = conn.execute(
            "SELECT result_json FROM analysis_cache WHERE company_id = ? AND mode = ?",
            (company_id, mode)
        ).fetchone()
    return json.loads(row["result_json"]) if row else None


def save_analysis(company_id: int, mode: str, result: dict) -> None:
    """Save (overwrite) analysis result for a company+mode."""
    with get_db() as conn:
        conn.execute("""
            INSERT INTO analysis_cache (company_id, mode, result_json, created_at)
            VALUES (?, ?, ?, ?)
            ON CONFLICT(company_id, mode) DO UPDATE SET
                result_json = excluded.result_json,
                created_at  = excluded.created_at
        """, (company_id, mode, json.dumps(result), datetime.utcnow().isoformat()))


# ── Claims ────────────────────────────────────────────────────────────────────

def save_claims(company_id: int, claims: list) -> None:
    """Overwrite all claims for a company."""
    with get_db() as conn:
        conn.execute("DELETE FROM claims WHERE company_id = ?", (company_id,))
        conn.executemany("""
            INSERT INTO claims
              (company_id, quarter, sentence, metric, direction, magnitude, result, actual_change, confidence)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
        """, [(
            company_id,
            c.get("quarter"),
            c.get("sentence"),
            c.get("metric"),
            c.get("direction"),
            c.get("magnitude"),
            c.get("result"),
            c.get("actual_change"),
            c.get("confidence"),
        ) for c in claims])


def get_claims(company_id: int) -> list:
    with get_db() as conn:
        rows = conn.execute(
            "SELECT * FROM claims WHERE company_id = ? ORDER BY quarter", (company_id,)
        ).fetchall()
    return [dict(r) for r in rows]


# ── Timeseries ────────────────────────────────────────────────────────────────

def save_timeseries(company_id: int, ts_records: list) -> None:
    """Overwrite all timeseries rows for a company."""
    with get_db() as conn:
        conn.execute("DELETE FROM timeseries WHERE company_id = ?", (company_id,))
        conn.executemany("""
            INSERT OR REPLACE INTO timeseries
              (company_id, quarter, revenue, net_profit, operating_profit, profit_margin,
               revenue_qoq_change, net_profit_qoq_change, operating_profit_qoq_change, profit_margin_qoq_change,
               revenue_yoy_change, net_profit_yoy_change, operating_profit_yoy_change, profit_margin_yoy_change)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
        """, [(
            company_id,
            r.get("quarter"),
            r.get("revenue"),
            r.get("net_profit"),
            r.get("operating_profit"),
            r.get("profit_margin"),
            r.get("revenue_qoq_change"),
            r.get("net_profit_qoq_change"),
            r.get("operating_profit_qoq_change"),
            r.get("profit_margin_qoq_change"),
            r.get("revenue_yoy_change"),
            r.get("net_profit_yoy_change"),
            r.get("operating_profit_yoy_change"),
            r.get("profit_margin_yoy_change"),
        ) for r in ts_records])


# ── Risk ──────────────────────────────────────────────────────────────────────

def save_risk(company_id: int, risk_records: list) -> None:
    """Overwrite all risk rows for a company."""
    with get_db() as conn:
        conn.execute("DELETE FROM risk WHERE company_id = ?", (company_id,))
        conn.executemany("""
            INSERT OR REPLACE INTO risk
              (company_id, quarter, total_claims, verification_rate, failure_rate,
               partial_rate, direction_mismatch_rate, consistency_score, risk_drift, warning_flag)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
        """, [(
            company_id,
            r.get("quarter"),
            r.get("total_claims"),
            r.get("verification_rate"),
            r.get("failure_rate"),
            r.get("partial_rate"),
            r.get("direction_mismatch_rate"),
            r.get("consistency_score"),
            r.get("risk_drift"),
            r.get("warning_flag"),
        ) for r in risk_records])