import os import pandas as pd import gradio as gr import torch import re import time import gc from PIL import Image import traceback from typing import List, Dict, Any, Union, Optional, Tuple import threading from tabulate import tabulate import tempfile import shutil # Import transformers modules try: from transformers import Qwen2_5_VLForConditionalGeneration, AutoProcessor except ImportError: print("Error: Could not import Qwen2_5_VLForConditionalGeneration") print("Please install transformers from source:") print("pip install git+https://github.com/huggingface/transformers") # Global variables for tracking progress total_images = 0 processed_images = 0 successful_images = 0 failed_images = 0 print_lock = threading.Lock() model = None processor = None # =============== QWEN BATCH EXTRACTOR FUNCTIONS =============== def load_image(image_path: str, max_size: int = 1024) -> Image.Image: """ Load an image from a file path and resize it if needed to save memory. Args: image_path: Path to the image max_size: Maximum dimension (width or height) for the image Returns: Resized PIL Image """ try: image = Image.open(image_path) # Resize large images to save memory while maintaining aspect ratio width, height = image.size if width > max_size or height > max_size: scale = max_size / max(width, height) new_width = int(width * scale) new_height = int(height * scale) image = image.resize((new_width, new_height), Image.LANCZOS) return image except Exception as e: raise ValueError(f"Failed to load or resize image {image_path}: {str(e)}") def process_vision_info(messages: List[Dict[str, Any]]) -> tuple: """Extract image inputs from messages.""" image_inputs = [] video_inputs = None # Setting to None instead of empty list for message in messages: if message["role"] != "user": continue for content in message["content"]: if content["type"] == "image": if isinstance(content["image"], str): # Load image if it's a path or URL image = load_image(content["image"]) image_inputs.append(image) else: # Assume it's already a PIL Image image_inputs.append(content["image"]) return image_inputs, video_inputs def extract_fields_from_response(response: str) -> Tuple[str, str, str]: """ Extract name, affiliation, and town from the model's response. Args: response: The response from the model Returns: Tuple containing (name, affiliation, town) """ # Initialize default values name = "" affiliation = "" town = "" # Use regex to extract fields name_match = re.search(r"Name:\s*([^\n]+)", response) affiliation_match = re.search(r"Affiliation:\s*([^\n]+)", response) town_match = re.search(r"Town:\s*([^\n]+)", response) # Extract fields if matches found if name_match: name = name_match.group(1).strip() if affiliation_match: affiliation = affiliation_match.group(1).strip() if town_match: town = town_match.group(1).strip() return name, affiliation, town def process_single_image(image_path: str, model, processor, device: str, prompt: str, max_image_size: int, max_tokens: int, progress=None) -> Dict: """ Process a single image and extract name, affiliation, and town. Args: image_path: Path to the image model: The loaded Qwen model processor: The loaded processor device: Device to run inference on ("cuda" or "cpu") prompt: Text prompt to send to the model max_image_size: Maximum dimension for input images max_tokens: Maximum number of tokens to generate progress: Gradio progress object Returns: Dictionary with extracted fields and metadata """ global processed_images, successful_images, failed_images result = { "image_path": image_path, "name": "", "affiliation": "", "town": "", "success": False, "error": "", "time_taken": 0, "response": "" } try: t0 = time.time() # Load and prepare image image = load_image(image_path, max_size=max_image_size) # Create message format expected by Qwen models messages = [ { "role": "user", "content": [ {"type": "image", "image": image}, {"type": "text", "text": prompt} ] } ] # Prepare inputs text = processor.apply_chat_template( messages, tokenize=False, add_generation_prompt=True ) image_inputs, video_inputs = process_vision_info(messages) # Check if video_inputs is None, and handle accordingly if video_inputs is None: inputs = processor( text=[text], images=image_inputs, padding=True, return_tensors="pt" ) else: inputs = processor( text=[text], images=image_inputs, videos=video_inputs, padding=True, return_tensors="pt" ) # Move inputs to the appropriate device inputs = inputs.to(device) # Free some memory before generation if torch.cuda.is_available(): torch.cuda.empty_cache() gc.collect() # Generate response with memory optimizations with torch.no_grad(): generate_kwargs = { "max_new_tokens": max_tokens, "do_sample": False, # Use greedy decoding to save memory "use_cache": True, } generated_ids = model.generate( **inputs, **generate_kwargs ) # Decode only the newly generated tokens generated_ids_trimmed = [ out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs.input_ids, generated_ids) ] response = processor.batch_decode( generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False )[0] # Get first (and only) response time_taken = time.time() - t0 # Extract fields from response name, affiliation, town = extract_fields_from_response(response) # Update result dictionary result["name"] = name result["affiliation"] = affiliation result["town"] = town result["success"] = True result["time_taken"] = time_taken result["response"] = response with print_lock: processed_images += 1 successful_images += 1 if progress is not None: progress(processed_images / total_images, f"Processed: {processed_images}/{total_images} (Success: {successful_images}, Failed: {failed_images})") except Exception as e: error_msg = str(e) stack_trace = traceback.format_exc() with print_lock: processed_images += 1 failed_images += 1 if progress is not None: progress(processed_images / total_images, f"Processed: {processed_images}/{total_images} (Success: {successful_images}, Failed: {failed_images})") result["error"] = error_msg result["time_taken"] = time.time() - t0 # Clean up to free memory if torch.cuda.is_available(): torch.cuda.empty_cache() gc.collect() return result def load_model_and_processor(model_name, device, half_precision): """Load model and processor for vision processing""" global model, processor # Set up dtype for model loading if half_precision and device == "cuda": dtype = torch.float16 else: dtype = "auto" # Low memory options for CUDA attn_implementation = "sdpa" if device == "cuda" else None # Load model and processor print(f"Loading {model_name} model...") t0 = time.time() try: model = Qwen2_5_VLForConditionalGeneration.from_pretrained( model_name, torch_dtype=dtype, device_map=device, attn_implementation=attn_implementation, max_memory={0: "10GiB"} if device == "cuda" else None, # Limit GPU memory usage ) processor = AutoProcessor.from_pretrained(model_name) print(f"Model loaded in {time.time() - t0:.2f} s") return True, f"Model loaded successfully in {time.time() - t0:.2f}s" except Exception as e: error_msg = f"Error loading model: {str(e)}" print(error_msg) return False, error_msg def process_directory(directory_path: str, output_csv: str, model_name: str, prompt: str, device: str, half_precision: bool, max_image_size: int, max_tokens: int, progress=None) -> List[Dict]: """ Process all images in a directory and save results to CSV. Args: directory_path: Path to directory containing images output_csv: Path to output CSV file model_name: Name of the Qwen model to use prompt: Text prompt to send to the model device: Device to run inference on ("auto", "cuda", or "cpu") half_precision: Whether to use half precision for model max_image_size: Maximum dimension for input images max_tokens: Maximum number of tokens to generate progress: Gradio progress object Returns: List of results for each image """ global total_images, processed_images, successful_images, failed_images, model, processor # Reset counters total_images = 0 processed_images = 0 successful_images = 0 failed_images = 0 # Validate directory if not os.path.isdir(directory_path): raise ValueError(f"Directory does not exist: {directory_path}") # Find all image files in directory image_extensions = ('.jpg', '.jpeg', '.png', '.bmp', '.gif', '.webp') image_files = [ os.path.join(directory_path, f) for f in os.listdir(directory_path) if os.path.isfile(os.path.join(directory_path, f)) and f.lower().endswith(image_extensions) ] if not image_files: raise ValueError(f"No image files found in directory: {directory_path}") total_images = len(image_files) if progress is not None: progress(0, f"Found {total_images} images to process") # Enable garbage collection gc.enable() # Determine device if device == "auto": device = "cuda" if torch.cuda.is_available() else "cpu" # Check if model is already loaded if model is None or processor is None: model_map = { "qwen2-vl-2b": "Qwen/Qwen2-VL-2B-Instruct", "qwen2.5-vl-3b": "Qwen/Qwen2.5-VL-3B-Instruct", "qwen2.5-vl-7b": "Qwen/Qwen2.5-VL-7B-Instruct", } success, message = load_model_and_processor(model_map[model_name], device, half_precision) if not success: return [], message results = [] # Process images sequentially for i, image_path in enumerate(image_files): if progress is not None: progress(i / total_images, f"Processing image {i+1}/{total_images}: {os.path.basename(image_path)}") result = process_single_image( image_path, model, processor, device, prompt, max_image_size, max_tokens, progress ) results.append(result) # Write results to CSV with open(output_csv, 'w', newline='', encoding='utf-8') as csvfile: import csv fieldnames = ['image_path', 'name', 'affiliation', 'town', 'success', 'error', 'time_taken'] writer = csv.DictWriter(csvfile, fieldnames=fieldnames) writer.writeheader() for result in results: # Create a copy without the 'response' field for CSV output csv_result = {k: v for k, v in result.items() if k != 'response'} writer.writerow(csv_result) # Create summary summary = f""" Processing complete! Total images processed: {total_images} Successful extractions: {successful_images} Failed extractions: {failed_images} Results saved to: {output_csv} """ if progress is not None: progress(1.0, f"Complete! Processed {total_images} images: {successful_images} successful, {failed_images} failed") return results, summary # =============== DATA ANALYZER FUNCTIONS =============== def load_data(file_path): """Load data from CSV file.""" if not os.path.exists(file_path): return None, f"Error: File '{file_path}' not found." try: # Load CSV file with headers for name, affiliation, town df = pd.read_csv(file_path) # Ensure expected columns exist required_columns = ['name', 'affiliation', 'town'] if not all(col.lower() in map(str.lower, df.columns) for col in required_columns): return None, f"Error: CSV must contain columns for name, affiliation, and town." # Standardize column names (case insensitive) column_map = {} for col in df.columns: if col.lower() == 'name': column_map[col] = 'name' elif col.lower() == 'affiliation': column_map[col] = 'affiliation' elif col.lower() == 'town': column_map[col] = 'town' df = df.rename(columns=column_map) # Convert all string columns to lowercase for case-insensitive operations for col in ['name', 'affiliation', 'town']: if df[col].dtype == object: # Check if column contains strings df[col] = df[col].str.lower() return df, "Data loaded successfully" except Exception as e: return None, f"Error loading CSV file: {e}" def summary_by_town(df): """Generate summary statistics by town - improved formatting.""" if df is None or len(df) == 0: return "No data available for summary." town_summary = df.groupby('town').agg( total_people=('name', 'count'), affiliations=('affiliation', lambda x: len(set(x))) ).reset_index() town_summary = town_summary.sort_values('total_people', ascending=False) # Better column formatting display_summary = town_summary.copy() display_summary['town'] = display_summary['town'].str.title() display_summary.columns = ['Town', 'People', 'Affiliations'] result = "\n" + "="*50 + "\n" result += "SUMMARY BY TOWN\n" result += "="*50 + "\n" result += tabulate( display_summary, headers='keys', tablefmt='psql', showindex=False, floatfmt='.0f' ) # Display top affiliations for each town result += "\n\n" + "="*50 + "\n" result += "TOP AFFILIATIONS BY TOWN\n" result += "="*50 + "\n" for town in town_summary['town']: town_data = df[df['town'] == town] top_affiliations = town_data['affiliation'].value_counts().head(3) result += f"\nšŸ™ļø {town.upper()}:\n" result += " " + "-"*30 + "\n" for rank, (affiliation, count) in enumerate(top_affiliations.items(), 1): result += f" {rank}. {affiliation.title():<20} → {count} people\n" if len(top_affiliations) == 0: result += " No data available\n" return result def summary_by_affiliation(df): """Generate summary statistics by affiliation - improved version of your current function.""" if df is None or len(df) == 0: return "No data available for summary." affiliation_summary = df.groupby('affiliation').agg( total_people=('name', 'count'), towns=('town', lambda x: len(set(x))) ).reset_index() affiliation_summary = affiliation_summary.sort_values('total_people', ascending=False) # Better column formatting display_summary = affiliation_summary.copy() display_summary['affiliation'] = display_summary['affiliation'].str.title() display_summary.columns = ['Affiliation', 'People', 'Towns'] result = "\n" + "="*50 + "\n" result += "SUMMARY BY AFFILIATION\n" result += "="*50 + "\n" # Use 'psql' format for better readability result += tabulate( display_summary, headers='keys', tablefmt='psql', # Changed from 'simple' to 'psql' showindex=False, floatfmt='.0f' ) # Display top towns for each affiliation result += "\n\n" + "="*50 + "\n" result += "TOP TOWNS BY AFFILIATION\n" result += "="*50 + "\n" for affiliation in affiliation_summary['affiliation'].head(5).tolist(): affiliation_data = df[df['affiliation'] == affiliation] top_towns = affiliation_data['town'].value_counts().head(3) result += f"\nšŸ›ļø {affiliation.upper()}:\n" result += " " + "-"*30 + "\n" for rank, (town, count) in enumerate(top_towns.items(), 1): result += f" {rank}. {town.title():<20} → {count} people\n" if len(top_towns) == 0: result += " No data available\n" return result def search_data(df, search_term, search_field=None): """Search for records by name, town, or affiliation.""" if df is None or len(df) == 0: return "No data available for search." if not search_term: return "Please enter a search term." search_term = search_term.lower() # Convert search term to lowercase for case-insensitive matching if search_field and search_field.lower() in ['name', 'town', 'affiliation']: # Search in specific field field = search_field.lower() results = df[df[field].str.contains(search_term, na=False)] else: # Search in all fields results = df[ df['name'].str.contains(search_term, na=False) | df['town'].str.contains(search_term, na=False) | df['affiliation'].str.contains(search_term, na=False) ] if len(results) == 0: return f"No results found for '{search_term}'" else: # Format results for display, converting back to title case for readability display_results = results.copy() for col in ['name', 'town', 'affiliation']: display_results[col] = display_results[col].str.title() # Only select the columns we want to display display_results = display_results[['name', 'affiliation', 'town']] result = f"=== SEARCH RESULTS ({len(results)} matches) ===\n" result += tabulate(display_results, headers='keys', tablefmt='simple', showindex=False) return result # =============== GRADIO APP INTERFACE =============== def copy_to_temp_dir(file_list): """Copy uploaded files to a temporary directory""" temp_dir = tempfile.mkdtemp() file_paths = [] for file in file_list: file_name = os.path.basename(file.name) dst_path = os.path.join(temp_dir, file_name) shutil.copy(file.name, dst_path) file_paths.append(dst_path) return temp_dir, file_paths def unload_model(): """Unload the model to free up GPU memory""" global model, processor if model is not None: del model model = None if processor is not None: del processor processor = None if torch.cuda.is_available(): torch.cuda.empty_cache() gc.collect() return "Model unloaded successfully" def process_images_tab(files, model_name, prompt, device, half_precision, max_image_size, max_tokens, progress=gr.Progress()): """Function to handle the image processing tab""" if not files: return "", "Please upload some image files." try: # Copy uploaded files to a temporary directory temp_dir, _ = copy_to_temp_dir(files) # Process the directory of images output_csv = os.path.join(temp_dir, "name_tags_results.csv") # Determine device if device == "auto": device = "cuda" if torch.cuda.is_available() else "cpu" # Process images results, summary = process_directory( directory_path=temp_dir, output_csv=output_csv, model_name=model_name, prompt=prompt, device=device, half_precision=half_precision, max_image_size=max_image_size, max_tokens=max_tokens, progress=progress ) # Create a DataFrame from results df = pd.DataFrame([{k: v for k, v in r.items() if k != 'response'} for r in results]) return output_csv, summary except Exception as e: return "", f"Error: {str(e)}\n{traceback.format_exc()}" def analyze_csv_tab(csv_file): """Function to handle the CSV analysis tab""" if not csv_file: return "Please upload or generate a CSV file first." # Get the file path from the file object or string if isinstance(csv_file, str): file_path = csv_file else: file_path = csv_file.name # Load data from CSV df, message = load_data(file_path) if df is None: return message # Generate overview overview = f"""=== DATA OVERVIEW === Total records: {len(df)} Unique towns: {df['town'].nunique()} Unique affiliations: {df['affiliation'].nunique()} """ return overview def search_csv(csv_file, search_term, search_field): """Function to search the CSV data""" if not csv_file: return "Please upload or generate a CSV file first." if not search_term: return "Please enter a search term." # Get the file path from the file object or string if isinstance(csv_file, str): file_path = csv_file else: file_path = csv_file.name # Load data from CSV df, message = load_data(file_path) if df is None: return message # Search the data result = search_data(df, search_term, search_field) return result def summary_csv(csv_file, summary_type): """Function to generate summaries from the CSV data""" if not csv_file: return "Please upload or generate a CSV file first." # Get the file path from the file object or string if isinstance(csv_file, str): file_path = csv_file else: file_path = csv_file.name # Load data from CSV df, message = load_data(file_path) if df is None: return message # Generate appropriate summary if summary_type == "By Town": result = summary_by_town(df) elif summary_type == "By Affiliation": result = summary_by_affiliation(df) else: result = "Please select a summary type." return result # Create the Gradio interface with gr.Blocks(title="People Tag Analyzer") as app: gr.Markdown("# People Tag Analyzer") gr.Markdown("This app processes images of name tags to extract information and provides analysis tools.") # Store CSV file path between tabs csv_file_path = gr.State("") with gr.Tabs(): # Image Processing Tab with gr.Tab("Process Images"): gr.Markdown("### Step 1: Upload Images") with gr.Row(): image_files = gr.File(file_count="multiple", label="Upload Name Tag Images") gr.Markdown("### Step 2: Configure Model") with gr.Row(): with gr.Column(): model_name = gr.Dropdown( choices=["qwen2-vl-2b", "qwen2.5-vl-3b", "qwen2.5-vl-7b"], value="qwen2.5-vl-3b", label="Vision Model" ) device = gr.Dropdown( choices=["auto", "cuda", "cpu"], value="auto", label="Device" ) with gr.Column(): half_precision = gr.Checkbox( value=True, label="Use Half Precision (FP16)" ) max_image_size = gr.Slider( minimum=256, maximum=2048, value=768, step=64, label="Max Image Size" ) max_tokens = gr.Slider( minimum=64, maximum=512, value=256, step=32, label="Max Output Tokens" ) gr.Markdown("### Step 3: Set Prompt") prompt = gr.Textbox( value="Extract 'name of the person', 'affiliation of the attendee' and also extract the town name you have to get it from the affiliation, then return the results in the format 'Name: Affiliation: Town:'", label="Prompt", lines=3 ) gr.Markdown("### Step 4: Process Images") process_button = gr.Button("Process Images") unload_button = gr.Button("Unload Model (Free Memory)") with gr.Row(): output_csv = gr.Textbox(label="Output CSV Path") processing_output = gr.Textbox(label="Processing Status", lines=10) # Connect the process button process_button.click( fn=process_images_tab, inputs=[image_files, model_name, prompt, device, half_precision, max_image_size, max_tokens], outputs=[output_csv, processing_output], api_name="process_images" ) # Connect the unload button unload_button.click( fn=unload_model, inputs=[], outputs=[processing_output] ) # Update state when CSV is generated output_csv.change( fn=lambda x: x, inputs=[output_csv], outputs=[csv_file_path] ) # Data Analysis Tab with gr.Tab("Analyze Data"): gr.Markdown("### Data Input") with gr.Row(): csv_input = gr.File(label="Upload CSV File") use_processed = gr.Button("Use Processed CSV") csv_status = gr.Textbox(label="CSV Status", lines=5) # Analyze data when CSV is uploaded or selected csv_input.change( fn=analyze_csv_tab, inputs=[csv_input], outputs=[csv_status] ) # Use processed CSV from first tab use_processed.click( fn=lambda x: x, inputs=[csv_file_path], outputs=[csv_input] ).then( fn=analyze_csv_tab, inputs=[csv_file_path], outputs=[csv_status] ) gr.Markdown("### Summary") with gr.Row(): summary_type = gr.Radio( choices=["By Town", "By Affiliation"], value="By Town", label="Summary Type" ) summary_button = gr.Button("Generate Summary") summary_output = gr.Textbox(label="Summary Results", lines=20) # Generate summary when button is clicked summary_button.click( fn=summary_csv, inputs=[csv_input, summary_type], outputs=[summary_output] ) gr.Markdown("### Search") with gr.Row(): with gr.Column(): search_term = gr.Textbox(label="Search Term") search_field = gr.Dropdown( choices=["All Fields", "Name", "Town", "Affiliation"], value="All Fields", label="Search In" ) with gr.Column(): search_button = gr.Button("Search") search_output = gr.Textbox(label="Search Results", lines=15) # Search when button is clicked search_button.click( fn=search_csv, inputs=[csv_input, search_term, search_field], outputs=[search_output] ) # Also search when Enter is pressed in search term search_term.submit( fn=search_csv, inputs=[csv_input, search_term, search_field], outputs=[search_output] ) # Help/Instructions Tab with gr.Tab("Help & Instructions"): gr.Markdown(""" # People Tag Analyzer - User Guide ## Overview This application uses advanced vision models to extract names, affiliations, and towns from name tag images, then provides powerful analysis tools for the extracted data. ## How to Use ### 1. Process Images Tab #### Step 1: Upload Images - Click "Browse files" to select multiple name tag images - Supported formats: JPG, JPEG, PNG, BMP, GIF, WEBP - You can upload multiple images at once #### Step 2: Configure Model - **Vision Model**: Choose from available Qwen vision models - `qwen2-vl-2b`: Fastest, least memory usage - `qwen2.5-vl-3b`: Balanced performance (recommended) - `qwen2.5-vl-7b`: Best accuracy, requires more memory - **Device**: - `auto`: Automatically detects GPU/CPU - `cuda`: Force GPU usage (if available) - `cpu`: Force CPU usage - **Half Precision**: Use FP16 to save GPU memory (recommended for CUDA) - **Max Image Size**: Resize large images to save memory (768px recommended) - **Max Output Tokens**: Limit model output length (256 recommended) #### Step 3: Set Prompt The default prompt works well for most name tags. You can customize it if needed: - The prompt tells the model what information to extract - Format should specify the expected output structure #### Step 4: Process Images - Click "Process Images" to start extraction - Progress will be shown in real-time """) # Main execution block if __name__ == "__main__": app.launch()