Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| VM Data Server | |
| Exposes local trading bot data via API for dashboard consumption | |
| Runs on the VM alongside your trading bot | |
| """ | |
| import os | |
| import json | |
| import pandas as pd | |
| from datetime import datetime, timedelta | |
| from flask import Flask, jsonify, request | |
| from flask_cors import CORS | |
| import logging | |
| app = Flask(__name__) | |
| CORS(app) # Allow dashboard to connect from anywhere | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # File paths | |
| PORTFOLIO_FILE = 'portfolio.txt' | |
| NEW_TICKERS_LOG_FILE = 'new_tickers_log.csv' | |
| SCRIPT_LOG_FILE = 'script.log' | |
| BUY_QUEUE_FILE = 'buy_queue.json' | |
| CURRENT_TICKERS_FILE = 'current_tickers.txt' | |
| def load_portfolio_data(): | |
| """Load portfolio CSV data""" | |
| try: | |
| if os.path.exists(PORTFOLIO_FILE): | |
| df = pd.read_csv(PORTFOLIO_FILE) | |
| return df.to_dict('records') | |
| return [] | |
| except Exception as e: | |
| logger.error(f"Error loading portfolio: {e}") | |
| return [] | |
| def load_new_tickers_with_decisions(): | |
| """Load IPO discoveries with investment decisions""" | |
| try: | |
| if not os.path.exists(NEW_TICKERS_LOG_FILE): | |
| return [] | |
| df = pd.read_csv(NEW_TICKERS_LOG_FILE) | |
| portfolio_data = load_portfolio_data() | |
| # Get symbols we actually invested in | |
| invested_symbols = set() | |
| for trade in portfolio_data: | |
| invested_symbols.add(trade.get('symbol', '')) | |
| # Add investment decision to each IPO | |
| enriched_ipos = [] | |
| for _, row in df.iterrows(): | |
| symbol = row.get('Symbol', '') | |
| security_type = row.get('Security_Type', '') | |
| # Determine investment status | |
| if symbol in invested_symbols: | |
| investment_status = 'INVESTED' | |
| status_color = 'success' | |
| status_emoji = 'π’' | |
| elif security_type == 'CS': | |
| investment_status = 'ELIGIBLE_NOT_INVESTED' | |
| status_color = 'warning' | |
| status_emoji = 'π‘' | |
| elif security_type in ['SP', 'WARRANT', 'UNIT']: | |
| investment_status = 'WRONG_TYPE' | |
| status_color = 'neutral' | |
| status_emoji = 'βͺ' | |
| else: | |
| investment_status = 'UNKNOWN' | |
| status_color = 'error' | |
| status_emoji = 'π΄' | |
| enriched_ipos.append({ | |
| 'symbol': symbol, | |
| 'security_type': security_type, | |
| 'trading_price': row.get('Trading_Price', 'N/A'), | |
| 'detected_at': row.get('Detected_At', 'N/A'), | |
| 'investment_status': investment_status, | |
| 'status_color': status_color, | |
| 'status_emoji': status_emoji | |
| }) | |
| # Sort by detection date (newest first) | |
| enriched_ipos.sort(key=lambda x: x['detected_at'], reverse=True) | |
| return enriched_ipos | |
| except Exception as e: | |
| logger.error(f"Error loading IPO data: {e}") | |
| return [] | |
| def load_script_logs(lines=100): | |
| """Load recent script logs""" | |
| try: | |
| if not os.path.exists(SCRIPT_LOG_FILE): | |
| return [] | |
| with open(SCRIPT_LOG_FILE, 'r') as f: | |
| all_lines = f.readlines() | |
| # Get recent lines | |
| recent_lines = all_lines[-lines:] if len(all_lines) > lines else all_lines | |
| # Parse log lines | |
| logs = [] | |
| for line in recent_lines: | |
| line = line.strip() | |
| if line: | |
| # Try to parse timestamp and level | |
| parts = line.split(' - ', 2) | |
| if len(parts) >= 3: | |
| timestamp = parts[0] | |
| level = parts[1] | |
| message = parts[2] | |
| # Determine log type for color coding | |
| if 'ERROR' in level: | |
| log_type = 'error' | |
| emoji = 'π΄' | |
| elif 'WARNING' in level or 'WARN' in level: | |
| log_type = 'warning' | |
| emoji = 'π‘' | |
| elif 'Buy order placed' in message or 'Sold' in message: | |
| log_type = 'trade' | |
| emoji = 'π°' | |
| elif 'Found' in message and 'new ticker' in message: | |
| log_type = 'discovery' | |
| emoji = 'π' | |
| elif 'INFO' in level: | |
| log_type = 'info' | |
| emoji = 'π΅' | |
| else: | |
| log_type = 'default' | |
| emoji = 'βͺ' | |
| logs.append({ | |
| 'timestamp': timestamp, | |
| 'level': level, | |
| 'message': message, | |
| 'log_type': log_type, | |
| 'emoji': emoji, | |
| 'full_line': line | |
| }) | |
| else: | |
| # Fallback for unparseable lines | |
| logs.append({ | |
| 'timestamp': 'N/A', | |
| 'level': 'RAW', | |
| 'message': line, | |
| 'log_type': 'default', | |
| 'emoji': 'βͺ', | |
| 'full_line': line | |
| }) | |
| return logs | |
| except Exception as e: | |
| logger.error(f"Error loading logs: {e}") | |
| return [] | |
| def load_buy_queue(): | |
| """Load current buy queue""" | |
| try: | |
| if os.path.exists(BUY_QUEUE_FILE): | |
| with open(BUY_QUEUE_FILE, 'r') as f: | |
| return json.load(f) | |
| return [] | |
| except Exception as e: | |
| logger.error(f"Error loading buy queue: {e}") | |
| return [] | |
| def get_system_stats(): | |
| """Get system statistics""" | |
| try: | |
| portfolio_data = load_portfolio_data() | |
| ipo_data = load_new_tickers_with_decisions() | |
| # Calculate stats | |
| total_ipos_detected = len(ipo_data) | |
| ipos_invested = len([ipo for ipo in ipo_data if ipo['investment_status'] == 'INVESTED']) | |
| current_positions = len(portfolio_data) | |
| # Get detection stats by type | |
| cs_stocks = len([ipo for ipo in ipo_data if ipo['security_type'] == 'CS']) | |
| other_types = total_ipos_detected - cs_stocks | |
| return { | |
| 'total_ipos_detected': total_ipos_detected, | |
| 'ipos_invested': ipos_invested, | |
| 'current_positions': current_positions, | |
| 'cs_stocks_detected': cs_stocks, | |
| 'other_types_detected': other_types, | |
| 'investment_rate': round((ipos_invested / cs_stocks * 100) if cs_stocks > 0 else 0, 1), | |
| 'last_updated': datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
| } | |
| except Exception as e: | |
| logger.error(f"Error calculating stats: {e}") | |
| return {} | |
| # API Endpoints | |
| def health_check(): | |
| """Health check endpoint""" | |
| return jsonify({'status': 'healthy', 'timestamp': datetime.now().isoformat()}) | |
| def get_portfolio(): | |
| """Get portfolio data""" | |
| return jsonify(load_portfolio_data()) | |
| def get_ipos(): | |
| """Get IPO discoveries with investment decisions""" | |
| limit = request.args.get('limit', 50, type=int) | |
| ipos = load_new_tickers_with_decisions() | |
| return jsonify(ipos[:limit]) | |
| def get_logs(): | |
| """Get script logs""" | |
| lines = request.args.get('lines', 100, type=int) | |
| logs = load_script_logs(lines) | |
| return jsonify(logs) | |
| def get_buy_queue(): | |
| """Get current buy queue""" | |
| return jsonify(load_buy_queue()) | |
| def get_stats(): | |
| """Get system statistics""" | |
| return jsonify(get_system_stats()) | |
| def get_raw_logs(): | |
| """Get raw log file content""" | |
| lines = request.args.get('lines', 200, type=int) | |
| try: | |
| if not os.path.exists(SCRIPT_LOG_FILE): | |
| return jsonify({'content': 'No log file found'}) | |
| with open(SCRIPT_LOG_FILE, 'r') as f: | |
| all_lines = f.readlines() | |
| recent_lines = all_lines[-lines:] if len(all_lines) > lines else all_lines | |
| content = ''.join(recent_lines) | |
| return jsonify({ | |
| 'content': content, | |
| 'total_lines': len(all_lines), | |
| 'showing_lines': len(recent_lines) | |
| }) | |
| except Exception as e: | |
| return jsonify({'error': str(e), 'content': ''}) | |
| def execute_command(): | |
| """Execute a command on the VM""" | |
| try: | |
| data = request.get_json() | |
| command = data.get('command', '').strip() | |
| if not command: | |
| return jsonify({'error': 'No command provided', 'output': '', 'exit_code': 1}) | |
| # Security: only allow safe commands | |
| dangerous_commands = ['rm', 'sudo', 'passwd', 'shutdown', 'reboot', 'mkfs', 'fdisk', 'dd'] | |
| if any(cmd in command.lower() for cmd in dangerous_commands): | |
| return jsonify({ | |
| 'error': 'Command not allowed for security reasons', | |
| 'output': f"β Command '{command}' contains restricted operations", | |
| 'exit_code': 1 | |
| }) | |
| # Execute command with timeout | |
| import subprocess | |
| import os | |
| # Use current working directory (where the server was started) | |
| cwd = os.getcwd() | |
| result = subprocess.run( | |
| command, | |
| shell=True, | |
| capture_output=True, | |
| text=True, | |
| timeout=30, # 30 second timeout | |
| cwd=cwd # Run in current directory | |
| ) | |
| return jsonify({ | |
| 'output': result.stdout + result.stderr, | |
| 'exit_code': result.returncode, | |
| 'command': command | |
| }) | |
| except subprocess.TimeoutExpired: | |
| return jsonify({ | |
| 'error': 'Command timed out', | |
| 'output': 'β° Command execution timed out (30s limit)', | |
| 'exit_code': 124 | |
| }) | |
| except Exception as e: | |
| return jsonify({ | |
| 'error': str(e), | |
| 'output': f'β Error executing command: {str(e)}', | |
| 'exit_code': 1 | |
| }) | |
| if __name__ == '__main__': | |
| print("π Starting VM Data Server...") | |
| print("π‘ This exposes your trading bot data via API") | |
| print("π Dashboard can now access:") | |
| print(" β’ IPO discoveries with investment decisions") | |
| print(" β’ Raw cron logs with color coding") | |
| print(" β’ Portfolio data from VM files") | |
| print(" β’ Buy queue and system stats") | |
| print(" β’ Remote command execution") | |
| print("-" * 50) | |
| # Run on all interfaces so dashboard can connect | |
| app.run( | |
| host='0.0.0.0', # Allow external connections | |
| port=8090, # Different from dashboard port | |
| debug=False | |
| ) |