| | |
| | """ |
| | FRED ML Lambda Function |
| | AWS Lambda function for processing economic data analysis |
| | """ |
| |
|
| | import json |
| | import os |
| | import boto3 |
| | import pandas as pd |
| | import numpy as np |
| | import matplotlib.pyplot as plt |
| | import seaborn as sns |
| | import io |
| | import base64 |
| | from datetime import datetime, timedelta |
| | import requests |
| | from typing import Dict, List, Optional, Tuple |
| | import logging |
| |
|
| | |
| | logger = logging.getLogger() |
| | logger.setLevel(logging.INFO) |
| |
|
| | |
| | s3_client = boto3.client('s3') |
| | lambda_client = boto3.client('lambda') |
| |
|
| | |
| | FRED_API_KEY = os.environ.get('FRED_API_KEY') |
| | S3_BUCKET = os.environ.get('S3_BUCKET', 'fredmlv1') |
| | FRED_BASE_URL = "https://api.stlouisfed.org/fred" |
| |
|
| | |
| | ECONOMIC_INDICATORS = { |
| | "GDP": "GDP", |
| | "UNRATE": "UNRATE", |
| | "CPIAUCSL": "CPIAUCSL", |
| | "FEDFUNDS": "FEDFUNDS", |
| | "DGS10": "DGS10", |
| | "DEXUSEU": "DEXUSEU", |
| | "PAYEMS": "PAYEMS", |
| | "INDPRO": "INDPRO", |
| | "M2SL": "M2SL", |
| | "PCE": "PCE" |
| | } |
| |
|
| | def get_fred_data(series_id: str, start_date: str, end_date: str) -> Optional[pd.Series]: |
| | """Fetch data from FRED API""" |
| | try: |
| | url = f"{FRED_BASE_URL}/series/observations" |
| | params = { |
| | "series_id": series_id, |
| | "api_key": FRED_API_KEY, |
| | "file_type": "json", |
| | "start_date": start_date, |
| | "end_date": end_date, |
| | } |
| |
|
| | response = requests.get(url, params=params) |
| | |
| | if response.status_code == 200: |
| | data = response.json() |
| | observations = data.get("observations", []) |
| | |
| | if observations: |
| | dates = [] |
| | values = [] |
| | |
| | for obs in observations: |
| | try: |
| | date = pd.to_datetime(obs["date"]) |
| | value = float(obs["value"]) if obs["value"] != "." else np.nan |
| | dates.append(date) |
| | values.append(value) |
| | except (ValueError, KeyError): |
| | continue |
| | |
| | if dates and values: |
| | return pd.Series(values, index=dates, name=series_id) |
| | |
| | logger.error(f"Failed to fetch data for {series_id}") |
| | return None |
| | |
| | except Exception as e: |
| | logger.error(f"Error fetching data for {series_id}: {e}") |
| | return None |
| |
|
| | def create_dataframe(series_data: Dict[str, pd.Series]) -> pd.DataFrame: |
| | """Create DataFrame from series data""" |
| | if not series_data: |
| | return pd.DataFrame() |
| | |
| | |
| | all_dates = set() |
| | for series in series_data.values(): |
| | if series is not None: |
| | all_dates.update(series.index) |
| | |
| | if all_dates: |
| | date_range = pd.date_range(min(all_dates), max(all_dates), freq='D') |
| | df = pd.DataFrame(index=date_range) |
| | |
| | for series_id, series_data in series_data.items(): |
| | if series_data is not None: |
| | df[series_id] = series_data |
| | |
| | df.index.name = 'Date' |
| | return df |
| | |
| | return pd.DataFrame() |
| |
|
| | def generate_statistics(df: pd.DataFrame) -> Dict: |
| | """Generate statistical summary""" |
| | if df.empty: |
| | return {} |
| | |
| | stats = {} |
| | for column in df.columns: |
| | if column != 'Date': |
| | series = df[column].dropna() |
| | if not series.empty: |
| | stats[column] = { |
| | 'mean': float(series.mean()), |
| | 'std': float(series.std()), |
| | 'min': float(series.min()), |
| | 'max': float(series.max()), |
| | 'count': int(len(series)), |
| | 'missing': int(df[column].isna().sum()) |
| | } |
| | |
| | return stats |
| |
|
| | def create_correlation_matrix(df: pd.DataFrame) -> Dict: |
| | """Create correlation matrix""" |
| | if df.empty: |
| | return {} |
| | |
| | corr_matrix = df.corr() |
| | return corr_matrix.to_dict() |
| |
|
| | def create_visualizations(df: pd.DataFrame, s3_bucket: str, report_id: str) -> List[str]: |
| | """Create and upload visualizations to S3""" |
| | if df.empty: |
| | return [] |
| | |
| | visualization_keys = [] |
| | |
| | try: |
| | |
| | plt.figure(figsize=(12, 8)) |
| | for column in df.columns: |
| | if column != 'Date': |
| | plt.plot(df.index, df[column], label=column, linewidth=2) |
| | |
| | plt.title('Economic Indicators Time Series') |
| | plt.xlabel('Date') |
| | plt.ylabel('Value') |
| | plt.legend() |
| | plt.grid(True, alpha=0.3) |
| | plt.xticks(rotation=45) |
| | plt.tight_layout() |
| | |
| | |
| | img_buffer = io.BytesIO() |
| | plt.savefig(img_buffer, format='png', dpi=300, bbox_inches='tight') |
| | img_buffer.seek(0) |
| | |
| | time_series_key = f"visualizations/{report_id}/time_series.png" |
| | s3_client.put_object( |
| | Bucket=s3_bucket, |
| | Key=time_series_key, |
| | Body=img_buffer.getvalue(), |
| | ContentType='image/png' |
| | ) |
| | visualization_keys.append(time_series_key) |
| | plt.close() |
| | |
| | |
| | if len(df.columns) > 1: |
| | plt.figure(figsize=(10, 8)) |
| | corr_matrix = df.corr() |
| | sns.heatmap(corr_matrix, annot=True, cmap='coolwarm', center=0) |
| | plt.title('Correlation Matrix') |
| | plt.tight_layout() |
| | |
| | img_buffer = io.BytesIO() |
| | plt.savefig(img_buffer, format='png', dpi=300, bbox_inches='tight') |
| | img_buffer.seek(0) |
| | |
| | correlation_key = f"visualizations/{report_id}/correlation.png" |
| | s3_client.put_object( |
| | Bucket=s3_bucket, |
| | Key=correlation_key, |
| | Body=img_buffer.getvalue(), |
| | ContentType='image/png' |
| | ) |
| | visualization_keys.append(correlation_key) |
| | plt.close() |
| | |
| | |
| | for column in df.columns: |
| | if column != 'Date': |
| | plt.figure(figsize=(8, 6)) |
| | plt.hist(df[column].dropna(), bins=30, alpha=0.7, edgecolor='black') |
| | plt.title(f'Distribution of {column}') |
| | plt.xlabel('Value') |
| | plt.ylabel('Frequency') |
| | plt.grid(True, alpha=0.3) |
| | plt.tight_layout() |
| | |
| | img_buffer = io.BytesIO() |
| | plt.savefig(img_buffer, format='png', dpi=300, bbox_inches='tight') |
| | img_buffer.seek(0) |
| | |
| | dist_key = f"visualizations/{report_id}/distribution_{column}.png" |
| | s3_client.put_object( |
| | Bucket=s3_bucket, |
| | Key=dist_key, |
| | Body=img_buffer.getvalue(), |
| | ContentType='image/png' |
| | ) |
| | visualization_keys.append(dist_key) |
| | plt.close() |
| | |
| | except Exception as e: |
| | logger.error(f"Error creating visualizations: {e}") |
| | |
| | return visualization_keys |
| |
|
| | def save_report_to_s3(report_data: Dict, s3_bucket: str, report_id: str) -> str: |
| | """Save report data to S3""" |
| | try: |
| | report_key = f"reports/{report_id}/report.json" |
| | |
| | s3_client.put_object( |
| | Bucket=s3_bucket, |
| | Key=report_key, |
| | Body=json.dumps(report_data, default=str), |
| | ContentType='application/json' |
| | ) |
| | |
| | return report_key |
| | except Exception as e: |
| | logger.error(f"Error saving report to S3: {e}") |
| | raise |
| |
|
| | def lambda_handler(event: Dict, context) -> Dict: |
| | """Main Lambda handler function""" |
| | try: |
| | logger.info(f"Received event: {json.dumps(event)}") |
| | |
| | |
| | if isinstance(event.get('body'), str): |
| | payload = json.loads(event['body']) |
| | else: |
| | payload = event |
| | |
| | indicators = payload.get('indicators', ['GDP', 'UNRATE', 'CPIAUCSL']) |
| | start_date = payload.get('start_date', (datetime.now() - timedelta(days=365)).strftime('%Y-%m-%d')) |
| | end_date = payload.get('end_date', datetime.now().strftime('%Y-%m-%d')) |
| | options = payload.get('options', {}) |
| | |
| | |
| | report_id = f"report_{datetime.now().strftime('%Y%m%d_%H%M%S')}" |
| | |
| | logger.info(f"Processing analysis for indicators: {indicators}") |
| | logger.info(f"Date range: {start_date} to {end_date}") |
| | |
| | |
| | series_data = {} |
| | for indicator in indicators: |
| | if indicator in ECONOMIC_INDICATORS: |
| | series_id = ECONOMIC_INDICATORS[indicator] |
| | data = get_fred_data(series_id, start_date, end_date) |
| | if data is not None: |
| | series_data[indicator] = data |
| | logger.info(f"Successfully fetched data for {indicator}") |
| | else: |
| | logger.warning(f"Failed to fetch data for {indicator}") |
| | |
| | |
| | df = create_dataframe(series_data) |
| | |
| | if df.empty: |
| | raise ValueError("No data available for analysis") |
| | |
| | |
| | report_data = { |
| | 'report_id': report_id, |
| | 'timestamp': datetime.now().isoformat(), |
| | 'indicators': indicators, |
| | 'start_date': start_date, |
| | 'end_date': end_date, |
| | 'total_observations': len(df), |
| | 'data_shape': df.shape, |
| | 'statistics': generate_statistics(df), |
| | 'correlation_matrix': create_correlation_matrix(df), |
| | 'data': df.reset_index().to_dict('records') |
| | } |
| | |
| | |
| | if options.get('visualizations', True): |
| | visualization_keys = create_visualizations(df, S3_BUCKET, report_id) |
| | report_data['visualizations'] = visualization_keys |
| | |
| | |
| | report_key = save_report_to_s3(report_data, S3_BUCKET, report_id) |
| | |
| | logger.info(f"Analysis completed successfully. Report saved to: {report_key}") |
| | |
| | return { |
| | 'statusCode': 200, |
| | 'body': json.dumps({ |
| | 'status': 'success', |
| | 'report_id': report_id, |
| | 'report_key': report_key, |
| | 'message': 'Analysis completed successfully' |
| | }) |
| | } |
| | |
| | except Exception as e: |
| | logger.error(f"Error in lambda_handler: {e}") |
| | return { |
| | 'statusCode': 500, |
| | 'body': json.dumps({ |
| | 'status': 'error', |
| | 'message': str(e) |
| | }) |
| | } |