Spaces:
Sleeping
Sleeping
| import os | |
| import sqlite3 | |
| import json | |
| import requests | |
| import random | |
| import time | |
| from flask import Flask, render_template, jsonify, request, g | |
| app = Flask(__name__) | |
| # Configuration | |
| API_KEY = os.environ.get("SILICONFLOW_API_KEY", "sk-vimuseiptfbomzegyuvmebjzooncsqbyjtlddrfodzcdskgi") | |
| BASE_URL = "https://api.siliconflow.cn/v1/chat/completions" | |
| DB_PATH = os.path.join(app.instance_path, "stellar.db") | |
| app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max upload | |
| # Ensure instance folder exists | |
| os.makedirs(app.instance_path, exist_ok=True) | |
| def get_db(): | |
| db = getattr(g, '_database', None) | |
| if db is None: | |
| db = g._database = sqlite3.connect(DB_PATH) | |
| db.row_factory = sqlite3.Row | |
| return db | |
| def close_connection(exception): | |
| db = getattr(g, '_database', None) | |
| if db is not None: | |
| db.close() | |
| def init_db(): | |
| print("Initializing database check...", flush=True) | |
| with app.app_context(): | |
| db = get_db() | |
| # Asteroids Catalog (Candidates) | |
| db.execute(''' | |
| CREATE TABLE IF NOT EXISTS asteroids ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| name TEXT NOT NULL, | |
| type TEXT, | |
| distance_au REAL, | |
| diameter_km REAL, | |
| estimated_value_t TEXT, | |
| spectral_class TEXT, | |
| composition TEXT, | |
| status TEXT DEFAULT 'scanned' -- scanned, analyzing, claimed | |
| ) | |
| ''') | |
| # Claims / Assets | |
| db.execute(''' | |
| CREATE TABLE IF NOT EXISTS assets ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| asteroid_id INTEGER, | |
| owner TEXT DEFAULT '星际开拓者', | |
| claim_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| mining_status TEXT DEFAULT 'planning', -- planning, active, depleted | |
| yield_total REAL DEFAULT 0, | |
| FOREIGN KEY(asteroid_id) REFERENCES asteroids(id) | |
| ) | |
| ''') | |
| # Check if empty, then seed | |
| cur = db.execute('SELECT count(*) FROM asteroids') | |
| if cur.fetchone()[0] == 0: | |
| seed_data = [ | |
| ('Psyche-16', 'M-Type', 2.5, 226.0, '10,000', 'M', '铁, 镍, 黄金'), | |
| ('Eros-433', 'S-Type', 1.13, 16.8, '500', 'S', '硅酸盐, 镁'), | |
| ('Bennu-101955', 'C-Type', 0.9, 0.5, '0.8', 'C', '碳, 水冰'), | |
| ('Ryugu-162173', 'C-Type', 1.0, 0.9, '1.2', 'C', '有机化合物'), | |
| ('Vesta-4', 'V-Type', 2.36, 525.0, '2,500', 'V', '玄武岩'), | |
| ('Didymos-65803', 'S-Type', 1.64, 0.78, '0.3', 'S', '硅酸盐'), | |
| ('Apophis-99942', 'S-Type', 0.9, 0.37, '0.5', 'S', '硅酸盐, 铁'), | |
| ('Itokawa-25143', 'S-Type', 1.3, 0.35, '0.1', 'S', '球粒陨石') | |
| ] | |
| db.executemany('INSERT INTO asteroids (name, type, distance_au, diameter_km, estimated_value_t, spectral_class, composition) VALUES (?, ?, ?, ?, ?, ?, ?)', seed_data) | |
| db.commit() | |
| print("Database seeded.", flush=True) | |
| init_db() | |
| print("App loaded.", flush=True) | |
| # --- AI Integration --- | |
| def ai_analyze_asteroid(asteroid_data): | |
| """Call SiliconFlow API to analyze asteroid composition and value.""" | |
| headers = { | |
| "Authorization": f"Bearer {API_KEY}", | |
| "Content-Type": "application/json" | |
| } | |
| prompt = f""" | |
| 扮演一位资深天体地质学家和采矿经济学家。分析以下小行星数据的采矿可行性: | |
| 名称: {asteroid_data['name']} | |
| 类型: {asteroid_data['type']} (光谱类别: {asteroid_data['spectral_class']}) | |
| 直径: {asteroid_data['diameter_km']} km | |
| 成分: {asteroid_data['composition']} | |
| 请提供一份结构化的 Markdown 报告(用中文),包含: | |
| 1. **资源评估**: 关键元素及其工业用途。 | |
| 2. **开采难度**: 低/中/高,基于尺寸和成分。 | |
| 3. **经济前景**: 战略价值(预估万亿美元)。 | |
| 4. **建议**: '优先目标' 或 '观望'。 | |
| 保持简洁专业。 | |
| """ | |
| payload = { | |
| "model": "Qwen/Qwen2.5-7B-Instruct", | |
| "messages": [ | |
| {"role": "system", "content": "你是一个专门负责太空采矿分析的 AI 助手。"}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| "temperature": 0.7 | |
| } | |
| try: | |
| response = requests.post(BASE_URL, json=payload, headers=headers, timeout=10) | |
| if response.status_code == 200: | |
| return response.json()['choices'][0]['message']['content'] | |
| else: | |
| return f"**分析系统离线**: 信号微弱。(API 错误: {response.status_code})" | |
| except Exception as e: | |
| return f"**分析系统离线**: 连接超时。使用缓存的启发式数据...\n\n*模拟分析*: 目标 {asteroid_data['name']} 显示出 {asteroid_data['composition']} 的高潜力。预计产量可观。" | |
| # --- Routes --- | |
| def index(): | |
| return render_template('index.html') | |
| def get_asteroids(): | |
| db = get_db() | |
| asteroids = db.execute('SELECT * FROM asteroids').fetchall() | |
| return jsonify([dict(ix) for ix in asteroids]) | |
| def analyze_asteroid(): | |
| data = request.json | |
| if not data or 'id' not in data: | |
| return jsonify({'error': '缺少 ID 参数'}), 400 | |
| db = get_db() | |
| asteroid = db.execute('SELECT * FROM asteroids WHERE id = ?', (data['id'],)).fetchone() | |
| if not asteroid: | |
| return jsonify({'error': '未找到小行星'}), 404 | |
| # Simulate processing time | |
| time.sleep(1) | |
| report = ai_analyze_asteroid(dict(asteroid)) | |
| # Update status | |
| db.execute("UPDATE asteroids SET status = 'analyzed' WHERE id = ?", (data['id'],)) | |
| db.commit() | |
| return jsonify({'report': report}) | |
| def claim_asteroid(): | |
| data = request.json | |
| if not data or 'id' not in data: | |
| return jsonify({'error': '缺少 ID 参数'}), 400 | |
| db = get_db() | |
| # Check if already claimed | |
| exists = db.execute('SELECT id FROM assets WHERE asteroid_id = ?', (data['id'],)).fetchone() | |
| if exists: | |
| return jsonify({'error': '该目标已被占据'}), 400 | |
| db.execute('INSERT INTO assets (asteroid_id, mining_status) VALUES (?, ?)', (data['id'], 'planning')) | |
| db.execute("UPDATE asteroids SET status = 'claimed' WHERE id = ?", (data['id'],)) | |
| db.commit() | |
| return jsonify({'success': True}) | |
| def upload_data(): | |
| """ | |
| Simulate file upload/data import. | |
| In a real scenario, this would handle file saving and parsing. | |
| Here we just accept it and mock a response to satisfy the 'logic loop'. | |
| """ | |
| if 'file' not in request.files: | |
| return jsonify({'error': '未检测到文件'}), 400 | |
| file = request.files['file'] | |
| if file.filename == '': | |
| return jsonify({'error': '文件名为空'}), 400 | |
| # Mock processing | |
| filename = file.filename | |
| size = len(file.read()) | |
| return jsonify({ | |
| 'success': True, | |
| 'message': f'文件 {filename} ({size/1024:.1f} KB) 上传成功并已加入处理队列。数据已整合至数据集。' | |
| }) | |
| def get_assets(): | |
| db = get_db() | |
| query = ''' | |
| SELECT assets.*, asteroids.name, asteroids.estimated_value_t, asteroids.composition | |
| FROM assets | |
| JOIN asteroids ON assets.asteroid_id = asteroids.id | |
| ''' | |
| assets = db.execute(query).fetchall() | |
| return jsonify([dict(ix) for ix in assets]) | |
| def get_stats(): | |
| db = get_db() | |
| total_asteroids = db.execute('SELECT count(*) FROM asteroids').fetchone()[0] | |
| claimed_assets = db.execute('SELECT count(*) FROM assets').fetchone()[0] | |
| # Calculate hypothetical value (sum of string '10,000' etc is hard, doing mock count) | |
| fleet_status = {'active': 3, 'idle': 2, 'maintenance': 1} | |
| return jsonify({ | |
| 'total_targets': total_asteroids, | |
| 'claimed_assets': claimed_assets, | |
| 'fleet_status': fleet_status, | |
| 'market_index': random.randint(9000, 12000) | |
| }) | |
| if __name__ == '__main__': | |
| app.run(host='0.0.0.0', port=7860, debug=True) | |