Spaces:
Runtime error
Runtime error
| """Module that defines the TimeSeriesAnalyzer object.""" | |
| import os | |
| from dataclasses import dataclass | |
| from datetime import datetime | |
| from typing import Any | |
| import numpy as np | |
| import polars as pl | |
| from loguru import logger | |
| from sklearn.ensemble import IsolationForest | |
| from sqlalchemy import Engine, create_engine, text | |
| from data.get_mock import get_df | |
| class TimeSeriesConfig: | |
| """Object with the database connections details. | |
| Attributes: | |
| host: address of the database | |
| port: port of the database | |
| database: name of the database | |
| username: username of the database | |
| password: password of the database | |
| """ | |
| host: str | |
| port: int | |
| database: str | |
| username: str | |
| password: str | |
| class TimeSeriesAnalyzer: | |
| """Handle connections details, and how to compute insights. | |
| Attributes: | |
| use_mock_db: if True, databased if mocked. | |
| connection: connection engine | |
| """ | |
| def __init__(self): | |
| self.use_mock_db = os.getenv("USE_MOCK_DB", True) | |
| if not self.use_mock_db: | |
| self.config = TimeSeriesConfig( | |
| host=os.getenv("DB_HOST", "localhost"), | |
| port=int(os.getenv("DB_PORT", 5432)), | |
| database=os.getenv("DB_NAME", "data"), | |
| username=os.getenv("DB_USER", "postgres"), | |
| password=os.getenv("DB_PASS", "secretpassword"), | |
| ) | |
| self.connection: Engine | |
| def connect(self): | |
| """Instantiate the database engine.""" | |
| if self.use_mock_db: | |
| logger.info("Connecting to mock SQLite database") | |
| self.connection = create_engine("sqlite:///mock.db") | |
| self._setup_mock_db() | |
| else: | |
| logger.info( | |
| f"Connecting to TimescaleDB at {self.config.host}:{self.config.port}" | |
| ) | |
| self.connection = create_engine( | |
| f"postgresql+psycopg2://{self.config.username}:{self.config.password}@{self.config.host}:{self.config.port}/{self.config.database}" | |
| ) | |
| logger.info("Connected to database!") | |
| def _setup_mock_db(self): | |
| df = get_df() | |
| if os.path.exists("./mock.db"): | |
| return | |
| logger.info( | |
| f"""df shape: {df.shape}, size: {df.estimated_size("kb"):,.3f}kb""" | |
| ) | |
| logger.debug(df.head(5)) | |
| with self.connection.connect() as conn: | |
| df.write_database( | |
| "timeseries_data", | |
| conn, | |
| if_table_exists="replace", | |
| engine_options={}, | |
| ) | |
| def _ensure_connected(self): | |
| if not self.connection: | |
| self.connect() | |
| def get_available_metrics(self) -> list[str]: | |
| """Get the list of sensor_id. | |
| Returns: | |
| list of sensors | |
| """ | |
| self._ensure_connected() | |
| sql = "SELECT DISTINCT sensor_id FROM timeseries_data ORDER BY sensor_id ASC" | |
| with self.connection.connect() as conn: | |
| rows = conn.execute(text(sql)) | |
| return [r[0] for r in rows] | |
| def query_metrics( | |
| self, | |
| sensor_id: str, | |
| start_time: str, | |
| end_time: str, | |
| ) -> pl.DataFrame: | |
| """Run a select query of 1 time serie. | |
| Args: | |
| sensor_id: id of the sensor | |
| start_time: iso datetime | |
| end_time: iso datetime | |
| Returns: | |
| The expected time serie as a polar DataFrame. | |
| """ | |
| self._ensure_connected() | |
| start_dt = datetime.fromisoformat(start_time) | |
| end_dt = datetime.fromisoformat(end_time) | |
| query = f"""SELECT timestamp, value FROM timeseries_data | |
| WHERE sensor_id = '{sensor_id}' AND timestamp >= '{start_dt}' AND timestamp <= '{end_dt}' | |
| ORDER BY timestamp ASC""" | |
| with self.connection.connect() as conn: | |
| df = pl.read_database(query, conn) | |
| return df | |
| def detect_anomalies( | |
| self, data: pl.DataFrame, threshold: float = 1.0 | |
| ) -> dict[str, Any]: | |
| """Detect anomalies in the time series data for a specific sensor. | |
| Args: | |
| data: expect only 1 timeserie with columns datetime and value | |
| threshold: default to 1.0 | |
| Returns: | |
| { | |
| "anomalies_found": int, | |
| "anomalies": list[dict[str, int]], | |
| "statistics": { | |
| "mean": float, | |
| "std": float, | |
| "min": float, | |
| "max": float | |
| }, | |
| """ | |
| mean_val = data["value"].mean() | |
| std_val = data["value"].std() | |
| data = data.with_columns( | |
| ((data["value"] - mean_val).abs() / std_val).alias("z_score") | |
| ) | |
| anomalies = ( | |
| data.filter(data["z_score"] > threshold) | |
| .select( | |
| [ | |
| data["timestamp"].cast(pl.Utf8).alias("timestamp"), | |
| data["value"].cast(pl.Float64), | |
| data["z_score"].cast(pl.Float64).alias("z_score"), | |
| (data["z_score"] > 2.0) | |
| .cast(pl.Utf8) | |
| .alias("severity") | |
| .map_elements( | |
| lambda x: "high" if x else "medium", | |
| return_dtype=pl.String, | |
| ), | |
| ] | |
| ) | |
| .to_dicts() | |
| ) | |
| return { | |
| "anomalies_found": len(anomalies), | |
| "anomalies": anomalies, | |
| "statistics": { | |
| "mean": mean_val, | |
| "std": std_val, | |
| "min": data["value"].min(), | |
| "max": data["value"].max(), | |
| }, | |
| } | |
| def calculate_trends(self, data: pl.DataFrame) -> dict[str, Any]: | |
| """Calculate trend information such as trend direction and percentage change. | |
| Args: | |
| data: expect only 1 timeserie with columns datetime and value | |
| Returns: | |
| { | |
| "trend_direction": Literal["increasing", "decreasing", "stable"], | |
| "trend_slope": float, | |
| "percentage_change": float, | |
| "start_value": float, | |
| "end_value": float, | |
| "time_period": { | |
| "start": datetime, | |
| "end": datetime, | |
| }, | |
| } | |
| """ | |
| values = data["value"] | |
| timestamps = data["timestamp"] | |
| x = np.arange(len(values)) | |
| coeffs = np.polyfit(x, values, 1) | |
| trend_slope = coeffs[0] | |
| pct_change = ( | |
| ((values[-1] - values[0]) / values[0]) * 100 | |
| if len(values) > 1 | |
| else 0 | |
| ) | |
| return { | |
| "trend_direction": "increasing" | |
| if trend_slope > 0 | |
| else "decreasing" | |
| if trend_slope < 0 | |
| else "stable", | |
| "trend_slope": float(trend_slope), | |
| "percentage_change": float(pct_change), | |
| "start_value": float(values[0]) if len(values) > 0 else 0, | |
| "end_value": float(values[-1]) if len(values) > 0 else 0, | |
| "time_period": { | |
| "start": timestamps[0] if len(timestamps) > 0 else None, | |
| "end": timestamps[-1] if len(timestamps) > 0 else None, | |
| }, | |
| } | |
| def detect_anomalies_isolation_forest( | |
| self, data: pl.DataFrame, contamination: float = 0.1 | |
| ) -> dict[str, Any]: | |
| """Detect anomalies in the time series data using Isolation Forest algorithm. | |
| Args: | |
| data: expect only 1 timeserie with columns datetime and value | |
| contamination: expected proportion of anomalies in the data (default: 0.1) | |
| Returns: | |
| { | |
| "anomalies_found": int, | |
| "anomalies": list[dict[str, int]], | |
| "statistics": { | |
| "mean": float, | |
| "std": float, | |
| "min": float, | |
| "max": float | |
| } | |
| """ | |
| values = data["value"].to_numpy().reshape(-1, 1) | |
| iso_forest = IsolationForest( | |
| contamination=contamination, random_state=42, n_estimators=100 | |
| ) | |
| # Predict anomalies (-1 for anomalies, 1 for normal) | |
| predictions = iso_forest.fit_predict(values) | |
| anomaly_scores = -iso_forest.score_samples(values) | |
| anomaly_mask = predictions == -1 | |
| mean_val = data["value"].mean() | |
| std_val = data["value"].std() | |
| logger.debug(f"anaomaly_mask: {anomaly_mask}") | |
| logger.debug(f"anomaly_scores: {anomaly_scores}") | |
| logger.debug( | |
| pl.Series(anomaly_scores) | |
| .filter(anomaly_mask) | |
| .alias("anomaly_score"), | |
| ) | |
| # Prepare anomalies data | |
| anomalies = ( | |
| data.select( | |
| data["timestamp"].cast(pl.Utf8).alias("timestamp"), | |
| data["value"].cast(pl.Float64), | |
| pl.Series(anomaly_scores).alias("anomaly_score"), | |
| pl.Series(anomaly_scores > np.percentile(anomaly_scores, 90)) | |
| .cast(pl.Utf8) | |
| .alias("severity") | |
| .map_elements( | |
| lambda x: "high" if x else "medium", | |
| return_dtype=pl.String, | |
| ), | |
| ) | |
| .filter(anomaly_mask) | |
| .to_dicts() | |
| ) | |
| logger.debug(f"anomalies: {anomalies}") | |
| return { | |
| "anomalies_found": len(anomalies), | |
| "anomalies": anomalies, | |
| "statistics": { | |
| "mean": mean_val, | |
| "std": std_val, | |
| "min": data["value"].min(), | |
| "max": data["value"].max(), | |
| }, | |
| } | |