snowman-ai / cache.py
nextmarte's picture
feat: Snowman AI - MCP Literature Review Assistant for Hackathon
04fa10d
"""
Sistema de cache para evitar buscas repetidas de referências.
Usa SQLite para persistência local.
"""
import sqlite3
import json
import hashlib
from typing import Optional, List
from dataclasses import asdict
from search_services import SearchResult
# Constante para indicar resultado não encontrado (cacheado)
CACHE_NOT_FOUND = "__NOT_FOUND__"
class ReferenceCache:
"""Cache SQLite para resultados de busca de referências."""
def __init__(self, db_path: str = "reference_cache.db", expiry_days: int = 30):
self.db_path = db_path
self.expiry_days = expiry_days
self._init_db()
def _init_db(self):
"""Inicializa o banco de dados com a tabela de cache."""
with sqlite3.connect(self.db_path) as conn:
# Cache de buscas de referências
conn.execute("""
CREATE TABLE IF NOT EXISTS search_cache (
query_hash TEXT PRIMARY KEY,
query_text TEXT,
result_json TEXT,
found INTEGER,
source TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
hits INTEGER DEFAULT 0
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_created_at
ON search_cache(created_at)
""")
# Cache de parsing de PDFs (novo!)
conn.execute("""
CREATE TABLE IF NOT EXISTS pdf_parse_cache (
content_hash TEXT PRIMARY KEY,
pdf_name TEXT,
references_json TEXT,
ref_count INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
hits INTEGER DEFAULT 0
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_pdf_created_at
ON pdf_parse_cache(created_at)
""")
conn.commit()
def _hash_query(self, query: str) -> str:
"""Gera hash único para a query de busca."""
normalized = query.lower().strip()
return hashlib.sha256(normalized.encode()).hexdigest()[:32]
def _hash_content(self, content: str) -> str:
"""Gera hash único para conteúdo de texto (PDF)."""
return hashlib.sha256(content.encode()).hexdigest()[:32]
# ==================== Cache de Busca de Referências ====================
def get(self, query: str) -> Optional[SearchResult]:
"""
Busca resultado no cache.
Retorna None se não encontrado ou expirado.
"""
query_hash = self._hash_query(query)
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.execute(
"""
SELECT * FROM search_cache
WHERE query_hash = ?
AND created_at > datetime('now', ?)
""",
(query_hash, f"-{self.expiry_days} days"),
)
row = cursor.fetchone()
if row:
# Incrementa contador de hits
conn.execute(
"UPDATE search_cache SET hits = hits + 1 WHERE query_hash = ?",
(query_hash,),
)
conn.commit()
if row["found"]:
data = json.loads(row["result_json"])
return SearchResult(**data)
else:
# Resultado negativo cacheado (não encontrado)
return CACHE_NOT_FOUND # type: ignore
return None
def set(self, query: str, result: Optional[SearchResult]):
"""Salva resultado no cache."""
query_hash = self._hash_query(query)
if result is None:
result_json = "{}"
found = False
source = None
else:
result_json = json.dumps(asdict(result), ensure_ascii=False)
found = True
source = result.source
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"""
INSERT OR REPLACE INTO search_cache
(query_hash, query_text, result_json, found, source, created_at, hits)
VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP, 0)
""",
(query_hash, query[:500], result_json, found, source),
)
conn.commit()
def get_stats(self) -> dict:
"""Retorna estatísticas do cache."""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
# Total de entradas
total = conn.execute("SELECT COUNT(*) as cnt FROM search_cache").fetchone()[
"cnt"
]
# Entradas encontradas vs não encontradas
found = conn.execute(
"SELECT COUNT(*) as cnt FROM search_cache WHERE found = 1"
).fetchone()["cnt"]
# Total de hits
hits = (
conn.execute("SELECT SUM(hits) as total FROM search_cache").fetchone()[
"total"
]
or 0
)
# Por fonte
sources = conn.execute(
"""
SELECT source, COUNT(*) as cnt
FROM search_cache
WHERE found = 1
GROUP BY source
"""
).fetchall()
return {
"total_entries": total,
"found": found,
"not_found": total - found,
"total_hits": hits,
"by_source": {row["source"]: row["cnt"] for row in sources},
}
def clear_expired(self) -> int:
"""Remove entradas expiradas. Retorna quantidade removida."""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute(
"""
DELETE FROM search_cache
WHERE created_at < datetime('now', ?)
""",
(f"-{self.expiry_days} days",),
)
conn.commit()
return cursor.rowcount
def clear_all(self) -> int:
"""Limpa todo o cache. Retorna quantidade removida."""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute("DELETE FROM search_cache")
count1 = cursor.rowcount
cursor = conn.execute("DELETE FROM pdf_parse_cache")
count2 = cursor.rowcount
conn.commit()
return count1 + count2
# ==================== Cache de Parsing de PDF ====================
def get_pdf_refs(
self, text_content: str, pdf_name: str = ""
) -> Optional[List[str]]:
"""
Busca referências cacheadas para um PDF baseado no hash do conteúdo.
Retorna None se não encontrado ou expirado.
"""
content_hash = self._hash_content(text_content)
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.execute(
"""
SELECT * FROM pdf_parse_cache
WHERE content_hash = ?
AND created_at > datetime('now', ?)
""",
(content_hash, f"-{self.expiry_days} days"),
)
row = cursor.fetchone()
if row:
# Incrementa contador de hits
conn.execute(
"UPDATE pdf_parse_cache SET hits = hits + 1 WHERE content_hash = ?",
(content_hash,),
)
conn.commit()
refs = json.loads(row["references_json"])
return refs
return None
def set_pdf_refs(
self, text_content: str, references: List[str], pdf_name: str = ""
):
"""Salva referências extraídas no cache."""
content_hash = self._hash_content(text_content)
refs_json = json.dumps(references, ensure_ascii=False)
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"""
INSERT OR REPLACE INTO pdf_parse_cache
(content_hash, pdf_name, references_json, ref_count, created_at, hits)
VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP, 0)
""",
(content_hash, pdf_name[:200], refs_json, len(references)),
)
conn.commit()
def get_pdf_cache_stats(self) -> dict:
"""Retorna estatísticas do cache de parsing de PDFs."""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
total = conn.execute(
"SELECT COUNT(*) as cnt FROM pdf_parse_cache"
).fetchone()["cnt"]
hits = (
conn.execute(
"SELECT SUM(hits) as total FROM pdf_parse_cache"
).fetchone()["total"]
or 0
)
refs = (
conn.execute(
"SELECT SUM(ref_count) as total FROM pdf_parse_cache"
).fetchone()["total"]
or 0
)
return {"pdfs_cached": total, "total_hits": hits, "total_refs_cached": refs}
# Instância global do cache
_cache_instance: Optional[ReferenceCache] = None
def get_cache() -> ReferenceCache:
"""Retorna instância singleton do cache."""
global _cache_instance
if _cache_instance is None:
_cache_instance = ReferenceCache()
return _cache_instance