Spaces:
Sleeping
Sleeping
| import numpy as np | |
| import pandas as pd | |
| from statsmodels.tsa.seasonal import seasonal_decompose | |
| from statsmodels.tsa.arima.model import ARIMA | |
| from sklearn.ensemble import IsolationForest | |
| from sklearn.preprocessing import StandardScaler | |
| import gradio as gr | |
| import traceback | |
| import logging | |
| import matplotlib.pyplot as plt | |
| logging.basicConfig(level=logging.ERROR) | |
| class TrendAnalysisAgent: | |
| def analyze(self, data): | |
| result = seasonal_decompose(data, model='additive', period=1) | |
| return result.trend | |
| class SeasonalityDetectionAgent: | |
| def detect(self, data): | |
| result = seasonal_decompose(data, model='additive', period=12) | |
| return result.seasonal | |
| class AnomalyDetectionAgent: | |
| def detect(self, data): | |
| scaler = StandardScaler() | |
| data_scaled = scaler.fit_transform(data.reshape(-1, 1)) | |
| iso_forest = IsolationForest(contamination=0.1, random_state=42) | |
| anomalies = iso_forest.fit_predict(data_scaled) | |
| return anomalies == -1 | |
| def plot_data(data, title, anomalies=None): | |
| fig, ax = plt.subplots(figsize=(10, 6)) | |
| ax.plot(data, label='Data') | |
| if anomalies is not None: | |
| anomaly_indices = np.where(anomalies)[0] | |
| ax.scatter(anomaly_indices, data[anomaly_indices], color='red', label='Anomalies') | |
| ax.set_title(title) | |
| ax.legend() | |
| plt.close(fig) | |
| return fig | |
| class FeatureExtractionAgent: | |
| def extract(self, data): | |
| features = pd.DataFrame({ | |
| 'mean': [np.mean(data)], | |
| 'std': [np.std(data)], | |
| 'min': [np.min(data)], | |
| 'max': [np.max(data)] | |
| }) | |
| return features | |
| class ForecastingAgent: | |
| def forecast(self, data, steps): | |
| model = ARIMA(data, order=(1,1,1)) | |
| results = model.fit() | |
| forecast = results.forecast(steps=steps) | |
| return forecast | |
| class RetrievalMechanism: | |
| def __init__(self): | |
| self.database = {} | |
| def store(self, key, data): | |
| self.database[key] = data | |
| def retrieve(self, key): | |
| return self.database.get(key, None) | |
| class MockLanguageModel: | |
| def generate_insight(self, data, trend, seasonality, anomalies, features, forecast): | |
| insight = f"The time series has a mean of {features['mean'].values[0]:.2f} and standard deviation of {features['std'].values[0]:.2f}. " | |
| insight += f"There {'are' if anomalies.sum() > 1 else 'is'} {anomalies.sum()} anomal{'ies' if anomalies.sum() > 1 else 'y'} detected. " | |
| insight += f"The forecast suggests a {'upward' if forecast[-1] > data[-1] else 'downward'} trend in the near future." | |
| return insight | |
| class AgenticRAG: | |
| def __init__(self): | |
| self.trend_agent = TrendAnalysisAgent() | |
| self.seasonality_agent = SeasonalityDetectionAgent() | |
| self.anomaly_agent = AnomalyDetectionAgent() | |
| self.feature_agent = FeatureExtractionAgent() | |
| self.forecasting_agent = ForecastingAgent() | |
| self.retrieval = RetrievalMechanism() | |
| self.language_model = MockLanguageModel() | |
| def process(self, data, forecast_steps): | |
| trend = self.trend_agent.analyze(data) | |
| seasonality = self.seasonality_agent.detect(data) | |
| anomalies = self.anomaly_agent.detect(data) | |
| features = self.feature_agent.extract(data) | |
| forecast = self.forecasting_agent.forecast(data, forecast_steps) | |
| insight = self.language_model.generate_insight(data, trend, seasonality, anomalies, features, forecast) | |
| return trend, seasonality, anomalies, features, forecast, insight | |
| def analyze_time_series(data, forecast_steps): | |
| try: | |
| data = np.array([float(x) for x in data.split(',')]) | |
| if len(data) < 2: | |
| raise ValueError("Input data must contain at least two values.") | |
| agentic_rag = AgenticRAG() | |
| trend, seasonality, anomalies, features, forecast, insight = agentic_rag.process(data, forecast_steps) | |
| trend_plot = plot_data(trend, "Trend") | |
| seasonality_plot = plot_data(seasonality, "Seasonality") | |
| anomalies_plot = plot_data(data, "Anomalies", anomalies) | |
| full_data = np.concatenate([data, forecast]) | |
| forecast_plot = plot_data(full_data, "Forecast") | |
| ax = forecast_plot.axes[0] | |
| ax.axvline(x=len(data) - 1, color='r', linestyle='--', label='Forecast Start') | |
| ax.legend() | |
| return ( | |
| trend_plot, | |
| seasonality_plot, | |
| anomalies_plot, | |
| features.to_dict(orient='records')[0], | |
| forecast_plot, | |
| insight, | |
| "" # Empty string for the error output | |
| ) | |
| except Exception as e: | |
| error_msg = f"An error occurred: {str(e)}\n{traceback.format_exc()}" | |
| logging.error(error_msg) | |
| return (None, None, None, None, None, "", error_msg) | |
| example_input = "120,125,130,140,135,145,150,160,155,165,170,180,175,185,190,200,195,205,210,220,215,225,230,240,235,245,250,260,255,265,270,280,275,285,290,300,295,305,310,320,315,325,330,340,335,345,350,360,355,365,370,380,375,385,390,400,395,405,410,420" | |
| iface = gr.Interface( | |
| fn=analyze_time_series, | |
| inputs=[ | |
| gr.Textbox(label="Enter comma-separated time series data", value=example_input), | |
| gr.Number(label="Number of steps to forecast", value=5) | |
| ], | |
| outputs=[ | |
| gr.Plot(label="Trend"), | |
| gr.Plot(label="Seasonality"), | |
| gr.Plot(label="Anomalies"), | |
| gr.JSON(label="Features"), | |
| gr.Plot(label="Forecast"), | |
| gr.Textbox(label="Insight"), | |
| gr.Textbox(label="Error", visible=False) | |
| ], | |
| title="Agentic RAG Time Series Analysis", | |
| description="Enter a comma-separated list of numbers representing your time series data, and specify the number of steps to forecast." | |
| ) | |
| if __name__ == "__main__": | |
| iface.launch() |