Spaces:
Sleeping
Sleeping
| """ | |
| High-level API for historical PDF data and predictions. | |
| Provides a unified interface for: | |
| - Storing PDF snapshots | |
| - Retrieving historical data | |
| - Pattern matching | |
| - Prediction tracking | |
| """ | |
| from datetime import datetime, timedelta | |
| from typing import List, Dict, Any, Optional, Tuple | |
| import numpy as np | |
| from .db_config import DatabaseManager | |
| from .pdf_archive import PDFArchive | |
| from .vector_store import PDFVectorStore, HybridPatternMatcher | |
| class HistoryAPI: | |
| """ | |
| High-level API for PDF history and prediction tracking. | |
| This is the main interface that the Streamlit app will use | |
| for all database operations. | |
| """ | |
| def __init__( | |
| self, | |
| db_manager: DatabaseManager = None, | |
| use_vector_store: bool = True | |
| ): | |
| """ | |
| Initialize History API. | |
| Args: | |
| db_manager: DatabaseManager instance (creates default if None) | |
| use_vector_store: Whether to use ChromaDB for fast search | |
| """ | |
| self.db_manager = db_manager or DatabaseManager() | |
| self.archive = PDFArchive(self.db_manager) | |
| # Initialize vector store if requested | |
| self.use_vector_store = use_vector_store | |
| if use_vector_store: | |
| self.vector_store = PDFVectorStore() | |
| self.hybrid_matcher = HybridPatternMatcher( | |
| self.vector_store, | |
| self.archive | |
| ) | |
| else: | |
| self.vector_store = None | |
| self.hybrid_matcher = None | |
| # ======================================================================== | |
| # PDF Snapshot Operations | |
| # ======================================================================== | |
| def save_pdf_analysis( | |
| self, | |
| ticker: str, | |
| spot_price: float, | |
| days_to_expiry: int, | |
| expiration_date: datetime, | |
| risk_free_rate: float, | |
| strikes: np.ndarray, | |
| pdf_values: np.ndarray, | |
| statistics: Dict[str, Any], | |
| sabr_params: Dict[str, float] = None, | |
| interpolation_method: str = None, | |
| interpretation: str = None, | |
| interpretation_mode: str = None, | |
| model_used: str = None, | |
| store_in_vector_db: bool = True | |
| ) -> int: | |
| """ | |
| Save a complete PDF analysis to the database. | |
| Args: | |
| ticker: Stock ticker | |
| spot_price: Current spot price | |
| days_to_expiry: Days to expiration | |
| expiration_date: Option expiration date | |
| risk_free_rate: Risk-free rate used | |
| strikes: Strike prices array | |
| pdf_values: PDF values array | |
| statistics: Dictionary of PDF statistics | |
| sabr_params: SABR parameters (optional) | |
| interpolation_method: Method used ('sabr' or 'spline') | |
| interpretation: AI interpretation text | |
| interpretation_mode: Mode used for interpretation | |
| model_used: Model used ('ollama' or 'fallback') | |
| store_in_vector_db: Whether to also store in ChromaDB | |
| Returns: | |
| Snapshot ID | |
| """ | |
| # Store in SQLite | |
| snapshot = self.archive.store_snapshot( | |
| ticker=ticker, | |
| spot_price=spot_price, | |
| days_to_expiry=days_to_expiry, | |
| expiration_date=expiration_date, | |
| risk_free_rate=risk_free_rate, | |
| strikes=strikes, | |
| pdf_values=pdf_values, | |
| statistics=statistics, | |
| sabr_params=sabr_params, | |
| interpolation_method=interpolation_method, | |
| interpretation=interpretation, | |
| interpretation_mode=interpretation_mode, | |
| model_used=model_used | |
| ) | |
| # Store in ChromaDB for fast similarity search | |
| if store_in_vector_db and self.vector_store: | |
| metadata = { | |
| 'ticker': ticker, | |
| 'date': snapshot.timestamp.strftime('%Y-%m-%d'), | |
| 'spot': spot_price, | |
| 'dte': days_to_expiry, | |
| **statistics | |
| } | |
| self.vector_store.add_snapshot( | |
| snapshot_id=snapshot.id, | |
| pdf=pdf_values, | |
| strikes=strikes, | |
| metadata=metadata | |
| ) | |
| return snapshot.id | |
| def get_pdf_snapshot(self, snapshot_id: int) -> Optional[Dict[str, Any]]: | |
| """ | |
| Retrieve a PDF snapshot by ID. | |
| Args: | |
| snapshot_id: Snapshot ID | |
| Returns: | |
| Dictionary with snapshot data or None | |
| """ | |
| snapshot = self.archive.get_snapshot_by_id(snapshot_id) | |
| return snapshot.to_dict() if snapshot else None | |
| def get_latest_pdf( | |
| self, | |
| ticker: str = 'SPY', | |
| days_to_expiry: int = None | |
| ) -> Optional[Dict[str, Any]]: | |
| """ | |
| Get the most recent PDF for a ticker. | |
| Args: | |
| ticker: Stock ticker | |
| days_to_expiry: Filter by DTE (optional) | |
| Returns: | |
| Dictionary with snapshot data or None | |
| """ | |
| snapshot = self.archive.get_latest_snapshot(ticker, days_to_expiry) | |
| return snapshot.to_dict() if snapshot else None | |
| def get_pdf_history( | |
| self, | |
| ticker: str, | |
| days: int = 30, | |
| days_to_expiry: int = None | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Get PDF snapshots for the last N days. | |
| Args: | |
| ticker: Stock ticker | |
| days: Number of days to look back | |
| days_to_expiry: Filter by DTE (optional) | |
| Returns: | |
| List of snapshot dictionaries | |
| """ | |
| end_date = datetime.utcnow() | |
| start_date = end_date - timedelta(days=days) | |
| snapshots = self.archive.get_snapshots_by_date_range( | |
| ticker=ticker, | |
| start_date=start_date, | |
| end_date=end_date, | |
| days_to_expiry=days_to_expiry | |
| ) | |
| return [s.to_dict() for s in snapshots] | |
| # ======================================================================== | |
| # Pattern Matching Operations | |
| # ======================================================================== | |
| def find_similar_patterns( | |
| self, | |
| current_pdf: np.ndarray, | |
| current_strikes: np.ndarray, | |
| current_stats: Dict[str, Any], | |
| ticker: str = 'SPY', | |
| n_results: int = 10, | |
| min_similarity: float = 0.7, | |
| days_to_expiry_range: Tuple[int, int] = (20, 40) | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Find historically similar PDF patterns. | |
| Uses hybrid approach (ChromaDB + SQLite) if available, | |
| otherwise falls back to database-only search. | |
| Args: | |
| current_pdf: Current PDF values | |
| current_strikes: Current strikes | |
| current_stats: Current PDF statistics | |
| ticker: Stock ticker | |
| n_results: Number of results to return | |
| min_similarity: Minimum similarity threshold | |
| days_to_expiry_range: Filter by DTE range | |
| Returns: | |
| List of similar patterns with similarity scores | |
| """ | |
| if self.hybrid_matcher: | |
| # Use hybrid approach (fast) | |
| matches = self.hybrid_matcher.find_similar_patterns( | |
| current_pdf=current_pdf, | |
| current_strikes=current_strikes, | |
| current_stats=current_stats, | |
| ticker=ticker, | |
| n_results=n_results, | |
| min_similarity=min_similarity, | |
| days_to_expiry_range=days_to_expiry_range | |
| ) | |
| else: | |
| # Fallback to database-only (slower but works) | |
| from src.core.patterns import PDFPatternMatcher | |
| historical_data = self.archive.get_snapshots_for_pattern_matching( | |
| ticker=ticker, | |
| max_snapshots=100, | |
| days_to_expiry_range=days_to_expiry_range | |
| ) | |
| matcher = PDFPatternMatcher( | |
| similarity_threshold=min_similarity, | |
| max_matches=n_results | |
| ) | |
| matches = matcher.find_similar_patterns( | |
| current_pdf=current_pdf, | |
| current_strikes=current_strikes, | |
| current_stats=current_stats, | |
| historical_data=historical_data | |
| ) | |
| return matches | |
| def save_pattern_matches( | |
| self, | |
| snapshot_id: int, | |
| matches: List[Dict[str, Any]] | |
| ): | |
| """ | |
| Save pattern matching results to database. | |
| Args: | |
| snapshot_id: Current snapshot ID | |
| matches: List of match dictionaries | |
| """ | |
| self.archive.store_pattern_matches(snapshot_id, matches) | |
| # ======================================================================== | |
| # Prediction Tracking Operations | |
| # ======================================================================== | |
| def create_prediction( | |
| self, | |
| snapshot_id: int, | |
| target_date: datetime, | |
| ticker: str, | |
| condition: str, | |
| target_level: float, | |
| predicted_probability: float, | |
| target_level_upper: float = None, | |
| notes: str = None | |
| ) -> int: | |
| """ | |
| Create a prediction from a PDF snapshot. | |
| Args: | |
| snapshot_id: ID of snapshot used for prediction | |
| target_date: Date to evaluate prediction | |
| ticker: Stock ticker | |
| condition: 'above', 'below', or 'between' | |
| target_level: Strike or price level | |
| predicted_probability: Forecasted probability (0-1) | |
| target_level_upper: Upper level for 'between' condition | |
| notes: Additional notes | |
| Returns: | |
| Prediction ID | |
| """ | |
| prediction = self.archive.store_prediction( | |
| snapshot_id=snapshot_id, | |
| forecast_date=datetime.utcnow(), | |
| target_date=target_date, | |
| ticker=ticker, | |
| condition=condition, | |
| target_level=target_level, | |
| predicted_probability=predicted_probability, | |
| target_level_upper=target_level_upper, | |
| notes=notes | |
| ) | |
| return prediction.id | |
| def evaluate_prediction( | |
| self, | |
| prediction_id: int, | |
| actual_price: float | |
| ) -> Dict[str, Any]: | |
| """ | |
| Evaluate a prediction with actual outcome. | |
| Args: | |
| prediction_id: Prediction ID | |
| actual_price: Actual price at target date | |
| Returns: | |
| Dictionary with prediction evaluation results | |
| """ | |
| prediction = self.archive.evaluate_prediction( | |
| prediction_id=prediction_id, | |
| actual_price=actual_price | |
| ) | |
| return prediction.to_dict() | |
| def get_pending_predictions( | |
| self, | |
| ticker: str = None | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Get predictions that need to be evaluated. | |
| Args: | |
| ticker: Filter by ticker (optional) | |
| Returns: | |
| List of pending prediction dictionaries | |
| """ | |
| predictions = self.archive.get_pending_predictions(ticker=ticker) | |
| return [p.to_dict() for p in predictions] | |
| def get_prediction_accuracy( | |
| self, | |
| ticker: str = 'SPY', | |
| days: int = 90 | |
| ) -> Dict[str, Any]: | |
| """ | |
| Get prediction accuracy statistics. | |
| Args: | |
| ticker: Stock ticker | |
| days: Number of days to look back | |
| Returns: | |
| Dictionary with accuracy metrics | |
| """ | |
| start_date = datetime.utcnow() - timedelta(days=days) | |
| return self.archive.get_prediction_accuracy_stats( | |
| ticker=ticker, | |
| start_date=start_date | |
| ) | |
| # ======================================================================== | |
| # Database Management | |
| # ======================================================================== | |
| def get_stats(self) -> Dict[str, Any]: | |
| """ | |
| Get overall database statistics. | |
| Returns: | |
| Dictionary with database stats | |
| """ | |
| db_stats = self.archive.get_database_stats() | |
| if self.vector_store: | |
| db_stats['vector_store_count'] = self.vector_store.get_count() | |
| return db_stats | |
| def clear_database(self, confirm: bool = False): | |
| """ | |
| Clear all data from database (use with caution!). | |
| Args: | |
| confirm: Must be True to proceed | |
| """ | |
| if not confirm: | |
| raise ValueError("Must set confirm=True to clear database") | |
| # Clear SQLite | |
| self.db_manager.drop_tables() | |
| self.db_manager.create_tables() | |
| # Clear ChromaDB | |
| if self.vector_store: | |
| self.vector_store.clear() | |
| print("β οΈ Database cleared!") | |
| def export_snapshot_to_dict(self, snapshot_id: int) -> Dict[str, Any]: | |
| """ | |
| Export a snapshot to a complete dictionary (for backup/export). | |
| Args: | |
| snapshot_id: Snapshot ID | |
| Returns: | |
| Complete snapshot data as dictionary | |
| """ | |
| return self.get_pdf_snapshot(snapshot_id) | |
| # Convenience singleton for global access | |
| _api_instance = None | |
| def get_history_api() -> HistoryAPI: | |
| """ | |
| Get the global HistoryAPI instance. | |
| Returns: | |
| HistoryAPI singleton | |
| """ | |
| global _api_instance | |
| if _api_instance is None: | |
| _api_instance = HistoryAPI() | |
| return _api_instance | |
| if __name__ == "__main__": | |
| # Test History API | |
| print("Testing History API...") | |
| # Create API | |
| api = HistoryAPI(use_vector_store=True) | |
| print("β History API created") | |
| # Create test data | |
| ticker = 'SPY' | |
| spot = 450.0 | |
| dte = 30 | |
| exp_date = datetime.utcnow() + timedelta(days=dte) | |
| r = 0.05 | |
| strikes = np.linspace(400, 500, 100) | |
| pdf = np.exp(-0.5 * ((strikes - spot) / 15)**2) | |
| pdf = pdf / np.trapz(pdf, strikes) | |
| stats = { | |
| 'mean': 450.5, | |
| 'std': 15.0, | |
| 'skewness': -0.1, | |
| 'excess_kurtosis': 0.3, | |
| 'implied_move_pct': 3.5, | |
| 'prob_up_5pct': 0.25, | |
| 'prob_down_5pct': 0.20 | |
| } | |
| # Save PDF analysis | |
| snapshot_id = api.save_pdf_analysis( | |
| ticker=ticker, | |
| spot_price=spot, | |
| days_to_expiry=dte, | |
| expiration_date=exp_date, | |
| risk_free_rate=r, | |
| strikes=strikes, | |
| pdf_values=pdf, | |
| statistics=stats, | |
| interpretation="Test interpretation", | |
| model_used="test" | |
| ) | |
| print(f"β Saved PDF analysis: ID={snapshot_id}") | |
| # Retrieve snapshot | |
| retrieved = api.get_pdf_snapshot(snapshot_id) | |
| print(f"β Retrieved snapshot: {retrieved['ticker']} @ {retrieved['timestamp']}") | |
| # Get latest | |
| latest = api.get_latest_pdf(ticker) | |
| print(f"β Latest PDF: ID={latest['id']}") | |
| # Create prediction | |
| pred_id = api.create_prediction( | |
| snapshot_id=snapshot_id, | |
| target_date=exp_date, | |
| ticker=ticker, | |
| condition='above', | |
| target_level=455.0, | |
| predicted_probability=0.35, | |
| notes="Test prediction" | |
| ) | |
| print(f"β Created prediction: ID={pred_id}") | |
| # Get database stats | |
| stats = api.get_stats() | |
| print(f"β Database stats:") | |
| for key, value in stats.items(): | |
| print(f" - {key}: {value}") | |
| print("\nβ All History API tests passed!") | |