Instant-SWOT-Agent / src /utils /analysis_cache.py
vn6295337's picture
Refactor: Analyzer handles revisions directly, remove Editor node
53fe655
"""
Analysis Cache - Supabase PostgreSQL caching for final SWOT analysis results.
Caches final SWOT analysis output with 24h TTL to avoid re-running the full pipeline.
Uses schema: asa.analysis_cache
"""
import os
import json
import logging
from datetime import datetime, timedelta, timezone
from typing import Optional
import psycopg2
from psycopg2.extras import RealDictCursor
from dotenv import load_dotenv
# Load environment variables (project .env first, then ~/.env for local overrides)
load_dotenv() # Project .env or HF Space secrets
load_dotenv(os.path.expanduser("~/.env")) # Local development overrides
logger = logging.getLogger("analysis-cache")
# Default TTL: 24 hours
DEFAULT_TTL_HOURS = 24
# Supabase PostgreSQL connection string
SUPABASE_DB_URL = os.getenv("PIPELINE_SUPABASE_URL")
def get_connection():
"""Get PostgreSQL connection to Supabase."""
if not SUPABASE_DB_URL:
raise RuntimeError("PIPELINE_SUPABASE_URL not set in environment")
return psycopg2.connect(SUPABASE_DB_URL)
def get_cached_analysis(ticker: str) -> Optional[dict]:
"""
Get cached analysis for a ticker if it exists and hasn't expired.
Args:
ticker: Stock ticker symbol
Returns:
Cached analysis dict or None if not found/expired
"""
try:
conn = get_connection()
cursor = conn.cursor(cursor_factory=RealDictCursor)
# Auto-cleanup: delete expired entries before checking cache
cursor.execute("DELETE FROM asa.analysis_cache WHERE expires_at <= NOW()")
deleted = cursor.rowcount
if deleted > 0:
conn.commit()
logger.info(f"Auto-cleanup: removed {deleted} expired cache entries")
cursor.execute("""
SELECT data, expires_at
FROM asa.analysis_cache
WHERE ticker = %s AND expires_at > NOW()
""", (ticker.upper(),))
row = cursor.fetchone()
cursor.close()
conn.close()
if row:
data = row['data']
if isinstance(data, str):
data = json.loads(data)
# Add cache metadata
data["_cache_info"] = {
"cached": True,
"expires_at": row['expires_at'].isoformat() if row['expires_at'] else None
}
logger.info(f"Cache HIT for {ticker}")
return data
logger.info(f"Cache MISS for {ticker}")
return None
except Exception as e:
logger.error(f"Cache read error for {ticker}: {e}")
return None
def set_cached_analysis(ticker: str, company_name: str, data: dict, ttl_hours: int = DEFAULT_TTL_HOURS):
"""
Store analysis result in cache.
Args:
ticker: Stock ticker symbol
company_name: Company name
data: Full analysis result dict (swot_data, score, critique, etc.)
ttl_hours: Time-to-live in hours (default 24)
"""
try:
conn = get_connection()
cursor = conn.cursor()
# Auto-cleanup: delete expired entries before inserting new one
cursor.execute("DELETE FROM asa.analysis_cache WHERE expires_at <= NOW()")
deleted = cursor.rowcount
if deleted > 0:
logger.info(f"Auto-cleanup: removed {deleted} expired cache entries")
expires_at = datetime.now(timezone.utc) + timedelta(hours=ttl_hours)
# Remove cache info before storing
data_to_store = {k: v for k, v in data.items() if k != "_cache_info"}
cursor.execute("""
INSERT INTO asa.analysis_cache (ticker, company_name, data, created_at, expires_at)
VALUES (%s, %s, %s, NOW(), %s)
ON CONFLICT (ticker)
DO UPDATE SET
company_name = EXCLUDED.company_name,
data = EXCLUDED.data,
created_at = NOW(),
expires_at = EXCLUDED.expires_at
""", (ticker.upper(), company_name, json.dumps(data_to_store, default=str), expires_at))
conn.commit()
cursor.close()
conn.close()
logger.info(f"Cached analysis for {ticker} (expires: {expires_at})")
except Exception as e:
logger.error(f"Cache write error for {ticker}: {e}")
def clear_cache(ticker: Optional[str] = None):
"""
Clear cache entries.
Args:
ticker: If provided, clear only this ticker. Otherwise clear all.
"""
try:
conn = get_connection()
cursor = conn.cursor()
if ticker:
cursor.execute("DELETE FROM asa.analysis_cache WHERE ticker = %s", (ticker.upper(),))
logger.info(f"Cleared cache for {ticker}")
else:
cursor.execute("DELETE FROM asa.analysis_cache")
logger.info("Cleared all cache entries")
conn.commit()
cursor.close()
conn.close()
except Exception as e:
logger.error(f"Cache clear error: {e}")
def clear_expired_cache() -> int:
"""Remove all expired cache entries. Returns count of deleted entries."""
try:
conn = get_connection()
cursor = conn.cursor()
cursor.execute("DELETE FROM asa.analysis_cache WHERE expires_at <= NOW()")
deleted = cursor.rowcount
conn.commit()
cursor.close()
conn.close()
logger.info(f"Cleared {deleted} expired cache entries")
return deleted
except Exception as e:
logger.error(f"Cache cleanup error: {e}")
return 0
def get_cache_stats() -> dict:
"""Get cache statistics."""
try:
conn = get_connection()
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM asa.analysis_cache")
total = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM asa.analysis_cache WHERE expires_at > NOW()")
valid = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM asa.analysis_cache WHERE expires_at <= NOW()")
expired = cursor.fetchone()[0]
cursor.close()
conn.close()
return {
"total_entries": total,
"valid_entries": valid,
"expired_entries": expired
}
except Exception as e:
logger.error(f"Cache stats error: {e}")
return {"total_entries": 0, "valid_entries": 0, "expired_entries": 0}