Spaces:
Runtime error
Runtime error
| import csv | |
| import time | |
| from datetime import datetime, timedelta | |
| from ultralytics import YOLO | |
| import cv2 | |
| import mediapipe as mp | |
| import numpy as np | |
| from flask import Flask, render_template, Response, redirect, url_for, request, send_from_directory, flash | |
| import os | |
| import plotly.express as px | |
| import pandas as pd | |
| from werkzeug.utils import secure_filename | |
| import json | |
| import matplotlib.pyplot as plt | |
| import uuid | |
| import random # Make sure to import the random module | |
| from datetime import datetime # Import datetime for timestamp | |
| import string # Add this line to use string.ascii_letters and string.digits | |
| import pyaudio | |
| import wave | |
| import whisper | |
| from transformers import pipeline | |
| import csv | |
| import os | |
| import pandas as pd | |
| from flask import Flask, render_template, request, redirect, url_for, send_from_directory, flash | |
| from flask import jsonify | |
| from transformers import T5Tokenizer, T5ForConditionalGeneration | |
| import torch | |
| from openai import OpenAI | |
| from io import StringIO | |
| import re | |
| from flask import session | |
| # Flask app setup | |
| app = Flask(__name__) | |
| app.config['UPLOAD_FOLDER'] = 'uploads' | |
| app.secret_key = 'supersecretkey' | |
| # Custom function to load YOLO model safely | |
| def safe_load_yolo_model(model_path): | |
| try: | |
| return YOLO(model_path) | |
| except Exception as e: | |
| print(f"Failed to load model: {e}") | |
| raise | |
| # Load YOLO model | |
| model_path = './best.pt' | |
| model = safe_load_yolo_model(model_path) | |
| mp_hands = mp.solutions.hands | |
| mp_drawing = mp.solutions.drawing_utils | |
| mp_drawing_styles = mp.solutions.drawing_styles | |
| # Load Whisper model for speech-to-text | |
| whisper_model = whisper.load_model("base") # ✅ This is correct! | |
| # Variables to hold CSV data and other states between requests | |
| original_data = None | |
| updated_data = None | |
| csv_filename = None | |
| hands = mp_hands.Hands(static_image_mode=True, min_detection_confidence=0.3) | |
| # Initialize variables for tracking gestures | |
| previous_gesture = None | |
| gesture_start_time = None | |
| gesture_data_list = [] | |
| capture_flag = True # This flag is used to indicate when to capture | |
| start_recording_time = None # To record the start time of the session | |
| # Default labels dictionary | |
| labels_dict = {0: 'fist', 1: 'ok', 2: 'peace', 3: 'stop', 4: 'two up'} | |
| custom_labels_dict = labels_dict.copy() # To store custom labels set by user | |
| # Initialize OpenAI client | |
| client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY")) | |
| # Function to interact with OpenAI's GPT model using streaming | |
| def get_gpt_instruction_response(instruction, csv_data): | |
| messages = [ | |
| {"role": "system", "content": "You are a helpful assistant that processes CSV files."}, | |
| {"role": "user", "content": f"Here is a CSV data:\n\n{csv_data}\n\nThe user has requested the following change: {instruction}\n\nPlease process the data accordingly and return the modified CSV."} | |
| ] | |
| # Stream response from OpenAI API | |
| stream = client.chat.completions.create( | |
| model="gpt-4o-mini", # or "gpt-3.5-turbo" | |
| messages=messages, | |
| stream=True, | |
| ) | |
| response = "" | |
| for chunk in stream: | |
| if chunk.choices[0].delta.content is not None: | |
| response += chunk.choices[0].delta.content | |
| return response.strip() | |
| # Function to read CSV and convert it to string | |
| def read_csv_to_string(file_path): | |
| df = pd.read_csv(file_path) | |
| return df.to_csv(index=False) | |
| # Function to write modified CSV string to a file | |
| def write_csv_from_string(csv_string, output_file_path): | |
| with open(output_file_path, 'w') as file: | |
| file.write(csv_string) | |
| # Function to record audio | |
| def record_audio(filename, duration=10): | |
| chunk = 1024 | |
| sample_format = pyaudio.paInt16 | |
| channels = 1 | |
| fs = 44100 | |
| p = pyaudio.PyAudio() | |
| print('Recording...') | |
| stream = p.open(format=sample_format, channels=channels, rate=fs, frames_per_buffer=chunk, input=True) | |
| frames = [] | |
| for _ in range(0, int(fs / chunk * duration)): | |
| data = stream.read(chunk) | |
| frames.append(data) | |
| stream.stop_stream() | |
| stream.close() | |
| p.terminate() | |
| print('Finished recording.') | |
| wf = wave.open(filename, 'wb') | |
| wf.setnchannels(channels) | |
| wf.setsampwidth(p.get_sample_size(sample_format)) | |
| wf.setframerate(fs) | |
| wf.writeframes(b''.join(frames)) | |
| wf.close() | |
| # Function to transcribe audio using Whisper | |
| def transcribe_audio(file_path): | |
| result = whisper_model.transcribe(file_path) | |
| return result["text"] | |
| def index(): | |
| return render_template('index.html') | |
| def set_labels(): | |
| global custom_labels_dict | |
| if request.method == 'POST': | |
| custom_labels_dict[0] = request.form['label1'] | |
| custom_labels_dict[1] = request.form['label2'] | |
| custom_labels_dict[2] = request.form['label3'] | |
| custom_labels_dict[3] = request.form['label4'] | |
| custom_labels_dict[4] = request.form['label5'] | |
| # Remove empty labels | |
| custom_labels_dict = {k: v for k, v in custom_labels_dict.items() if v} | |
| return redirect(url_for('recognize')) | |
| return render_template('set_labels.html') | |
| def recognize(): | |
| return render_template('recognize.html') | |
| def video_feed(): | |
| return Response(generate_frames(), mimetype='multipart/x-mixed-replace; boundary=frame') | |
| def generate_frames(): | |
| global previous_gesture, gesture_start_time, gesture_data_list, capture_flag, start_recording_time | |
| # Initialize start recording time | |
| start_recording_time = datetime.now() | |
| cap = cv2.VideoCapture(0) | |
| while capture_flag: | |
| data_aux = [] | |
| x_ = [] | |
| y_ = [] | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| H, W, _ = frame.shape | |
| frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| results = hands.process(frame_rgb) | |
| if results.multi_hand_landmarks: | |
| for hand_landmarks in results.multi_hand_landmarks: | |
| mp_drawing.draw_landmarks( | |
| frame, | |
| hand_landmarks, | |
| mp_hands.HAND_CONNECTIONS, | |
| mp_drawing_styles.get_default_hand_landmarks_style(), | |
| mp_drawing_styles.get_default_hand_connections_style()) | |
| for hand_landmarks in results.multi_hand_landmarks: | |
| for i in range(len(hand_landmarks.landmark)): | |
| x = hand_landmarks.landmark[i].x | |
| y = hand_landmarks.landmark[i].y | |
| x_.append(x) | |
| y_.append(y) | |
| for i in range(len(hand_landmarks.landmark)): | |
| x = hand_landmarks.landmark[i].x | |
| y = hand_landmarks.landmark[i].y | |
| data_aux.append(x - min(x_)) | |
| data_aux.append(y - min(y_)) | |
| x1 = int(min(x_) * W) - 10 | |
| y1 = int(min(y_) * H) - 10 | |
| x2 = int(max(x_) * W) + 10 | |
| y2 = int(max(y_) * H) + 10 | |
| prediction = model.predict(frame, conf=0.25, iou=0.45) | |
| probs = prediction[0].probs.data.numpy() | |
| detected_gesture_index = np.argmax(probs) | |
| detected_gesture = custom_labels_dict.get(detected_gesture_index, None) | |
| if detected_gesture is None: | |
| continue | |
| # Get the current timestamp and calculate relative time from the start | |
| current_time = datetime.now() | |
| relative_time = current_time - start_recording_time | |
| # Check if the detected gesture has changed | |
| if detected_gesture != previous_gesture: | |
| # If the detected gesture has changed, calculate the duration of the previous gesture | |
| if previous_gesture is not None: | |
| gesture_end_time = relative_time.total_seconds() | |
| gesture_duration = gesture_end_time - gesture_start_time | |
| # Store the detected gesture, start time, end time, and duration in the list | |
| gesture_data_list.append([previous_gesture, gesture_start_time, gesture_end_time, round(gesture_duration, 2)]) | |
| # Update the previous gesture and its start time | |
| previous_gesture = detected_gesture | |
| gesture_start_time = relative_time.total_seconds() | |
| # Draw rectangle around the detected gesture | |
| cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 0), 4) | |
| cv2.putText(frame, detected_gesture, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 1.3, (0, 255, 0), 3, cv2.LINE_AA) | |
| ret, buffer = cv2.imencode('.jpg', frame) | |
| frame = buffer.tobytes() | |
| yield (b'--frame\r\n' | |
| b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n') | |
| cap.release() | |
| def upload_csv(): | |
| try: | |
| # Handle file upload | |
| file = request.files.get('csv_file') | |
| if file: | |
| file_path = os.path.join(app.config['UPLOAD_FOLDER'], secure_filename(file.filename)) | |
| file.save(file_path) | |
| flash("CSV file uploaded successfully!", "success") | |
| # Load the uploaded CSV file as original data | |
| original_df = pd.read_csv(file_path) | |
| original_data = original_df.to_dict('records') | |
| columns = original_df.columns.tolist() | |
| # Store the original data and file path in the session | |
| session['original_data'] = original_data | |
| session['columns'] = columns | |
| session['file_path'] = file_path | |
| else: | |
| flash("Please upload a CSV file.", "warning") | |
| except Exception as e: | |
| app.logger.error(f"Error in upload_csv route: {e}") | |
| flash("An unexpected error occurred. Please check the logs.", "danger") | |
| return redirect(url_for('edit_csv')) | |
| def edit_csv(): | |
| updated_data = None | |
| original_data = session.get('original_data', None) | |
| columns = session.get('columns', None) | |
| if request.method == 'POST': | |
| try: | |
| # Ensure a file has been uploaded | |
| file_path = session.get('file_path') | |
| if not file_path: | |
| flash("Please upload a CSV file first.", "warning") | |
| return redirect(url_for('edit_csv')) | |
| # Load the CSV data as string for processing | |
| csv_data = read_csv_to_string(file_path) | |
| # Get the instruction from the form | |
| instruction = request.form.get('transcription', "").strip() | |
| if not instruction: | |
| flash("Please provide an instruction.", "warning") | |
| return redirect(url_for('edit_csv')) | |
| # Process the CSV using OpenAI API | |
| raw_output = get_gpt_instruction_response(instruction, csv_data) | |
| # Extract and clean only the CSV part from the GPT output | |
| csv_pattern = re.compile(r"(?<=```)([\s\S]*?)(?=```)|([\s\S]*)", re.DOTALL) | |
| match = csv_pattern.search(raw_output) | |
| if match: | |
| csv_content = match.group(1) or match.group(2) | |
| csv_content = csv_content.strip() # Clean up leading/trailing spaces | |
| else: | |
| raise ValueError("No valid CSV content found in GPT output.") | |
| # Further cleaning: Remove any lines not starting with valid CSV columns | |
| csv_lines = csv_content.splitlines() | |
| cleaned_csv_lines = [ | |
| line for line in csv_lines if ',' in line and not line.startswith("Here is") | |
| ] | |
| cleaned_csv_content = "\n".join(cleaned_csv_lines) | |
| # Save the modified CSV to a file | |
| modified_file_path = os.path.join(app.config['UPLOAD_FOLDER'], 'modified_gesture_data.csv') | |
| with open(modified_file_path, 'w') as f: | |
| f.write(cleaned_csv_content) | |
| # Load the modified data | |
| updated_data = pd.read_csv(StringIO(cleaned_csv_content)).to_dict('records') | |
| # Store the updated data in the session | |
| session['updated_data'] = updated_data | |
| except Exception as e: | |
| app.logger.error(f"Error in edit_csv route: {e}") | |
| flash("An unexpected error occurred. Please check the logs.", "danger") | |
| # Load updated data from session if available | |
| updated_data = session.get('updated_data', None) | |
| return render_template('edit_csv.html', original_data=original_data, updated_data=updated_data, columns=columns) | |
| # Route: Download Modified CSV | |
| def download_csv_updated(): | |
| file_path = os.path.join(app.config['UPLOAD_FOLDER'], 'modified_gesture_data.csv') | |
| if not os.path.isfile(file_path): | |
| flash("Updated CSV file not found!", "warning") | |
| return redirect(url_for('edit_csv')) | |
| return send_from_directory(app.config['UPLOAD_FOLDER'], 'modified_gesture_data.csv', as_attachment=True) | |
| # Process uploaded audio using Whisper | |
| def process_audio(): | |
| if 'audio' not in request.files: | |
| return jsonify({'error': 'No audio file provided'}), 400 | |
| audio_file = request.files['audio'] | |
| audio_file_path = 'recorded_audio.wav' | |
| audio_file.save(audio_file_path) | |
| # Transcribe audio using Whisper | |
| transcription = transcribe_audio(audio_file_path) | |
| return jsonify({'transcription': transcription}) | |
| def data_view(): | |
| csv_file = request.args.get('csv_file', 'static/gesture_data.csv') | |
| gesture_data = load_csv_data(csv_file) | |
| df = pd.DataFrame(gesture_data, columns=['Gesture', 'Start Time', 'End Time', 'Duration']) | |
| gesture_counts = df['Gesture'].value_counts().reset_index() | |
| gesture_counts.columns = ['Gesture', 'Count'] | |
| fig = px.pie(gesture_counts, values='Count', names='Gesture', title='Gesture Distribution') | |
| html_chart = fig.to_html(full_html=False) | |
| return render_template('data.html', gesture_data=gesture_data, html_chart=html_chart) | |
| import pandas as pd | |
| from flask import render_template | |
| def datadiff(): | |
| # Load original and modified CSV files | |
| original_csv_path = os.path.join(app.config['UPLOAD_FOLDER'], 'gesture_data.csv') | |
| modified_csv_path = os.path.join(app.config['UPLOAD_FOLDER'], 'modified_gesture_data.csv') | |
| # Read the CSVs into pandas DataFrames | |
| original_csv = pd.read_csv(original_csv_path) | |
| modified_csv = pd.read_csv(modified_csv_path) | |
| # Render the datadiff.html page with the data for comparison | |
| return render_template('datadiff.html', original_csv=original_csv, modified_csv=modified_csv) | |
| def load_csv_data(file_path): | |
| gesture_data = [] | |
| with open(file_path, 'r') as csvfile: | |
| reader = csv.reader(csvfile) | |
| next(reader) | |
| for row in reader: | |
| gesture_data.append(row) | |
| return gesture_data | |
| def save_gesture_data(): | |
| global capture_flag | |
| capture_flag = False | |
| # Ensure gesture data is actually populated | |
| print("Saving gesture data:", gesture_data_list) | |
| # Ensure the static directory exists | |
| os.makedirs('static', exist_ok=True) | |
| # Save data to JSON file in Label Studio-compatible format | |
| json_file_path = os.path.join('static', 'gesture_data_labelstudio.json') | |
| save_label_studio_json(gesture_data_list, json_file_path) | |
| # Save data to CSV file for visualization | |
| csv_file_path = os.path.join('static', 'gesture_data.csv') | |
| save_gesture_csv(gesture_data_list, csv_file_path) | |
| return redirect(url_for('data')) | |
| import random # Make sure to import the random module | |
| import uuid # Make sure to import uuid for unique IDs | |
| from datetime import datetime # Import datetime for timestamp | |
| def generate_alphanumeric_id(length=5): | |
| """Generates a random alphanumeric ID.""" | |
| return ''.join(random.choices(string.ascii_letters + string.digits, k=length)) | |
| def save_label_studio_json(gesture_data, file_path): | |
| current_time = datetime.utcnow().isoformat() + "Z" | |
| # Create a single task with all annotations | |
| annotations = { | |
| "id": 1, # Task ID | |
| "annotations": [ | |
| { | |
| "id": 1, # Annotation ID | |
| "completed_by": 1, | |
| "result": [], | |
| "was_cancelled": False, | |
| "ground_truth": False, | |
| "created_at": current_time, | |
| "updated_at": current_time, | |
| "draft_created_at": current_time, | |
| "lead_time": sum(duration for _, _, _, duration in gesture_data), | |
| "prediction": {}, | |
| "result_count": 0, | |
| "unique_id": str(uuid.uuid4()), | |
| "import_id": None, | |
| "last_action": None, | |
| "task": 1, | |
| "project": 25, | |
| "updated_by": 1, | |
| "parent_prediction": None, | |
| "parent_annotation": None, | |
| "last_created_by": None | |
| } | |
| ], | |
| "file_upload": "1212df4d-HandyLabels.MP4", | |
| "drafts": [], | |
| "predictions": [], | |
| "data": { | |
| "video_url": "/data/upload/30/030cca83-Video_1.mp4" | |
| }, | |
| "meta": {}, | |
| "created_at": current_time, | |
| "updated_at": current_time, | |
| "inner_id": 1, | |
| "total_annotations": 1, | |
| "cancelled_annotations": 0, | |
| "total_predictions": 0, | |
| "comment_count": 0, | |
| "unresolved_comment_count": 0, | |
| "last_comment_updated_at": None, | |
| "project": 25, | |
| "updated_by": 1, | |
| "comment_authors": [] | |
| } | |
| # Add each gesture as an individual result within the annotation | |
| for gesture, start_time, end_time, duration in gesture_data: | |
| annotation_result = { | |
| "original_length": end_time - start_time, | |
| "value": { | |
| "start": start_time, | |
| "end": end_time, | |
| "channel": 0, | |
| "labels": [gesture] | |
| }, | |
| "id": generate_alphanumeric_id(), # Generate a unique 5-character alphanumeric ID for each result | |
| "from_name": "tricks", | |
| "to_name": "audio", | |
| "type": "labels", | |
| "origin": "manual" | |
| } | |
| annotations["annotations"][0]["result"].append(annotation_result) | |
| # Save the consolidated JSON to the file | |
| with open(file_path, 'w') as json_file: | |
| json.dump([annotations], json_file, indent=2) | |
| print(f"Label Studio JSON saved to: {file_path}") | |
| def save_gesture_csv(gesture_data, file_path): | |
| with open(file_path, 'w', newline='') as csvfile: | |
| writer = csv.writer(csvfile) | |
| writer.writerow(['Gesture', 'Start Time', 'End Time', 'Duration']) | |
| for gesture, start_time, end_time, duration in gesture_data: | |
| writer.writerow([gesture, start_time, end_time, duration]) | |
| def data(): | |
| gesture_data = load_csv_data() | |
| # Convert to DataFrame for easier manipulation | |
| df = pd.DataFrame(gesture_data, columns=['Gesture', 'Start Time', 'End Time', 'Duration']) | |
| # Count occurrences of each gesture | |
| gesture_counts = df['Gesture'].value_counts().reset_index() | |
| gesture_counts.columns = ['Gesture', 'Count'] | |
| # Create the pie chart using Plotly | |
| fig = px.pie(gesture_counts, values='Count', names='Gesture', title='Gesture Distribution') | |
| # Convert the plotly chart to HTML | |
| html_chart = fig.to_html(full_html=False) | |
| return render_template('data.html', gesture_data=gesture_data, html_chart=html_chart) | |
| def load_csv_data(): | |
| gesture_data = [] | |
| with open('static/gesture_data.csv', 'r') as csvfile: | |
| reader = csv.reader(csvfile) | |
| next(reader) # Skip the header row | |
| for row in reader: | |
| gesture_data.append(row) | |
| return gesture_data | |
| def download_json(): | |
| file_path = os.path.join('static', 'gesture_data_labelstudio.json') | |
| if not os.path.isfile(file_path): | |
| return "JSON file not found!", 404 | |
| return send_from_directory('static', 'gesture_data_labelstudio.json', as_attachment=True) | |
| def download_csv(): | |
| filename = request.args.get('filename') | |
| if filename == 'original': | |
| path = os.path.join(app.config['UPLOAD_FOLDER'], 'gesture_data.csv') | |
| elif filename == 'updated': | |
| path = os.path.join(app.config['UPLOAD_FOLDER'], 'modified_gesture_data.csv') | |
| else: | |
| flash('Invalid file requested') | |
| return redirect(url_for('edit_csv')) | |
| if not os.path.exists(path): | |
| flash('File not found!') | |
| return redirect(url_for('edit_csv')) | |
| return send_from_directory(app.config['UPLOAD_FOLDER'], os.path.basename(path), as_attachment=True) | |
| # New route to download the modified CSV | |
| def download_csv_modified(): | |
| file_path = os.path.join(app.config['UPLOAD_FOLDER'], 'modified_gesture_data.csv') | |
| if not os.path.isfile(file_path): | |
| return "Modified CSV file not found!", 404 | |
| return send_from_directory(app.config['UPLOAD_FOLDER'], 'modified_gesture_data.csv', as_attachment=True) | |
| # Import Data Functionality to Visualize Imported CSV | |
| def import_data(): | |
| if request.method == 'POST': | |
| if 'file' not in request.files: | |
| flash('No file part') | |
| return redirect(request.url) | |
| file = request.files['file'] | |
| if file.filename == '': | |
| flash('No selected file') | |
| return redirect(request.url) | |
| if file: | |
| filename = secure_filename(file.filename) | |
| file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename) | |
| file.save(file_path) | |
| return redirect(url_for('visualize_data', file_path=file_path)) | |
| return render_template('import_data.html') | |
| def visualize_data(): | |
| file_path = request.args.get('file_path') | |
| if not os.path.exists(file_path): | |
| return "The file could not be found.", 404 | |
| return visualize_csv(file_path) | |
| def visualize_csv(file_path): | |
| try: | |
| # Load gesture data from CSV and process it for visualization | |
| data = pd.read_csv(file_path) | |
| # Check if necessary columns are present | |
| required_columns = ['Gesture', 'Start Time', 'End Time', 'Duration'] | |
| if not set(required_columns).issubset(data.columns): | |
| return f"The uploaded CSV must contain the following columns: {required_columns}", 400 | |
| # Extract relevant columns | |
| gesture_df = data[required_columns] | |
| # Generate a pie chart for gesture distribution | |
| gesture_counts = gesture_df['Gesture'].value_counts().reset_index() | |
| gesture_counts.columns = ['Gesture', 'Count'] | |
| # Create the pie chart using Plotly | |
| fig = px.pie(gesture_counts, values='Count', names='Gesture', title='Gesture Distribution') | |
| # Convert the plotly chart to HTML | |
| html_chart = fig.to_html(full_html=False) | |
| # Render the data.html template with the gesture data and chart | |
| return render_template('data.html', gesture_data=gesture_df.to_dict('records'), html_chart=html_chart) | |
| except Exception as e: | |
| return f"An error occurred while processing the file: {str(e)}", 500 | |
| if __name__ == '__main__': | |
| port = int(os.environ.get("PORT", 5000)) | |
| app.run(host='0.0.0.0', port=port, debug=True) | |