Spaces:
Sleeping
Sleeping
| import os | |
| import uuid | |
| import json | |
| import pandas as pd | |
| import numpy as np | |
| from datetime import datetime, timedelta | |
| from flask import Flask, request, jsonify, send_file | |
| from flask_cors import CORS | |
| from werkzeug.utils import secure_filename | |
| import threading | |
| import time | |
| import logging | |
| from scipy import stats | |
| import matplotlib | |
| matplotlib.use('Agg') # Use non-interactive backend | |
| import matplotlib.pyplot as plt | |
| import seaborn as sns | |
| import io | |
| import base64 | |
| from apscheduler.schedulers.background import BackgroundScheduler | |
| import atexit | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| app = Flask(__name__) | |
| CORS(app) | |
| # Configuration | |
| UPLOAD_FOLDER = '/tmp/uploads' | |
| PROCESSED_FOLDER = '/tmp/processed' | |
| MAX_FILE_SIZE = 512 * 1024 * 1024 # 512MB | |
| ALLOWED_EXTENSIONS = {'csv', 'xlsx', 'xls', 'json', 'parquet', 'tsv'} | |
| FILE_EXPIRY_HOURS = 1 | |
| # Ensure directories exist | |
| os.makedirs(UPLOAD_FOLDER, exist_ok=True) | |
| os.makedirs(PROCESSED_FOLDER, exist_ok=True) | |
| # File storage to track sessions and files | |
| file_storage = {} | |
| def allowed_file(filename): | |
| return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS | |
| def get_file_age(filepath): | |
| """Get file age in hours""" | |
| if os.path.exists(filepath): | |
| file_time = os.path.getmtime(filepath) | |
| return (time.time() - file_time) / 3600 | |
| return float('inf') | |
| def cleanup_old_files(): | |
| """Remove files older than FILE_EXPIRY_HOURS""" | |
| try: | |
| for folder in [UPLOAD_FOLDER, PROCESSED_FOLDER]: | |
| for root, dirs, files in os.walk(folder): | |
| for file in files: | |
| filepath = os.path.join(root, file) | |
| if get_file_age(filepath) > FILE_EXPIRY_HOURS: | |
| os.remove(filepath) | |
| logger.info(f"Cleaned up old file: {filepath}") | |
| # Clean up file_storage entries | |
| current_time = datetime.now() | |
| sessions_to_remove = [] | |
| for session_id, files in file_storage.items(): | |
| files_to_remove = [] | |
| for file_id, file_info in files.items(): | |
| file_time = datetime.fromisoformat(file_info['timestamp']) | |
| if (current_time - file_time).total_seconds() > FILE_EXPIRY_HOURS * 3600: | |
| files_to_remove.append(file_id) | |
| for file_id in files_to_remove: | |
| del files[file_id] | |
| if not files: | |
| sessions_to_remove.append(session_id) | |
| for session_id in sessions_to_remove: | |
| del file_storage[session_id] | |
| except Exception as e: | |
| logger.error(f"Error during cleanup: {str(e)}") | |
| # Setup scheduler for automatic cleanup | |
| scheduler = BackgroundScheduler() | |
| scheduler.add_job(func=cleanup_old_files, trigger="interval", minutes=15) | |
| scheduler.start() | |
| atexit.register(lambda: scheduler.shutdown()) | |
| def load_data_file(filepath, filename): | |
| """Load data from various file formats""" | |
| try: | |
| file_ext = filename.rsplit('.', 1)[1].lower() | |
| if file_ext == 'csv': | |
| return pd.read_csv(filepath) | |
| elif file_ext in ['xlsx', 'xls']: | |
| return pd.read_excel(filepath) | |
| elif file_ext == 'json': | |
| return pd.read_json(filepath) | |
| elif file_ext == 'parquet': | |
| return pd.read_parquet(filepath) | |
| elif file_ext == 'tsv': | |
| return pd.read_csv(filepath, sep='\t') | |
| else: | |
| raise ValueError(f"Unsupported file format: {file_ext}") | |
| except Exception as e: | |
| raise Exception(f"Error loading file: {str(e)}") | |
| def perform_basic_statistics(df, columns=None): | |
| """Perform basic statistical analysis""" | |
| if columns: | |
| df = df[columns] | |
| numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist() | |
| categorical_cols = df.select_dtypes(exclude=[np.number]).columns.tolist() | |
| result = { | |
| 'numeric_summary': {}, | |
| 'categorical_summary': {}, | |
| 'general_info': { | |
| 'total_rows': len(df), | |
| 'total_columns': len(df.columns), | |
| 'numeric_columns': len(numeric_cols), | |
| 'categorical_columns': len(categorical_cols), | |
| 'missing_values': df.isnull().sum().to_dict() | |
| } | |
| } | |
| # Numeric statistics | |
| if numeric_cols: | |
| numeric_stats = df[numeric_cols].describe() | |
| result['numeric_summary'] = numeric_stats.to_dict() | |
| # Categorical statistics | |
| if categorical_cols: | |
| for col in categorical_cols: | |
| result['categorical_summary'][col] = { | |
| 'unique_values': df[col].nunique(), | |
| 'top_values': df[col].value_counts().head(10).to_dict(), | |
| 'missing_count': df[col].isnull().sum() | |
| } | |
| return result | |
| def perform_groupby_analysis(df, group_column, target_column, operation='mean', filters=None): | |
| """Perform group by analysis""" | |
| # Apply filters if provided | |
| if filters: | |
| for f in filters: | |
| col, op, val = f['column'], f['operator'], f['value'] | |
| if op == '>': | |
| df = df[df[col] > val] | |
| elif op == '<': | |
| df = df[df[col] < val] | |
| elif op == '==': | |
| df = df[df[col] == val] | |
| elif op == '!=': | |
| df = df[df[col] != val] | |
| elif op == '>=': | |
| df = df[df[col] >= val] | |
| elif op == '<=': | |
| df = df[df[col] <= val] | |
| # Perform groupby operation | |
| grouped = df.groupby(group_column)[target_column] | |
| if operation == 'mean': | |
| result = grouped.mean() | |
| elif operation == 'sum': | |
| result = grouped.sum() | |
| elif operation == 'count': | |
| result = grouped.count() | |
| elif operation == 'max': | |
| result = grouped.max() | |
| elif operation == 'min': | |
| result = grouped.min() | |
| elif operation == 'std': | |
| result = grouped.std() | |
| else: | |
| raise ValueError(f"Unsupported operation: {operation}") | |
| return { | |
| 'result': result.to_dict(), | |
| 'operation': operation, | |
| 'group_column': group_column, | |
| 'target_column': target_column, | |
| 'total_groups': len(result) | |
| } | |
| def perform_correlation_analysis(df, columns=None, method='pearson'): | |
| """Perform correlation analysis""" | |
| if columns: | |
| df = df[columns] | |
| # Only numeric columns | |
| numeric_df = df.select_dtypes(include=[np.number]) | |
| if numeric_df.empty: | |
| raise ValueError("No numeric columns found for correlation analysis") | |
| correlation_matrix = numeric_df.corr(method=method) | |
| return { | |
| 'correlation_matrix': correlation_matrix.to_dict(), | |
| 'method': method, | |
| 'columns': numeric_df.columns.tolist() | |
| } | |
| def detect_outliers(df, columns=None, method='iqr'): | |
| """Detect outliers in numeric columns""" | |
| if columns: | |
| df = df[columns] | |
| numeric_df = df.select_dtypes(include=[np.number]) | |
| outliers = {} | |
| for col in numeric_df.columns: | |
| if method == 'iqr': | |
| Q1 = numeric_df[col].quantile(0.25) | |
| Q3 = numeric_df[col].quantile(0.75) | |
| IQR = Q3 - Q1 | |
| lower_bound = Q1 - 1.5 * IQR | |
| upper_bound = Q3 + 1.5 * IQR | |
| outlier_indices = numeric_df[(numeric_df[col] < lower_bound) | | |
| (numeric_df[col] > upper_bound)].index.tolist() | |
| elif method == 'zscore': | |
| z_scores = np.abs(stats.zscore(numeric_df[col].dropna())) | |
| outlier_indices = numeric_df[z_scores > 3].index.tolist() | |
| outliers[col] = { | |
| 'count': len(outlier_indices), | |
| 'indices': outlier_indices[:100], # Limit to first 100 | |
| 'percentage': (len(outlier_indices) / len(numeric_df)) * 100 | |
| } | |
| return outliers | |
| def generate_visualization(df, chart_type, x_column, y_column=None, group_column=None): | |
| """Generate visualization and return base64 encoded image""" | |
| plt.figure(figsize=(10, 6)) | |
| try: | |
| if chart_type == 'histogram': | |
| plt.hist(df[x_column], bins=30, alpha=0.7) | |
| plt.xlabel(x_column) | |
| plt.ylabel('Frequency') | |
| plt.title(f'Histogram of {x_column}') | |
| elif chart_type == 'scatter': | |
| if not y_column: | |
| raise ValueError("Y column required for scatter plot") | |
| plt.scatter(df[x_column], df[y_column], alpha=0.6) | |
| plt.xlabel(x_column) | |
| plt.ylabel(y_column) | |
| plt.title(f'{x_column} vs {y_column}') | |
| elif chart_type == 'bar': | |
| if group_column: | |
| grouped = df.groupby(group_column)[x_column].mean() if pd.api.types.is_numeric_dtype(df[x_column]) else df[group_column].value_counts() | |
| else: | |
| grouped = df[x_column].value_counts().head(20) | |
| grouped.plot(kind='bar') | |
| plt.xlabel(group_column or x_column) | |
| plt.ylabel('Count' if not pd.api.types.is_numeric_dtype(df[x_column]) else f'Mean {x_column}') | |
| plt.title(f'Bar Chart') | |
| plt.xticks(rotation=45) | |
| elif chart_type == 'line': | |
| if y_column: | |
| plt.plot(df[x_column], df[y_column]) | |
| plt.xlabel(x_column) | |
| plt.ylabel(y_column) | |
| else: | |
| df[x_column].plot() | |
| plt.ylabel(x_column) | |
| plt.title('Line Chart') | |
| elif chart_type == 'box': | |
| if group_column: | |
| df.boxplot(column=x_column, by=group_column) | |
| else: | |
| df.boxplot(column=x_column) | |
| plt.title('Box Plot') | |
| plt.tight_layout() | |
| # Convert plot to base64 string | |
| img_buffer = io.BytesIO() | |
| plt.savefig(img_buffer, format='png', dpi=150, bbox_inches='tight') | |
| img_buffer.seek(0) | |
| img_base64 = base64.b64encode(img_buffer.getvalue()).decode() | |
| plt.close() | |
| return img_base64 | |
| except Exception as e: | |
| plt.close() | |
| raise Exception(f"Error generating visualization: {str(e)}") | |
| def parse_natural_language_query(query, df_columns): | |
| """Simple natural language query parser""" | |
| query_lower = query.lower() | |
| # Define operation keywords | |
| operations = { | |
| 'average': 'mean', 'mean': 'mean', 'avg': 'mean', | |
| 'sum': 'sum', 'total': 'sum', | |
| 'count': 'count', 'number': 'count', | |
| 'max': 'max', 'maximum': 'max', 'highest': 'max', | |
| 'min': 'min', 'minimum': 'min', 'lowest': 'min' | |
| } | |
| # Find operation | |
| operation = 'mean' # default | |
| for keyword, op in operations.items(): | |
| if keyword in query_lower: | |
| operation = op | |
| break | |
| # Find columns mentioned in query | |
| mentioned_columns = [col for col in df_columns if col.lower() in query_lower] | |
| # Simple parsing patterns | |
| if 'by' in query_lower and len(mentioned_columns) >= 2: | |
| # Group by analysis | |
| target_col = mentioned_columns[0] | |
| group_col = mentioned_columns[-1] | |
| return { | |
| 'analysisType': 'groupby', | |
| 'parameters': { | |
| 'groupByColumn': group_col, | |
| 'targetColumn': target_col, | |
| 'operation': operation | |
| } | |
| } | |
| elif 'correlation' in query_lower: | |
| return { | |
| 'analysisType': 'correlation', | |
| 'parameters': { | |
| 'columns': mentioned_columns if mentioned_columns else None | |
| } | |
| } | |
| elif any(word in query_lower for word in ['chart', 'plot', 'graph', 'visualize']): | |
| chart_type = 'bar' # default | |
| if 'scatter' in query_lower: | |
| chart_type = 'scatter' | |
| elif 'line' in query_lower: | |
| chart_type = 'line' | |
| elif 'histogram' in query_lower: | |
| chart_type = 'histogram' | |
| return { | |
| 'analysisType': 'visualization', | |
| 'parameters': { | |
| 'chartType': chart_type, | |
| 'xColumn': mentioned_columns[0] if mentioned_columns else None, | |
| 'yColumn': mentioned_columns[1] if len(mentioned_columns) > 1 else None | |
| } | |
| } | |
| else: | |
| # Default to basic statistics | |
| return { | |
| 'analysisType': 'statistics', | |
| 'parameters': { | |
| 'columns': mentioned_columns if mentioned_columns else None | |
| } | |
| } | |
| def health_check(): | |
| return jsonify({'status': 'healthy', 'timestamp': datetime.now().isoformat()}) | |
| def upload_file(): | |
| try: | |
| if 'file' not in request.files: | |
| return jsonify({'error': 'No file provided'}), 400 | |
| file = request.files['file'] | |
| session_id = request.form.get('sessionId') | |
| if not session_id: | |
| return jsonify({'error': 'Session ID required'}), 400 | |
| if file.filename == '': | |
| return jsonify({'error': 'No file selected'}), 400 | |
| if not allowed_file(file.filename): | |
| return jsonify({'error': 'File type not supported'}), 400 | |
| # Check file size | |
| file.seek(0, 2) # Seek to end | |
| file_size = file.tell() | |
| file.seek(0) # Reset to beginning | |
| if file_size > MAX_FILE_SIZE: | |
| return jsonify({'error': f'File too large. Maximum size is {MAX_FILE_SIZE // (1024*1024)}MB'}), 400 | |
| # Generate unique file ID and secure filename | |
| file_id = str(uuid.uuid4()) | |
| filename = secure_filename(file.filename) | |
| # Create session directory | |
| session_dir = os.path.join(UPLOAD_FOLDER, session_id) | |
| os.makedirs(session_dir, exist_ok=True) | |
| # Save file | |
| filepath = os.path.join(session_dir, f"{file_id}_{filename}") | |
| file.save(filepath) | |
| # Store file info | |
| if session_id not in file_storage: | |
| file_storage[session_id] = {} | |
| file_storage[session_id][file_id] = { | |
| 'filename': filename, | |
| 'filepath': filepath, | |
| 'size': file_size, | |
| 'timestamp': datetime.now().isoformat() | |
| } | |
| return jsonify({ | |
| 'fileId': file_id, | |
| 'filename': filename, | |
| 'size': file_size, | |
| 'message': 'File uploaded successfully' | |
| }) | |
| except Exception as e: | |
| logger.error(f"Upload error: {str(e)}") | |
| return jsonify({'error': str(e)}), 500 | |
| def preview_file(file_id): | |
| try: | |
| session_id = request.args.get('sessionId') | |
| if not session_id or session_id not in file_storage: | |
| return jsonify({'error': 'Invalid session'}), 400 | |
| if file_id not in file_storage[session_id]: | |
| return jsonify({'error': 'File not found'}), 404 | |
| file_info = file_storage[session_id][file_id] | |
| # Load data and get preview | |
| df = load_data_file(file_info['filepath'], file_info['filename']) | |
| preview_data = { | |
| 'columns': df.columns.tolist(), | |
| 'dtypes': df.dtypes.astype(str).to_dict(), | |
| 'shape': df.shape, | |
| 'head': df.head(5).to_dict('records'), | |
| 'missing_values': df.isnull().sum().to_dict() | |
| } | |
| return jsonify(preview_data) | |
| except Exception as e: | |
| logger.error(f"Preview error: {str(e)}") | |
| return jsonify({'error': str(e)}), 500 | |
| def analyze_data(): | |
| try: | |
| data = request.get_json() | |
| session_id = data.get('sessionId') | |
| file_id = data.get('fileId') | |
| analysis_type = data.get('analysisType') | |
| parameters = data.get('parameters', {}) | |
| natural_query = data.get('naturalQuery') | |
| if not all([session_id, file_id]): | |
| return jsonify({'error': 'Session ID and File ID required'}), 400 | |
| if session_id not in file_storage or file_id not in file_storage[session_id]: | |
| return jsonify({'error': 'File not found'}), 404 | |
| file_info = file_storage[session_id][file_id] | |
| df = load_data_file(file_info['filepath'], file_info['filename']) | |
| # Handle natural language query | |
| if natural_query and not analysis_type: | |
| parsed_query = parse_natural_language_query(natural_query, df.columns.tolist()) | |
| analysis_type = parsed_query['analysisType'] | |
| parameters = parsed_query['parameters'] | |
| result = {} | |
| if analysis_type == 'statistics': | |
| result = perform_basic_statistics(df, parameters.get('columns')) | |
| elif analysis_type == 'groupby': | |
| result = perform_groupby_analysis( | |
| df, | |
| parameters.get('groupByColumn'), | |
| parameters.get('targetColumn'), | |
| parameters.get('operation', 'mean'), | |
| parameters.get('filters') | |
| ) | |
| elif analysis_type == 'correlation': | |
| result = perform_correlation_analysis( | |
| df, | |
| parameters.get('columns'), | |
| parameters.get('method', 'pearson') | |
| ) | |
| elif analysis_type == 'outliers': | |
| result = detect_outliers( | |
| df, | |
| parameters.get('columns'), | |
| parameters.get('method', 'iqr') | |
| ) | |
| elif analysis_type == 'visualization': | |
| chart_base64 = generate_visualization( | |
| df, | |
| parameters.get('chartType', 'bar'), | |
| parameters.get('xColumn'), | |
| parameters.get('yColumn'), | |
| parameters.get('groupColumn') | |
| ) | |
| result = { | |
| 'chart': chart_base64, | |
| 'chartType': parameters.get('chartType', 'bar') | |
| } | |
| else: | |
| return jsonify({'error': 'Invalid analysis type'}), 400 | |
| # Save result to processed folder | |
| result_id = str(uuid.uuid4()) | |
| result_dir = os.path.join(PROCESSED_FOLDER, session_id) | |
| os.makedirs(result_dir, exist_ok=True) | |
| result_filepath = os.path.join(result_dir, f"{result_id}_result.json") | |
| with open(result_filepath, 'w') as f: | |
| json.dump(result, f, indent=2, default=str) | |
| return jsonify({ | |
| 'resultId': result_id, | |
| 'result': result, | |
| 'analysisType': analysis_type, | |
| 'timestamp': datetime.now().isoformat() | |
| }) | |
| except Exception as e: | |
| logger.error(f"Analysis error: {str(e)}") | |
| return jsonify({'error': str(e)}), 500 | |
| def list_files(session_id): | |
| try: | |
| if session_id not in file_storage: | |
| return jsonify({'files': []}) | |
| files = [] | |
| for file_id, file_info in file_storage[session_id].items(): | |
| # Check if file still exists | |
| if os.path.exists(file_info['filepath']): | |
| files.append({ | |
| 'fileId': file_id, | |
| 'filename': file_info['filename'], | |
| 'size': file_info['size'], | |
| 'timestamp': file_info['timestamp'] | |
| }) | |
| return jsonify({'files': files}) | |
| except Exception as e: | |
| logger.error(f"List files error: {str(e)}") | |
| return jsonify({'error': str(e)}), 500 | |
| def delete_file(file_id): | |
| try: | |
| session_id = request.args.get('sessionId') | |
| if not session_id or session_id not in file_storage: | |
| return jsonify({'error': 'Invalid session'}), 400 | |
| if file_id not in file_storage[session_id]: | |
| return jsonify({'error': 'File not found'}), 404 | |
| file_info = file_storage[session_id][file_id] | |
| # Remove file from filesystem | |
| if os.path.exists(file_info['filepath']): | |
| os.remove(file_info['filepath']) | |
| # Remove from storage | |
| del file_storage[session_id][file_id] | |
| return jsonify({'message': 'File deleted successfully'}) | |
| except Exception as e: | |
| logger.error(f"Delete error: {str(e)}") | |
| return jsonify({'error': str(e)}), 500 | |
| def download_result(result_id): | |
| try: | |
| session_id = request.args.get('sessionId') | |
| format_type = request.args.get('format', 'json') | |
| if not session_id: | |
| return jsonify({'error': 'Session ID required'}), 400 | |
| result_filepath = os.path.join(PROCESSED_FOLDER, session_id, f"{result_id}_result.json") | |
| if not os.path.exists(result_filepath): | |
| return jsonify({'error': 'Result not found'}), 404 | |
| if format_type == 'json': | |
| return send_file(result_filepath, as_attachment=True, | |
| download_name=f"analysis_result_{result_id}.json") | |
| else: | |
| return jsonify({'error': 'Format not supported'}), 400 | |
| except Exception as e: | |
| logger.error(f"Download error: {str(e)}") | |
| return jsonify({'error': str(e)}), 500 | |
| def home(): | |
| return jsonify({ | |
| 'message': 'Data Analytics API is running!', | |
| 'version': '1.0.0', | |
| 'endpoints': { | |
| 'health': '/api/health', | |
| 'upload': '/api/upload', | |
| 'preview': '/api/preview/<file_id>', | |
| 'analyze': '/api/analyze', | |
| 'files': '/api/files/<session_id>', | |
| 'delete': '/api/file/<file_id>', | |
| 'download': '/api/download/<result_id>' | |
| }, | |
| 'timestamp': datetime.now().isoformat() | |
| }) | |
| def not_found(error): | |
| return jsonify({ | |
| 'error': 'Endpoint not found', | |
| 'message': 'Please check the API documentation', | |
| 'available_endpoints': [ | |
| '/', | |
| '/api/health', | |
| '/api/upload', | |
| '/api/preview/<file_id>', | |
| '/api/analyze', | |
| '/api/files/<session_id>', | |
| '/api/file/<file_id>', | |
| '/api/download/<result_id>' | |
| ] | |
| }), 404 | |
| if __name__ == '__main__': | |
| app.run(host='0.0.0.0', port=7860, debug=True) |