Spaces:
Paused
Paused
| from flask import Flask, render_template, request, jsonify | |
| from werkzeug.utils import secure_filename | |
| import os | |
| from parser import parse_python_code | |
| from collections import defaultdict | |
| app = Flask(__name__) | |
| UPLOAD_FOLDER = 'uploads' | |
| app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER | |
| if not os.path.exists(UPLOAD_FOLDER): | |
| os.makedirs(UPLOAD_FOLDER) | |
| def index(): | |
| return render_template('index.html') | |
| def update_node(): | |
| data = request.get_json() | |
| node_id = data.get('id') | |
| new_source = data.get('source') | |
| # Update node in stored nodes (e.g., in-memory or database) | |
| # Re-parse to validate and update connections | |
| return jsonify({'status': 'success'}) | |
| def update_program(): | |
| data = request.get_json() | |
| code = data.get('code') | |
| parts, _ = parse_python_code(code) | |
| # Update nodes and connections | |
| return jsonify({'status': 'success'}) | |
| def parse_code(): | |
| code = None | |
| if 'file' in request.files and request.files['file'].filename: | |
| file = request.files['file'] | |
| filename = secure_filename(file.filename) | |
| file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename) | |
| file.save(file_path) | |
| with open(file_path, 'r') as f: | |
| code = f.read() | |
| os.remove(file_path) | |
| elif 'code' in request.form: | |
| code = request.form['code'] | |
| if not code: | |
| return jsonify({'error': 'No code or file provided'}), 400 | |
| parts, _ = parse_python_code(code) | |
| nodes = [] | |
| connections = [] | |
| node_id_map = {} | |
| scope_positions = defaultdict(lambda: {'x': 50, 'y': 50}) # Track positions per scope | |
| for part in parts: | |
| category = part['category'] | |
| node_id = part['node_id'] | |
| source = part['source'].strip() | |
| scope = part['parent_path'].split(' -> ')[0] if ' -> ' in part['parent_path'] else 'global' | |
| level = part['level'] | |
| if category in ['function', 'assigned_variable', 'input_variable', 'returned_variable', 'import']: | |
| node_data = { | |
| 'id': len(nodes), | |
| 'type': category, | |
| 'label': node_id, | |
| 'source': source, | |
| 'x': scope_positions[scope]['x'] + level * 150, # Indent based on level | |
| 'y': scope_positions[scope]['y'], | |
| 'inputs': [], | |
| 'outputs': [], | |
| 'value': part.get('value', None) | |
| } | |
| if category == 'function': | |
| if 'def ' in source: | |
| func_name = source.split('def ')[1].split('(')[0] | |
| node_data['label'] = func_name | |
| params = source.split('(')[1].split(')')[0].split(',') | |
| params = [p.strip() for p in params if p.strip()] | |
| node_data['inputs'] = params | |
| node_data['outputs'] = ['return'] | |
| elif category == 'input_variable': | |
| var_name = source.strip().rstrip(',') | |
| node_data['label'] = var_name | |
| node_data['outputs'] = [var_name] | |
| elif category == 'assigned_variable': | |
| var_name = source.split('=')[0].strip() | |
| node_data['label'] = var_name | |
| node_data['inputs'] = ['value'] | |
| node_data['outputs'] = [var_name] | |
| if node_data['value'] and node_data['value'].isdigit(): | |
| node_data['type'] = 'number_box' | |
| elif category == 'returned_variable': | |
| var_name = source.split('return ')[1].strip() if 'return ' in source else node_id | |
| node_data['label'] = var_name | |
| node_data['inputs'] = [var_name] | |
| elif category == 'import': | |
| import_name = source.split('import ')[1].split()[0] if 'import ' in source else node_id | |
| node_data['label'] = import_name | |
| nodes.append(node_data) | |
| node_id_map[node_id] = node_data['id'] | |
| scope_positions[scope]['y'] += 100 | |
| # Create connections based on variable usage | |
| for part in parts: | |
| category = part['category'] | |
| node_id = part['node_id'] | |
| if node_id in node_id_map: | |
| scope = part['parent_path'].split(' -> ')[0] if ' -> ' in part['parent_path'] else 'global' | |
| var_defs = part.get('var_defs', {}) | |
| var_uses = part.get('var_uses', {}) | |
| if category == 'assigned_variable': | |
| var_name = part['source'].split('=')[0].strip() | |
| # Connect to nodes where this variable is used | |
| for use_node_id, use_scope in var_uses.get(var_name, []): | |
| if use_node_id in node_id_map and use_scope == scope: | |
| connections.append({ | |
| 'from': node_id_map[node_id], | |
| 'to': node_id_map[use_node_id] | |
| }) | |
| elif category == 'function': | |
| # Connect function to its input and returned variables | |
| for input_part in parts: | |
| if input_part['category'] == 'input_variable' and input_part['parent_path'].startswith(node_id): | |
| connections.append({ | |
| 'from': node_id_map[input_part['node_id']], | |
| 'to': node_id_map[node_id] | |
| }) | |
| for return_part in parts: | |
| if return_part['category'] == 'returned_variable' and return_part['parent_path'].startswith(node_id): | |
| connections.append({ | |
| 'from': node_id_map[node_id], | |
| 'to': node_id_map[return_part['node_id']] | |
| }) | |
| return jsonify({'nodes': nodes, 'connections': connections}) | |
| def save_nodes(): | |
| data = request.get_json() | |
| nodes = data.get('nodes', []) | |
| connections = data.get('connections', []) | |
| return jsonify({'status': 'success', 'nodes': nodes, 'connections': connections}) | |
| if __name__ == '__main__': | |
| app.run(host="0.0.0.0", port=7860, debug=True) |