import sys from os import walk import csv import argparse from flask import Flask, redirect, url_for, request from flask import render_template from flask import send_file import os from datasets import load_dataset from huggingface_hub import hf_hub_download from io import BytesIO from PIL import Image import tempfile import shutil import json from datetime import datetime import hashlib import threading import requests app = Flask(__name__) app.config['SEND_FILE_MAX_AGE_DEFAULT'] = 0 # Analytics configuration - Use absolute path to ensure persistence across rebuilds # In HuggingFace Spaces, files in the workspace root persist across rebuilds STATS_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "analytics_stats.json") STATS_BACKUP_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "analytics_stats_backup.json") STATS_LOCK = threading.Lock() def get_client_ip(): """Get client IP address from request""" try: if request.headers.get('X-Forwarded-For'): return request.headers.get('X-Forwarded-For').split(',')[0].strip() elif request.headers.get('X-Real-IP'): return request.headers.get('X-Real-IP') else: return request.remote_addr or '127.0.0.1' except: return '127.0.0.1' def get_country_from_ip(ip): """Get country from IP address using free API""" try: # Using ip-api.com (free, no API key required) response = requests.get(f'http://ip-api.com/json/{ip}', timeout=2) if response.status_code == 200: data = response.json() if data.get('status') == 'success': return data.get('country', 'Unknown') except Exception as e: print(f"Error getting country for IP {ip}: {e}") return 'Unknown' def get_user_agent_hash(): """Create a hash of user agent for unique visitor tracking""" try: ua = request.headers.get('User-Agent', '') return hashlib.md5(ua.encode()).hexdigest()[:8] except: return 'unknown' def load_stats(): """Load statistics from JSON file with backup recovery""" # Try to load from main file first try: if os.path.exists(STATS_FILE): with open(STATS_FILE, 'r') as f: data = json.load(f) # Convert unique_visitors list back to set if needed if isinstance(data.get('unique_visitors'), list): data['unique_visitors'] = set(data['unique_visitors']) print(f"Loaded stats from {STATS_FILE}: {data.get('total_visits', 0)} visits") return data except Exception as e: print(f"Error loading stats from main file: {e}") # Try backup file if main file fails try: if os.path.exists(STATS_BACKUP_FILE): print(f"Attempting to load from backup file: {STATS_BACKUP_FILE}") with open(STATS_BACKUP_FILE, 'r') as f: data = json.load(f) if isinstance(data.get('unique_visitors'), list): data['unique_visitors'] = set(data['unique_visitors']) print(f"Recovered stats from backup: {data.get('total_visits', 0)} visits") # Restore backup to main file save_stats(data) return data except Exception as e2: print(f"Error loading stats from backup file: {e2}") # Return default structure if both files fail print("No existing stats found, starting fresh") return { 'total_visits': 0, 'unique_visitors': set(), 'countries': {}, 'visits_by_date': {}, 'first_visit': None, 'last_visit': None, 'user_agents': {} } def save_stats(stats): """Save statistics to JSON file with backup (convert sets to lists for JSON)""" try: stats_to_save = { 'total_visits': stats.get('total_visits', 0), 'unique_visitors': list(stats['unique_visitors']) if isinstance(stats.get('unique_visitors'), set) else stats.get('unique_visitors', []), 'countries': stats.get('countries', {}), 'visits_by_date': stats.get('visits_by_date', {}), 'first_visit': stats.get('first_visit'), 'last_visit': stats.get('last_visit'), 'user_agents': stats.get('user_agents', {}) } # Save to main file with open(STATS_FILE, 'w') as f: json.dump(stats_to_save, f, indent=2) # Create backup copy for redundancy try: import shutil shutil.copy2(STATS_FILE, STATS_BACKUP_FILE) except Exception as backup_error: print(f"Warning: Could not create backup: {backup_error}") print(f"Stats saved successfully: {stats_to_save.get('total_visits', 0)} total visits") except Exception as e: print(f"Error saving stats: {e}") import traceback traceback.print_exc() def get_hf_all_time_visits(space_id="0001AMA/auto_object_annotator_0.0.4"): """Get HuggingFace Space 'All time visits' from metrics API - returns None if not available""" # Get HuggingFace token from environment (automatically provided in Spaces) hf_token = os.getenv("HF_TOKEN") or os.getenv("HUGGING_FACE_HUB_TOKEN") # If no token in env, try to get it from huggingface_hub if not hf_token: try: from huggingface_hub import HfApi api = HfApi() hf_token = api.token except: pass # Prepare headers with authentication if token is available headers = {'User-Agent': 'Mozilla/5.0'} if hf_token: headers['Authorization'] = f'Bearer {hf_token}' # Try the metrics API endpoint with authentication try: metrics_url = f"https://huggingface.co/api/spaces/{space_id}/metrics" # Use very short timeout (1 second) to prevent blocking page loads response = requests.get(metrics_url, timeout=1, headers=headers) if response.status_code == 200: data = response.json() # Look for "All time visits" in the response if isinstance(data, dict): # Try various field names for "all time visits" for key in ['all_time_visits', 'allTimeVisits', 'total_visits', 'totalVisits', 'all_time_views', 'allTimeViews', 'total_views', 'totalViews', 'views', 'visits', 'viewCount', 'visitCount']: if key in data: value = data[key] # Only return if it's a valid number > 0 (not blank/null) if value is not None and value != '' and value != '-': try: count = int(value) if isinstance(value, (int, float, str)) else None if count is not None and count > 0: return count except (ValueError, TypeError): continue elif response.status_code == 401: print("HF API: Authentication required but token may be invalid") elif response.status_code == 403: print("HF API: Access forbidden - may need owner permissions") except Exception as e: print(f"HF API request failed: {e}") # Debug logging pass # Silently fail - return None # Return None if not available (don't fallback to app's tracking) return None def track_visit(): """Track a visit - cumulative and persistent""" try: with STATS_LOCK: stats = load_stats() # Convert unique_visitors list back to set if needed if isinstance(stats.get('unique_visitors'), list): stats['unique_visitors'] = set(stats['unique_visitors']) # Get visitor information ip = get_client_ip() ua_hash = get_user_agent_hash() visitor_id = f"{ip}_{ua_hash}" country = get_country_from_ip(ip) current_date = datetime.now().strftime('%Y-%m-%d') current_time = datetime.now().isoformat() # Update statistics stats['total_visits'] = stats.get('total_visits', 0) + 1 if 'unique_visitors' not in stats: stats['unique_visitors'] = set() stats['unique_visitors'].add(visitor_id) # Track countries if 'countries' not in stats: stats['countries'] = {} if country not in stats['countries']: stats['countries'][country] = 0 stats['countries'][country] += 1 # Track visits by date if 'visits_by_date' not in stats: stats['visits_by_date'] = {} if current_date not in stats['visits_by_date']: stats['visits_by_date'][current_date] = 0 stats['visits_by_date'][current_date] += 1 # Track first and last visit if not stats.get('first_visit'): stats['first_visit'] = current_time stats['last_visit'] = current_time # Track user agents if 'user_agents' not in stats: stats['user_agents'] = {} ua = request.headers.get('User-Agent', 'Unknown') if ua not in stats['user_agents']: stats['user_agents'][ua] = 0 stats['user_agents'][ua] += 1 # Save statistics save_stats(stats) except Exception as e: # Don't let tracking errors break the app print(f"Error tracking visit: {e}") import traceback traceback.print_exc() @app.route('/') def index(): """Redirect root URL to tagger""" print("DEBUG: Root route / called") try: # Track visit track_visit() except Exception as e: print(f"Error in track_visit at root: {e}") return redirect(url_for('tagger')) @app.route('/test') def test(): """Simple test route to verify app is running""" return """
If you see this, the Flask app is working.
""" @app.errorhandler(Exception) def handle_exception(e): """Global error handler to prevent blank screens""" print(f"Unhandled exception: {e}") import traceback traceback.print_exc() return f"""An unexpected error occurred.
Error: {str(e)}
Please check the Space logs for more details.
""", 500 @app.route('/tagger') def tagger(): print("DEBUG: tagger() route called") try: # Track visit track_visit() except Exception as e: print(f"Error in track_visit: {e}") # Continue even if tracking fails # Check if dataset was loaded successfully folder_sets = app.config.get("FOLDER_SETS", []) print(f"DEBUG: folder_sets length: {len(folder_sets)}") if not folder_sets: error_msg = app.config.get("DATASET_ERROR", "No folders found with all three required image types (sr_int_full.png, -tr_line.png, -tr_int_full.png)") return f"""{error_msg}
This may be due to:
Please check the Space logs for more details.
An error occurred while accessing folder/image data.
Error: {str(e)}
Please check the Space logs for more details.
""", 500 # Ensure we have valid data before proceeding if current_folder_set is None: return f"""Unable to load folder data.
""", 500 labels = app.config["LABELS"] has_prev_folder = app.config["HEAD"] > 0 has_next_folder = app.config["HEAD"] + 1 < len(app.config["FOLDER_SETS"]) has_prev_set = image_set_index > 0 has_next_set = image_set_index + 1 < max_sets # Get statistics for display try: stats_data = load_stats() total_visits = stats_data.get('total_visits', 0) unique_count = len(stats_data['unique_visitors']) if isinstance(stats_data.get('unique_visitors'), set) else len(stats_data.get('unique_visitors', [])) countries_count = len(stats_data.get('countries', {})) except Exception as e: print(f"Error loading stats: {e}") import traceback traceback.print_exc() total_visits = 0 unique_count = 0 countries_count = 0 # Try to get HF Space "All time visits" from analytics # Only use HF value if available - don't fallback to app's tracking # Make this non-blocking with very short timeout hf_all_time_visits = None try: # Use threading to prevent blocking - but actually just call it directly with timeout # The function already has a 2-second timeout, so it should fail fast hf_all_time_visits = get_hf_all_time_visits() # Only use if we got a valid value if hf_all_time_visits is not None and hf_all_time_visits > 0: pass # Use the value else: hf_all_time_visits = None # Keep blank until HF populates it except Exception as e: # Silently fail - keep as None (blank) - don't let this break the page print(f"HF All time visits fetch failed (keeping blank): {e}") hf_all_time_visits = None print(f"DEBUG: About to render template. current_folder_set: {current_folder_set is not None}, current_images: {len(current_images)}") # Ensure we have all required variables if current_folder_set is None: print("ERROR: current_folder_set is None - cannot render template") return f"""Unable to load folder data. Please check the Space logs.
""", 500 # Validate all required template variables before rendering try: current_folder_name = current_folder_set.get('folder', 'Unknown') if isinstance(current_folder_set, dict) else 'Unknown' # Double-check all variables are valid if not isinstance(current_images, list): current_images = [] if not isinstance(labels, list): labels = [] if not isinstance(total_visits, int): total_visits = 0 if not isinstance(unique_count, int): unique_count = 0 if not isinstance(countries_count, int): countries_count = 0 print(f"DEBUG: Rendering template with {len(current_images)} images, folder: {current_folder_name}") result = render_template( 'tagger.html', has_prev_folder=has_prev_folder, has_next_folder=has_next_folder, has_prev_set=has_prev_set, has_next_set=has_next_set, directory=directory, current_folder_set=current_folder_set, current_folder=current_folder_name, current_images=current_images, labels=labels, head=app.config["HEAD"] + 1, len=len(app.config["FOLDER_SETS"]), image_set_index=image_set_index + 1, max_sets=max_sets, total_visits=total_visits, unique_visitors=unique_count, countries_count=countries_count, hf_all_time_visits=hf_all_time_visits ) print("DEBUG: Template rendered successfully, returning result") return result except Exception as e: # If template rendering fails, return a simple error page print(f"CRITICAL ERROR rendering template: {e}") import traceback traceback.print_exc() return f"""An error occurred while rendering the page.
Error: {str(e)}
Please check the Space logs for more details.
""", 500 def save_annotations_to_csv(): """Save all labeled annotations to CSV file""" # Write CSV with header and all labeled annotations with open(app.config["OUT"], 'w') as f: # Write header f.write("image,id,name,centerX,centerY,width,height\n") # Write ALL labeled annotations from current session current_count = 0 for label in app.config["LABELS"]: print(f"DEBUG: Checking label - Image: {label['image']}, ID: {label.get('id', 'None')}, Name: {label.get('name', 'None')}") if label.get("id") and label.get("name"): f.write( label["image"] + "," + label["id"] + "," + label["name"] + "," + str(round(float(label["centerX"]))) + "," + str(round(float(label["centerY"]))) + "," + str(round(float(label["width"]))) + "," + str(round(float(label["height"]))) + "\n" ) current_count += 1 print(f"DEBUG: Wrote annotation for {label['image']} with class {label['name']} (ID: {label['id']})") f.flush() # Ensure data is written to disk immediately print(f"DEBUG: Saved {current_count} labeled annotations to CSV") @app.route('/save_and_next') def save_and_next(): # Get current folder images to identify which annotations to save if app.config["HEAD"] < len(app.config["FOLDER_SETS"]): current_folder_set = app.config["FOLDER_SETS"][app.config["HEAD"]] current_folder_images = set() for image_set in current_folder_set['image_sets']: current_folder_images.add(image_set['sr_int_full']) current_folder_images.add(image_set['tr_line']) current_folder_images.add(image_set['tr_int_full']) # Read existing CSV content existing_lines = [] if os.path.exists(app.config["OUT"]): with open(app.config["OUT"], 'r') as f: existing_lines = f.readlines() # Write back CSV with header and non-current-folder annotations, plus new current folder annotations with open(app.config["OUT"], 'w') as f: # Write header f.write("image,id,name,centerX,centerY,width,height\n") # Write existing annotations that are NOT from current folder existing_count = 0 for line in existing_lines[1:]: # Skip header line = line.strip() if line: image_name = line.split(',')[0] if image_name not in current_folder_images: f.write(line + "\n") existing_count += 1 print(f"DEBUG: Wrote {existing_count} existing annotations from other folders") # Write ALL labeled annotations from current session (not just current folder) current_count = 0 for label in app.config["LABELS"]: print(f"DEBUG: Checking label - Image: {label['image']}, ID: {label.get('id', 'None')}, Name: {label.get('name', 'None')}") if label.get("id") and label.get("name"): f.write( label["image"] + "," + label["id"] + "," + label["name"] + "," + str(round(float(label["centerX"]))) + "," + str(round(float(label["centerY"]))) + "," + str(round(float(label["width"]))) + "," + str(round(float(label["height"]))) + "\n" ) current_count += 1 print(f"DEBUG: Wrote annotation for {label['image']} with class {label['name']} (ID: {label['id']})") print(f"DEBUG: Wrote {current_count} labeled annotations from all folders") # Remove current folder annotations from memory but keep others app.config["LABELS"] = [label for label in app.config["LABELS"] if label["image"] not in current_folder_images] print(f"Saved annotations for folder: {current_folder_set['folder']}") # Move to next folder, loop back to start if at the end app.config["HEAD"] += 1 if app.config["HEAD"] >= len(app.config["FOLDER_SETS"]): app.config["HEAD"] = 0 # Loop back to first folder app.config["IMAGE_SET_INDEX"] = 0 # Reset image set index print("Reached end of folders, looping back to first folder") return redirect(url_for('tagger')) @app.route('/next_folder') def next_folder(): # Save annotations before moving to next folder save_annotations_to_csv() # Move to next folder (labels persist) app.config["HEAD"] += 1 if app.config["HEAD"] >= len(app.config["FOLDER_SETS"]): app.config["HEAD"] = 0 # Loop back to first folder print("Reached end of folders, looping back to first folder") app.config["IMAGE_SET_INDEX"] = 0 # Reset to first image set # Preserve auto-play parameters if present autoplay = request.args.get('autoplay') interval = request.args.get('interval') if autoplay and interval: return redirect(url_for('tagger', autoplay=autoplay, interval=interval)) return redirect(url_for('tagger')) @app.route('/prev_folder') def prev_folder(): # Move to previous folder (labels persist) app.config["HEAD"] -= 1 if app.config["HEAD"] < 0: app.config["HEAD"] = len(app.config["FOLDER_SETS"]) - 1 # Loop to last folder print("Reached beginning of folders, looping to last folder") app.config["IMAGE_SET_INDEX"] = 0 # Reset to first image set # Preserve auto-play parameters if present autoplay = request.args.get('autoplay') interval = request.args.get('interval') if autoplay and interval: return redirect(url_for('tagger', autoplay=autoplay, interval=interval)) return redirect(url_for('tagger')) @app.route('/next_set') def next_set(): # Save annotations before moving to next set save_annotations_to_csv() # Move to next image set within current folder current_folder_set = app.config["FOLDER_SETS"][app.config["HEAD"]] max_sets = len(current_folder_set['image_sets']) current_index = app.config.get("IMAGE_SET_INDEX", 0) if current_index + 1 < max_sets: app.config["IMAGE_SET_INDEX"] = current_index + 1 else: # Reached end of sets in current folder, move to next folder if app.config["HEAD"] + 1 < len(app.config["FOLDER_SETS"]): app.config["HEAD"] += 1 app.config["IMAGE_SET_INDEX"] = 0 # Reset to first set in new folder print(f"DEBUG: Auto-advanced to next folder: {app.config['FOLDER_SETS'][app.config['HEAD']]['folder']}") else: # Reached end of all folders, loop back to beginning app.config["HEAD"] = 0 app.config["IMAGE_SET_INDEX"] = 0 print("DEBUG: Auto-looped back to first folder for continuous play") # Preserve auto-play parameters if present autoplay = request.args.get('autoplay') interval = request.args.get('interval') if autoplay and interval: return redirect(url_for('tagger', autoplay=autoplay, interval=interval)) return redirect(url_for('tagger')) @app.route('/prev_set') def prev_set(): # Move to previous image set within current folder current_index = app.config.get("IMAGE_SET_INDEX", 0) if current_index > 0: app.config["IMAGE_SET_INDEX"] = current_index - 1 # Preserve auto-play parameters if present autoplay = request.args.get('autoplay') interval = request.args.get('interval') if autoplay and interval: return redirect(url_for('tagger', autoplay=autoplay, interval=interval)) return redirect(url_for('tagger')) @app.route('/reset_annotations') def reset_annotations(): scope = request.args.get('scope', 'folder') if scope == 'all': # Reset all annotations from all folders app.config["LABELS"] = [] app.config["CLASS_TO_ID"] = {} app.config["NEXT_CLASS_ID"] = 1 print("DEBUG: Reset ALL annotations from ALL folders") elif scope == 'folder': # Reset annotations only for current folder current_folder_set = app.config["FOLDER_SETS"][app.config["HEAD"]] folder_name = current_folder_set["folder"] # Remove annotations that belong to the current folder original_count = len(app.config["LABELS"]) app.config["LABELS"] = [ label for label in app.config["LABELS"] if not any(label["image"].startswith(f"{folder_name}/") for folder_name in [folder_name]) ] removed_count = original_count - len(app.config["LABELS"]) print(f"DEBUG: Reset {removed_count} annotations from folder '{folder_name}'") # Save the updated annotations to CSV save_annotations_to_csv() return redirect(url_for('tagger')) @app.route("/bye") def bye(): return """All folders have been processed successfully.
Your annotations have been saved to:
out.csv
The CSV file contains all bounding boxes and labels you created.
You can now use this data for training machine learning models or further analysis.
Start Over| Country | Visits | Percentage |
|---|---|---|
| {country} | {count:,} | {percentage:.1f}% |
| Date | Visits |
|---|---|
| {date} | {count:,} |
| User Agent | Visits |
|---|---|
| {ua_display} | {count:,} |
Last Updated: {stats_data.get('last_visit', 'N/A')}
← Back to Tagger