Spaces:
Sleeping
Sleeping
| import sys | |
| from os import walk | |
| import csv | |
| import argparse | |
| from flask import Flask, redirect, url_for, request | |
| from flask import render_template | |
| from flask import send_file | |
| import os | |
| from datasets import load_dataset | |
| from huggingface_hub import hf_hub_download | |
| from io import BytesIO | |
| from PIL import Image | |
| import tempfile | |
| import shutil | |
| import json | |
| from datetime import datetime | |
| import hashlib | |
| import threading | |
| import requests | |
| app = Flask(__name__) | |
| app.config['SEND_FILE_MAX_AGE_DEFAULT'] = 0 | |
| # Analytics configuration - Use absolute path to ensure persistence across rebuilds | |
| # In HuggingFace Spaces, files in the workspace root persist across rebuilds | |
| STATS_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "analytics_stats.json") | |
| STATS_BACKUP_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "analytics_stats_backup.json") | |
| STATS_LOCK = threading.Lock() | |
| def get_client_ip(): | |
| """Get client IP address from request""" | |
| try: | |
| if request.headers.get('X-Forwarded-For'): | |
| return request.headers.get('X-Forwarded-For').split(',')[0].strip() | |
| elif request.headers.get('X-Real-IP'): | |
| return request.headers.get('X-Real-IP') | |
| else: | |
| return request.remote_addr or '127.0.0.1' | |
| except: | |
| return '127.0.0.1' | |
| def get_country_from_ip(ip): | |
| """Get country from IP address using free API""" | |
| try: | |
| # Using ip-api.com (free, no API key required) | |
| response = requests.get(f'http://ip-api.com/json/{ip}', timeout=2) | |
| if response.status_code == 200: | |
| data = response.json() | |
| if data.get('status') == 'success': | |
| return data.get('country', 'Unknown') | |
| except Exception as e: | |
| print(f"Error getting country for IP {ip}: {e}") | |
| return 'Unknown' | |
| def get_user_agent_hash(): | |
| """Create a hash of user agent for unique visitor tracking""" | |
| try: | |
| ua = request.headers.get('User-Agent', '') | |
| return hashlib.md5(ua.encode()).hexdigest()[:8] | |
| except: | |
| return 'unknown' | |
| def load_stats(): | |
| """Load statistics from JSON file with backup recovery""" | |
| # Try to load from main file first | |
| try: | |
| if os.path.exists(STATS_FILE): | |
| with open(STATS_FILE, 'r') as f: | |
| data = json.load(f) | |
| # Convert unique_visitors list back to set if needed | |
| if isinstance(data.get('unique_visitors'), list): | |
| data['unique_visitors'] = set(data['unique_visitors']) | |
| print(f"Loaded stats from {STATS_FILE}: {data.get('total_visits', 0)} visits") | |
| return data | |
| except Exception as e: | |
| print(f"Error loading stats from main file: {e}") | |
| # Try backup file if main file fails | |
| try: | |
| if os.path.exists(STATS_BACKUP_FILE): | |
| print(f"Attempting to load from backup file: {STATS_BACKUP_FILE}") | |
| with open(STATS_BACKUP_FILE, 'r') as f: | |
| data = json.load(f) | |
| if isinstance(data.get('unique_visitors'), list): | |
| data['unique_visitors'] = set(data['unique_visitors']) | |
| print(f"Recovered stats from backup: {data.get('total_visits', 0)} visits") | |
| # Restore backup to main file | |
| save_stats(data) | |
| return data | |
| except Exception as e2: | |
| print(f"Error loading stats from backup file: {e2}") | |
| # Return default structure if both files fail | |
| print("No existing stats found, starting fresh") | |
| return { | |
| 'total_visits': 0, | |
| 'unique_visitors': set(), | |
| 'countries': {}, | |
| 'visits_by_date': {}, | |
| 'first_visit': None, | |
| 'last_visit': None, | |
| 'user_agents': {} | |
| } | |
| def save_stats(stats): | |
| """Save statistics to JSON file with backup (convert sets to lists for JSON)""" | |
| try: | |
| stats_to_save = { | |
| 'total_visits': stats.get('total_visits', 0), | |
| 'unique_visitors': list(stats['unique_visitors']) if isinstance(stats.get('unique_visitors'), set) else stats.get('unique_visitors', []), | |
| 'countries': stats.get('countries', {}), | |
| 'visits_by_date': stats.get('visits_by_date', {}), | |
| 'first_visit': stats.get('first_visit'), | |
| 'last_visit': stats.get('last_visit'), | |
| 'user_agents': stats.get('user_agents', {}) | |
| } | |
| # Save to main file | |
| with open(STATS_FILE, 'w') as f: | |
| json.dump(stats_to_save, f, indent=2) | |
| # Create backup copy for redundancy | |
| try: | |
| import shutil | |
| shutil.copy2(STATS_FILE, STATS_BACKUP_FILE) | |
| except Exception as backup_error: | |
| print(f"Warning: Could not create backup: {backup_error}") | |
| print(f"Stats saved successfully: {stats_to_save.get('total_visits', 0)} total visits") | |
| except Exception as e: | |
| print(f"Error saving stats: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| def get_hf_all_time_visits(space_id="0001AMA/auto_object_annotator_0.0.4"): | |
| """Get HuggingFace Space 'All time visits' from metrics API - returns None if not available""" | |
| # Get HuggingFace token from environment (automatically provided in Spaces) | |
| hf_token = os.getenv("HF_TOKEN") or os.getenv("HUGGING_FACE_HUB_TOKEN") | |
| # If no token in env, try to get it from huggingface_hub | |
| if not hf_token: | |
| try: | |
| from huggingface_hub import HfApi | |
| api = HfApi() | |
| hf_token = api.token | |
| except: | |
| pass | |
| # Prepare headers with authentication if token is available | |
| headers = {'User-Agent': 'Mozilla/5.0'} | |
| if hf_token: | |
| headers['Authorization'] = f'Bearer {hf_token}' | |
| # Try the metrics API endpoint with authentication | |
| try: | |
| metrics_url = f"https://huggingface.co/api/spaces/{space_id}/metrics" | |
| # Use very short timeout (1 second) to prevent blocking page loads | |
| response = requests.get(metrics_url, timeout=1, headers=headers) | |
| if response.status_code == 200: | |
| data = response.json() | |
| # Look for "All time visits" in the response | |
| if isinstance(data, dict): | |
| # Try various field names for "all time visits" | |
| for key in ['all_time_visits', 'allTimeVisits', 'total_visits', 'totalVisits', | |
| 'all_time_views', 'allTimeViews', 'total_views', 'totalViews', | |
| 'views', 'visits', 'viewCount', 'visitCount']: | |
| if key in data: | |
| value = data[key] | |
| # Only return if it's a valid number > 0 (not blank/null) | |
| if value is not None and value != '' and value != '-': | |
| try: | |
| count = int(value) if isinstance(value, (int, float, str)) else None | |
| if count is not None and count > 0: | |
| return count | |
| except (ValueError, TypeError): | |
| continue | |
| elif response.status_code == 401: | |
| print("HF API: Authentication required but token may be invalid") | |
| elif response.status_code == 403: | |
| print("HF API: Access forbidden - may need owner permissions") | |
| except Exception as e: | |
| print(f"HF API request failed: {e}") # Debug logging | |
| pass # Silently fail - return None | |
| # Return None if not available (don't fallback to app's tracking) | |
| return None | |
| def track_visit(): | |
| """Track a visit - cumulative and persistent""" | |
| try: | |
| with STATS_LOCK: | |
| stats = load_stats() | |
| # Convert unique_visitors list back to set if needed | |
| if isinstance(stats.get('unique_visitors'), list): | |
| stats['unique_visitors'] = set(stats['unique_visitors']) | |
| # Get visitor information | |
| ip = get_client_ip() | |
| ua_hash = get_user_agent_hash() | |
| visitor_id = f"{ip}_{ua_hash}" | |
| country = get_country_from_ip(ip) | |
| current_date = datetime.now().strftime('%Y-%m-%d') | |
| current_time = datetime.now().isoformat() | |
| # Update statistics | |
| stats['total_visits'] = stats.get('total_visits', 0) + 1 | |
| if 'unique_visitors' not in stats: | |
| stats['unique_visitors'] = set() | |
| stats['unique_visitors'].add(visitor_id) | |
| # Track countries | |
| if 'countries' not in stats: | |
| stats['countries'] = {} | |
| if country not in stats['countries']: | |
| stats['countries'][country] = 0 | |
| stats['countries'][country] += 1 | |
| # Track visits by date | |
| if 'visits_by_date' not in stats: | |
| stats['visits_by_date'] = {} | |
| if current_date not in stats['visits_by_date']: | |
| stats['visits_by_date'][current_date] = 0 | |
| stats['visits_by_date'][current_date] += 1 | |
| # Track first and last visit | |
| if not stats.get('first_visit'): | |
| stats['first_visit'] = current_time | |
| stats['last_visit'] = current_time | |
| # Track user agents | |
| if 'user_agents' not in stats: | |
| stats['user_agents'] = {} | |
| ua = request.headers.get('User-Agent', 'Unknown') | |
| if ua not in stats['user_agents']: | |
| stats['user_agents'][ua] = 0 | |
| stats['user_agents'][ua] += 1 | |
| # Save statistics | |
| save_stats(stats) | |
| except Exception as e: | |
| # Don't let tracking errors break the app | |
| print(f"Error tracking visit: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| def index(): | |
| """Redirect root URL to tagger""" | |
| print("DEBUG: Root route / called") | |
| try: | |
| # Track visit | |
| track_visit() | |
| except Exception as e: | |
| print(f"Error in track_visit at root: {e}") | |
| return redirect(url_for('tagger')) | |
| def test(): | |
| """Simple test route to verify app is running""" | |
| return """ | |
| <!DOCTYPE html> | |
| <html> | |
| <head> | |
| <title>Test</title> | |
| </head> | |
| <body> | |
| <h1>App is running!</h1> | |
| <p>If you see this, the Flask app is working.</p> | |
| <p><a href="/tagger">Go to Tagger</a></p> | |
| </body> | |
| </html> | |
| """ | |
| def handle_exception(e): | |
| """Global error handler to prevent blank screens""" | |
| print(f"Unhandled exception: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return f""" | |
| <!DOCTYPE html> | |
| <html> | |
| <head> | |
| <title>Application Error</title> | |
| <meta charset="UTF-8"> | |
| </head> | |
| <body> | |
| <h1>Application Error</h1> | |
| <p>An unexpected error occurred.</p> | |
| <p>Error: {str(e)}</p> | |
| <p>Please check the Space logs for more details.</p> | |
| </body> | |
| </html> | |
| """, 500 | |
| def tagger(): | |
| print("DEBUG: tagger() route called") | |
| try: | |
| # Track visit | |
| track_visit() | |
| except Exception as e: | |
| print(f"Error in track_visit: {e}") | |
| # Continue even if tracking fails | |
| # Check if dataset was loaded successfully | |
| folder_sets = app.config.get("FOLDER_SETS", []) | |
| print(f"DEBUG: folder_sets length: {len(folder_sets)}") | |
| if not folder_sets: | |
| error_msg = app.config.get("DATASET_ERROR", "No folders found with all three required image types (sr_int_full.png, -tr_line.png, -tr_int_full.png)") | |
| return f""" | |
| <!DOCTYPE html> | |
| <html> | |
| <head> | |
| <title>Dataset Loading Error</title> | |
| <meta charset="UTF-8"> | |
| <style> | |
| body {{ font-family: Arial, sans-serif; text-align: center; padding: 50px; background-color: #f0f0f0; }} | |
| .container {{ background: white; padding: 40px; border-radius: 10px; box-shadow: 0 2px 10px rgba(0,0,0,0.1); max-width: 600px; margin: 0 auto; }} | |
| h1 {{ color: #dc3545; margin-bottom: 20px; }} | |
| p {{ font-size: 16px; margin: 15px 0; color: #333; }} | |
| .error {{ color: #dc3545; font-weight: bold; }} | |
| .info {{ background: #fff3cd; padding: 15px; border-radius: 5px; margin: 20px 0; border-left: 4px solid #ffc107; }} | |
| </style> | |
| </head> | |
| <body> | |
| <div class="container"> | |
| <h1>⚠️ Dataset Loading Error</h1> | |
| <div class="info"> | |
| <p class="error">{error_msg}</p> | |
| <p>This may be due to:</p> | |
| <ul style="text-align: left; display: inline-block;"> | |
| <li>Dataset not fully uploaded yet</li> | |
| <li>Network issues loading the dataset</li> | |
| <li>Dataset structure doesn't match expected format</li> | |
| </ul> | |
| </div> | |
| <p>Please check the Space logs for more details.</p> | |
| </div> | |
| </body> | |
| </html> | |
| """, 500 | |
| # Ensure HEAD is initialized and within bounds | |
| if "HEAD" not in app.config: | |
| app.config["HEAD"] = 0 | |
| if app.config["HEAD"] < 0: | |
| app.config["HEAD"] = 0 | |
| if app.config["HEAD"] >= len(folder_sets): | |
| app.config["HEAD"] = 0 | |
| app.config["IMAGE_SET_INDEX"] = 0 | |
| print("Reached end of folders, looping back to first folder") | |
| # Initialize variables with defaults | |
| directory = app.config.get('IMAGES', '') | |
| current_folder_set = None | |
| image_set_index = 0 | |
| max_sets = 0 | |
| current_images = [] | |
| # Safely access current folder set | |
| try: | |
| current_folder_set = folder_sets[app.config["HEAD"]] | |
| # Validate folder set structure | |
| if not isinstance(current_folder_set, dict) or 'image_sets' not in current_folder_set: | |
| raise ValueError(f"Invalid folder set structure at index {app.config['HEAD']}") | |
| # Get current image set index (default to 0 if not set) | |
| image_set_index = app.config.get("IMAGE_SET_INDEX", 0) | |
| if image_set_index < 0: | |
| image_set_index = 0 | |
| app.config["IMAGE_SET_INDEX"] = 0 | |
| # Get image sets for current folder | |
| image_sets = current_folder_set['image_sets'] | |
| if not isinstance(image_sets, list) or len(image_sets) == 0: | |
| raise ValueError(f"No image sets found in folder {current_folder_set.get('folder', 'unknown')}") | |
| max_sets = len(image_sets) | |
| # Ensure image_set_index is within bounds | |
| if image_set_index >= max_sets: | |
| image_set_index = 0 | |
| app.config["IMAGE_SET_INDEX"] = 0 | |
| # Get current set of 3 images (all with same file ID prefix) | |
| if image_set_index < max_sets: | |
| current_set = image_sets[image_set_index] | |
| if not isinstance(current_set, dict): | |
| raise ValueError(f"Invalid image set structure at index {image_set_index}") | |
| # Validate required keys exist | |
| required_keys = ['sr_int_full', 'tr_line', 'tr_int_full'] | |
| for key in required_keys: | |
| if key not in current_set: | |
| raise ValueError(f"Missing required image key '{key}' in image set {image_set_index}") | |
| current_images = [ | |
| current_set['sr_int_full'], | |
| current_set['tr_line'], | |
| current_set['tr_int_full'] | |
| ] | |
| else: | |
| raise ValueError(f"Image set index {image_set_index} out of bounds (max: {max_sets})") | |
| except (IndexError, KeyError, ValueError) as e: | |
| print(f"Error accessing folder/image data: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return f""" | |
| <!DOCTYPE html> | |
| <html> | |
| <head> | |
| <title>Data Access Error</title> | |
| <meta charset="UTF-8"> | |
| </head> | |
| <body> | |
| <h1>Data Access Error</h1> | |
| <p>An error occurred while accessing folder/image data.</p> | |
| <p>Error: {str(e)}</p> | |
| <p>Please check the Space logs for more details.</p> | |
| </body> | |
| </html> | |
| """, 500 | |
| # Ensure we have valid data before proceeding | |
| if current_folder_set is None: | |
| return f""" | |
| <!DOCTYPE html> | |
| <html> | |
| <head> | |
| <title>Data Error</title> | |
| <meta charset="UTF-8"> | |
| </head> | |
| <body> | |
| <h1>Data Error</h1> | |
| <p>Unable to load folder data.</p> | |
| </body> | |
| </html> | |
| """, 500 | |
| labels = app.config["LABELS"] | |
| has_prev_folder = app.config["HEAD"] > 0 | |
| has_next_folder = app.config["HEAD"] + 1 < len(app.config["FOLDER_SETS"]) | |
| has_prev_set = image_set_index > 0 | |
| has_next_set = image_set_index + 1 < max_sets | |
| # Get statistics for display | |
| try: | |
| stats_data = load_stats() | |
| total_visits = stats_data.get('total_visits', 0) | |
| unique_count = len(stats_data['unique_visitors']) if isinstance(stats_data.get('unique_visitors'), set) else len(stats_data.get('unique_visitors', [])) | |
| countries_count = len(stats_data.get('countries', {})) | |
| except Exception as e: | |
| print(f"Error loading stats: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| total_visits = 0 | |
| unique_count = 0 | |
| countries_count = 0 | |
| # Try to get HF Space "All time visits" from analytics | |
| # Only use HF value if available - don't fallback to app's tracking | |
| # Make this non-blocking with very short timeout | |
| hf_all_time_visits = None | |
| try: | |
| # Use threading to prevent blocking - but actually just call it directly with timeout | |
| # The function already has a 2-second timeout, so it should fail fast | |
| hf_all_time_visits = get_hf_all_time_visits() | |
| # Only use if we got a valid value | |
| if hf_all_time_visits is not None and hf_all_time_visits > 0: | |
| pass # Use the value | |
| else: | |
| hf_all_time_visits = None # Keep blank until HF populates it | |
| except Exception as e: | |
| # Silently fail - keep as None (blank) - don't let this break the page | |
| print(f"HF All time visits fetch failed (keeping blank): {e}") | |
| hf_all_time_visits = None | |
| print(f"DEBUG: About to render template. current_folder_set: {current_folder_set is not None}, current_images: {len(current_images)}") | |
| # Ensure we have all required variables | |
| if current_folder_set is None: | |
| print("ERROR: current_folder_set is None - cannot render template") | |
| return f""" | |
| <!DOCTYPE html> | |
| <html> | |
| <head> | |
| <title>Data Error</title> | |
| <meta charset="UTF-8"> | |
| </head> | |
| <body> | |
| <h1>Data Error</h1> | |
| <p>Unable to load folder data. Please check the Space logs.</p> | |
| </body> | |
| </html> | |
| """, 500 | |
| # Validate all required template variables before rendering | |
| try: | |
| current_folder_name = current_folder_set.get('folder', 'Unknown') if isinstance(current_folder_set, dict) else 'Unknown' | |
| # Double-check all variables are valid | |
| if not isinstance(current_images, list): | |
| current_images = [] | |
| if not isinstance(labels, list): | |
| labels = [] | |
| if not isinstance(total_visits, int): | |
| total_visits = 0 | |
| if not isinstance(unique_count, int): | |
| unique_count = 0 | |
| if not isinstance(countries_count, int): | |
| countries_count = 0 | |
| print(f"DEBUG: Rendering template with {len(current_images)} images, folder: {current_folder_name}") | |
| result = render_template( | |
| 'tagger.html', | |
| has_prev_folder=has_prev_folder, | |
| has_next_folder=has_next_folder, | |
| has_prev_set=has_prev_set, | |
| has_next_set=has_next_set, | |
| directory=directory, | |
| current_folder_set=current_folder_set, | |
| current_folder=current_folder_name, | |
| current_images=current_images, | |
| labels=labels, | |
| head=app.config["HEAD"] + 1, | |
| len=len(app.config["FOLDER_SETS"]), | |
| image_set_index=image_set_index + 1, | |
| max_sets=max_sets, | |
| total_visits=total_visits, | |
| unique_visitors=unique_count, | |
| countries_count=countries_count, | |
| hf_all_time_visits=hf_all_time_visits | |
| ) | |
| print("DEBUG: Template rendered successfully, returning result") | |
| return result | |
| except Exception as e: | |
| # If template rendering fails, return a simple error page | |
| print(f"CRITICAL ERROR rendering template: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return f""" | |
| <!DOCTYPE html> | |
| <html> | |
| <head> | |
| <title>Application Error</title> | |
| <meta charset="UTF-8"> | |
| </head> | |
| <body> | |
| <h1>Application Error</h1> | |
| <p>An error occurred while rendering the page.</p> | |
| <p>Error: {str(e)}</p> | |
| <p>Please check the Space logs for more details.</p> | |
| <p><a href="/test">Test if app is running</a></p> | |
| </body> | |
| </html> | |
| """, 500 | |
| def save_annotations_to_csv(): | |
| """Save all labeled annotations to CSV file""" | |
| # Write CSV with header and all labeled annotations | |
| with open(app.config["OUT"], 'w') as f: | |
| # Write header | |
| f.write("image,id,name,centerX,centerY,width,height\n") | |
| # Write ALL labeled annotations from current session | |
| current_count = 0 | |
| for label in app.config["LABELS"]: | |
| print(f"DEBUG: Checking label - Image: {label['image']}, ID: {label.get('id', 'None')}, Name: {label.get('name', 'None')}") | |
| if label.get("id") and label.get("name"): | |
| f.write( | |
| label["image"] + "," + | |
| label["id"] + "," + | |
| label["name"] + "," + | |
| str(round(float(label["centerX"]))) + "," + | |
| str(round(float(label["centerY"]))) + "," + | |
| str(round(float(label["width"]))) + "," + | |
| str(round(float(label["height"]))) + "\n" | |
| ) | |
| current_count += 1 | |
| print(f"DEBUG: Wrote annotation for {label['image']} with class {label['name']} (ID: {label['id']})") | |
| f.flush() # Ensure data is written to disk immediately | |
| print(f"DEBUG: Saved {current_count} labeled annotations to CSV") | |
| def save_and_next(): | |
| # Get current folder images to identify which annotations to save | |
| if app.config["HEAD"] < len(app.config["FOLDER_SETS"]): | |
| current_folder_set = app.config["FOLDER_SETS"][app.config["HEAD"]] | |
| current_folder_images = set() | |
| for image_set in current_folder_set['image_sets']: | |
| current_folder_images.add(image_set['sr_int_full']) | |
| current_folder_images.add(image_set['tr_line']) | |
| current_folder_images.add(image_set['tr_int_full']) | |
| # Read existing CSV content | |
| existing_lines = [] | |
| if os.path.exists(app.config["OUT"]): | |
| with open(app.config["OUT"], 'r') as f: | |
| existing_lines = f.readlines() | |
| # Write back CSV with header and non-current-folder annotations, plus new current folder annotations | |
| with open(app.config["OUT"], 'w') as f: | |
| # Write header | |
| f.write("image,id,name,centerX,centerY,width,height\n") | |
| # Write existing annotations that are NOT from current folder | |
| existing_count = 0 | |
| for line in existing_lines[1:]: # Skip header | |
| line = line.strip() | |
| if line: | |
| image_name = line.split(',')[0] | |
| if image_name not in current_folder_images: | |
| f.write(line + "\n") | |
| existing_count += 1 | |
| print(f"DEBUG: Wrote {existing_count} existing annotations from other folders") | |
| # Write ALL labeled annotations from current session (not just current folder) | |
| current_count = 0 | |
| for label in app.config["LABELS"]: | |
| print(f"DEBUG: Checking label - Image: {label['image']}, ID: {label.get('id', 'None')}, Name: {label.get('name', 'None')}") | |
| if label.get("id") and label.get("name"): | |
| f.write( | |
| label["image"] + "," + | |
| label["id"] + "," + | |
| label["name"] + "," + | |
| str(round(float(label["centerX"]))) + "," + | |
| str(round(float(label["centerY"]))) + "," + | |
| str(round(float(label["width"]))) + "," + | |
| str(round(float(label["height"]))) + "\n" | |
| ) | |
| current_count += 1 | |
| print(f"DEBUG: Wrote annotation for {label['image']} with class {label['name']} (ID: {label['id']})") | |
| print(f"DEBUG: Wrote {current_count} labeled annotations from all folders") | |
| # Remove current folder annotations from memory but keep others | |
| app.config["LABELS"] = [label for label in app.config["LABELS"] | |
| if label["image"] not in current_folder_images] | |
| print(f"Saved annotations for folder: {current_folder_set['folder']}") | |
| # Move to next folder, loop back to start if at the end | |
| app.config["HEAD"] += 1 | |
| if app.config["HEAD"] >= len(app.config["FOLDER_SETS"]): | |
| app.config["HEAD"] = 0 # Loop back to first folder | |
| app.config["IMAGE_SET_INDEX"] = 0 # Reset image set index | |
| print("Reached end of folders, looping back to first folder") | |
| return redirect(url_for('tagger')) | |
| def next_folder(): | |
| # Save annotations before moving to next folder | |
| save_annotations_to_csv() | |
| # Move to next folder (labels persist) | |
| app.config["HEAD"] += 1 | |
| if app.config["HEAD"] >= len(app.config["FOLDER_SETS"]): | |
| app.config["HEAD"] = 0 # Loop back to first folder | |
| print("Reached end of folders, looping back to first folder") | |
| app.config["IMAGE_SET_INDEX"] = 0 # Reset to first image set | |
| # Preserve auto-play parameters if present | |
| autoplay = request.args.get('autoplay') | |
| interval = request.args.get('interval') | |
| if autoplay and interval: | |
| return redirect(url_for('tagger', autoplay=autoplay, interval=interval)) | |
| return redirect(url_for('tagger')) | |
| def prev_folder(): | |
| # Move to previous folder (labels persist) | |
| app.config["HEAD"] -= 1 | |
| if app.config["HEAD"] < 0: | |
| app.config["HEAD"] = len(app.config["FOLDER_SETS"]) - 1 # Loop to last folder | |
| print("Reached beginning of folders, looping to last folder") | |
| app.config["IMAGE_SET_INDEX"] = 0 # Reset to first image set | |
| # Preserve auto-play parameters if present | |
| autoplay = request.args.get('autoplay') | |
| interval = request.args.get('interval') | |
| if autoplay and interval: | |
| return redirect(url_for('tagger', autoplay=autoplay, interval=interval)) | |
| return redirect(url_for('tagger')) | |
| def next_set(): | |
| # Save annotations before moving to next set | |
| save_annotations_to_csv() | |
| # Move to next image set within current folder | |
| current_folder_set = app.config["FOLDER_SETS"][app.config["HEAD"]] | |
| max_sets = len(current_folder_set['image_sets']) | |
| current_index = app.config.get("IMAGE_SET_INDEX", 0) | |
| if current_index + 1 < max_sets: | |
| app.config["IMAGE_SET_INDEX"] = current_index + 1 | |
| else: | |
| # Reached end of sets in current folder, move to next folder | |
| if app.config["HEAD"] + 1 < len(app.config["FOLDER_SETS"]): | |
| app.config["HEAD"] += 1 | |
| app.config["IMAGE_SET_INDEX"] = 0 # Reset to first set in new folder | |
| print(f"DEBUG: Auto-advanced to next folder: {app.config['FOLDER_SETS'][app.config['HEAD']]['folder']}") | |
| else: | |
| # Reached end of all folders, loop back to beginning | |
| app.config["HEAD"] = 0 | |
| app.config["IMAGE_SET_INDEX"] = 0 | |
| print("DEBUG: Auto-looped back to first folder for continuous play") | |
| # Preserve auto-play parameters if present | |
| autoplay = request.args.get('autoplay') | |
| interval = request.args.get('interval') | |
| if autoplay and interval: | |
| return redirect(url_for('tagger', autoplay=autoplay, interval=interval)) | |
| return redirect(url_for('tagger')) | |
| def prev_set(): | |
| # Move to previous image set within current folder | |
| current_index = app.config.get("IMAGE_SET_INDEX", 0) | |
| if current_index > 0: | |
| app.config["IMAGE_SET_INDEX"] = current_index - 1 | |
| # Preserve auto-play parameters if present | |
| autoplay = request.args.get('autoplay') | |
| interval = request.args.get('interval') | |
| if autoplay and interval: | |
| return redirect(url_for('tagger', autoplay=autoplay, interval=interval)) | |
| return redirect(url_for('tagger')) | |
| def reset_annotations(): | |
| scope = request.args.get('scope', 'folder') | |
| if scope == 'all': | |
| # Reset all annotations from all folders | |
| app.config["LABELS"] = [] | |
| app.config["CLASS_TO_ID"] = {} | |
| app.config["NEXT_CLASS_ID"] = 1 | |
| print("DEBUG: Reset ALL annotations from ALL folders") | |
| elif scope == 'folder': | |
| # Reset annotations only for current folder | |
| current_folder_set = app.config["FOLDER_SETS"][app.config["HEAD"]] | |
| folder_name = current_folder_set["folder"] | |
| # Remove annotations that belong to the current folder | |
| original_count = len(app.config["LABELS"]) | |
| app.config["LABELS"] = [ | |
| label for label in app.config["LABELS"] | |
| if not any(label["image"].startswith(f"{folder_name}/") for folder_name in [folder_name]) | |
| ] | |
| removed_count = original_count - len(app.config["LABELS"]) | |
| print(f"DEBUG: Reset {removed_count} annotations from folder '{folder_name}'") | |
| # Save the updated annotations to CSV | |
| save_annotations_to_csv() | |
| return redirect(url_for('tagger')) | |
| def bye(): | |
| return """ | |
| <!DOCTYPE html> | |
| <html> | |
| <head> | |
| <title>Annotation Complete</title> | |
| <meta charset="UTF-8"> | |
| <style> | |
| body { font-family: Arial, sans-serif; text-align: center; padding: 50px; background-color: #f0f0f0; } | |
| .container { background: white; padding: 40px; border-radius: 10px; box-shadow: 0 2px 10px rgba(0,0,0,0.1); max-width: 600px; margin: 0 auto; } | |
| h1 { color: #28a745; margin-bottom: 20px; } | |
| p { font-size: 18px; margin: 15px 0; color: #333; } | |
| .success { color: #28a745; font-weight: bold; } | |
| .info { background: #e7f3ff; padding: 15px; border-radius: 5px; margin: 20px 0; } | |
| .restart-btn { background: #007bff; color: white; padding: 10px 20px; text-decoration: none; border-radius: 5px; display: inline-block; margin-top: 20px; } | |
| .restart-btn:hover { background: #0056b3; } | |
| </style> | |
| </head> | |
| <body> | |
| <div class="container"> | |
| <h1>🎉 Annotation Complete!</h1> | |
| <p class="success">All folders have been processed successfully.</p> | |
| <div class="info"> | |
| <p><strong>Your annotations have been saved to:</strong></p> | |
| <p><code>out.csv</code></p> | |
| <p>The CSV file contains all bounding boxes and labels you created.</p> | |
| </div> | |
| <p>You can now use this data for training machine learning models or further analysis.</p> | |
| <a href="/tagger" class="restart-btn">Start Over</a> | |
| </div> | |
| </body> | |
| </html> | |
| """ | |
| def stats(): | |
| """Display analytics statistics""" | |
| stats_data = load_stats() | |
| # Convert set to list for display | |
| unique_count = len(stats_data['unique_visitors']) if isinstance(stats_data['unique_visitors'], set) else len(stats_data.get('unique_visitors', [])) | |
| # Sort countries by visits | |
| sorted_countries = sorted(stats_data.get('countries', {}).items(), key=lambda x: x[1], reverse=True) | |
| # Sort dates | |
| sorted_dates = sorted(stats_data.get('visits_by_date', {}).items(), reverse=True)[:30] # Last 30 days | |
| # Get top user agents | |
| sorted_user_agents = sorted(stats_data.get('user_agents', {}).items(), key=lambda x: x[1], reverse=True)[:10] | |
| html = f""" | |
| <!DOCTYPE html> | |
| <html> | |
| <head> | |
| <title>Analytics Statistics</title> | |
| <meta charset="UTF-8"> | |
| <style> | |
| body {{ font-family: Arial, sans-serif; padding: 20px; background-color: #f5f5f5; }} | |
| .container {{ max-width: 1200px; margin: 0 auto; background: white; padding: 30px; border-radius: 10px; box-shadow: 0 2px 10px rgba(0,0,0,0.1); }} | |
| h1 {{ color: #333; border-bottom: 3px solid #6c757d; padding-bottom: 10px; }} | |
| h2 {{ color: #555; margin-top: 30px; }} | |
| .stat-box {{ background: #f8f9fa; padding: 20px; border-radius: 8px; margin: 15px 0; border-left: 4px solid #6c757d; }} | |
| .stat-number {{ font-size: 36px; font-weight: bold; color: #495057; }} | |
| .stat-label {{ font-size: 14px; color: #666; margin-top: 5px; }} | |
| .table {{ width: 100%; border-collapse: collapse; margin: 20px 0; }} | |
| .table th, .table td {{ padding: 12px; text-align: left; border-bottom: 1px solid #ddd; }} | |
| .table th {{ background-color: #6c757d; color: white; }} | |
| .table tr:hover {{ background-color: #f5f5f5; }} | |
| .back-link {{ display: inline-block; margin-top: 20px; padding: 10px 20px; background: #6c757d; color: white; text-decoration: none; border-radius: 5px; }} | |
| .back-link:hover {{ background: #5a6268; }} | |
| .grid {{ display: grid; grid-template-columns: repeat(auto-fit, minmax(250px, 1fr)); gap: 20px; margin: 20px 0; }} | |
| </style> | |
| </head> | |
| <body> | |
| <div class="container"> | |
| <h1>📊 Analytics Statistics</h1> | |
| <div class="grid"> | |
| <div class="stat-box"> | |
| <div class="stat-number">{stats_data.get('total_visits', 0):,}</div> | |
| <div class="stat-label">Total Visits</div> | |
| </div> | |
| <div class="stat-box"> | |
| <div class="stat-number">{unique_count:,}</div> | |
| <div class="stat-label">Unique Visitors</div> | |
| </div> | |
| <div class="stat-box"> | |
| <div class="stat-number">{len(stats_data.get('countries', {}))}</div> | |
| <div class="stat-label">Countries</div> | |
| </div> | |
| <div class="stat-box"> | |
| <div class="stat-number">{stats_data.get('first_visit', 'N/A')[:10] if stats_data.get('first_visit') else 'N/A'}</div> | |
| <div class="stat-label">First Visit</div> | |
| </div> | |
| </div> | |
| <h2>🌍 Visits by Country</h2> | |
| <table class="table"> | |
| <thead> | |
| <tr> | |
| <th>Country</th> | |
| <th>Visits</th> | |
| <th>Percentage</th> | |
| </tr> | |
| </thead> | |
| <tbody> | |
| """ | |
| total_visits = stats_data.get('total_visits', 1) | |
| for country, count in sorted_countries: | |
| percentage = (count / total_visits * 100) if total_visits > 0 else 0 | |
| html += f""" | |
| <tr> | |
| <td>{country}</td> | |
| <td>{count:,}</td> | |
| <td>{percentage:.1f}%</td> | |
| </tr> | |
| """ | |
| html += """ | |
| </tbody> | |
| </table> | |
| <h2>📅 Visits by Date (Last 30 Days)</h2> | |
| <table class="table"> | |
| <thead> | |
| <tr> | |
| <th>Date</th> | |
| <th>Visits</th> | |
| </tr> | |
| </thead> | |
| <tbody> | |
| """ | |
| for date, count in sorted_dates: | |
| html += f""" | |
| <tr> | |
| <td>{date}</td> | |
| <td>{count:,}</td> | |
| </tr> | |
| """ | |
| html += """ | |
| </tbody> | |
| </table> | |
| <h2>🖥️ Top User Agents</h2> | |
| <table class="table"> | |
| <thead> | |
| <tr> | |
| <th>User Agent</th> | |
| <th>Visits</th> | |
| </tr> | |
| </thead> | |
| <tbody> | |
| """ | |
| for ua, count in sorted_user_agents: | |
| # Truncate long user agents | |
| ua_display = ua[:80] + '...' if len(ua) > 80 else ua | |
| html += f""" | |
| <tr> | |
| <td>{ua_display}</td> | |
| <td>{count:,}</td> | |
| </tr> | |
| """ | |
| html += f""" | |
| </tbody> | |
| </table> | |
| <p><strong>Last Updated:</strong> {stats_data.get('last_visit', 'N/A')}</p> | |
| <a href="/tagger" class="back-link">← Back to Tagger</a> | |
| </div> | |
| </body> | |
| </html> | |
| """ | |
| return html | |
| def add(temp_id): | |
| image = request.args.get("image") | |
| xMin = float(request.args.get("xMin")) | |
| xMax = float(request.args.get("xMax")) | |
| yMin = float(request.args.get("yMin")) | |
| yMax = float(request.args.get("yMax")) | |
| # Convert to center, width, height format | |
| centerX = (xMin + xMax) / 2 | |
| centerY = (yMin + yMax) / 2 | |
| width = xMax - xMin | |
| height = yMax - yMin | |
| print(f"DEBUG: Coordinates - xMin:{xMin:.1f}, xMax:{xMax:.1f}, yMin:{yMin:.1f}, yMax:{yMax:.1f}") | |
| print(f"DEBUG: Calculated - centerX:{centerX:.1f}, centerY:{centerY:.1f}, width:{width:.1f}, height:{height:.1f}") | |
| # Use temporary ID until class is assigned | |
| app.config["LABELS"].append({ | |
| "image": image, | |
| "temp_id": temp_id, # Temporary ID for tracking | |
| "id": "", # Will be assigned when class is labeled | |
| "name": "", | |
| "centerX": centerX, | |
| "centerY": centerY, | |
| "width": width, | |
| "height": height | |
| }) | |
| return redirect(url_for('tagger')) | |
| def remove(temp_id): | |
| image = request.args.get("image") | |
| print(f"DEBUG: Removing - Temp ID: {temp_id}, Image: {image}") | |
| original_count = len(app.config["LABELS"]) | |
| app.config["LABELS"] = [ | |
| label for label in app.config["LABELS"] | |
| if not (label["image"] == image and | |
| (label.get("temp_id") == temp_id or label.get("id") == temp_id)) | |
| ] | |
| new_count = len(app.config["LABELS"]) | |
| print(f"DEBUG: Removed {original_count - new_count} labels") | |
| return redirect(url_for('tagger')) | |
| def label(temp_id): | |
| image = request.args.get("image") | |
| name = request.args.get("name").strip().lower() | |
| print(f"DEBUG: Labeling - Temp ID: {temp_id}, Image: {image}, Name: {name}") | |
| # Get or assign class ID | |
| if name not in app.config["CLASS_TO_ID"]: | |
| app.config["CLASS_TO_ID"][name] = app.config["NEXT_CLASS_ID"] | |
| app.config["NEXT_CLASS_ID"] += 1 | |
| print(f"DEBUG: Assigned new class ID {app.config['CLASS_TO_ID'][name]} to class '{name}'") | |
| class_id = app.config["CLASS_TO_ID"][name] | |
| found = False | |
| for label in app.config["LABELS"]: | |
| # Check both temp_id and regular id for compatibility | |
| label_temp_id = label.get("temp_id", label.get("id")) | |
| print(f"DEBUG: Checking label - Temp ID: {label_temp_id}, Image: {label['image']}") | |
| if label["image"] == image and label_temp_id == temp_id: | |
| label["name"] = name | |
| label["id"] = str(class_id) # Assign class-based ID | |
| if "temp_id" in label: | |
| del label["temp_id"] # Remove temp_id once class is assigned | |
| print(f"DEBUG: Updated label temp_id {temp_id} with name '{name}' and class ID {class_id}") | |
| found = True | |
| break | |
| if not found: | |
| print(f"DEBUG: Label not found for temp_id: {temp_id}, Image: {image}") | |
| print(f"DEBUG: Current class mapping: {app.config['CLASS_TO_ID']}") | |
| return redirect(url_for('tagger')) | |
| def images(f): | |
| # Check if using HuggingFace dataset | |
| if app.config.get("USE_HF_DATASET", False): | |
| # Load image from HuggingFace dataset | |
| try: | |
| from huggingface_hub import hf_hub_download | |
| dataset_name = app.config.get("HF_DATASET_NAME", "0001AMA/multimodal_data_annotator_dataset") | |
| cache_dir = app.config.get("CACHE_DIR", None) | |
| # Try to find the file path | |
| file_path = f | |
| dataset_files = app.config.get("HF_DATASET_FILES", {}) | |
| # Try exact match first | |
| if f not in dataset_files: | |
| # Try to find by matching path | |
| for path in dataset_files: | |
| if path.endswith(f) or f in path: | |
| file_path = path | |
| break | |
| # Get HF token for authenticated requests | |
| hf_token = os.getenv("HF_TOKEN") or os.getenv("HUGGING_FACE_HUB_TOKEN") | |
| if not hf_token: | |
| try: | |
| from huggingface_hub import HfApi | |
| api = HfApi() | |
| hf_token = api.token | |
| except: | |
| pass | |
| # Download file from HuggingFace | |
| try: | |
| local_path = hf_hub_download( | |
| repo_id=dataset_name, | |
| filename=file_path, | |
| repo_type="dataset", | |
| cache_dir=cache_dir, | |
| token=hf_token | |
| ) | |
| if os.path.exists(local_path): | |
| return send_file(local_path) | |
| except Exception as download_error: | |
| print(f"Error downloading file {file_path}: {download_error}") | |
| # Try alternative: download to cache and serve | |
| try: | |
| # Use cache_dir if available | |
| cache_file = os.path.join(cache_dir or tempfile.gettempdir(), file_path.replace('/', '_')) | |
| if not os.path.exists(cache_file): | |
| local_path = hf_hub_download( | |
| repo_id=dataset_name, | |
| filename=file_path, | |
| repo_type="dataset", | |
| token=hf_token | |
| ) | |
| # Copy to cache | |
| os.makedirs(os.path.dirname(cache_file), exist_ok=True) | |
| shutil.copy2(local_path, cache_file) | |
| else: | |
| local_path = cache_file | |
| return send_file(local_path) | |
| except Exception as e2: | |
| print(f"Alternative download also failed: {e2}") | |
| except Exception as e: | |
| print(f"Error loading image from dataset: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| # Fallback to local file if available | |
| pass | |
| # Fallback to local file system | |
| images_dir = app.config.get('IMAGES', '') | |
| if images_dir: | |
| file_path = os.path.join(images_dir, f) | |
| if os.path.exists(file_path): | |
| return send_file(file_path) | |
| return "Image not found", 404 | |
| def load_from_huggingface_dataset(dataset_name="0001AMA/multimodal_data_annotator_dataset"): | |
| """Load and process images from HuggingFace dataset""" | |
| print(f"Loading dataset from HuggingFace: {dataset_name}") | |
| try: | |
| from huggingface_hub import list_repo_files, hf_hub_download | |
| # Get HF token for authenticated requests | |
| hf_token = os.getenv("HF_TOKEN") or os.getenv("HUGGING_FACE_HUB_TOKEN") | |
| if not hf_token: | |
| try: | |
| from huggingface_hub import HfApi | |
| api = HfApi() | |
| hf_token = api.token | |
| except: | |
| pass | |
| # List all files in the dataset repository | |
| print("Listing files in dataset repository...") | |
| repo_files = list_repo_files(repo_id=dataset_name, repo_type="dataset", token=hf_token) | |
| print(f"Found {len(repo_files)} files in repository") | |
| # Filter PNG files only | |
| png_files = [f for f in repo_files if f.endswith('.png')] | |
| print(f"Found {len(png_files)} PNG files") | |
| # Create a cache directory for images | |
| cache_dir = os.path.join(tempfile.gettempdir(), "hf_dataset_cache") | |
| os.makedirs(cache_dir, exist_ok=True) | |
| app.config["CACHE_DIR"] = cache_dir | |
| # Process files to group by folder and file ID | |
| folder_sets = [] | |
| required_suffixes = ['sr_int_full.png', '-tr_line.png', '-tr_int_full.png'] | |
| # Group files by folder and file ID | |
| folder_files = {} # {folder_name: {file_id: {suffix: file_path}}} | |
| for file_path in png_files: | |
| # Extract folder name and filename | |
| path_parts = file_path.split('/') | |
| if len(path_parts) < 2: | |
| continue | |
| folder_name = path_parts[0] | |
| filename = path_parts[-1] | |
| # Check if file matches required suffixes | |
| matched_suffix = None | |
| for suffix in required_suffixes: | |
| if filename.endswith(suffix): | |
| matched_suffix = suffix | |
| break | |
| if not matched_suffix: | |
| continue | |
| # Extract file ID prefix (everything before the first '-') | |
| if '-' in filename: | |
| file_id = filename.split('-')[0] | |
| else: | |
| continue | |
| # Initialize folder structure | |
| if folder_name not in folder_files: | |
| folder_files[folder_name] = {} | |
| if file_id not in folder_files[folder_name]: | |
| folder_files[folder_name][file_id] = {} | |
| # Store file path | |
| folder_files[folder_name][file_id][matched_suffix] = file_path | |
| # Create folder sets with valid image sets | |
| for folder_name, file_ids in folder_files.items(): | |
| valid_image_sets = [] | |
| for file_id, images in file_ids.items(): | |
| # Check if all three required suffixes are present | |
| if all(suffix in images for suffix in required_suffixes): | |
| valid_image_sets.append({ | |
| 'file_id': file_id, | |
| 'sr_int_full': images['sr_int_full.png'], | |
| 'tr_line': images['-tr_line.png'], | |
| 'tr_int_full': images['-tr_int_full.png'] | |
| }) | |
| print(f"DEBUG: Created valid image set for file_id '{file_id}' in folder '{folder_name}'") | |
| if valid_image_sets: | |
| folder_sets.append({ | |
| 'folder': folder_name, | |
| 'image_sets': valid_image_sets | |
| }) | |
| print(f"DEBUG: Added folder '{folder_name}' with {len(valid_image_sets)} image sets") | |
| # Store file list for image serving | |
| app.config["HF_DATASET_FILES"] = {f: f for f in png_files} | |
| app.config["HF_DATASET_NAME"] = dataset_name | |
| print(f"Successfully processed {len(folder_sets)} folders with valid image sets") | |
| return folder_sets | |
| except Exception as e: | |
| print(f"Error loading HuggingFace dataset: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return [] | |
| def load_from_local_directory(directory): | |
| """Load and process images from local directory (original method)""" | |
| folder_sets = [] | |
| required_suffixes = ['sr_int_full.png', '-tr_line.png', '-tr_int_full.png'] | |
| for (dirpath, dirnames, filenames) in walk(directory): | |
| if dirpath == directory: # Skip root directory | |
| continue | |
| # Find ALL images with required suffixes in this folder and group by file ID prefix | |
| found_images = {'sr_int_full.png': [], '-tr_line.png': [], '-tr_int_full.png': []} | |
| for filename in filenames: | |
| for suffix in required_suffixes: | |
| if filename.endswith(suffix): | |
| relative_path = os.path.relpath(os.path.join(dirpath, filename), directory) | |
| found_images[suffix].append(relative_path) | |
| # Group images by their file ID prefix (everything before the first '-') | |
| image_groups = {} | |
| for suffix in required_suffixes: | |
| for image_path in found_images[suffix]: | |
| filename = os.path.basename(image_path) | |
| # Extract file ID prefix (everything before the first '-') | |
| if '-' in filename: | |
| file_id = filename.split('-')[0] | |
| if file_id not in image_groups: | |
| image_groups[file_id] = {} | |
| image_groups[file_id][suffix] = image_path | |
| print(f"DEBUG: Grouped {filename} with file_id '{file_id}' for suffix '{suffix}'") | |
| # Create image sets only for file IDs that have all three image types | |
| valid_image_sets = [] | |
| for file_id, images in image_groups.items(): | |
| print(f"DEBUG: Checking file_id '{file_id}' - has suffixes: {list(images.keys())}") | |
| if all(suffix in images for suffix in required_suffixes): | |
| valid_image_sets.append({ | |
| 'file_id': file_id, | |
| 'sr_int_full': images['sr_int_full.png'], | |
| 'tr_line': images['-tr_line.png'], | |
| 'tr_int_full': images['-tr_int_full.png'] | |
| }) | |
| print(f"DEBUG: Created valid image set for file_id '{file_id}'") | |
| else: | |
| print(f"DEBUG: Skipped file_id '{file_id}' - missing suffixes: {[s for s in required_suffixes if s not in images]}") | |
| # Only include folders that have at least one complete image set | |
| if valid_image_sets: | |
| folder_name = os.path.basename(dirpath) | |
| folder_sets.append({ | |
| 'folder': folder_name, | |
| 'image_sets': valid_image_sets | |
| }) | |
| return folder_sets | |
| if __name__ == "__main__": | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument('--dir', type=str, default=None, help='specify the images directory (optional, uses HF dataset if not provided)') | |
| parser.add_argument("--out") | |
| args = parser.parse_args() | |
| app.config["LABELS"] = [] | |
| app.config["CLASS_TO_ID"] = {} # Maps class names to IDs | |
| app.config["NEXT_CLASS_ID"] = 1 # Next available class ID | |
| # Check if running on HuggingFace Spaces or if no local directory specified | |
| is_hf_space = os.getenv("SPACE_ID") is not None | |
| use_hf_dataset = args.dir is None or is_hf_space | |
| if use_hf_dataset: | |
| print("===== Application Startup at " + str(os.popen('date').read().strip()) + " =====") | |
| print("Loading from HuggingFace dataset...") | |
| app.config["USE_HF_DATASET"] = True | |
| folder_sets = load_from_huggingface_dataset("0001AMA/multimodal_data_annotator_dataset") | |
| app.config["IMAGES"] = "" # Not using local directory | |
| else: | |
| print("Loading from local directory...") | |
| app.config["USE_HF_DATASET"] = False | |
| directory = args.dir | |
| if directory[-1] != "/": | |
| directory += "/" | |
| app.config["IMAGES"] = directory | |
| folder_sets = load_from_local_directory(directory) | |
| if not folder_sets: | |
| error_msg = "No folders found with all three required image types (sr_int_full.png, -tr_line.png, -tr_int_full.png)" | |
| print(error_msg) | |
| if use_hf_dataset: | |
| print("This may be due to:") | |
| print("1. Dataset not fully uploaded yet") | |
| print("2. Dataset structure doesn't match expected format") | |
| print("3. Network issues loading the dataset") | |
| # Don't exit - allow app to start and show error message in UI | |
| app.config["FOLDER_SETS"] = [] | |
| app.config["DATASET_ERROR"] = error_msg | |
| else: | |
| app.config["FOLDER_SETS"] = folder_sets | |
| app.config["DATASET_ERROR"] = None | |
| app.config["HEAD"] = 0 | |
| app.config["IMAGE_SET_INDEX"] = 0 | |
| app.config["OUT"] = args.out if args.out else "out.csv" | |
| # Check if CSV file exists, create header only if it doesn't exist | |
| import os | |
| if not os.path.exists(app.config["OUT"]): | |
| with open(app.config["OUT"], 'w') as f: | |
| f.write("image,id,name,centerX,centerY,width,height\n") | |
| print(f"Created new CSV file: {app.config['OUT']}") | |
| else: | |
| print(f"Using existing CSV file: {app.config['OUT']}") | |
| # Verify the file has the correct header | |
| with open(app.config["OUT"], 'r') as f: | |
| first_line = f.readline().strip() | |
| if first_line != "image,id,name,centerX,centerY,width,height": | |
| print("Warning: Existing CSV file has different header format!") | |
| print(f"Expected: image,id,name,centerX,centerY,width,height") | |
| print(f"Found: {first_line}") | |
| # Backup the old file and create new one | |
| backup_name = app.config["OUT"].replace('.csv', '_backup.csv') | |
| os.rename(app.config["OUT"], backup_name) | |
| print(f"Backed up old file to: {backup_name}") | |
| with open(app.config["OUT"], 'w') as f: | |
| f.write("image,id,name,centerX,centerY,width,height\n") | |
| print(f"Created new CSV file with correct header") | |
| # Load existing annotations from CSV if file exists and has content | |
| if os.path.exists(app.config["OUT"]): | |
| try: | |
| with open(app.config["OUT"], 'r') as f: | |
| lines = f.readlines()[1:] # Skip header | |
| for line in lines: | |
| line = line.strip() | |
| if line: # Skip empty lines | |
| parts = line.split(',') | |
| if len(parts) >= 7: # Ensure we have all required fields | |
| class_name = parts[2].lower() if parts[2] else "" | |
| class_id = parts[1] if parts[1] else "" | |
| # Rebuild class mapping for labeled annotations | |
| if class_name and class_id and class_id.isdigit(): | |
| class_id_int = int(class_id) | |
| if class_name not in app.config["CLASS_TO_ID"]: | |
| app.config["CLASS_TO_ID"][class_name] = class_id_int | |
| if class_id_int >= app.config["NEXT_CLASS_ID"]: | |
| app.config["NEXT_CLASS_ID"] = class_id_int + 1 | |
| # For unlabeled annotations, assign a temp_id | |
| annotation_data = { | |
| "image": parts[0], | |
| "name": parts[2], | |
| "centerX": float(parts[3]), | |
| "centerY": float(parts[4]), | |
| "width": float(parts[5]), | |
| "height": float(parts[6]) | |
| } | |
| if class_id: | |
| annotation_data["id"] = class_id | |
| else: | |
| # Assign temp_id for unlabeled annotations | |
| annotation_data["temp_id"] = str(len(app.config["LABELS"]) + 1) | |
| app.config["LABELS"].append(annotation_data) | |
| if len(app.config["LABELS"]) > 0: | |
| print(f"Loaded {len(app.config['LABELS'])} existing annotations from CSV") | |
| except Exception as e: | |
| print(f"Error loading existing annotations: {e}") | |
| # Don't clear LABELS here, keep them empty if loading fails | |
| print(f"Found {len(folder_sets)} valid folder sets") | |
| # For HuggingFace Spaces, use 0.0.0.0 and port 7860 | |
| # For local development, you can use 127.0.0.1 and port 7620 | |
| if os.getenv("SPACE_ID"): # Running on HuggingFace | |
| app.run(host="0.0.0.0", port=7860, debug=False) | |
| else: # Running locally | |
| app.run(host="127.0.0.1", port=7620, debug=False) |