import os import json import sqlite3 import datetime import requests from flask import Flask, render_template, request, jsonify, g from flask_cors import CORS app = Flask(__name__) CORS(app) # Configuration app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max upload SILICONFLOW_API_KEY = os.environ.get("SILICONFLOW_API_KEY", "sk-vimuseiptfbomzegyuvmebjzooncsqbyjtlddrfodzcdskgi") DB_NAME = "clawbot_assets.db" # Ensure instance folder exists os.makedirs(app.instance_path, exist_ok=True) DB_PATH = os.path.join(app.instance_path, DB_NAME) def get_db(): db = getattr(g, '_database', None) if db is None: db = g._database = sqlite3.connect(DB_PATH) db.row_factory = sqlite3.Row return db @app.teardown_appcontext def close_connection(exception): db = getattr(g, '_database', None) if db is not None: db.close() def init_db(): with app.app_context(): db = get_db() cursor = db.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS motions ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, description TEXT, joints TEXT NOT NULL, -- JSON string: {base, shoulder, elbow, claw} created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # Add some demo data if empty cursor.execute('SELECT count(*) FROM motions') if cursor.fetchone()[0] == 0: demos = [ ('Home Position', 'Reset to safe start', json.dumps({'base': 90, 'shoulder': 90, 'elbow': 90, 'claw': 0})), ('Grab Low', 'Pick up object from floor', json.dumps({'base': 90, 'shoulder': 45, 'elbow': 135, 'claw': 100})), ('Inspect High', 'Lift object for inspection', json.dumps({'base': 90, 'shoulder': 135, 'elbow': 45, 'claw': 100})) ] cursor.executemany('INSERT INTO motions (name, description, joints) VALUES (?, ?, ?)', demos) db.commit() @app.route('/') def index(): return render_template('index.html') @app.route('/api/status') def status(): # Mock telemetry for "Reliability" monitoring import random return jsonify({ 'motor_temp': [random.randint(40, 65) for _ in range(4)], 'power_draw': random.randint(120, 200), 'connection': 'ONLINE', 'uptime': '48h 12m' }) @app.route('/api/motions', methods=['GET', 'POST']) def handle_motions(): db = get_db() if request.method == 'POST': data = request.json name = data.get('name', 'Untitled Motion') desc = data.get('description', '') joints = json.dumps(data.get('joints', {})) cursor = db.cursor() cursor.execute('INSERT INTO motions (name, description, joints) VALUES (?, ?, ?)', (name, desc, joints)) db.commit() return jsonify({'status': 'success', 'id': cursor.lastrowid}) else: cursor = db.cursor() cursor.execute('SELECT * FROM motions ORDER BY created_at DESC') rows = cursor.fetchall() return jsonify([dict(row) for row in rows]) @app.route('/api/upload', methods=['POST']) def upload_file(): if 'file' not in request.files: return jsonify({'error': 'No file part'}), 400 file = request.files['file'] if file.filename == '': return jsonify({'error': 'No selected file'}), 400 try: if file.filename.endswith('.json'): content = json.load(file) if isinstance(content, list): db = get_db() cursor = db.cursor() for item in content: name = item.get('name', 'Imported Motion') desc = item.get('description', '') # Handle if joints is dict or string joints_val = item.get('joints', {}) if isinstance(joints_val, dict): joints = json.dumps(joints_val) else: joints = joints_val cursor.execute('INSERT INTO motions (name, description, joints) VALUES (?, ?, ?)', (name, desc, joints)) db.commit() return jsonify({'status': 'success', 'count': len(content)}) else: return jsonify({'error': 'Invalid JSON format, expected list'}), 400 return jsonify({'error': 'Unsupported file type'}), 400 except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/export', methods=['GET']) def export_data(): db = get_db() cursor = db.cursor() cursor.execute('SELECT name, description, joints, created_at FROM motions') rows = cursor.fetchall() data = [] for row in rows: d = dict(row) # Ensure joints is parsed back to dict for JSON export, or keep as string? # Better to keep as valid JSON object try: d['joints'] = json.loads(d['joints']) except: pass data.append(d) return jsonify(data) @app.route('/api/analyze', methods=['POST']) def analyze_command(): data = request.json command = data.get('command', '') if not command: return jsonify({'error': 'No command provided'}), 400 # Call SiliconFlow headers = { "Authorization": f"Bearer {SILICONFLOW_API_KEY}", "Content-Type": "application/json" } system_prompt = """ You are the Kinematic Controller for a 4-DOF Robot Arm. Joint Limits: - Base: 0-180 (0=Left, 180=Right, 90=Center) - Shoulder: 0-180 (0=Down, 180=Up) - Elbow: 0-180 (0=Folded, 180=Extended) - Claw: 0-100 (0=Open, 100=Closed) Output JSON ONLY containing these 4 keys and an 'explanation'. Example: {"base": 90, "shoulder": 45, "elbow": 120, "claw": 100, "explanation": "Moving to lower center to grab."} """ payload = { "model": "Qwen/Qwen2.5-7B-Instruct", "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": f"Command: {command}"} ], "response_format": {"type": "json_object"} } try: response = requests.post( "https://api.siliconflow.cn/v1/chat/completions", json=payload, headers=headers, timeout=10 ) response.raise_for_status() result = response.json() content = result['choices'][0]['message']['content'] parsed = json.loads(content) return jsonify(parsed) except Exception as e: print(f"AI Error: {e}") # Fallback Mock if AI fails return jsonify({ 'base': 90, 'shoulder': 90, 'elbow': 90, 'claw': 0, 'explanation': f"AI Connection Failed. Resetting to Home. Error: {str(e)}" }) if __name__ == '__main__': init_db() app.run(host='0.0.0.0', port=7860, debug=True)