book-rec-with-LLMs / src /core /rag /freshness_monitor.py
ymlin105's picture
chore: remove obsolete files and update project structure
6ad997d
"""
Data Freshness Monitor for RAG.
Provides insights into the freshness of the local book database and helps
decide when to fall back to web search.
"""
from datetime import datetime
from typing import Optional
from src.data.stores.metadata_store import metadata_store
from src.utils import setup_logger
logger = setup_logger(__name__)
class FreshnessMonitor:
"""
Monitor data freshness and provide staleness detection.
Usage:
monitor = FreshnessMonitor()
stats = monitor.get_data_stats()
if monitor.is_stale_for_query("latest 2025 books"):
# Trigger web search fallback
"""
# Years considered "recent" for freshness calculations
RECENT_YEARS_THRESHOLD = 2
def __init__(self):
self._cache = {}
self._cache_timestamp = None
self._cache_ttl_seconds = 300 # 5 minutes
def _is_cache_valid(self) -> bool:
"""Check if cached stats are still valid."""
if not self._cache or not self._cache_timestamp:
return False
age = (datetime.now() - self._cache_timestamp).total_seconds()
return age < self._cache_ttl_seconds
def get_data_stats(self, force_refresh: bool = False) -> dict:
"""
Get comprehensive statistics about data freshness.
Returns:
Dict with:
- total_books: Total number of books in database
- newest_year: Year of most recently published book
- oldest_year: Year of oldest book
- books_by_year: Dict mapping year -> count
- recent_books_count: Books published in last N years
- data_cutoff_year: Effective "knowledge cutoff" year
- freshness_score: 0-100 score indicating data freshness
"""
if not force_refresh and self._is_cache_valid():
return self._cache
stats = {
"total_books": 0,
"newest_year": None,
"oldest_year": None,
"books_by_year": {},
"recent_books_count": 0,
"data_cutoff_year": None,
"freshness_score": 0,
"last_checked": datetime.now().isoformat(),
}
try:
stats["total_books"] = metadata_store.get_book_count()
stats["books_by_year"] = (
metadata_store.get_books_by_year_distribution()
)
if stats["books_by_year"]:
years = sorted(stats["books_by_year"].keys())
stats["newest_year"] = max(years)
stats["oldest_year"] = min(years)
stats["data_cutoff_year"] = stats["newest_year"]
# Count recent books (last N years)
current_year = datetime.now().year
recent_threshold = current_year - self.RECENT_YEARS_THRESHOLD
stats["recent_books_count"] = sum(
count
for year, count in stats["books_by_year"].items()
if year >= recent_threshold
)
# Calculate freshness score (0-100)
# Based on: newest year relative to current year
years_behind = current_year - (
stats["newest_year"] or current_year
)
stats["freshness_score"] = max(
0, 100 - (years_behind * 25)
)
self._cache = stats
self._cache_timestamp = datetime.now()
except Exception as e:
logger.error("FreshnessMonitor.get_data_stats failed: %s", e)
return stats
def is_stale(self, target_year: Optional[int] = None) -> bool:
"""
Check if local data is too old for a given target year.
Args:
target_year: Year the user is asking about (default: current year)
Returns:
True if data is stale and web fallback should be triggered
"""
if target_year is None:
target_year = datetime.now().year
stats = self.get_data_stats()
newest_year = stats.get("newest_year")
if newest_year is None:
return True # No data at all
# Stale if target year is newer than our newest data
return target_year > newest_year
def is_stale_for_query(self, query: str) -> bool:
"""
Analyze a query and determine if data is stale for it.
Args:
query: User's search query
Returns:
True if web fallback should be triggered
"""
from src.core.rag.web_search import extract_year_from_query
target_year = extract_year_from_query(query)
if target_year is None:
# No year requirement - check freshness score
stats = self.get_data_stats()
# Trigger fallback if data is more than 2 years old
return stats.get("freshness_score", 100) < 50
return self.is_stale(target_year)
def get_coverage_for_year(self, year: int) -> dict:
"""
Get coverage statistics for a specific year.
Returns:
Dict with: count, percentage of total, is_well_covered
"""
stats = self.get_data_stats()
year_count = stats["books_by_year"].get(year, 0)
total = stats["total_books"] or 1
return {
"year": year,
"count": year_count,
"percentage": round(year_count / total * 100, 2),
"is_well_covered": year_count >= 100, # Arbitrary threshold
}
def recommend_update_categories(self) -> list[str]:
"""
Recommend categories that should be updated.
Returns:
List of category names that need fresh data
"""
# This would require category-level year tracking
# For now, return common categories that benefit from freshness
return [
"fiction",
"thriller",
"science fiction",
"fantasy",
"mystery",
"self-help",
"business",
]
def get_summary(self) -> str:
"""
Get a human-readable summary of data freshness.
Returns:
Formatted string describing data freshness status
"""
stats = self.get_data_stats()
lines = [
"Data Freshness Report",
"=" * 40,
f"Total books: {stats['total_books']:,}",
f"Newest book year: {stats['newest_year'] or 'Unknown'}",
f"Data cutoff: {stats['data_cutoff_year'] or 'Unknown'}",
f"Recent books (last {self.RECENT_YEARS_THRESHOLD} years): {stats['recent_books_count']:,}",
f"Freshness score: {stats['freshness_score']}/100",
]
current_year = datetime.now().year
if stats["newest_year"] and stats["newest_year"] < current_year:
years_behind = current_year - stats["newest_year"]
lines.append("")
lines.append(
f"WARNING: Data is {years_behind} year(s) behind current year."
)
lines.append(
f"Consider running: python scripts/data/fetch_new_books.py --year {current_year}"
)
return "\n".join(lines)
# Global instance
freshness_monitor = FreshnessMonitor()
def is_data_fresh_enough(query: str) -> bool:
"""
Quick check if local data is fresh enough for a query.
Args:
query: User's search query
Returns:
True if local data is sufficient, False if web fallback recommended
"""
return not freshness_monitor.is_stale_for_query(query)
__all__ = ["FreshnessMonitor", "freshness_monitor", "is_data_fresh_enough"]