Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import sqlite3 | |
| import datetime | |
| import requests | |
| from flask import Flask, render_template, request, jsonify, g | |
| from flask_cors import CORS | |
| app = Flask(__name__) | |
| CORS(app) | |
| # Configuration | |
| app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max upload | |
| SILICONFLOW_API_KEY = os.environ.get("SILICONFLOW_API_KEY", "sk-vimuseiptfbomzegyuvmebjzooncsqbyjtlddrfodzcdskgi") | |
| DB_NAME = "clawbot_assets.db" | |
| # Ensure instance folder exists | |
| os.makedirs(app.instance_path, exist_ok=True) | |
| DB_PATH = os.path.join(app.instance_path, DB_NAME) | |
| def get_db(): | |
| db = getattr(g, '_database', None) | |
| if db is None: | |
| db = g._database = sqlite3.connect(DB_PATH) | |
| db.row_factory = sqlite3.Row | |
| return db | |
| def close_connection(exception): | |
| db = getattr(g, '_database', None) | |
| if db is not None: | |
| db.close() | |
| def init_db(): | |
| with app.app_context(): | |
| db = get_db() | |
| cursor = db.cursor() | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS motions ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| name TEXT NOT NULL, | |
| description TEXT, | |
| joints TEXT NOT NULL, -- JSON string: {base, shoulder, elbow, claw} | |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| ''') | |
| # Add some demo data if empty | |
| cursor.execute('SELECT count(*) FROM motions') | |
| if cursor.fetchone()[0] == 0: | |
| demos = [ | |
| ('Home Position', 'Reset to safe start', json.dumps({'base': 90, 'shoulder': 90, 'elbow': 90, 'claw': 0})), | |
| ('Grab Low', 'Pick up object from floor', json.dumps({'base': 90, 'shoulder': 45, 'elbow': 135, 'claw': 100})), | |
| ('Inspect High', 'Lift object for inspection', json.dumps({'base': 90, 'shoulder': 135, 'elbow': 45, 'claw': 100})) | |
| ] | |
| cursor.executemany('INSERT INTO motions (name, description, joints) VALUES (?, ?, ?)', demos) | |
| db.commit() | |
| def index(): | |
| return render_template('index.html') | |
| def status(): | |
| # Mock telemetry for "Reliability" monitoring | |
| import random | |
| return jsonify({ | |
| 'motor_temp': [random.randint(40, 65) for _ in range(4)], | |
| 'power_draw': random.randint(120, 200), | |
| 'connection': 'ONLINE', | |
| 'uptime': '48h 12m' | |
| }) | |
| def handle_motions(): | |
| db = get_db() | |
| if request.method == 'POST': | |
| data = request.json | |
| name = data.get('name', 'Untitled Motion') | |
| desc = data.get('description', '') | |
| joints = json.dumps(data.get('joints', {})) | |
| cursor = db.cursor() | |
| cursor.execute('INSERT INTO motions (name, description, joints) VALUES (?, ?, ?)', (name, desc, joints)) | |
| db.commit() | |
| return jsonify({'status': 'success', 'id': cursor.lastrowid}) | |
| else: | |
| cursor = db.cursor() | |
| cursor.execute('SELECT * FROM motions ORDER BY created_at DESC') | |
| rows = cursor.fetchall() | |
| return jsonify([dict(row) for row in rows]) | |
| def upload_file(): | |
| if 'file' not in request.files: | |
| return jsonify({'error': 'No file part'}), 400 | |
| file = request.files['file'] | |
| if file.filename == '': | |
| return jsonify({'error': 'No selected file'}), 400 | |
| try: | |
| if file.filename.endswith('.json'): | |
| content = json.load(file) | |
| if isinstance(content, list): | |
| db = get_db() | |
| cursor = db.cursor() | |
| for item in content: | |
| name = item.get('name', 'Imported Motion') | |
| desc = item.get('description', '') | |
| # Handle if joints is dict or string | |
| joints_val = item.get('joints', {}) | |
| if isinstance(joints_val, dict): | |
| joints = json.dumps(joints_val) | |
| else: | |
| joints = joints_val | |
| cursor.execute('INSERT INTO motions (name, description, joints) VALUES (?, ?, ?)', (name, desc, joints)) | |
| db.commit() | |
| return jsonify({'status': 'success', 'count': len(content)}) | |
| else: | |
| return jsonify({'error': 'Invalid JSON format, expected list'}), 400 | |
| return jsonify({'error': 'Unsupported file type'}), 400 | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| def export_data(): | |
| db = get_db() | |
| cursor = db.cursor() | |
| cursor.execute('SELECT name, description, joints, created_at FROM motions') | |
| rows = cursor.fetchall() | |
| data = [] | |
| for row in rows: | |
| d = dict(row) | |
| # Ensure joints is parsed back to dict for JSON export, or keep as string? | |
| # Better to keep as valid JSON object | |
| try: | |
| d['joints'] = json.loads(d['joints']) | |
| except: | |
| pass | |
| data.append(d) | |
| return jsonify(data) | |
| def analyze_command(): | |
| data = request.json | |
| command = data.get('command', '') | |
| if not command: | |
| return jsonify({'error': 'No command provided'}), 400 | |
| # Call SiliconFlow | |
| headers = { | |
| "Authorization": f"Bearer {SILICONFLOW_API_KEY}", | |
| "Content-Type": "application/json" | |
| } | |
| system_prompt = """ | |
| You are the Kinematic Controller for a 4-DOF Robot Arm. | |
| Joint Limits: | |
| - Base: 0-180 (0=Left, 180=Right, 90=Center) | |
| - Shoulder: 0-180 (0=Down, 180=Up) | |
| - Elbow: 0-180 (0=Folded, 180=Extended) | |
| - Claw: 0-100 (0=Open, 100=Closed) | |
| Output JSON ONLY containing these 4 keys and an 'explanation'. | |
| Example: {"base": 90, "shoulder": 45, "elbow": 120, "claw": 100, "explanation": "Moving to lower center to grab."} | |
| """ | |
| payload = { | |
| "model": "Qwen/Qwen2.5-7B-Instruct", | |
| "messages": [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": f"Command: {command}"} | |
| ], | |
| "response_format": {"type": "json_object"} | |
| } | |
| try: | |
| response = requests.post( | |
| "https://api.siliconflow.cn/v1/chat/completions", | |
| json=payload, | |
| headers=headers, | |
| timeout=10 | |
| ) | |
| response.raise_for_status() | |
| result = response.json() | |
| content = result['choices'][0]['message']['content'] | |
| parsed = json.loads(content) | |
| return jsonify(parsed) | |
| except Exception as e: | |
| print(f"AI Error: {e}") | |
| # Fallback Mock if AI fails | |
| return jsonify({ | |
| 'base': 90, 'shoulder': 90, 'elbow': 90, 'claw': 0, | |
| 'explanation': f"AI Connection Failed. Resetting to Home. Error: {str(e)}" | |
| }) | |
| if __name__ == '__main__': | |
| init_db() | |
| app.run(host='0.0.0.0', port=7860, debug=True) | |