option-pdf-vis / src /database /history_api.py
Arjit
Production-ready Option-Implied PDF Visualizer
8e1643b
"""
High-level API for historical PDF data and predictions.
Provides a unified interface for:
- Storing PDF snapshots
- Retrieving historical data
- Pattern matching
- Prediction tracking
"""
from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional, Tuple
import numpy as np
from .db_config import DatabaseManager
from .pdf_archive import PDFArchive
from .vector_store import PDFVectorStore, HybridPatternMatcher
class HistoryAPI:
"""
High-level API for PDF history and prediction tracking.
This is the main interface that the Streamlit app will use
for all database operations.
"""
def __init__(
self,
db_manager: DatabaseManager = None,
use_vector_store: bool = True
):
"""
Initialize History API.
Args:
db_manager: DatabaseManager instance (creates default if None)
use_vector_store: Whether to use ChromaDB for fast search
"""
self.db_manager = db_manager or DatabaseManager()
self.archive = PDFArchive(self.db_manager)
# Initialize vector store if requested
self.use_vector_store = use_vector_store
if use_vector_store:
self.vector_store = PDFVectorStore()
self.hybrid_matcher = HybridPatternMatcher(
self.vector_store,
self.archive
)
else:
self.vector_store = None
self.hybrid_matcher = None
# ========================================================================
# PDF Snapshot Operations
# ========================================================================
def save_pdf_analysis(
self,
ticker: str,
spot_price: float,
days_to_expiry: int,
expiration_date: datetime,
risk_free_rate: float,
strikes: np.ndarray,
pdf_values: np.ndarray,
statistics: Dict[str, Any],
sabr_params: Dict[str, float] = None,
interpolation_method: str = None,
interpretation: str = None,
interpretation_mode: str = None,
model_used: str = None,
store_in_vector_db: bool = True
) -> int:
"""
Save a complete PDF analysis to the database.
Args:
ticker: Stock ticker
spot_price: Current spot price
days_to_expiry: Days to expiration
expiration_date: Option expiration date
risk_free_rate: Risk-free rate used
strikes: Strike prices array
pdf_values: PDF values array
statistics: Dictionary of PDF statistics
sabr_params: SABR parameters (optional)
interpolation_method: Method used ('sabr' or 'spline')
interpretation: AI interpretation text
interpretation_mode: Mode used for interpretation
model_used: Model used ('ollama' or 'fallback')
store_in_vector_db: Whether to also store in ChromaDB
Returns:
Snapshot ID
"""
# Store in SQLite
snapshot = self.archive.store_snapshot(
ticker=ticker,
spot_price=spot_price,
days_to_expiry=days_to_expiry,
expiration_date=expiration_date,
risk_free_rate=risk_free_rate,
strikes=strikes,
pdf_values=pdf_values,
statistics=statistics,
sabr_params=sabr_params,
interpolation_method=interpolation_method,
interpretation=interpretation,
interpretation_mode=interpretation_mode,
model_used=model_used
)
# Store in ChromaDB for fast similarity search
if store_in_vector_db and self.vector_store:
metadata = {
'ticker': ticker,
'date': snapshot.timestamp.strftime('%Y-%m-%d'),
'spot': spot_price,
'dte': days_to_expiry,
**statistics
}
self.vector_store.add_snapshot(
snapshot_id=snapshot.id,
pdf=pdf_values,
strikes=strikes,
metadata=metadata
)
return snapshot.id
def get_pdf_snapshot(self, snapshot_id: int) -> Optional[Dict[str, Any]]:
"""
Retrieve a PDF snapshot by ID.
Args:
snapshot_id: Snapshot ID
Returns:
Dictionary with snapshot data or None
"""
snapshot = self.archive.get_snapshot_by_id(snapshot_id)
return snapshot.to_dict() if snapshot else None
def get_latest_pdf(
self,
ticker: str = 'SPY',
days_to_expiry: int = None
) -> Optional[Dict[str, Any]]:
"""
Get the most recent PDF for a ticker.
Args:
ticker: Stock ticker
days_to_expiry: Filter by DTE (optional)
Returns:
Dictionary with snapshot data or None
"""
snapshot = self.archive.get_latest_snapshot(ticker, days_to_expiry)
return snapshot.to_dict() if snapshot else None
def get_pdf_history(
self,
ticker: str,
days: int = 30,
days_to_expiry: int = None
) -> List[Dict[str, Any]]:
"""
Get PDF snapshots for the last N days.
Args:
ticker: Stock ticker
days: Number of days to look back
days_to_expiry: Filter by DTE (optional)
Returns:
List of snapshot dictionaries
"""
end_date = datetime.utcnow()
start_date = end_date - timedelta(days=days)
snapshots = self.archive.get_snapshots_by_date_range(
ticker=ticker,
start_date=start_date,
end_date=end_date,
days_to_expiry=days_to_expiry
)
return [s.to_dict() for s in snapshots]
# ========================================================================
# Pattern Matching Operations
# ========================================================================
def find_similar_patterns(
self,
current_pdf: np.ndarray,
current_strikes: np.ndarray,
current_stats: Dict[str, Any],
ticker: str = 'SPY',
n_results: int = 10,
min_similarity: float = 0.7,
days_to_expiry_range: Tuple[int, int] = (20, 40)
) -> List[Dict[str, Any]]:
"""
Find historically similar PDF patterns.
Uses hybrid approach (ChromaDB + SQLite) if available,
otherwise falls back to database-only search.
Args:
current_pdf: Current PDF values
current_strikes: Current strikes
current_stats: Current PDF statistics
ticker: Stock ticker
n_results: Number of results to return
min_similarity: Minimum similarity threshold
days_to_expiry_range: Filter by DTE range
Returns:
List of similar patterns with similarity scores
"""
if self.hybrid_matcher:
# Use hybrid approach (fast)
matches = self.hybrid_matcher.find_similar_patterns(
current_pdf=current_pdf,
current_strikes=current_strikes,
current_stats=current_stats,
ticker=ticker,
n_results=n_results,
min_similarity=min_similarity,
days_to_expiry_range=days_to_expiry_range
)
else:
# Fallback to database-only (slower but works)
from src.core.patterns import PDFPatternMatcher
historical_data = self.archive.get_snapshots_for_pattern_matching(
ticker=ticker,
max_snapshots=100,
days_to_expiry_range=days_to_expiry_range
)
matcher = PDFPatternMatcher(
similarity_threshold=min_similarity,
max_matches=n_results
)
matches = matcher.find_similar_patterns(
current_pdf=current_pdf,
current_strikes=current_strikes,
current_stats=current_stats,
historical_data=historical_data
)
return matches
def save_pattern_matches(
self,
snapshot_id: int,
matches: List[Dict[str, Any]]
):
"""
Save pattern matching results to database.
Args:
snapshot_id: Current snapshot ID
matches: List of match dictionaries
"""
self.archive.store_pattern_matches(snapshot_id, matches)
# ========================================================================
# Prediction Tracking Operations
# ========================================================================
def create_prediction(
self,
snapshot_id: int,
target_date: datetime,
ticker: str,
condition: str,
target_level: float,
predicted_probability: float,
target_level_upper: float = None,
notes: str = None
) -> int:
"""
Create a prediction from a PDF snapshot.
Args:
snapshot_id: ID of snapshot used for prediction
target_date: Date to evaluate prediction
ticker: Stock ticker
condition: 'above', 'below', or 'between'
target_level: Strike or price level
predicted_probability: Forecasted probability (0-1)
target_level_upper: Upper level for 'between' condition
notes: Additional notes
Returns:
Prediction ID
"""
prediction = self.archive.store_prediction(
snapshot_id=snapshot_id,
forecast_date=datetime.utcnow(),
target_date=target_date,
ticker=ticker,
condition=condition,
target_level=target_level,
predicted_probability=predicted_probability,
target_level_upper=target_level_upper,
notes=notes
)
return prediction.id
def evaluate_prediction(
self,
prediction_id: int,
actual_price: float
) -> Dict[str, Any]:
"""
Evaluate a prediction with actual outcome.
Args:
prediction_id: Prediction ID
actual_price: Actual price at target date
Returns:
Dictionary with prediction evaluation results
"""
prediction = self.archive.evaluate_prediction(
prediction_id=prediction_id,
actual_price=actual_price
)
return prediction.to_dict()
def get_pending_predictions(
self,
ticker: str = None
) -> List[Dict[str, Any]]:
"""
Get predictions that need to be evaluated.
Args:
ticker: Filter by ticker (optional)
Returns:
List of pending prediction dictionaries
"""
predictions = self.archive.get_pending_predictions(ticker=ticker)
return [p.to_dict() for p in predictions]
def get_prediction_accuracy(
self,
ticker: str = 'SPY',
days: int = 90
) -> Dict[str, Any]:
"""
Get prediction accuracy statistics.
Args:
ticker: Stock ticker
days: Number of days to look back
Returns:
Dictionary with accuracy metrics
"""
start_date = datetime.utcnow() - timedelta(days=days)
return self.archive.get_prediction_accuracy_stats(
ticker=ticker,
start_date=start_date
)
# ========================================================================
# Database Management
# ========================================================================
def get_stats(self) -> Dict[str, Any]:
"""
Get overall database statistics.
Returns:
Dictionary with database stats
"""
db_stats = self.archive.get_database_stats()
if self.vector_store:
db_stats['vector_store_count'] = self.vector_store.get_count()
return db_stats
def clear_database(self, confirm: bool = False):
"""
Clear all data from database (use with caution!).
Args:
confirm: Must be True to proceed
"""
if not confirm:
raise ValueError("Must set confirm=True to clear database")
# Clear SQLite
self.db_manager.drop_tables()
self.db_manager.create_tables()
# Clear ChromaDB
if self.vector_store:
self.vector_store.clear()
print("⚠️ Database cleared!")
def export_snapshot_to_dict(self, snapshot_id: int) -> Dict[str, Any]:
"""
Export a snapshot to a complete dictionary (for backup/export).
Args:
snapshot_id: Snapshot ID
Returns:
Complete snapshot data as dictionary
"""
return self.get_pdf_snapshot(snapshot_id)
# Convenience singleton for global access
_api_instance = None
def get_history_api() -> HistoryAPI:
"""
Get the global HistoryAPI instance.
Returns:
HistoryAPI singleton
"""
global _api_instance
if _api_instance is None:
_api_instance = HistoryAPI()
return _api_instance
if __name__ == "__main__":
# Test History API
print("Testing History API...")
# Create API
api = HistoryAPI(use_vector_store=True)
print("βœ… History API created")
# Create test data
ticker = 'SPY'
spot = 450.0
dte = 30
exp_date = datetime.utcnow() + timedelta(days=dte)
r = 0.05
strikes = np.linspace(400, 500, 100)
pdf = np.exp(-0.5 * ((strikes - spot) / 15)**2)
pdf = pdf / np.trapz(pdf, strikes)
stats = {
'mean': 450.5,
'std': 15.0,
'skewness': -0.1,
'excess_kurtosis': 0.3,
'implied_move_pct': 3.5,
'prob_up_5pct': 0.25,
'prob_down_5pct': 0.20
}
# Save PDF analysis
snapshot_id = api.save_pdf_analysis(
ticker=ticker,
spot_price=spot,
days_to_expiry=dte,
expiration_date=exp_date,
risk_free_rate=r,
strikes=strikes,
pdf_values=pdf,
statistics=stats,
interpretation="Test interpretation",
model_used="test"
)
print(f"βœ… Saved PDF analysis: ID={snapshot_id}")
# Retrieve snapshot
retrieved = api.get_pdf_snapshot(snapshot_id)
print(f"βœ… Retrieved snapshot: {retrieved['ticker']} @ {retrieved['timestamp']}")
# Get latest
latest = api.get_latest_pdf(ticker)
print(f"βœ… Latest PDF: ID={latest['id']}")
# Create prediction
pred_id = api.create_prediction(
snapshot_id=snapshot_id,
target_date=exp_date,
ticker=ticker,
condition='above',
target_level=455.0,
predicted_probability=0.35,
notes="Test prediction"
)
print(f"βœ… Created prediction: ID={pred_id}")
# Get database stats
stats = api.get_stats()
print(f"βœ… Database stats:")
for key, value in stats.items():
print(f" - {key}: {value}")
print("\nβœ… All History API tests passed!")