""" BI Storyteller Web Interface Professional HTTP server with REST API - Standard Library Only """ import json import os from http.server import HTTPServer, BaseHTTPRequestHandler from urllib.parse import urlparse, parse_qs import mimetypes from main import BIStoryteller class BIStoryteller_WebHandler(BaseHTTPRequestHandler): """HTTP request handler for BI Storyteller web interface""" def __init__(self, *args, **kwargs): self.bi = BIStoryteller() super().__init__(*args, **kwargs) def do_GET(self): """Handle GET requests""" if self.path == '/' or self.path == '/index.html': self.serve_main_page() elif self.path.startswith('/api/status'): self.get_status() else: self.send_error(404, "Page not found") def do_POST(self): """Handle POST requests""" content_length = int(self.headers.get('Content-Length', 0)) post_data = self.rfile.read(content_length) try: data = json.loads(post_data.decode('utf-8')) except: self.send_error(400, "Invalid JSON") return if self.path == '/api/set_api_key': result = self.bi.set_groq_api_key(data.get('api_key', '')) self._send_json_response(result) elif self.path == '/api/extract_variables': result = self.bi.extract_variables(data.get('business_problem', '')) self._send_json_response(result) elif self.path == '/api/generate_questionnaire': result = self.bi.generate_questionnaire( data.get('variables', []), data.get('business_problem', '') ) self._send_json_response(result) elif self.path == '/api/generate_data': result = self.bi.generate_sample_data( data.get('variables', []), data.get('sample_size', 1000) ) self._send_json_response(result) elif self.path == '/api/clean_data': result = self.bi.clean_data(data.get('data', [])) self._send_json_response(result) elif self.path == '/api/perform_eda': result = self.bi.perform_eda(data.get('data', [])) self._send_json_response(result) elif self.path == '/api/train_model': result = self.bi.train_predictive_model( data.get('data', []), data.get('algorithm', 'Random Forest') ) self._send_json_response(result) elif self.path == '/api/analyze_trends': result = self.bi.analyze_trends( data.get('data', []), data.get('time_period', 'Monthly') ) self._send_json_response(result) elif self.path == '/api/analyze_sentiment': result = self.bi.analyze_sentiment(data.get('data', [])) self._send_json_response(result) elif self.path == '/api/run_ab_test': result = self.bi.run_ab_test( data.get('data', []), data.get('test_variable', ''), data.get('success_metric', '') ) self._send_json_response(result) elif self.path == '/api/chat': result = self.bi.chat_with_data(data.get('question', '')) self._send_json_response(result) elif self.path == '/api/export': result = self.bi.export_results(data.get('filename')) self._send_json_response(result) elif self.path == '/api/import': result = self.bi.import_results(data.get('filename')) self._send_json_response(result) else: self.send_error(404, "API endpoint not found") def get_status(self): """Get current analysis status""" status = { "modules_completed": [], "current_step": 1, "total_steps": 12 } if self.bi.variables: status["modules_completed"].append("Variable Extraction") status["current_step"] = 2 if self.bi.questionnaire: status["modules_completed"].append("Questionnaire Generation") status["current_step"] = 3 if self.bi.sample_data: status["modules_completed"].append("Data Generation") status["current_step"] = 4 if self.bi.cleaned_data: status["modules_completed"].append("Data Cleaning") status["current_step"] = 5 if self.bi.eda_results: status["modules_completed"].append("EDA Analysis") status["current_step"] = 6 if self.bi.model_results: status["modules_completed"].append("Predictive Modeling") status["current_step"] = 7 if self.bi.trend_results: status["modules_completed"].append("Trend Analysis") status["current_step"] = 8 if self.bi.sentiment_results: status["modules_completed"].append("Sentiment Analysis") status["current_step"] = 9 if self.bi.ab_test_results: status["modules_completed"].append("A/B Testing") status["current_step"] = 10 if self.bi.chat_history: status["modules_completed"].append("Chat Interface") status["current_step"] = 11 self._send_json_response(status) def serve_main_page(self): """Serve the main HTML page""" html_content = """ BI Storyteller - Marketing Analysis Platform

๐Ÿš€ BI Storyteller

Marketing Analysis Automation Platform

๐Ÿ”‘ Step 1: API Key Setup (Optional)

๐Ÿ“ Step 2: Variable Extraction

๐Ÿ“‹ Step 3: Generate Questionnaire

Generate survey questions based on extracted variables.

๐Ÿ”ข Step 4: Generate Sample Data

๐Ÿงน Step 5: Clean Data

Remove outliers, handle missing values, and preprocess data.

๐Ÿ“Š Step 6: Exploratory Data Analysis

Perform statistical analysis and generate insights.

๐Ÿค– Step 7: Predictive Analytics

๐Ÿ“ˆ Step 8: Trend Analysis

๐Ÿ’ญ Step 9: Sentiment Analysis

Analyze customer feedback and sentiment patterns.

๐Ÿงช Step 10: A/B Testing

๐Ÿ’ฌ Step 11: Chat with Your Data

๐Ÿ“ค Step 12: Export Results

""" self.send_response(200) self.send_header('Content-type', 'text/html') self.end_headers() self.wfile.write(html_content.encode()) def _send_json_response(self, data): """Send JSON response""" self.send_response(200) self.send_header('Content-type', 'application/json') self.send_header('Access-Control-Allow-Origin', '*') self.end_headers() self.wfile.write(json.dumps(data).encode()) def log_message(self, format, *args): """Override to reduce log noise""" pass def start_web_server(port=8000): """Start the web server""" server_address = ('', port) httpd = HTTPServer(server_address, BIStoryteller_WebHandler) print(f"๐ŸŒ BI Storyteller Web Interface Starting...") print(f"๐Ÿ“ Server running at: http://localhost:{port}") print(f"๐Ÿ”ง Standard Library Only - No External Dependencies") print(f"โšก Ready for Marketing Analysis Automation!") print("\n" + "="*50) print("Press Ctrl+C to stop the server") print("="*50) try: httpd.serve_forever() except KeyboardInterrupt: print("\n๐Ÿ›‘ Server stopped by user") httpd.server_close() if __name__ == "__main__": start_web_server()