Spaces:
Sleeping
Sleeping
| import os | |
| import pandas as pd | |
| import gradio as gr | |
| import torch | |
| import re | |
| import time | |
| import gc | |
| from PIL import Image | |
| import traceback | |
| from typing import List, Dict, Any, Union, Optional, Tuple | |
| import threading | |
| from tabulate import tabulate | |
| import tempfile | |
| import shutil | |
| # Import transformers modules | |
| try: | |
| from transformers import Qwen2_5_VLForConditionalGeneration, AutoProcessor | |
| except ImportError: | |
| print("Error: Could not import Qwen2_5_VLForConditionalGeneration") | |
| print("Please install transformers from source:") | |
| print("pip install git+https://github.com/huggingface/transformers") | |
| # Global variables for tracking progress | |
| total_images = 0 | |
| processed_images = 0 | |
| successful_images = 0 | |
| failed_images = 0 | |
| print_lock = threading.Lock() | |
| model = None | |
| processor = None | |
| # =============== QWEN BATCH EXTRACTOR FUNCTIONS =============== | |
| def load_image(image_path: str, max_size: int = 1024) -> Image.Image: | |
| """ | |
| Load an image from a file path and resize it if needed to save memory. | |
| Args: | |
| image_path: Path to the image | |
| max_size: Maximum dimension (width or height) for the image | |
| Returns: | |
| Resized PIL Image | |
| """ | |
| try: | |
| image = Image.open(image_path) | |
| # Resize large images to save memory while maintaining aspect ratio | |
| width, height = image.size | |
| if width > max_size or height > max_size: | |
| scale = max_size / max(width, height) | |
| new_width = int(width * scale) | |
| new_height = int(height * scale) | |
| image = image.resize((new_width, new_height), Image.LANCZOS) | |
| return image | |
| except Exception as e: | |
| raise ValueError(f"Failed to load or resize image {image_path}: {str(e)}") | |
| def process_vision_info(messages: List[Dict[str, Any]]) -> tuple: | |
| """Extract image inputs from messages.""" | |
| image_inputs = [] | |
| video_inputs = None # Setting to None instead of empty list | |
| for message in messages: | |
| if message["role"] != "user": | |
| continue | |
| for content in message["content"]: | |
| if content["type"] == "image": | |
| if isinstance(content["image"], str): | |
| # Load image if it's a path or URL | |
| image = load_image(content["image"]) | |
| image_inputs.append(image) | |
| else: | |
| # Assume it's already a PIL Image | |
| image_inputs.append(content["image"]) | |
| return image_inputs, video_inputs | |
| def extract_fields_from_response(response: str) -> Tuple[str, str, str]: | |
| """ | |
| Extract name, affiliation, and town from the model's response. | |
| Args: | |
| response: The response from the model | |
| Returns: | |
| Tuple containing (name, affiliation, town) | |
| """ | |
| # Initialize default values | |
| name = "" | |
| affiliation = "" | |
| town = "" | |
| # Use regex to extract fields | |
| name_match = re.search(r"Name:\s*([^\n]+)", response) | |
| affiliation_match = re.search(r"Affiliation:\s*([^\n]+)", response) | |
| town_match = re.search(r"Town:\s*([^\n]+)", response) | |
| # Extract fields if matches found | |
| if name_match: | |
| name = name_match.group(1).strip() | |
| if affiliation_match: | |
| affiliation = affiliation_match.group(1).strip() | |
| if town_match: | |
| town = town_match.group(1).strip() | |
| return name, affiliation, town | |
| def process_single_image(image_path: str, model, processor, device: str, | |
| prompt: str, max_image_size: int, max_tokens: int, | |
| progress=None) -> Dict: | |
| """ | |
| Process a single image and extract name, affiliation, and town. | |
| Args: | |
| image_path: Path to the image | |
| model: The loaded Qwen model | |
| processor: The loaded processor | |
| device: Device to run inference on ("cuda" or "cpu") | |
| prompt: Text prompt to send to the model | |
| max_image_size: Maximum dimension for input images | |
| max_tokens: Maximum number of tokens to generate | |
| progress: Gradio progress object | |
| Returns: | |
| Dictionary with extracted fields and metadata | |
| """ | |
| global processed_images, successful_images, failed_images | |
| result = { | |
| "image_path": image_path, | |
| "name": "", | |
| "affiliation": "", | |
| "town": "", | |
| "success": False, | |
| "error": "", | |
| "time_taken": 0, | |
| "response": "" | |
| } | |
| try: | |
| t0 = time.time() | |
| # Load and prepare image | |
| image = load_image(image_path, max_size=max_image_size) | |
| # Create message format expected by Qwen models | |
| messages = [ | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "image", "image": image}, | |
| {"type": "text", "text": prompt} | |
| ] | |
| } | |
| ] | |
| # Prepare inputs | |
| text = processor.apply_chat_template( | |
| messages, | |
| tokenize=False, | |
| add_generation_prompt=True | |
| ) | |
| image_inputs, video_inputs = process_vision_info(messages) | |
| # Check if video_inputs is None, and handle accordingly | |
| if video_inputs is None: | |
| inputs = processor( | |
| text=[text], | |
| images=image_inputs, | |
| padding=True, | |
| return_tensors="pt" | |
| ) | |
| else: | |
| inputs = processor( | |
| text=[text], | |
| images=image_inputs, | |
| videos=video_inputs, | |
| padding=True, | |
| return_tensors="pt" | |
| ) | |
| # Move inputs to the appropriate device | |
| inputs = inputs.to(device) | |
| # Free some memory before generation | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| gc.collect() | |
| # Generate response with memory optimizations | |
| with torch.no_grad(): | |
| generate_kwargs = { | |
| "max_new_tokens": max_tokens, | |
| "do_sample": False, # Use greedy decoding to save memory | |
| "use_cache": True, | |
| } | |
| generated_ids = model.generate( | |
| **inputs, | |
| **generate_kwargs | |
| ) | |
| # Decode only the newly generated tokens | |
| generated_ids_trimmed = [ | |
| out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs.input_ids, generated_ids) | |
| ] | |
| response = processor.batch_decode( | |
| generated_ids_trimmed, | |
| skip_special_tokens=True, | |
| clean_up_tokenization_spaces=False | |
| )[0] # Get first (and only) response | |
| time_taken = time.time() - t0 | |
| # Extract fields from response | |
| name, affiliation, town = extract_fields_from_response(response) | |
| # Update result dictionary | |
| result["name"] = name | |
| result["affiliation"] = affiliation | |
| result["town"] = town | |
| result["success"] = True | |
| result["time_taken"] = time_taken | |
| result["response"] = response | |
| with print_lock: | |
| processed_images += 1 | |
| successful_images += 1 | |
| if progress is not None: | |
| progress(processed_images / total_images, f"Processed: {processed_images}/{total_images} (Success: {successful_images}, Failed: {failed_images})") | |
| except Exception as e: | |
| error_msg = str(e) | |
| stack_trace = traceback.format_exc() | |
| with print_lock: | |
| processed_images += 1 | |
| failed_images += 1 | |
| if progress is not None: | |
| progress(processed_images / total_images, f"Processed: {processed_images}/{total_images} (Success: {successful_images}, Failed: {failed_images})") | |
| result["error"] = error_msg | |
| result["time_taken"] = time.time() - t0 | |
| # Clean up to free memory | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| gc.collect() | |
| return result | |
| def load_model_and_processor(model_name, device, half_precision): | |
| """Load model and processor for vision processing""" | |
| global model, processor | |
| # Set up dtype for model loading | |
| if half_precision and device == "cuda": | |
| dtype = torch.float16 | |
| else: | |
| dtype = "auto" | |
| # Low memory options for CUDA | |
| attn_implementation = "sdpa" if device == "cuda" else None | |
| # Load model and processor | |
| print(f"Loading {model_name} model...") | |
| t0 = time.time() | |
| try: | |
| model = Qwen2_5_VLForConditionalGeneration.from_pretrained( | |
| model_name, | |
| torch_dtype=dtype, | |
| device_map=device, | |
| attn_implementation=attn_implementation, | |
| max_memory={0: "10GiB"} if device == "cuda" else None, # Limit GPU memory usage | |
| ) | |
| processor = AutoProcessor.from_pretrained(model_name) | |
| print(f"Model loaded in {time.time() - t0:.2f} s") | |
| return True, f"Model loaded successfully in {time.time() - t0:.2f}s" | |
| except Exception as e: | |
| error_msg = f"Error loading model: {str(e)}" | |
| print(error_msg) | |
| return False, error_msg | |
| def process_directory(directory_path: str, output_csv: str, model_name: str, | |
| prompt: str, device: str, half_precision: bool, | |
| max_image_size: int, max_tokens: int, progress=None) -> List[Dict]: | |
| """ | |
| Process all images in a directory and save results to CSV. | |
| Args: | |
| directory_path: Path to directory containing images | |
| output_csv: Path to output CSV file | |
| model_name: Name of the Qwen model to use | |
| prompt: Text prompt to send to the model | |
| device: Device to run inference on ("auto", "cuda", or "cpu") | |
| half_precision: Whether to use half precision for model | |
| max_image_size: Maximum dimension for input images | |
| max_tokens: Maximum number of tokens to generate | |
| progress: Gradio progress object | |
| Returns: | |
| List of results for each image | |
| """ | |
| global total_images, processed_images, successful_images, failed_images, model, processor | |
| # Reset counters | |
| total_images = 0 | |
| processed_images = 0 | |
| successful_images = 0 | |
| failed_images = 0 | |
| # Validate directory | |
| if not os.path.isdir(directory_path): | |
| raise ValueError(f"Directory does not exist: {directory_path}") | |
| # Find all image files in directory | |
| image_extensions = ('.jpg', '.jpeg', '.png', '.bmp', '.gif', '.webp') | |
| image_files = [ | |
| os.path.join(directory_path, f) for f in os.listdir(directory_path) | |
| if os.path.isfile(os.path.join(directory_path, f)) and | |
| f.lower().endswith(image_extensions) | |
| ] | |
| if not image_files: | |
| raise ValueError(f"No image files found in directory: {directory_path}") | |
| total_images = len(image_files) | |
| if progress is not None: | |
| progress(0, f"Found {total_images} images to process") | |
| # Enable garbage collection | |
| gc.enable() | |
| # Determine device | |
| if device == "auto": | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| # Check if model is already loaded | |
| if model is None or processor is None: | |
| model_map = { | |
| "qwen2-vl-2b": "Qwen/Qwen2-VL-2B-Instruct", | |
| "qwen2.5-vl-3b": "Qwen/Qwen2.5-VL-3B-Instruct", | |
| "qwen2.5-vl-7b": "Qwen/Qwen2.5-VL-7B-Instruct", | |
| } | |
| success, message = load_model_and_processor(model_map[model_name], device, half_precision) | |
| if not success: | |
| return [], message | |
| results = [] | |
| # Process images sequentially | |
| for i, image_path in enumerate(image_files): | |
| if progress is not None: | |
| progress(i / total_images, f"Processing image {i+1}/{total_images}: {os.path.basename(image_path)}") | |
| result = process_single_image( | |
| image_path, model, processor, device, | |
| prompt, max_image_size, max_tokens, progress | |
| ) | |
| results.append(result) | |
| # Write results to CSV | |
| with open(output_csv, 'w', newline='', encoding='utf-8') as csvfile: | |
| import csv | |
| fieldnames = ['image_path', 'name', 'affiliation', 'town', 'success', 'error', 'time_taken'] | |
| writer = csv.DictWriter(csvfile, fieldnames=fieldnames) | |
| writer.writeheader() | |
| for result in results: | |
| # Create a copy without the 'response' field for CSV output | |
| csv_result = {k: v for k, v in result.items() if k != 'response'} | |
| writer.writerow(csv_result) | |
| # Create summary | |
| summary = f""" | |
| Processing complete! | |
| Total images processed: {total_images} | |
| Successful extractions: {successful_images} | |
| Failed extractions: {failed_images} | |
| Results saved to: {output_csv} | |
| """ | |
| if progress is not None: | |
| progress(1.0, f"Complete! Processed {total_images} images: {successful_images} successful, {failed_images} failed") | |
| return results, summary | |
| # =============== DATA ANALYZER FUNCTIONS =============== | |
| def load_data(file_path): | |
| """Load data from CSV file.""" | |
| if not os.path.exists(file_path): | |
| return None, f"Error: File '{file_path}' not found." | |
| try: | |
| # Load CSV file with headers for name, affiliation, town | |
| df = pd.read_csv(file_path) | |
| # Ensure expected columns exist | |
| required_columns = ['name', 'affiliation', 'town'] | |
| if not all(col.lower() in map(str.lower, df.columns) for col in required_columns): | |
| return None, f"Error: CSV must contain columns for name, affiliation, and town." | |
| # Standardize column names (case insensitive) | |
| column_map = {} | |
| for col in df.columns: | |
| if col.lower() == 'name': | |
| column_map[col] = 'name' | |
| elif col.lower() == 'affiliation': | |
| column_map[col] = 'affiliation' | |
| elif col.lower() == 'town': | |
| column_map[col] = 'town' | |
| df = df.rename(columns=column_map) | |
| # Convert all string columns to lowercase for case-insensitive operations | |
| for col in ['name', 'affiliation', 'town']: | |
| if df[col].dtype == object: # Check if column contains strings | |
| df[col] = df[col].str.lower() | |
| return df, "Data loaded successfully" | |
| except Exception as e: | |
| return None, f"Error loading CSV file: {e}" | |
| def summary_by_town(df): | |
| """Generate summary statistics by town - improved formatting.""" | |
| if df is None or len(df) == 0: | |
| return "No data available for summary." | |
| town_summary = df.groupby('town').agg( | |
| total_people=('name', 'count'), | |
| affiliations=('affiliation', lambda x: len(set(x))) | |
| ).reset_index() | |
| town_summary = town_summary.sort_values('total_people', ascending=False) | |
| # Better column formatting | |
| display_summary = town_summary.copy() | |
| display_summary['town'] = display_summary['town'].str.title() | |
| display_summary.columns = ['Town', 'People', 'Affiliations'] | |
| result = "\n" + "="*50 + "\n" | |
| result += "SUMMARY BY TOWN\n" | |
| result += "="*50 + "\n" | |
| result += tabulate( | |
| display_summary, | |
| headers='keys', | |
| tablefmt='psql', | |
| showindex=False, | |
| floatfmt='.0f' | |
| ) | |
| # Display top affiliations for each town | |
| result += "\n\n" + "="*50 + "\n" | |
| result += "TOP AFFILIATIONS BY TOWN\n" | |
| result += "="*50 + "\n" | |
| for town in town_summary['town']: | |
| town_data = df[df['town'] == town] | |
| top_affiliations = town_data['affiliation'].value_counts().head(3) | |
| result += f"\n🏙️ {town.upper()}:\n" | |
| result += " " + "-"*30 + "\n" | |
| for rank, (affiliation, count) in enumerate(top_affiliations.items(), 1): | |
| result += f" {rank}. {affiliation.title():<20} → {count} people\n" | |
| if len(top_affiliations) == 0: | |
| result += " No data available\n" | |
| return result | |
| def summary_by_affiliation(df): | |
| """Generate summary statistics by affiliation - improved version of your current function.""" | |
| if df is None or len(df) == 0: | |
| return "No data available for summary." | |
| affiliation_summary = df.groupby('affiliation').agg( | |
| total_people=('name', 'count'), | |
| towns=('town', lambda x: len(set(x))) | |
| ).reset_index() | |
| affiliation_summary = affiliation_summary.sort_values('total_people', ascending=False) | |
| # Better column formatting | |
| display_summary = affiliation_summary.copy() | |
| display_summary['affiliation'] = display_summary['affiliation'].str.title() | |
| display_summary.columns = ['Affiliation', 'People', 'Towns'] | |
| result = "\n" + "="*50 + "\n" | |
| result += "SUMMARY BY AFFILIATION\n" | |
| result += "="*50 + "\n" | |
| # Use 'psql' format for better readability | |
| result += tabulate( | |
| display_summary, | |
| headers='keys', | |
| tablefmt='psql', # Changed from 'simple' to 'psql' | |
| showindex=False, | |
| floatfmt='.0f' | |
| ) | |
| # Display top towns for each affiliation | |
| result += "\n\n" + "="*50 + "\n" | |
| result += "TOP TOWNS BY AFFILIATION\n" | |
| result += "="*50 + "\n" | |
| for affiliation in affiliation_summary['affiliation'].head(5).tolist(): | |
| affiliation_data = df[df['affiliation'] == affiliation] | |
| top_towns = affiliation_data['town'].value_counts().head(3) | |
| result += f"\n🏛️ {affiliation.upper()}:\n" | |
| result += " " + "-"*30 + "\n" | |
| for rank, (town, count) in enumerate(top_towns.items(), 1): | |
| result += f" {rank}. {town.title():<20} → {count} people\n" | |
| if len(top_towns) == 0: | |
| result += " No data available\n" | |
| return result | |
| def search_data(df, search_term, search_field=None): | |
| """Search for records by name, town, or affiliation.""" | |
| if df is None or len(df) == 0: | |
| return "No data available for search." | |
| if not search_term: | |
| return "Please enter a search term." | |
| search_term = search_term.lower() # Convert search term to lowercase for case-insensitive matching | |
| if search_field and search_field.lower() in ['name', 'town', 'affiliation']: | |
| # Search in specific field | |
| field = search_field.lower() | |
| results = df[df[field].str.contains(search_term, na=False)] | |
| else: | |
| # Search in all fields | |
| results = df[ | |
| df['name'].str.contains(search_term, na=False) | | |
| df['town'].str.contains(search_term, na=False) | | |
| df['affiliation'].str.contains(search_term, na=False) | |
| ] | |
| if len(results) == 0: | |
| return f"No results found for '{search_term}'" | |
| else: | |
| # Format results for display, converting back to title case for readability | |
| display_results = results.copy() | |
| for col in ['name', 'town', 'affiliation']: | |
| display_results[col] = display_results[col].str.title() | |
| # Only select the columns we want to display | |
| display_results = display_results[['name', 'affiliation', 'town']] | |
| result = f"=== SEARCH RESULTS ({len(results)} matches) ===\n" | |
| result += tabulate(display_results, headers='keys', tablefmt='simple', showindex=False) | |
| return result | |
| # =============== GRADIO APP INTERFACE =============== | |
| def copy_to_temp_dir(file_list): | |
| """Copy uploaded files to a temporary directory""" | |
| temp_dir = tempfile.mkdtemp() | |
| file_paths = [] | |
| for file in file_list: | |
| file_name = os.path.basename(file.name) | |
| dst_path = os.path.join(temp_dir, file_name) | |
| shutil.copy(file.name, dst_path) | |
| file_paths.append(dst_path) | |
| return temp_dir, file_paths | |
| def unload_model(): | |
| """Unload the model to free up GPU memory""" | |
| global model, processor | |
| if model is not None: | |
| del model | |
| model = None | |
| if processor is not None: | |
| del processor | |
| processor = None | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| gc.collect() | |
| return "Model unloaded successfully" | |
| def process_images_tab(files, model_name, prompt, device, half_precision, max_image_size, max_tokens, progress=gr.Progress()): | |
| """Function to handle the image processing tab""" | |
| if not files: | |
| return "", "Please upload some image files." | |
| try: | |
| # Copy uploaded files to a temporary directory | |
| temp_dir, _ = copy_to_temp_dir(files) | |
| # Process the directory of images | |
| output_csv = os.path.join(temp_dir, "name_tags_results.csv") | |
| # Determine device | |
| if device == "auto": | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| # Process images | |
| results, summary = process_directory( | |
| directory_path=temp_dir, | |
| output_csv=output_csv, | |
| model_name=model_name, | |
| prompt=prompt, | |
| device=device, | |
| half_precision=half_precision, | |
| max_image_size=max_image_size, | |
| max_tokens=max_tokens, | |
| progress=progress | |
| ) | |
| # Create a DataFrame from results | |
| df = pd.DataFrame([{k: v for k, v in r.items() if k != 'response'} for r in results]) | |
| return output_csv, summary | |
| except Exception as e: | |
| return "", f"Error: {str(e)}\n{traceback.format_exc()}" | |
| def analyze_csv_tab(csv_file): | |
| """Function to handle the CSV analysis tab""" | |
| if not csv_file: | |
| return "Please upload or generate a CSV file first." | |
| # Get the file path from the file object or string | |
| if isinstance(csv_file, str): | |
| file_path = csv_file | |
| else: | |
| file_path = csv_file.name | |
| # Load data from CSV | |
| df, message = load_data(file_path) | |
| if df is None: | |
| return message | |
| # Generate overview | |
| overview = f"""=== DATA OVERVIEW === | |
| Total records: {len(df)} | |
| Unique towns: {df['town'].nunique()} | |
| Unique affiliations: {df['affiliation'].nunique()} | |
| """ | |
| return overview | |
| def search_csv(csv_file, search_term, search_field): | |
| """Function to search the CSV data""" | |
| if not csv_file: | |
| return "Please upload or generate a CSV file first." | |
| if not search_term: | |
| return "Please enter a search term." | |
| # Get the file path from the file object or string | |
| if isinstance(csv_file, str): | |
| file_path = csv_file | |
| else: | |
| file_path = csv_file.name | |
| # Load data from CSV | |
| df, message = load_data(file_path) | |
| if df is None: | |
| return message | |
| # Search the data | |
| result = search_data(df, search_term, search_field) | |
| return result | |
| def summary_csv(csv_file, summary_type): | |
| """Function to generate summaries from the CSV data""" | |
| if not csv_file: | |
| return "Please upload or generate a CSV file first." | |
| # Get the file path from the file object or string | |
| if isinstance(csv_file, str): | |
| file_path = csv_file | |
| else: | |
| file_path = csv_file.name | |
| # Load data from CSV | |
| df, message = load_data(file_path) | |
| if df is None: | |
| return message | |
| # Generate appropriate summary | |
| if summary_type == "By Town": | |
| result = summary_by_town(df) | |
| elif summary_type == "By Affiliation": | |
| result = summary_by_affiliation(df) | |
| else: | |
| result = "Please select a summary type." | |
| return result | |
| # Create the Gradio interface | |
| with gr.Blocks(title="People Tag Analyzer") as app: | |
| gr.Markdown("# People Tag Analyzer") | |
| gr.Markdown("This app processes images of name tags to extract information and provides analysis tools.") | |
| # Store CSV file path between tabs | |
| csv_file_path = gr.State("") | |
| with gr.Tabs(): | |
| # Image Processing Tab | |
| with gr.Tab("Process Images"): | |
| gr.Markdown("### Step 1: Upload Images") | |
| with gr.Row(): | |
| image_files = gr.File(file_count="multiple", label="Upload Name Tag Images") | |
| gr.Markdown("### Step 2: Configure Model") | |
| with gr.Row(): | |
| with gr.Column(): | |
| model_name = gr.Dropdown( | |
| choices=["qwen2-vl-2b", "qwen2.5-vl-3b", "qwen2.5-vl-7b"], | |
| value="qwen2.5-vl-3b", | |
| label="Vision Model" | |
| ) | |
| device = gr.Dropdown( | |
| choices=["auto", "cuda", "cpu"], | |
| value="auto", | |
| label="Device" | |
| ) | |
| with gr.Column(): | |
| half_precision = gr.Checkbox( | |
| value=True, | |
| label="Use Half Precision (FP16)" | |
| ) | |
| max_image_size = gr.Slider( | |
| minimum=256, | |
| maximum=2048, | |
| value=768, | |
| step=64, | |
| label="Max Image Size" | |
| ) | |
| max_tokens = gr.Slider( | |
| minimum=64, | |
| maximum=512, | |
| value=256, | |
| step=32, | |
| label="Max Output Tokens" | |
| ) | |
| gr.Markdown("### Step 3: Set Prompt") | |
| prompt = gr.Textbox( | |
| value="Extract 'name of the person', 'affiliation of the attendee' and also extract the town name you have to get it from the affiliation, then return the results in the format 'Name: Affiliation: Town:'", | |
| label="Prompt", | |
| lines=3 | |
| ) | |
| gr.Markdown("### Step 4: Process Images") | |
| process_button = gr.Button("Process Images") | |
| unload_button = gr.Button("Unload Model (Free Memory)") | |
| with gr.Row(): | |
| output_csv = gr.Textbox(label="Output CSV Path") | |
| processing_output = gr.Textbox(label="Processing Status", lines=10) | |
| # Connect the process button | |
| process_button.click( | |
| fn=process_images_tab, | |
| inputs=[image_files, model_name, prompt, device, half_precision, max_image_size, max_tokens], | |
| outputs=[output_csv, processing_output], | |
| api_name="process_images" | |
| ) | |
| # Connect the unload button | |
| unload_button.click( | |
| fn=unload_model, | |
| inputs=[], | |
| outputs=[processing_output] | |
| ) | |
| # Update state when CSV is generated | |
| output_csv.change( | |
| fn=lambda x: x, | |
| inputs=[output_csv], | |
| outputs=[csv_file_path] | |
| ) | |
| # Data Analysis Tab | |
| with gr.Tab("Analyze Data"): | |
| gr.Markdown("### Data Input") | |
| with gr.Row(): | |
| csv_input = gr.File(label="Upload CSV File") | |
| use_processed = gr.Button("Use Processed CSV") | |
| csv_status = gr.Textbox(label="CSV Status", lines=5) | |
| # Analyze data when CSV is uploaded or selected | |
| csv_input.change( | |
| fn=analyze_csv_tab, | |
| inputs=[csv_input], | |
| outputs=[csv_status] | |
| ) | |
| # Use processed CSV from first tab | |
| use_processed.click( | |
| fn=lambda x: x, | |
| inputs=[csv_file_path], | |
| outputs=[csv_input] | |
| ).then( | |
| fn=analyze_csv_tab, | |
| inputs=[csv_file_path], | |
| outputs=[csv_status] | |
| ) | |
| gr.Markdown("### Summary") | |
| with gr.Row(): | |
| summary_type = gr.Radio( | |
| choices=["By Town", "By Affiliation"], | |
| value="By Town", | |
| label="Summary Type" | |
| ) | |
| summary_button = gr.Button("Generate Summary") | |
| summary_output = gr.Textbox(label="Summary Results", lines=20) | |
| # Generate summary when button is clicked | |
| summary_button.click( | |
| fn=summary_csv, | |
| inputs=[csv_input, summary_type], | |
| outputs=[summary_output] | |
| ) | |
| gr.Markdown("### Search") | |
| with gr.Row(): | |
| with gr.Column(): | |
| search_term = gr.Textbox(label="Search Term") | |
| search_field = gr.Dropdown( | |
| choices=["All Fields", "Name", "Town", "Affiliation"], | |
| value="All Fields", | |
| label="Search In" | |
| ) | |
| with gr.Column(): | |
| search_button = gr.Button("Search") | |
| search_output = gr.Textbox(label="Search Results", lines=15) | |
| # Search when button is clicked | |
| search_button.click( | |
| fn=search_csv, | |
| inputs=[csv_input, search_term, search_field], | |
| outputs=[search_output] | |
| ) | |
| # Also search when Enter is pressed in search term | |
| search_term.submit( | |
| fn=search_csv, | |
| inputs=[csv_input, search_term, search_field], | |
| outputs=[search_output] | |
| ) | |
| # Help/Instructions Tab | |
| with gr.Tab("Help & Instructions"): | |
| gr.Markdown(""" | |
| # People Tag Analyzer - User Guide | |
| ## Overview | |
| This application uses advanced vision models to extract names, affiliations, and towns from name tag images, then provides powerful analysis tools for the extracted data. | |
| ## How to Use | |
| ### 1. Process Images Tab | |
| #### Step 1: Upload Images | |
| - Click "Browse files" to select multiple name tag images | |
| - Supported formats: JPG, JPEG, PNG, BMP, GIF, WEBP | |
| - You can upload multiple images at once | |
| #### Step 2: Configure Model | |
| - **Vision Model**: Choose from available Qwen vision models | |
| - `qwen2-vl-2b`: Fastest, least memory usage | |
| - `qwen2.5-vl-3b`: Balanced performance (recommended) | |
| - `qwen2.5-vl-7b`: Best accuracy, requires more memory | |
| - **Device**: | |
| - `auto`: Automatically detects GPU/CPU | |
| - `cuda`: Force GPU usage (if available) | |
| - `cpu`: Force CPU usage | |
| - **Half Precision**: Use FP16 to save GPU memory (recommended for CUDA) | |
| - **Max Image Size**: Resize large images to save memory (768px recommended) | |
| - **Max Output Tokens**: Limit model output length (256 recommended) | |
| #### Step 3: Set Prompt | |
| The default prompt works well for most name tags. You can customize it if needed: | |
| - The prompt tells the model what information to extract | |
| - Format should specify the expected output structure | |
| #### Step 4: Process Images | |
| - Click "Process Images" to start extraction | |
| - Progress will be shown in real-time | |
| """) | |
| # Main execution block | |
| if __name__ == "__main__": | |
| app.launch() |