3v324v23's picture
Initial commit with enhancements
aacd740
import os
import json
import sqlite3
import datetime
import requests
from flask import Flask, render_template, request, jsonify, g
from flask_cors import CORS
app = Flask(__name__)
CORS(app)
# Configuration
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max upload
SILICONFLOW_API_KEY = os.environ.get("SILICONFLOW_API_KEY", "sk-vimuseiptfbomzegyuvmebjzooncsqbyjtlddrfodzcdskgi")
DB_NAME = "clawbot_assets.db"
# Ensure instance folder exists
os.makedirs(app.instance_path, exist_ok=True)
DB_PATH = os.path.join(app.instance_path, DB_NAME)
def get_db():
db = getattr(g, '_database', None)
if db is None:
db = g._database = sqlite3.connect(DB_PATH)
db.row_factory = sqlite3.Row
return db
@app.teardown_appcontext
def close_connection(exception):
db = getattr(g, '_database', None)
if db is not None:
db.close()
def init_db():
with app.app_context():
db = get_db()
cursor = db.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS motions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
description TEXT,
joints TEXT NOT NULL, -- JSON string: {base, shoulder, elbow, claw}
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Add some demo data if empty
cursor.execute('SELECT count(*) FROM motions')
if cursor.fetchone()[0] == 0:
demos = [
('Home Position', 'Reset to safe start', json.dumps({'base': 90, 'shoulder': 90, 'elbow': 90, 'claw': 0})),
('Grab Low', 'Pick up object from floor', json.dumps({'base': 90, 'shoulder': 45, 'elbow': 135, 'claw': 100})),
('Inspect High', 'Lift object for inspection', json.dumps({'base': 90, 'shoulder': 135, 'elbow': 45, 'claw': 100}))
]
cursor.executemany('INSERT INTO motions (name, description, joints) VALUES (?, ?, ?)', demos)
db.commit()
@app.route('/')
def index():
return render_template('index.html')
@app.route('/api/status')
def status():
# Mock telemetry for "Reliability" monitoring
import random
return jsonify({
'motor_temp': [random.randint(40, 65) for _ in range(4)],
'power_draw': random.randint(120, 200),
'connection': 'ONLINE',
'uptime': '48h 12m'
})
@app.route('/api/motions', methods=['GET', 'POST'])
def handle_motions():
db = get_db()
if request.method == 'POST':
data = request.json
name = data.get('name', 'Untitled Motion')
desc = data.get('description', '')
joints = json.dumps(data.get('joints', {}))
cursor = db.cursor()
cursor.execute('INSERT INTO motions (name, description, joints) VALUES (?, ?, ?)', (name, desc, joints))
db.commit()
return jsonify({'status': 'success', 'id': cursor.lastrowid})
else:
cursor = db.cursor()
cursor.execute('SELECT * FROM motions ORDER BY created_at DESC')
rows = cursor.fetchall()
return jsonify([dict(row) for row in rows])
@app.route('/api/upload', methods=['POST'])
def upload_file():
if 'file' not in request.files:
return jsonify({'error': 'No file part'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': 'No selected file'}), 400
try:
if file.filename.endswith('.json'):
content = json.load(file)
if isinstance(content, list):
db = get_db()
cursor = db.cursor()
for item in content:
name = item.get('name', 'Imported Motion')
desc = item.get('description', '')
# Handle if joints is dict or string
joints_val = item.get('joints', {})
if isinstance(joints_val, dict):
joints = json.dumps(joints_val)
else:
joints = joints_val
cursor.execute('INSERT INTO motions (name, description, joints) VALUES (?, ?, ?)', (name, desc, joints))
db.commit()
return jsonify({'status': 'success', 'count': len(content)})
else:
return jsonify({'error': 'Invalid JSON format, expected list'}), 400
return jsonify({'error': 'Unsupported file type'}), 400
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/export', methods=['GET'])
def export_data():
db = get_db()
cursor = db.cursor()
cursor.execute('SELECT name, description, joints, created_at FROM motions')
rows = cursor.fetchall()
data = []
for row in rows:
d = dict(row)
# Ensure joints is parsed back to dict for JSON export, or keep as string?
# Better to keep as valid JSON object
try:
d['joints'] = json.loads(d['joints'])
except:
pass
data.append(d)
return jsonify(data)
@app.route('/api/analyze', methods=['POST'])
def analyze_command():
data = request.json
command = data.get('command', '')
if not command:
return jsonify({'error': 'No command provided'}), 400
# Call SiliconFlow
headers = {
"Authorization": f"Bearer {SILICONFLOW_API_KEY}",
"Content-Type": "application/json"
}
system_prompt = """
You are the Kinematic Controller for a 4-DOF Robot Arm.
Joint Limits:
- Base: 0-180 (0=Left, 180=Right, 90=Center)
- Shoulder: 0-180 (0=Down, 180=Up)
- Elbow: 0-180 (0=Folded, 180=Extended)
- Claw: 0-100 (0=Open, 100=Closed)
Output JSON ONLY containing these 4 keys and an 'explanation'.
Example: {"base": 90, "shoulder": 45, "elbow": 120, "claw": 100, "explanation": "Moving to lower center to grab."}
"""
payload = {
"model": "Qwen/Qwen2.5-7B-Instruct",
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": f"Command: {command}"}
],
"response_format": {"type": "json_object"}
}
try:
response = requests.post(
"https://api.siliconflow.cn/v1/chat/completions",
json=payload,
headers=headers,
timeout=10
)
response.raise_for_status()
result = response.json()
content = result['choices'][0]['message']['content']
parsed = json.loads(content)
return jsonify(parsed)
except Exception as e:
print(f"AI Error: {e}")
# Fallback Mock if AI fails
return jsonify({
'base': 90, 'shoulder': 90, 'elbow': 90, 'claw': 0,
'explanation': f"AI Connection Failed. Resetting to Home. Error: {str(e)}"
})
if __name__ == '__main__':
init_db()
app.run(host='0.0.0.0', port=7860, debug=True)