| | import os |
| | import pandas as pd |
| | import numpy as np |
| | import matplotlib.pyplot as plt |
| | import seaborn as sns |
| | import plotly.express as px |
| | import plotly.graph_objects as go |
| | from plotly.subplots import make_subplots |
| | import warnings |
| | import traceback |
| | import time |
| | import random |
| | import requests |
| | import json |
| | from urllib3.util.retry import Retry |
| | from requests.adapters import HTTPAdapter |
| | warnings.filterwarnings('ignore') |
| |
|
| | from typing import Dict, List, Any, Optional, TypedDict |
| | from datetime import datetime |
| | import logging |
| |
|
| | |
| | from langgraph.graph import StateGraph, END |
| | from langchain_core.messages import HumanMessage, SystemMessage |
| |
|
| | |
| | logging.basicConfig(level=logging.INFO) |
| | logger = logging.getLogger(__name__) |
| |
|
| | class AnalysisState(TypedDict): |
| | """State structure for the analysis workflow""" |
| | dataset: pd.DataFrame |
| | dataset_info: Dict[str, Any] |
| | column_analysis: Dict[str, Any] |
| | insights: List[str] |
| | visualizations: List[Dict[str, Any]] |
| | recommendations: List[str] |
| | current_step: str |
| | error_messages: List[str] |
| |
|
| | class DataAnalysisAgent: |
| | def __init__(self, groq_api_key: str, model_name: str = "llama3-70b-8192"): |
| | """Initialize with direct Groq API calls to bypass HF Spaces blocks""" |
| | |
| | self.groq_api_key = groq_api_key |
| | self.model_name = model_name |
| | self.is_hf_spaces = os.environ.get('SPACE_ID') is not None |
| | |
| | |
| | self.session = requests.Session() |
| | retry_strategy = Retry( |
| | total=5, |
| | backoff_factor=3, |
| | status_forcelist=[429, 500, 502, 503, 504], |
| | allowed_methods=["POST"] |
| | ) |
| | adapter = HTTPAdapter(max_retries=retry_strategy) |
| | self.session.mount("http://", adapter) |
| | self.session.mount("https://", adapter) |
| | |
| | |
| | self.session.headers.update({ |
| | "User-Agent": "curl/7.68.0", |
| | "Accept": "*/*", |
| | "Accept-Encoding": "gzip, deflate", |
| | "Connection": "close" |
| | }) |
| | |
| | if self.is_hf_spaces: |
| | logger.info("🚀 HF Spaces: Using direct Groq API calls") |
| | else: |
| | logger.info("💻 Local: Using direct Groq API calls") |
| | |
| | |
| | self.workflow = self._create_workflow() |
| | |
| | def _direct_groq_call(self, prompt: str) -> str: |
| | """Direct Groq API call bypassing LangChain completely""" |
| | |
| | url = "https://api.groq.com/openai/v1/chat/completions" |
| | |
| | headers = { |
| | "Authorization": f"Bearer {self.groq_api_key}", |
| | "Content-Type": "application/json", |
| | "User-Agent": "curl/7.68.0", |
| | "Accept": "*/*", |
| | "Connection": "close" |
| | } |
| | |
| | data = { |
| | "messages": [ |
| | {"role": "user", "content": prompt} |
| | ], |
| | "model": self.model_name, |
| | "max_tokens": 1500, |
| | "temperature": 0.1, |
| | "stream": False |
| | } |
| | |
| | max_attempts = 5 if self.is_hf_spaces else 3 |
| | |
| | for attempt in range(max_attempts): |
| | try: |
| | if attempt > 0: |
| | |
| | delay = (2 ** attempt) + random.uniform(1, 3) |
| | logger.info(f"⏳ Waiting {delay:.1f}s before attempt {attempt + 1}") |
| | time.sleep(delay) |
| | |
| | logger.info(f"🤖 Direct Groq API attempt {attempt + 1}/{max_attempts}") |
| | |
| | |
| | if self.is_hf_spaces and attempt > 1: |
| | |
| | headers["User-Agent"] = f"DataAnalysisAgent/1.{attempt}" |
| | headers["X-Forwarded-For"] = "127.0.0.1" |
| | |
| | response = self.session.post( |
| | url, |
| | headers=headers, |
| | json=data, |
| | timeout=120, |
| | verify=True, |
| | allow_redirects=True |
| | ) |
| | |
| | logger.info(f"📡 Response status: {response.status_code}") |
| | |
| | if response.status_code == 200: |
| | result = response.json() |
| | content = result["choices"][0]["message"]["content"] |
| | logger.info("✅ Direct Groq API call successful") |
| | return content |
| | |
| | elif response.status_code == 429: |
| | logger.warning("⚠️ Rate limited, retrying...") |
| | time.sleep(10) |
| | continue |
| | |
| | elif response.status_code in [500, 502, 503, 504]: |
| | logger.warning(f"⚠️ Server error {response.status_code}, retrying...") |
| | continue |
| | |
| | else: |
| | logger.error(f"❌ API error {response.status_code}: {response.text}") |
| | if attempt == max_attempts - 1: |
| | raise Exception(f"Groq API error: {response.status_code}") |
| | continue |
| | |
| | except requests.exceptions.ConnectTimeout: |
| | logger.warning(f"⚠️ Connection timeout on attempt {attempt + 1}") |
| | continue |
| | |
| | except requests.exceptions.ReadTimeout: |
| | logger.warning(f"⚠️ Read timeout on attempt {attempt + 1}") |
| | continue |
| | |
| | except requests.exceptions.ConnectionError as e: |
| | logger.warning(f"⚠️ Connection error on attempt {attempt + 1}: {str(e)}") |
| | |
| | if self.is_hf_spaces and attempt > 2: |
| | logger.info("🔄 Creating new session...") |
| | self.session = requests.Session() |
| | continue |
| | |
| | except Exception as e: |
| | logger.error(f"❌ Unexpected error on attempt {attempt + 1}: {str(e)}") |
| | if attempt == max_attempts - 1: |
| | raise |
| | continue |
| | |
| | raise ConnectionError(f"Failed to connect to Groq API after {max_attempts} attempts") |
| | |
| | def _create_workflow(self) -> StateGraph: |
| | """Create the LangGraph workflow for data analysis""" |
| | workflow = StateGraph(AnalysisState) |
| | |
| | |
| | workflow.add_node("data_profiler", self._profile_dataset) |
| | workflow.add_node("column_analyzer", self._analyze_columns) |
| | workflow.add_node("insight_generator", self._generate_insights) |
| | workflow.add_node("visualization_planner", self._plan_visualizations) |
| | workflow.add_node("chart_creator", self._create_charts) |
| | workflow.add_node("recommendation_engine", self._generate_recommendations) |
| | |
| | |
| | workflow.add_edge("data_profiler", "column_analyzer") |
| | workflow.add_edge("column_analyzer", "insight_generator") |
| | workflow.add_edge("insight_generator", "visualization_planner") |
| | workflow.add_edge("visualization_planner", "chart_creator") |
| | workflow.add_edge("chart_creator", "recommendation_engine") |
| | workflow.add_edge("recommendation_engine", END) |
| | |
| | |
| | workflow.set_entry_point("data_profiler") |
| | |
| | return workflow.compile() |
| | |
| | def _profile_dataset(self, state: AnalysisState) -> AnalysisState: |
| | """Profile the dataset to understand its structure and characteristics""" |
| | logger.info("Profiling dataset...") |
| | |
| | try: |
| | df = state["dataset"] |
| | |
| | |
| | dataset_info = { |
| | "shape": df.shape, |
| | "columns": list(df.columns), |
| | "dtypes": {col: str(dtype) for col, dtype in df.dtypes.to_dict().items()}, |
| | "memory_usage": int(df.memory_usage(deep=True).sum()), |
| | "null_counts": df.isnull().sum().to_dict(), |
| | "duplicate_rows": int(df.duplicated().sum()), |
| | "numeric_columns": df.select_dtypes(include=[np.number]).columns.tolist(), |
| | "categorical_columns": df.select_dtypes(include=['object', 'category']).columns.tolist(), |
| | "datetime_columns": df.select_dtypes(include=['datetime64']).columns.tolist() |
| | } |
| | |
| | |
| | prompt = f"""Analyze this dataset profile: |
| | |
| | Dataset: {dataset_info['shape'][0]} rows × {dataset_info['shape'][1]} columns |
| | Missing values: {sum(dataset_info['null_counts'].values())} total |
| | Duplicates: {dataset_info['duplicate_rows']} |
| | Numeric columns: {len(dataset_info['numeric_columns'])} |
| | Categorical columns: {len(dataset_info['categorical_columns'])} |
| | |
| | Provide a brief professional assessment of data quality and analysis potential in 2-3 sentences.""" |
| | |
| | |
| | response_content = self._direct_groq_call(prompt) |
| | dataset_info["llm_profile"] = response_content |
| | |
| | state["dataset_info"] = dataset_info |
| | state["current_step"] = "data_profiler" |
| | |
| | except Exception as e: |
| | logger.error(f"Error in data profiling: {str(e)}") |
| | |
| | if "error_messages" not in state: |
| | state["error_messages"] = [] |
| | if "dataset_info" not in state: |
| | state["dataset_info"] = {} |
| | |
| | |
| | try: |
| | df = state["dataset"] |
| | state["dataset_info"] = { |
| | "shape": df.shape, |
| | "columns": list(df.columns), |
| | "dtypes": {col: str(dtype) for col, dtype in df.dtypes.items()}, |
| | "numeric_columns": df.select_dtypes(include=[np.number]).columns.tolist(), |
| | "categorical_columns": df.select_dtypes(include=['object', 'category']).columns.tolist(), |
| | "datetime_columns": df.select_dtypes(include=['datetime64']).columns.tolist(), |
| | "null_counts": df.isnull().sum().to_dict(), |
| | "duplicate_rows": int(df.duplicated().sum()), |
| | "memory_usage": int(df.memory_usage(deep=True).sum()), |
| | "llm_profile": "Basic profile completed" |
| | } |
| | except Exception: |
| | |
| | state["dataset_info"] = { |
| | "shape": [0, 0], |
| | "columns": [], |
| | "dtypes": {}, |
| | "numeric_columns": [], |
| | "categorical_columns": [], |
| | "datetime_columns": [], |
| | "null_counts": {}, |
| | "duplicate_rows": 0, |
| | "memory_usage": 0, |
| | "llm_profile": "Profile failed" |
| | } |
| | |
| | state["error_messages"].append(f"Data profiling error: {str(e)}") |
| | |
| | return state |
| | |
| | def _analyze_columns(self, state: AnalysisState) -> AnalysisState: |
| | """Analyze individual columns in detail""" |
| | logger.info("Analyzing columns...") |
| | |
| | try: |
| | df = state["dataset"] |
| | column_analysis = {} |
| | |
| | for column in df.columns: |
| | col_data = df[column] |
| | |
| | analysis = { |
| | "dtype": str(col_data.dtype), |
| | "null_count": int(col_data.isnull().sum()), |
| | "null_percentage": float((col_data.isnull().sum() / len(col_data)) * 100), |
| | "unique_count": int(col_data.nunique()), |
| | "unique_percentage": float((col_data.nunique() / len(col_data)) * 100) |
| | } |
| | |
| | if col_data.dtype in ['int64', 'float64']: |
| | analysis.update({ |
| | "mean": float(col_data.mean()) if not pd.isna(col_data.mean()) else None, |
| | "median": float(col_data.median()) if not pd.isna(col_data.median()) else None, |
| | "std": float(col_data.std()) if not pd.isna(col_data.std()) else None, |
| | "min": float(col_data.min()) if not pd.isna(col_data.min()) else None, |
| | "max": float(col_data.max()) if not pd.isna(col_data.max()) else None, |
| | "skewness": float(col_data.skew()) if not pd.isna(col_data.skew()) else None, |
| | "kurtosis": float(col_data.kurtosis()) if not pd.isna(col_data.kurtosis()) else None |
| | }) |
| | elif col_data.dtype == 'object': |
| | try: |
| | top_values = col_data.value_counts().head(5).to_dict() |
| | analysis.update({ |
| | "top_values": top_values, |
| | "avg_length": float(col_data.astype(str).str.len().mean()), |
| | "max_length": int(col_data.astype(str).str.len().max()) |
| | }) |
| | except Exception: |
| | analysis.update({ |
| | "top_values": {}, |
| | "avg_length": 0, |
| | "max_length": 0 |
| | }) |
| | |
| | column_analysis[column] = analysis |
| | |
| | |
| | prompt = f"""Analyze these column statistics and identify key patterns: |
| | |
| | Total columns analyzed: {len(column_analysis)} |
| | Numeric columns: {len([c for c in column_analysis if 'mean' in column_analysis[c]])} |
| | Text columns: {len([c for c in column_analysis if 'top_values' in column_analysis[c]])} |
| | |
| | Provide 2-3 key observations about data patterns and quality issues.""" |
| | |
| | |
| | response_content = self._direct_groq_call(prompt) |
| | column_analysis["llm_interpretation"] = response_content |
| | |
| | state["column_analysis"] = column_analysis |
| | state["current_step"] = "column_analyzer" |
| | |
| | except Exception as e: |
| | logger.error(f"Error in column analysis: {str(e)}") |
| | if "error_messages" not in state: |
| | state["error_messages"] = [] |
| | if "column_analysis" not in state: |
| | state["column_analysis"] = {} |
| | state["error_messages"].append(f"Column analysis error: {str(e)}") |
| | |
| | return state |
| | |
| | def _generate_insights(self, state: AnalysisState) -> AnalysisState: |
| | """Generate insights from the data analysis""" |
| | logger.info("Generating insights...") |
| | |
| | try: |
| | df = state["dataset"] |
| | dataset_info = state["dataset_info"] |
| | |
| | |
| | if "numeric_columns" not in dataset_info: |
| | dataset_info["numeric_columns"] = df.select_dtypes(include=[np.number]).columns.tolist() |
| | if "categorical_columns" not in dataset_info: |
| | dataset_info["categorical_columns"] = df.select_dtypes(include=['object', 'category']).columns.tolist() |
| | |
| | |
| | correlations = {} |
| | numeric_cols = dataset_info.get("numeric_columns", []) |
| | if len(numeric_cols) > 1: |
| | corr_matrix = df[numeric_cols].corr() |
| | high_correlations = [] |
| | for i in range(len(corr_matrix.columns)): |
| | for j in range(i+1, len(corr_matrix.columns)): |
| | corr_val = corr_matrix.iloc[i, j] |
| | if not pd.isna(corr_val) and abs(corr_val) > 0.7: |
| | high_correlations.append({ |
| | "col1": corr_matrix.columns[i], |
| | "col2": corr_matrix.columns[j], |
| | "correlation": float(corr_val) |
| | }) |
| | correlations["high_correlations"] = high_correlations |
| | |
| | |
| | prompt = f"""Generate exactly 5 specific insights for this dataset. |
| | |
| | Dataset Overview: |
| | - Rows: {dataset_info.get('shape', [0])[0]:,} |
| | - Columns: {dataset_info.get('shape', [0])[1]} |
| | - Missing values: {sum(dataset_info.get('null_counts', {}).values()):,} |
| | - Numeric variables: {len(numeric_cols)} |
| | - Categorical variables: {len(dataset_info.get('categorical_columns', []))} |
| | - Strong correlations found: {len(correlations.get('high_correlations', []))} |
| | |
| | IMPORTANT: Respond with EXACTLY this format: |
| | |
| | 1. [First specific insight about data quality or patterns] |
| | |
| | 2. [Second specific insight about distribution or trends] |
| | |
| | 3. [Third specific insight about relationships or correlations] |
| | |
| | 4. [Fourth specific insight about business implications] |
| | |
| | 5. [Fifth specific insight about opportunities or recommendations] |
| | |
| | Each insight should be: |
| | - Specific and data-focused |
| | - Business-relevant |
| | - At least 15 words long |
| | - Complete on its own line |
| | |
| | Do not include any other text or formatting.""" |
| | |
| | |
| | response_content = self._direct_groq_call(prompt) |
| | |
| | |
| | insights = [] |
| | lines = response_content.strip().split('\n') |
| | current_insight = "" |
| | |
| | for line in lines: |
| | line = line.strip() |
| | |
| | |
| | if line and len(line) > 3 and line[0].isdigit() and line[1:3] in ['. ', ') ', ': ']: |
| | |
| | if current_insight: |
| | clean_insight = current_insight.strip() |
| | if len(clean_insight) > 15: |
| | insights.append(clean_insight) |
| | |
| | |
| | current_insight = line[2:].strip() if line[1] == '.' else line[3:].strip() |
| | |
| | elif current_insight and line and not line[0].isdigit(): |
| | |
| | current_insight += " " + line |
| | |
| | |
| | if len(insights) >= 5: |
| | break |
| | |
| | |
| | if current_insight and len(insights) < 5: |
| | clean_insight = current_insight.strip() |
| | if len(clean_insight) > 15: |
| | insights.append(clean_insight) |
| | |
| | |
| | fallback_insights = [ |
| | "Dataset contains substantial missing values that may impact analysis accuracy and require data cleaning strategies", |
| | "Distribution patterns show significant variation across variables, indicating diverse data characteristics requiring tailored analysis approaches", |
| | "Strong correlations exist between key variables, suggesting potential predictive relationships and analytical opportunities", |
| | "Data quality metrics indicate areas for improvement in collection processes and validation procedures", |
| | "Business value can be enhanced through targeted analysis of high-impact variables and strategic data utilization" |
| | ] |
| | |
| | while len(insights) < 5: |
| | insight_index = len(insights) |
| | if insight_index < len(fallback_insights): |
| | insights.append(fallback_insights[insight_index]) |
| | else: |
| | insights.append(f"Additional analysis opportunities exist within the {dataset_info.get('shape', [0])[1]} variables to uncover business insights") |
| | |
| | |
| | insights = insights[:5] |
| | |
| | state["insights"] = insights |
| | state["current_step"] = "insight_generator" |
| | |
| | except Exception as e: |
| | logger.error(f"Error in insight generation: {str(e)}") |
| | if "error_messages" not in state: |
| | state["error_messages"] = [] |
| | if "insights" not in state: |
| | state["insights"] = [] |
| | state["error_messages"].append(f"Insight generation error: {str(e)}") |
| | |
| | return state |
| | |
| | def _plan_visualizations(self, state: AnalysisState) -> AnalysisState: |
| | """Plan appropriate visualizations based on data characteristics""" |
| | logger.info("Planning visualizations...") |
| | |
| | try: |
| | dataset_info = state["dataset_info"] |
| | insights = state["insights"] |
| | |
| | |
| | if "numeric_columns" not in dataset_info: |
| | df = state["dataset"] |
| | dataset_info["numeric_columns"] = df.select_dtypes(include=[np.number]).columns.tolist() |
| | dataset_info["categorical_columns"] = df.select_dtypes(include=['object', 'category']).columns.tolist() |
| | |
| | |
| | prompt = f"""Plan 5 effective visualizations for this dataset: |
| | |
| | Numeric columns: {len(dataset_info.get('numeric_columns', []))} |
| | Categorical columns: {len(dataset_info.get('categorical_columns', []))} |
| | |
| | Return as JSON array: |
| | [ |
| | {{"type": "histogram", "columns": ["col1"], "title": "Distribution of col1", "description": "Shows distribution", "purpose": "Understand patterns"}}, |
| | {{"type": "bar", "columns": ["col2"], "title": "Frequency of col2", "description": "Shows counts", "purpose": "Category analysis"}} |
| | ] |
| | |
| | Use types: histogram, bar, scatter, heatmap, line""" |
| | |
| | |
| | response_content = self._direct_groq_call(prompt) |
| | |
| | try: |
| | |
| | json_start = response_content.find('[') |
| | json_end = response_content.rfind(']') + 1 |
| | if json_start >= 0 and json_end > json_start: |
| | viz_plan = json.loads(response_content[json_start:json_end]) |
| | else: |
| | viz_plan = self._create_default_viz_plan(dataset_info) |
| | except Exception: |
| | |
| | viz_plan = self._create_default_viz_plan(dataset_info) |
| | |
| | state["visualizations"] = viz_plan |
| | state["current_step"] = "visualization_planner" |
| | |
| | except Exception as e: |
| | logger.error(f"Error in visualization planning: {str(e)}") |
| | if "error_messages" not in state: |
| | state["error_messages"] = [] |
| | if "visualizations" not in state: |
| | state["visualizations"] = [] |
| | state["error_messages"].append(f"Visualization planning error: {str(e)}") |
| | |
| | if "dataset_info" not in state: |
| | state["dataset_info"] = {} |
| | state["visualizations"] = self._create_default_viz_plan(state["dataset_info"]) |
| | |
| | return state |
| | |
| | def _create_default_viz_plan(self, dataset_info: Dict) -> List[Dict]: |
| | """Create a default visualization plan""" |
| | viz_plan = [] |
| | |
| | |
| | numeric_columns = dataset_info.get("numeric_columns", []) |
| | categorical_columns = dataset_info.get("categorical_columns", []) |
| | |
| | |
| | for col in numeric_columns[:3]: |
| | viz_plan.append({ |
| | "type": "histogram", |
| | "columns": [col], |
| | "title": f"Distribution of {col}", |
| | "description": f"Shows the distribution pattern of {col}", |
| | "purpose": "Understand data distribution" |
| | }) |
| | |
| | |
| | for col in categorical_columns[:2]: |
| | viz_plan.append({ |
| | "type": "bar", |
| | "columns": [col], |
| | "title": f"Frequency of {col}", |
| | "description": f"Shows the frequency of different {col} values", |
| | "purpose": "Understand categorical distribution" |
| | }) |
| | |
| | |
| | if len(numeric_columns) > 1: |
| | viz_plan.append({ |
| | "type": "heatmap", |
| | "columns": numeric_columns, |
| | "title": "Correlation Matrix", |
| | "description": "Shows correlations between numeric variables", |
| | "purpose": "Identify relationships" |
| | }) |
| | |
| | return viz_plan |
| | |
| | def _create_charts(self, state: AnalysisState) -> AnalysisState: |
| | """Create the planned visualizations""" |
| | logger.info("Creating charts...") |
| | |
| | try: |
| | df = state["dataset"] |
| | viz_plans = state["visualizations"] |
| | |
| | |
| | try: |
| | plt.style.use('default') |
| | except: |
| | pass |
| | |
| | for i, viz in enumerate(viz_plans): |
| | try: |
| | fig, ax = plt.subplots(figsize=(10, 6)) |
| | |
| | if viz["type"] == "histogram": |
| | col = viz["columns"][0] |
| | if col in df.columns and df[col].dtype in ['int64', 'float64']: |
| | df[col].dropna().hist(bins=30, ax=ax, alpha=0.7) |
| | ax.set_title(viz["title"]) |
| | ax.set_xlabel(col) |
| | ax.set_ylabel('Frequency') |
| | |
| | elif viz["type"] == "bar": |
| | col = viz["columns"][0] |
| | if col in df.columns: |
| | value_counts = df[col].value_counts().head(10) |
| | value_counts.plot(kind='bar', ax=ax) |
| | ax.set_title(viz["title"]) |
| | ax.set_xlabel(col) |
| | ax.set_ylabel('Count') |
| | plt.xticks(rotation=45) |
| | |
| | elif viz["type"] == "heatmap": |
| | numeric_cols = [col for col in viz["columns"] if col in df.columns and df[col].dtype in ['int64', 'float64']] |
| | if len(numeric_cols) > 1: |
| | corr_matrix = df[numeric_cols].corr() |
| | |
| | im = ax.imshow(corr_matrix, cmap='coolwarm', aspect='auto') |
| | ax.set_xticks(range(len(corr_matrix.columns))) |
| | ax.set_yticks(range(len(corr_matrix.columns))) |
| | ax.set_xticklabels(corr_matrix.columns, rotation=45) |
| | ax.set_yticklabels(corr_matrix.columns) |
| | ax.set_title(viz["title"]) |
| | plt.colorbar(im, ax=ax) |
| | |
| | elif viz["type"] == "scatter": |
| | if len(viz["columns"]) >= 2: |
| | col1, col2 = viz["columns"][0], viz["columns"][1] |
| | if col1 in df.columns and col2 in df.columns: |
| | clean_data = df[[col1, col2]].dropna() |
| | ax.scatter(clean_data[col1], clean_data[col2], alpha=0.6) |
| | ax.set_xlabel(col1) |
| | ax.set_ylabel(col2) |
| | ax.set_title(viz["title"]) |
| | |
| | plt.tight_layout() |
| | plt.savefig(f'chart_{i+1}_{viz["type"]}.png', dpi=300, bbox_inches='tight') |
| | plt.close() |
| | |
| | except Exception as e: |
| | logger.warning(f"Failed to create {viz.get('type', 'unknown')} chart: {str(e)}") |
| | plt.close() |
| | continue |
| | |
| | state["current_step"] = "chart_creator" |
| | |
| | except Exception as e: |
| | logger.error(f"Error in chart creation: {str(e)}") |
| | if "error_messages" not in state: |
| | state["error_messages"] = [] |
| | state["error_messages"].append(f"Chart creation error: {str(e)}") |
| | |
| | return state |
| | |
| | def _generate_recommendations(self, state: AnalysisState) -> AnalysisState: |
| | """Generate actionable recommendations based on analysis""" |
| | logger.info("Generating recommendations...") |
| | |
| | try: |
| | insights = state["insights"] |
| | dataset_info = state["dataset_info"] |
| | |
| | |
| | prompt = f"""Based on the complete data analysis, generate specific and exactly 5 actionable business recommendations. |
| | |
| | Dataset Overview: |
| | - Rows: {dataset_info.get('shape', [0])[0]:,} |
| | - Columns: {dataset_info.get('shape', [0])[1]} |
| | - Missing values: {sum(dataset_info.get('null_counts', {}).values()):,} |
| | - Numeric variables: {len(dataset_info.get('numeric_columns', []))} |
| | - Categorical variables: {len(dataset_info.get('categorical_columns', []))} |
| | |
| | Key insights found: {len(insights)} insights |
| | |
| | IMPORTANT: Respond with EXACTLY this format: |
| | |
| | 1. [First specific recommendation for actionable decision-making in business growth] |
| | |
| | 2. [Second specific recommendation for strategic decision-making in business growth] |
| | |
| | 3. [Third specific recommendation for operational efficiency or performance optimization] |
| | |
| | 4. [Fourth specific recommendation for further data analysis or reporting improvements] |
| | |
| | 5. [Fifth specific recommendation for action items to stakeholders] |
| | |
| | Each recommendation should be: |
| | - Specific and actionable |
| | - Business-focused |
| | - Based on the data characteristics |
| | - At least 15 words long |
| | - Complete on its own line |
| | |
| | Do not include any other text, explanations, or formatting.""" |
| | |
| | |
| | response_content = self._direct_groq_call(prompt) |
| | |
| | |
| | logger.info("=" * 50) |
| | logger.info("FULL GROQ RESPONSE FOR RECOMMENDATIONS:") |
| | logger.info(response_content) |
| | logger.info("=" * 50) |
| | |
| | |
| | recommendations = [] |
| | |
| | |
| | lines = response_content.strip().split('\n') |
| | current_rec = "" |
| | |
| | for line in lines: |
| | line = line.strip() |
| | |
| | |
| | if line and len(line) > 3 and line[0].isdigit() and line[1:3] in ['. ', ') ', ': ']: |
| | |
| | if current_rec: |
| | clean_rec = current_rec.strip() |
| | if len(clean_rec) > 15: |
| | recommendations.append(clean_rec) |
| | |
| | |
| | current_rec = line[2:].strip() if line[1] == '.' else line[3:].strip() |
| | |
| | elif current_rec and line and not line[0].isdigit(): |
| | |
| | current_rec += " " + line |
| | |
| | |
| | if current_rec and len(recommendations) < 5: |
| | clean_rec = current_rec.strip() |
| | if len(clean_rec) > 15: |
| | recommendations.append(clean_rec) |
| | |
| | |
| | if len(recommendations) < 3: |
| | logger.warning("Strategy 1 failed, trying regex approach...") |
| | import re |
| | |
| | |
| | pattern = r'(\d+)\.\s+([^0-9]+?)(?=\d+\.|$)' |
| | matches = re.findall(pattern, response_content, re.DOTALL) |
| | |
| | recommendations = [] |
| | for match in matches: |
| | rec_text = match[1].strip() |
| | if len(rec_text) > 15: |
| | recommendations.append(rec_text) |
| | if len(recommendations) >= 5: |
| | break |
| | |
| | |
| | if len(recommendations) < 3: |
| | logger.warning("Regex approach failed, trying sentence-based approach...") |
| | |
| | |
| | cleaned_text = re.sub(r'^\d+\.?\s*', '', response_content, flags=re.MULTILINE) |
| | sentences = [s.strip() for s in cleaned_text.split('.') if len(s.strip()) > 20] |
| | |
| | recommendations = sentences[:5] |
| | |
| | |
| | fallback_recommendations = [ |
| | "Implement comprehensive data quality monitoring and validation processes to identify and address missing or inconsistent data values before analysis", |
| | "Develop automated reporting dashboards that provide real-time visibility into key business metrics and performance indicators for stakeholder decision-making", |
| | "Establish regular data collection workflows and governance protocols to ensure consistent, accurate, and timely data capture across all business processes", |
| | "Consider implementing advanced analytics and machine learning models to uncover predictive insights that can drive proactive business strategies and competitive advantage", |
| | "Create standardized data documentation and metadata management practices to improve data discoverability, understanding, and collaborative analysis across teams" |
| | ] |
| | |
| | |
| | while len(recommendations) < 5: |
| | rec_index = len(recommendations) |
| | if rec_index < len(fallback_recommendations): |
| | recommendations.append(fallback_recommendations[rec_index]) |
| | else: |
| | recommendations.append(f"Conduct additional analysis on the {dataset_info.get('shape', [0])[1]} variables to identify optimization opportunities and data-driven improvements") |
| | |
| | |
| | recommendations = recommendations[:5] |
| | |
| | |
| | logger.info(f"FINAL RECOMMENDATIONS COUNT: {len(recommendations)}") |
| | for i, rec in enumerate(recommendations, 1): |
| | logger.info(f"FINAL REC {i}: {rec}") |
| | |
| | state["recommendations"] = recommendations |
| | state["current_step"] = "recommendation_engine" |
| | |
| | except Exception as e: |
| | logger.error(f"Error in recommendation generation: {str(e)}") |
| | |
| | |
| | fallback_recs = [ |
| | "Implement comprehensive data quality assessment and validation procedures to ensure data accuracy and completeness before analysis", |
| | "Develop automated monitoring dashboards for key business metrics to provide real-time insights and performance tracking capabilities", |
| | "Consider implementing advanced statistical modeling and machine learning techniques to uncover predictive insights and business opportunities", |
| | "Establish regular data governance workflows and collection protocols to maintain consistent, high-quality data across all business processes", |
| | "Create standardized reporting and communication processes to effectively share analysis findings with key stakeholders and decision-makers" |
| | ] |
| | |
| | state["recommendations"] = fallback_recs |
| | |
| | if "error_messages" not in state: |
| | state["error_messages"] = [] |
| | state["error_messages"].append(f"Recommendation generation error: {str(e)}") |
| | |
| | return state |
| | |
| | def analyze_dataset(self, dataset_path: str) -> Dict[str, Any]: |
| | """Main method to analyze a dataset""" |
| | logger.info(f"Starting analysis of dataset: {dataset_path}") |
| | |
| | try: |
| | |
| | if dataset_path.endswith('.csv'): |
| | df = pd.read_csv(dataset_path) |
| | elif dataset_path.endswith(('.xlsx', '.xls')): |
| | df = pd.read_excel(dataset_path) |
| | elif dataset_path.endswith('.json'): |
| | df = pd.read_json(dataset_path) |
| | else: |
| | raise ValueError("Unsupported file format. Use CSV, Excel, or JSON.") |
| | |
| | |
| | initial_state = AnalysisState( |
| | dataset=df, |
| | dataset_info={}, |
| | column_analysis={}, |
| | insights=[], |
| | visualizations=[], |
| | recommendations=[], |
| | current_step="", |
| | error_messages=[] |
| | ) |
| | |
| | |
| | final_state = self.workflow.invoke(initial_state) |
| | |
| | |
| | results = { |
| | "dataset_info": final_state.get("dataset_info", {}), |
| | "column_analysis": final_state.get("column_analysis", {}), |
| | "insights": final_state.get("insights", []), |
| | "visualizations": final_state.get("visualizations", []), |
| | "recommendations": final_state.get("recommendations", []), |
| | "analysis_timestamp": datetime.now().isoformat(), |
| | "errors": final_state.get("error_messages", []) |
| | } |
| | |
| | |
| | self._generate_report(results, dataset_path) |
| | |
| | logger.info("Analysis completed successfully!") |
| | return results |
| | |
| | except Exception as e: |
| | logger.error(f"Error in dataset analysis: {str(e)}") |
| | return {"error": str(e)} |
| | |
| | def _generate_report(self, results: Dict[str, Any], dataset_path: str): |
| | """Generate a comprehensive analysis report""" |
| | try: |
| | report_content = f""" |
| | # Data Analysis Report |
| | ## Dataset: {dataset_path} |
| | ## Analysis Date: {results['analysis_timestamp']} |
| | |
| | ### Dataset Overview |
| | - Shape: {results['dataset_info'].get('shape', 'N/A')} |
| | - Columns: {len(results['dataset_info'].get('columns', []))} |
| | - Missing Values: {sum(results['dataset_info'].get('null_counts', {}).values())} |
| | - Duplicate Rows: {results['dataset_info'].get('duplicate_rows', 'N/A')} |
| | |
| | ### Key Insights |
| | """ |
| | |
| | for i, insight in enumerate(results.get('insights', []), 1): |
| | report_content += f"{i}. {insight}\n" |
| | |
| | report_content += "\n### Recommendations\n" |
| | for i, rec in enumerate(results.get('recommendations', []), 1): |
| | report_content += f"{i}. {rec}\n" |
| | |
| | |
| | with open('analysis_report.md', 'w') as f: |
| | f.write(report_content) |
| | |
| | print("Analysis report saved as 'analysis_report.md'") |
| | except Exception as e: |
| | logger.error(f"Error generating report: {str(e)}") |
| |
|
| | |
| | class DataAnalysisConfig: |
| | """Configuration class for easy customization""" |
| | |
| | def __init__(self): |
| | self.groq_api_key = os.environ.get('GROQ_API_KEY') |
| | self.model_name = "llama3-70b-8192" |
| | self.output_directory = "analysis_output" |
| | self.chart_style = "default" |
| | |
| | def validate(self): |
| | """Validate configuration""" |
| | if not self.groq_api_key: |
| | raise ValueError("GROQ_API_KEY environment variable is required") |
| | |
| | if not os.path.exists(self.output_directory): |
| | os.makedirs(self.output_directory) |
| |
|
| | def main(): |
| | """Main function to run the data analysis system""" |
| | |
| | |
| | config = DataAnalysisConfig() |
| | |
| | try: |
| | config.validate() |
| | except ValueError as e: |
| | print(f"Configuration error: {e}") |
| | print("Please set the GROQ_API_KEY environment variable") |
| | return |
| | |
| | |
| | agent = DataAnalysisAgent( |
| | groq_api_key=config.groq_api_key, |
| | model_name=config.model_name |
| | ) |
| | |
| | |
| | dataset_path = "your_dataset.csv" |
| | |
| | if os.path.exists(dataset_path): |
| | results = agent.analyze_dataset(dataset_path) |
| | |
| | if "error" not in results: |
| | print("Analysis completed successfully!") |
| | print(f"Generated {len(results['insights'])} insights") |
| | print(f"Created {len(results['visualizations'])} visualizations") |
| | print(f"Provided {len(results['recommendations'])} recommendations") |
| | else: |
| | print(f"Analysis failed: {results['error']}") |
| | else: |
| | print(f"Dataset file not found: {dataset_path}") |
| | print("Please provide a valid dataset path") |
| |
|
| | if __name__ == "__main__": |
| | main() |