|
|
import os |
|
|
import sys |
|
|
import math |
|
|
import numpy as np |
|
|
import torch |
|
|
import torchvision.transforms as T |
|
|
from torchvision.transforms.functional import InterpolationMode |
|
|
from PIL import Image |
|
|
import gradio as gr |
|
|
from transformers import AutoModel, AutoTokenizer |
|
|
from huggingface_hub import login |
|
|
import glob |
|
|
from pathlib import Path |
|
|
import datetime |
|
|
import time |
|
|
import json |
|
|
import re |
|
|
from pdf2image import convert_from_path, convert_from_bytes |
|
|
import tempfile |
|
|
import logging |
|
|
import traceback |
|
|
import io |
|
|
import threading |
|
|
import queue |
|
|
from typing import List, Dict, Any |
|
|
|
|
|
|
|
|
|
|
|
os.makedirs("saved_outputs", exist_ok=True) |
|
|
|
|
|
|
|
|
log_file = f"saved_outputs/debug_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.log" |
|
|
logging.basicConfig( |
|
|
level=logging.DEBUG, |
|
|
format='%(asctime)s [%(levelname)s] %(message)s', |
|
|
handlers=[ |
|
|
logging.FileHandler(log_file), |
|
|
logging.StreamHandler(sys.stdout) |
|
|
] |
|
|
) |
|
|
|
|
|
|
|
|
logger = logging.getLogger("internvl_app") |
|
|
logger.setLevel(logging.DEBUG) |
|
|
|
|
|
|
|
|
logger.info("="*50) |
|
|
logger.info("InternVL2.5 Image Analyzer starting up") |
|
|
logger.info(f"Log file: {log_file}") |
|
|
logger.info(f"Python version: {sys.version}") |
|
|
logger.info(f"Torch version: {torch.__version__}") |
|
|
logger.info(f"CUDA available: {torch.cuda.is_available()}") |
|
|
if torch.cuda.is_available(): |
|
|
logger.info(f"CUDA version: {torch.version.cuda}") |
|
|
logger.info(f"GPU: {torch.cuda.get_device_name(0)}") |
|
|
logger.info("="*50) |
|
|
|
|
|
|
|
|
error_count = 0 |
|
|
warning_count = 0 |
|
|
last_error = "None" |
|
|
last_error_time = "" |
|
|
|
|
|
|
|
|
IMAGENET_MEAN = (0.485, 0.456, 0.406) |
|
|
IMAGENET_STD = (0.229, 0.224, 0.225) |
|
|
|
|
|
|
|
|
MODEL_NAME = "OpenGVLab/InternVL2_5-8B" |
|
|
IMAGE_SIZE = 448 |
|
|
OUTPUT_DIR = "saved_outputs" |
|
|
LOGS_DIR = os.path.join(OUTPUT_DIR, "logs") |
|
|
|
|
|
|
|
|
os.makedirs(OUTPUT_DIR, exist_ok=True) |
|
|
os.makedirs(LOGS_DIR, exist_ok=True) |
|
|
|
|
|
|
|
|
timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S') |
|
|
log_file = os.path.join(LOGS_DIR, f"debug_log_{timestamp}.log") |
|
|
latest_log = os.path.join(LOGS_DIR, "latest_debug.log") |
|
|
|
|
|
|
|
|
logging.basicConfig( |
|
|
level=logging.DEBUG, |
|
|
format='%(asctime)s [%(levelname)s] %(message)s', |
|
|
handlers=[ |
|
|
logging.FileHandler(log_file), |
|
|
logging.FileHandler(latest_log, mode='w'), |
|
|
logging.StreamHandler(sys.stdout) |
|
|
] |
|
|
) |
|
|
|
|
|
|
|
|
logger = logging.getLogger() |
|
|
logger.setLevel(logging.DEBUG) |
|
|
|
|
|
|
|
|
class GUILogHandler(logging.Handler): |
|
|
def __init__(self, max_entries=100): |
|
|
super().__init__() |
|
|
self.log_queue = queue.Queue() |
|
|
self.max_entries = max_entries |
|
|
self.log_entries = [] |
|
|
self.lock = threading.Lock() |
|
|
|
|
|
def emit(self, record): |
|
|
try: |
|
|
log_entry = self.format(record) |
|
|
|
|
|
|
|
|
if record.levelno >= logging.ERROR: |
|
|
gui_stats['errors'] += 1 |
|
|
gui_stats['last_error'] = record.getMessage() |
|
|
gui_stats['last_error_time'] = datetime.datetime.now().strftime("%H:%M:%S") |
|
|
|
|
|
|
|
|
if "list" in record.getMessage() and "unsqueeze" in record.getMessage(): |
|
|
gui_stats['tensor_issues'] += 1 |
|
|
|
|
|
elif record.levelno >= logging.WARNING: |
|
|
gui_stats['warnings'] += 1 |
|
|
gui_stats['last_warning'] = record.getMessage() |
|
|
gui_stats['last_warning_time'] = datetime.datetime.now().strftime("%H:%M:%S") |
|
|
|
|
|
with self.lock: |
|
|
self.log_entries.append(log_entry) |
|
|
|
|
|
if len(self.log_entries) > self.max_entries: |
|
|
self.log_entries = self.log_entries[-self.max_entries:] |
|
|
self.log_queue.put(log_entry) |
|
|
except Exception: |
|
|
self.handleError(record) |
|
|
|
|
|
def get_logs(self, last_n=None): |
|
|
with self.lock: |
|
|
if last_n is not None: |
|
|
return "\n".join(self.log_entries[-last_n:]) |
|
|
return "\n".join(self.log_entries) |
|
|
|
|
|
def get_latest(self): |
|
|
try: |
|
|
return self.log_queue.get_nowait() |
|
|
except queue.Empty: |
|
|
return None |
|
|
|
|
|
def clear(self): |
|
|
with self.lock: |
|
|
self.log_entries = [] |
|
|
|
|
|
|
|
|
def get_debug_stats(): |
|
|
uptime = datetime.datetime.now() - gui_stats['start_time'] |
|
|
hours, remainder = divmod(uptime.seconds, 3600) |
|
|
minutes, seconds = divmod(remainder, 60) |
|
|
uptime_str = f"{hours}h {minutes}m {seconds}s" |
|
|
|
|
|
return { |
|
|
'errors': gui_stats['errors'], |
|
|
'warnings': gui_stats['warnings'], |
|
|
'last_error': gui_stats['last_error'], |
|
|
'last_error_time': gui_stats['last_error_time'], |
|
|
'last_warning': gui_stats['last_warning'], |
|
|
'last_warning_time': gui_stats['last_warning_time'], |
|
|
'operations': gui_stats['operations_completed'], |
|
|
'uptime': uptime_str, |
|
|
'tensor_issues': gui_stats['tensor_issues'] |
|
|
} |
|
|
|
|
|
|
|
|
def format_debug_stats_html(): |
|
|
stats = get_debug_stats() |
|
|
|
|
|
error_color = "#ff5555" if stats['errors'] > 0 else "#555555" |
|
|
warning_color = "#ffaa00" if stats['warnings'] > 0 else "#555555" |
|
|
|
|
|
html = f""" |
|
|
<div style="margin: 10px 0; padding: 10px; border: 1px solid #ddd; border-radius: 4px; background-color: #f9f9f9;"> |
|
|
<div style="display: flex; justify-content: space-between;"> |
|
|
<div style="flex: 1;"> |
|
|
<p><strong>Errors:</strong> <span style="color: {error_color};">{stats['errors']}</span></p> |
|
|
<p><strong>Warnings:</strong> <span style="color: {warning_color};">{stats['warnings']}</span></p> |
|
|
<p><strong>Operations:</strong> {stats['operations']}</p> |
|
|
</div> |
|
|
<div style="flex: 1;"> |
|
|
<p><strong>Uptime:</strong> {stats['uptime']}</p> |
|
|
<p><strong>Tensor Issues:</strong> {stats['tensor_issues']}</p> |
|
|
</div> |
|
|
</div> |
|
|
<div style="margin-top: 10px; border-top: 1px solid #ddd; padding-top: 10px;"> |
|
|
<p><strong>Last Error:</strong> {stats['last_error_time']} - {stats['last_error']}</p> |
|
|
<p><strong>Last Warning:</strong> {stats['last_warning_time']} - {stats['last_warning']}</p> |
|
|
</div> |
|
|
</div> |
|
|
""" |
|
|
return html |
|
|
|
|
|
|
|
|
def log_tensor_info(tensor, name="tensor"): |
|
|
"""Log detailed information about a tensor or list for debugging.""" |
|
|
if tensor is None: |
|
|
logger.warning(f"{name} is None") |
|
|
return |
|
|
|
|
|
try: |
|
|
if isinstance(tensor, list): |
|
|
logger.debug(f"{name} is a list of length {len(tensor)}") |
|
|
for i, item in enumerate(tensor[:3]): |
|
|
item_type = type(item) |
|
|
item_shape = getattr(item, "shape", "unknown") |
|
|
item_dtype = getattr(item, "dtype", "unknown") |
|
|
logger.debug(f" - Item {i}: type={item_type}, shape={item_shape}, dtype={item_dtype}") |
|
|
if len(tensor) > 3: |
|
|
logger.debug(f" - ... and {len(tensor)-3} more items") |
|
|
elif isinstance(tensor, torch.Tensor): |
|
|
logger.debug(f"{name} is a tensor: shape={tensor.shape}, dtype={tensor.dtype}, device={tensor.device}") |
|
|
|
|
|
if tensor.numel() > 0: |
|
|
try: |
|
|
logger.debug(f" - Stats: min={tensor.min().item():.4f}, max={tensor.max().item():.4f}, " |
|
|
f"mean={tensor.mean().item():.4f}, std={tensor.std().item():.4f}") |
|
|
except: |
|
|
pass |
|
|
logger.debug(f" - Requires grad: {tensor.requires_grad}") |
|
|
else: |
|
|
logger.debug(f"{name} is type {type(tensor)}") |
|
|
except Exception as e: |
|
|
logger.error(f"Error logging tensor info for {name}: {str(e)}") |
|
|
|
|
|
|
|
|
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "max_split_size_mb:128" |
|
|
|
|
|
|
|
|
hf_token = os.environ.get("HUGGINGFACE_TOKEN", None) |
|
|
if hf_token: |
|
|
logger.info("Logging in to Hugging Face Hub with token...") |
|
|
login(token=hf_token) |
|
|
else: |
|
|
logger.info("No Hugging Face token found in environment. Model may not load if it's private.") |
|
|
|
|
|
|
|
|
SUPPORTED_EXTENSIONS = ['.jpg', '.jpeg', '.png', '.bmp', '.gif', '.webp', '.pdf'] |
|
|
|
|
|
|
|
|
def build_transform(input_size): |
|
|
MEAN, STD = IMAGENET_MEAN, IMAGENET_STD |
|
|
transform = T.Compose([ |
|
|
T.Lambda(lambda img: img.convert('RGB') if img.mode != 'RGB' else img), |
|
|
T.Resize((input_size, input_size), interpolation=InterpolationMode.BICUBIC), |
|
|
T.ToTensor(), |
|
|
T.Normalize(mean=MEAN, std=STD) |
|
|
]) |
|
|
return transform |
|
|
|
|
|
def find_closest_aspect_ratio(aspect_ratio, target_ratios, width, height, image_size): |
|
|
best_ratio_diff = float('inf') |
|
|
best_ratio = (1, 1) |
|
|
area = width * height |
|
|
for ratio in target_ratios: |
|
|
target_aspect_ratio = ratio[0] / ratio[1] |
|
|
ratio_diff = abs(aspect_ratio - target_aspect_ratio) |
|
|
if ratio_diff < best_ratio_diff: |
|
|
best_ratio_diff = ratio_diff |
|
|
best_ratio = ratio |
|
|
elif ratio_diff == best_ratio_diff: |
|
|
if area > 0.5 * image_size * image_size * ratio[0] * ratio[1]: |
|
|
best_ratio = ratio |
|
|
return best_ratio |
|
|
|
|
|
def dynamic_preprocess(image, min_num=1, max_num=12, image_size=448, use_thumbnail=False): |
|
|
orig_width, orig_height = image.size |
|
|
aspect_ratio = orig_width / orig_height |
|
|
|
|
|
|
|
|
target_ratios = set( |
|
|
(i, j) for n in range(min_num, max_num + 1) for i in range(1, n + 1) for j in range(1, n + 1) if |
|
|
i * j <= max_num and i * j >= min_num) |
|
|
target_ratios = sorted(target_ratios, key=lambda x: x[0] * x[1]) |
|
|
|
|
|
|
|
|
target_aspect_ratio = find_closest_aspect_ratio( |
|
|
aspect_ratio, target_ratios, orig_width, orig_height, image_size) |
|
|
|
|
|
|
|
|
target_width = image_size * target_aspect_ratio[0] |
|
|
target_height = image_size * target_aspect_ratio[1] |
|
|
blocks = target_aspect_ratio[0] * target_aspect_ratio[1] |
|
|
|
|
|
|
|
|
resized_img = image.resize((target_width, target_height)) |
|
|
processed_images = [] |
|
|
for i in range(blocks): |
|
|
box = ( |
|
|
(i % (target_width // image_size)) * image_size, |
|
|
(i // (target_width // image_size)) * image_size, |
|
|
((i % (target_width // image_size)) + 1) * image_size, |
|
|
((i // (target_width // image_size)) + 1) * image_size |
|
|
) |
|
|
|
|
|
split_img = resized_img.crop(box) |
|
|
processed_images.append(split_img) |
|
|
assert len(processed_images) == blocks |
|
|
if use_thumbnail and len(processed_images) != 1: |
|
|
thumbnail_img = image.resize((image_size, image_size)) |
|
|
processed_images.append(thumbnail_img) |
|
|
return processed_images |
|
|
|
|
|
|
|
|
def load_image(image_pil, max_num=12): |
|
|
try: |
|
|
|
|
|
print(f"load_image received image of type: {type(image_pil)}, size: {image_pil.size if hasattr(image_pil, 'size') else 'unknown'}") |
|
|
|
|
|
|
|
|
processed_images = dynamic_preprocess(image_pil, image_size=IMAGE_SIZE, max_num=max_num) |
|
|
|
|
|
|
|
|
transform = build_transform(IMAGE_SIZE) |
|
|
pixel_values = [transform(img) for img in processed_images] |
|
|
|
|
|
|
|
|
print(f"After transforms, pixel_values is a list of length {len(pixel_values)}, first element type: {type(pixel_values[0])}") |
|
|
|
|
|
|
|
|
try: |
|
|
pixel_values = torch.stack(pixel_values) |
|
|
print(f"Successfully stacked tensors into shape: {pixel_values.shape}") |
|
|
except Exception as stack_error: |
|
|
print(f"Error during tensor stacking: {str(stack_error)}") |
|
|
|
|
|
fixed_values = [] |
|
|
for i, val in enumerate(pixel_values): |
|
|
if not isinstance(val, torch.Tensor): |
|
|
print(f"Item {i} is not a tensor, type: {type(val)}") |
|
|
try: |
|
|
|
|
|
if not isinstance(val, np.ndarray): |
|
|
if hasattr(val, 'numpy'): |
|
|
val = val.numpy() |
|
|
else: |
|
|
val = np.array(val) |
|
|
|
|
|
val = torch.from_numpy(val).float() |
|
|
fixed_values.append(val) |
|
|
except Exception as convert_err: |
|
|
print(f"Failed to convert item {i}: {str(convert_err)}") |
|
|
|
|
|
continue |
|
|
else: |
|
|
fixed_values.append(val) |
|
|
|
|
|
if not fixed_values: |
|
|
raise ValueError("No valid tensor data could be extracted from the image") |
|
|
|
|
|
pixel_values = torch.stack(fixed_values) |
|
|
|
|
|
|
|
|
if torch.cuda.is_available(): |
|
|
pixel_values = pixel_values.cuda().to(torch.bfloat16) |
|
|
else: |
|
|
pixel_values = pixel_values.to(torch.float32) |
|
|
|
|
|
print(f"Final tensor shape: {pixel_values.shape}, dtype: {pixel_values.dtype}") |
|
|
return pixel_values |
|
|
except Exception as e: |
|
|
print(f"Error in load_image: {str(e)}") |
|
|
import traceback |
|
|
print(traceback.format_exc()) |
|
|
|
|
|
|
|
|
try: |
|
|
print("Attempting direct tensor conversion...") |
|
|
|
|
|
image_pil = image_pil.convert('RGB') |
|
|
transform = build_transform(IMAGE_SIZE) |
|
|
tensor = transform(image_pil) |
|
|
|
|
|
|
|
|
if not isinstance(tensor, torch.Tensor): |
|
|
print(f"Warning: transform did not return a tensor, got {type(tensor)}") |
|
|
if hasattr(tensor, 'numpy'): |
|
|
tensor = torch.from_numpy(tensor.numpy()).float() |
|
|
else: |
|
|
tensor = torch.tensor(tensor, dtype=torch.float32) |
|
|
|
|
|
tensor = tensor.unsqueeze(0) |
|
|
|
|
|
if torch.cuda.is_available(): |
|
|
tensor = tensor.cuda().to(torch.bfloat16) |
|
|
else: |
|
|
tensor = tensor.to(torch.float32) |
|
|
|
|
|
print(f"Recovery successful, tensor shape: {tensor.shape}") |
|
|
return tensor |
|
|
except Exception as recovery_error: |
|
|
print(f"Recovery attempt also failed: {str(recovery_error)}") |
|
|
print(traceback.format_exc()) |
|
|
|
|
|
|
|
|
try: |
|
|
print("Creating fallback dummy tensor...") |
|
|
dummy_tensor = torch.zeros((1, 3, IMAGE_SIZE, IMAGE_SIZE), |
|
|
dtype=torch.float32) |
|
|
if torch.cuda.is_available(): |
|
|
dummy_tensor = dummy_tensor.cuda().to(torch.bfloat16) |
|
|
print("Returning dummy tensor as last resort") |
|
|
return dummy_tensor |
|
|
except: |
|
|
print("Even dummy tensor creation failed. Cannot proceed.") |
|
|
return None |
|
|
|
|
|
|
|
|
def split_model(model_name): |
|
|
device_map = {} |
|
|
world_size = torch.cuda.device_count() |
|
|
if world_size <= 1: |
|
|
return "auto" |
|
|
|
|
|
num_layers = { |
|
|
'InternVL2_5-1B': 24, |
|
|
'InternVL2_5-2B': 24, |
|
|
'InternVL2_5-4B': 36, |
|
|
'InternVL2_5-8B': 32, |
|
|
'InternVL2_5-26B': 48, |
|
|
'InternVL2_5-38B': 64, |
|
|
'InternVL2_5-78B': 80 |
|
|
}[model_name] |
|
|
|
|
|
|
|
|
num_layers_per_gpu = math.ceil(num_layers / (world_size - 0.5)) |
|
|
num_layers_per_gpu = [num_layers_per_gpu] * world_size |
|
|
num_layers_per_gpu[0] = math.ceil(num_layers_per_gpu[0] * 0.5) |
|
|
layer_cnt = 0 |
|
|
for i, num_layer in enumerate(num_layers_per_gpu): |
|
|
for j in range(num_layer): |
|
|
device_map[f'language_model.model.layers.{layer_cnt}'] = i |
|
|
layer_cnt += 1 |
|
|
device_map['vision_model'] = 0 |
|
|
device_map['mlp1'] = 0 |
|
|
device_map['language_model.model.tok_embeddings'] = 0 |
|
|
device_map['language_model.model.embed_tokens'] = 0 |
|
|
device_map['language_model.model.rotary_emb'] = 0 |
|
|
device_map['language_model.output'] = 0 |
|
|
device_map['language_model.model.norm'] = 0 |
|
|
device_map['language_model.lm_head'] = 0 |
|
|
device_map[f'language_model.model.layers.{num_layers - 1}'] = 0 |
|
|
|
|
|
return device_map |
|
|
|
|
|
|
|
|
def get_model_dtype(): |
|
|
return torch.bfloat16 if torch.cuda.is_available() else torch.float32 |
|
|
|
|
|
|
|
|
def load_model(): |
|
|
print(f"\n=== Loading {MODEL_NAME} ===") |
|
|
print(f"CUDA available: {torch.cuda.is_available()}") |
|
|
|
|
|
model_dtype = get_model_dtype() |
|
|
print(f"Using model dtype: {model_dtype}") |
|
|
|
|
|
if torch.cuda.is_available(): |
|
|
print(f"GPU count: {torch.cuda.device_count()}") |
|
|
for i in range(torch.cuda.device_count()): |
|
|
print(f"GPU {i}: {torch.cuda.get_device_name(i)}") |
|
|
|
|
|
|
|
|
print(f"Total GPU memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.2f} GB") |
|
|
print(f"Allocated GPU memory: {torch.cuda.memory_allocated() / 1e9:.2f} GB") |
|
|
print(f"Reserved GPU memory: {torch.cuda.memory_reserved() / 1e9:.2f} GB") |
|
|
|
|
|
|
|
|
device_map = "auto" |
|
|
if torch.cuda.is_available() and torch.cuda.device_count() > 1: |
|
|
model_short_name = MODEL_NAME.split('/')[-1] |
|
|
device_map = split_model(model_short_name) |
|
|
|
|
|
|
|
|
try: |
|
|
print(f"Starting model download/loading from {MODEL_NAME}...") |
|
|
|
|
|
model = AutoModel.from_pretrained( |
|
|
MODEL_NAME, |
|
|
torch_dtype=model_dtype, |
|
|
low_cpu_mem_usage=True, |
|
|
trust_remote_code=True, |
|
|
device_map=device_map, |
|
|
token=hf_token, |
|
|
cache_dir="model_cache" |
|
|
) |
|
|
|
|
|
tokenizer = AutoTokenizer.from_pretrained( |
|
|
MODEL_NAME, |
|
|
use_fast=False, |
|
|
trust_remote_code=True, |
|
|
token=hf_token |
|
|
) |
|
|
|
|
|
print(f"✓ Model and tokenizer loaded successfully!") |
|
|
return model, tokenizer |
|
|
except Exception as e: |
|
|
print(f"❌ Error loading model: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
|
|
|
|
|
|
try: |
|
|
print("Attempting to load smaller model as fallback...") |
|
|
fallback_model = "OpenGVLab/InternVL2_5-1B" |
|
|
|
|
|
model = AutoModel.from_pretrained( |
|
|
fallback_model, |
|
|
torch_dtype=model_dtype, |
|
|
low_cpu_mem_usage=True, |
|
|
trust_remote_code=True, |
|
|
device_map="auto", |
|
|
token=hf_token |
|
|
) |
|
|
|
|
|
tokenizer = AutoTokenizer.from_pretrained( |
|
|
fallback_model, |
|
|
use_fast=False, |
|
|
trust_remote_code=True, |
|
|
token=hf_token |
|
|
) |
|
|
|
|
|
print(f"✓ Fallback model loaded successfully!") |
|
|
return model, tokenizer |
|
|
except Exception as e2: |
|
|
print(f"❌ Error loading fallback model: {e2}") |
|
|
traceback.print_exc() |
|
|
return None, None |
|
|
|
|
|
|
|
|
def analyze_single_image(model, tokenizer, image, prompt): |
|
|
try: |
|
|
|
|
|
if image is None: |
|
|
return "Please upload an image first." |
|
|
|
|
|
|
|
|
pixel_values = load_image(image) |
|
|
|
|
|
|
|
|
print(f"Image processed: tensor shape {pixel_values.shape}, dtype {pixel_values.dtype}") |
|
|
|
|
|
|
|
|
generation_config = { |
|
|
"max_new_tokens": 512, |
|
|
"do_sample": False |
|
|
} |
|
|
|
|
|
|
|
|
question = f"<image>\n{prompt}" |
|
|
response, _ = model.chat( |
|
|
tokenizer=tokenizer, |
|
|
pixel_values=pixel_values, |
|
|
question=question, |
|
|
generation_config=generation_config, |
|
|
history=None, |
|
|
return_history=True |
|
|
) |
|
|
|
|
|
return response |
|
|
except Exception as e: |
|
|
import traceback |
|
|
error_msg = f"Error analyzing image: {str(e)}\n{traceback.format_exc()}" |
|
|
return error_msg |
|
|
|
|
|
|
|
|
def analyze_dual_images(model, tokenizer, image1, image2, prompt): |
|
|
try: |
|
|
|
|
|
if image1 is None and image2 is None: |
|
|
return "Please upload at least one image." |
|
|
|
|
|
results = [] |
|
|
|
|
|
|
|
|
if image1 is not None: |
|
|
first_prompt = f"First image: {prompt}" |
|
|
first_result = analyze_single_image(model, tokenizer, image1, first_prompt) |
|
|
results.append(f"FIRST IMAGE ANALYSIS:\n{first_result}") |
|
|
|
|
|
|
|
|
if image2 is not None: |
|
|
second_prompt = f"Second image: {prompt}" |
|
|
second_result = analyze_single_image(model, tokenizer, image2, second_prompt) |
|
|
results.append(f"SECOND IMAGE ANALYSIS:\n{second_result}") |
|
|
|
|
|
|
|
|
return "\n\n---\n\n".join(results) |
|
|
except Exception as e: |
|
|
import traceback |
|
|
error_msg = f"Error analyzing images: {str(e)}\n{traceback.format_exc()}" |
|
|
return error_msg |
|
|
|
|
|
|
|
|
def process_pdf(pdf_path=None, pdf_file=None): |
|
|
"""Process a PDF file and return a list of PIL images.""" |
|
|
try: |
|
|
logger.info(f"Processing PDF: path={pdf_path}, file_upload={pdf_file is not None}") |
|
|
|
|
|
if pdf_path is not None and os.path.exists(pdf_path): |
|
|
|
|
|
file_size = os.path.getsize(pdf_path) / 1024 |
|
|
logger.info(f"PDF file details: path={pdf_path}, size={file_size:.2f} KB") |
|
|
|
|
|
|
|
|
print(f"[DEBUG] Processing PDF from path: {pdf_path}") |
|
|
print(f"[DEBUG] File exists: {os.path.exists(pdf_path)}, Size: {file_size:.2f} KB") |
|
|
|
|
|
|
|
|
try: |
|
|
logger.debug(f"Converting PDF to images using convert_from_path: {pdf_path}") |
|
|
with open(pdf_path, 'rb') as f: |
|
|
file_content = f.read() |
|
|
logger.debug(f"PDF file read: {len(file_content)} bytes") |
|
|
|
|
|
|
|
|
if len(file_content) >= 8: |
|
|
header_hex = ' '.join([f'{b:02x}' for b in file_content[:8]]) |
|
|
logger.info(f"PDF header hex: {header_hex}") |
|
|
print(f"[DEBUG] PDF header hex: {header_hex}") |
|
|
|
|
|
|
|
|
if not file_content.startswith(b'%PDF'): |
|
|
logger.warning(f"File does not have PDF header: {pdf_path}") |
|
|
print(f"[WARNING] File does not have PDF header: {pdf_path}") |
|
|
|
|
|
images = convert_from_path(pdf_path) |
|
|
logger.info(f"PDF converted successfully using convert_from_path: {len(images)} pages") |
|
|
return images |
|
|
except Exception as path_err: |
|
|
logger.error(f"Error converting PDF using path method: {str(path_err)}") |
|
|
logger.error(traceback.format_exc()) |
|
|
print(f"[ERROR] Convert from path failed: {str(path_err)}") |
|
|
|
|
|
|
|
|
try: |
|
|
logger.debug("Falling back to convert_from_bytes method") |
|
|
with open(pdf_path, 'rb') as pdf_file: |
|
|
pdf_data = pdf_file.read() |
|
|
logger.debug(f"Read {len(pdf_data)} bytes from PDF file") |
|
|
|
|
|
images = convert_from_bytes(pdf_data) |
|
|
logger.info(f"PDF converted successfully using convert_from_bytes: {len(images)} pages") |
|
|
return images |
|
|
except Exception as bytes_err: |
|
|
logger.error(f"Error converting PDF using bytes method: {str(bytes_err)}") |
|
|
logger.error(traceback.format_exc()) |
|
|
print(f"[ERROR] Convert from bytes also failed: {str(bytes_err)}") |
|
|
raise |
|
|
|
|
|
elif pdf_file is not None: |
|
|
logger.info("Processing uploaded PDF file") |
|
|
print(f"[DEBUG] Processing uploaded PDF file") |
|
|
|
|
|
if hasattr(pdf_file, 'name'): |
|
|
logger.debug(f"Uploaded PDF filename: {pdf_file.name}") |
|
|
|
|
|
try: |
|
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as temp_file: |
|
|
temp_file.write(pdf_file.read()) |
|
|
temp_path = temp_file.name |
|
|
|
|
|
logger.debug(f"Created temporary file: {temp_path}") |
|
|
print(f"[DEBUG] Created temp file: {temp_path}") |
|
|
|
|
|
|
|
|
images = convert_from_path(temp_path) |
|
|
logger.info(f"PDF converted successfully: {len(images)} pages") |
|
|
|
|
|
|
|
|
os.unlink(temp_path) |
|
|
return images |
|
|
except Exception as upload_err: |
|
|
logger.error(f"Error processing uploaded PDF: {str(upload_err)}") |
|
|
logger.error(traceback.format_exc()) |
|
|
print(f"[ERROR] Processing uploaded PDF failed: {str(upload_err)}") |
|
|
raise |
|
|
else: |
|
|
error_msg = "No PDF file provided (both pdf_path and pdf_file are None or invalid)" |
|
|
logger.error(error_msg) |
|
|
print(f"[ERROR] {error_msg}") |
|
|
return None |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Critical error in PDF processing: {str(e)}") |
|
|
logger.error(traceback.format_exc()) |
|
|
print(f"[CRITICAL] PDF processing failed: {str(e)}") |
|
|
print(traceback.format_exc()) |
|
|
|
|
|
|
|
|
gui_stats['errors'] += 1 |
|
|
gui_stats['last_error'] = f"PDF processing error: {str(e)}" |
|
|
gui_stats['last_error_time'] = datetime.datetime.now().strftime("%H:%M:%S") |
|
|
|
|
|
|
|
|
raise |
|
|
|
|
|
|
|
|
def analyze_with_prompt(image_input, prompt): |
|
|
"""Analyze images with a specific prompt and InternVL model.""" |
|
|
try: |
|
|
if image_input is None: |
|
|
return "Please provide valid image input." |
|
|
|
|
|
if isinstance(image_input, list) and len(image_input) == 0: |
|
|
return "No valid images found." |
|
|
|
|
|
|
|
|
if hasattr(image_input, 'name') and image_input.name.lower().endswith('.pdf'): |
|
|
with tempfile.NamedTemporaryFile(suffix='.pdf', delete=False) as temp_pdf: |
|
|
temp_pdf.write(image_input.read()) |
|
|
temp_pdf_path = temp_pdf.name |
|
|
|
|
|
|
|
|
pdf_images = process_pdf(pdf_path=temp_pdf_path) |
|
|
if not pdf_images: |
|
|
os.unlink(temp_pdf_path) |
|
|
return "Failed to process PDF file." |
|
|
|
|
|
images = pdf_images |
|
|
os.unlink(temp_pdf_path) |
|
|
else: |
|
|
|
|
|
if isinstance(image_input, (str, Image.Image)): |
|
|
images = [Image.open(image_input) if isinstance(image_input, str) else image_input] |
|
|
else: |
|
|
|
|
|
images = [image_input] |
|
|
|
|
|
|
|
|
results = [] |
|
|
for img in images: |
|
|
if not isinstance(img, Image.Image): |
|
|
img = Image.open(img) |
|
|
|
|
|
img = img.convert('RGB') |
|
|
|
|
|
|
|
|
|
|
|
result = process_image_with_text(img, prompt) |
|
|
results.append(result) |
|
|
|
|
|
|
|
|
if len(results) == 1: |
|
|
return results[0] |
|
|
else: |
|
|
combined_result = f"Analysis of {len(results)} page(s):\n\n" |
|
|
for i, res in enumerate(results): |
|
|
combined_result += f"--- Page {i+1} ---\n{res}\n\n" |
|
|
return combined_result |
|
|
|
|
|
except Exception as e: |
|
|
return f"Error analyzing image: {str(e)}" |
|
|
|
|
|
|
|
|
def process_image_folder(model, tokenizer, folder_path, prompt): |
|
|
if not folder_path: |
|
|
return "Please provide a valid folder path." |
|
|
|
|
|
|
|
|
print(f"Attempting to access folder: {folder_path}") |
|
|
print(f"Current working directory: {os.getcwd()}") |
|
|
print(f"Directory contents: {os.listdir('.')}") |
|
|
|
|
|
|
|
|
potential_paths = [ |
|
|
folder_path, |
|
|
os.path.join(os.getcwd(), folder_path), |
|
|
os.path.join("/app", folder_path), |
|
|
os.path.abspath(folder_path) |
|
|
] |
|
|
|
|
|
|
|
|
valid_path = None |
|
|
for path in potential_paths: |
|
|
if os.path.exists(path) and os.path.isdir(path): |
|
|
valid_path = path |
|
|
print(f"Found valid path: {valid_path}") |
|
|
break |
|
|
|
|
|
if not valid_path: |
|
|
available_dirs = [d for d in os.listdir('.') if os.path.isdir(d)] |
|
|
return f"Error: Could not find valid directory at {folder_path}. Available directories: {', '.join(available_dirs)}" |
|
|
|
|
|
|
|
|
folder_path = Path(valid_path) |
|
|
|
|
|
|
|
|
image_files = [] |
|
|
for ext in SUPPORTED_EXTENSIONS: |
|
|
image_files.extend(folder_path.glob(f"*{ext}")) |
|
|
image_files.extend(folder_path.glob(f"*{ext.upper()}")) |
|
|
|
|
|
if not image_files: |
|
|
return f"No image files found in {folder_path}. Supported formats: {', '.join(SUPPORTED_EXTENSIONS)}" |
|
|
|
|
|
|
|
|
image_files.sort() |
|
|
|
|
|
results = [] |
|
|
results.append(f"Found {len(image_files)} images in {folder_path}\n") |
|
|
|
|
|
|
|
|
for i, img_path in enumerate(image_files, 1): |
|
|
try: |
|
|
|
|
|
image = Image.open(img_path) |
|
|
|
|
|
|
|
|
file_prompt = f"Image file {i}/{len(image_files)} - {img_path.name}: {prompt}" |
|
|
|
|
|
|
|
|
result = analyze_single_image(model, tokenizer, image, file_prompt) |
|
|
|
|
|
|
|
|
results.append(f"---\nImage {i}/{len(image_files)}: {img_path.name}\n{result}\n") |
|
|
|
|
|
except Exception as e: |
|
|
results.append(f"---\nError processing {img_path.name}: {str(e)}\n") |
|
|
|
|
|
return "\n".join(results) |
|
|
|
|
|
|
|
|
def generate_filename(prefix="analysis", ext="txt"): |
|
|
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
return f"{prefix}_{timestamp}.{ext}" |
|
|
|
|
|
|
|
|
def save_to_file(content, filename=None, prompt=None): |
|
|
if filename is None: |
|
|
filename = generate_filename() |
|
|
elif not filename.endswith('.txt'): |
|
|
filename = f"{filename}_{generate_filename()}" |
|
|
|
|
|
|
|
|
filename = filename.strip() |
|
|
|
|
|
filepath = os.path.join(OUTPUT_DIR, filename) |
|
|
|
|
|
try: |
|
|
with open(filepath, 'w', encoding='utf-8') as f: |
|
|
if prompt: |
|
|
f.write(f"Prompt: {prompt}\n\n") |
|
|
f.write(content) |
|
|
return f"Results saved to {filepath}" |
|
|
except Exception as e: |
|
|
return f"Error saving results: {str(e)}" |
|
|
|
|
|
|
|
|
def save_to_json(content, source_type, prompt, filename=None): |
|
|
if filename is None: |
|
|
filename = generate_filename(prefix=f"{source_type}_analysis", ext="json") |
|
|
|
|
|
|
|
|
filename = filename.strip() |
|
|
|
|
|
filepath = os.path.join(OUTPUT_DIR, filename) |
|
|
|
|
|
|
|
|
formatted_json = format_analysis_to_json(content) |
|
|
|
|
|
try: |
|
|
with open(filepath, 'w', encoding='utf-8') as f: |
|
|
json.dump(formatted_json, f, indent=2, ensure_ascii=False) |
|
|
return f"JSON results saved to {filepath}", filename |
|
|
except Exception as e: |
|
|
return f"Error saving JSON results: {str(e)}", None |
|
|
|
|
|
|
|
|
def format_analysis_to_json(content): |
|
|
result = {} |
|
|
|
|
|
|
|
|
result["images"] = [] |
|
|
|
|
|
|
|
|
if "Found" in content and "images in" in content: |
|
|
lines = content.split("\n") |
|
|
|
|
|
|
|
|
image_sections = content.split("---\n") |
|
|
|
|
|
|
|
|
for section in image_sections[1:]: |
|
|
if not section.strip(): |
|
|
continue |
|
|
|
|
|
image_data = {} |
|
|
|
|
|
|
|
|
first_line = section.strip().split("\n")[0] |
|
|
if "Image" in first_line and ":" in first_line: |
|
|
image_name = first_line.split(":")[1].strip() |
|
|
image_data["filename"] = image_name |
|
|
|
|
|
|
|
|
description_lines = section.strip().split("\n")[1:] |
|
|
image_data["description"] = "\n".join(description_lines) |
|
|
|
|
|
|
|
|
if "### Title:" in section: |
|
|
title_section = section.split("### Title:")[1].split("###")[0].strip() |
|
|
image_data["title"] = title_section |
|
|
|
|
|
if "### Key Points:" in section: |
|
|
key_points_section = section.split("### Key Points:")[1].split("###")[0].strip() |
|
|
|
|
|
points = [] |
|
|
for line in key_points_section.split("\n"): |
|
|
if line.strip() and line.strip()[0].isdigit() and "." in line: |
|
|
points.append(line.strip()) |
|
|
image_data["key_points"] = points |
|
|
|
|
|
if "### Visual Elements:" in section: |
|
|
visual_section = section.split("### Visual Elements:")[1].split("###")[0].strip() |
|
|
image_data["visual_elements"] = visual_section |
|
|
|
|
|
|
|
|
result["images"].append(image_data) |
|
|
else: |
|
|
|
|
|
result["images"] = [{ |
|
|
"filename": "single_image", |
|
|
"description": content |
|
|
}] |
|
|
|
|
|
return result |
|
|
|
|
|
|
|
|
def list_output_files(): |
|
|
try: |
|
|
if not os.path.exists(OUTPUT_DIR): |
|
|
return "No saved outputs found." |
|
|
|
|
|
files = sorted(os.listdir(OUTPUT_DIR), reverse=True) |
|
|
if not files: |
|
|
return "No saved outputs found." |
|
|
|
|
|
file_list = [f"# Saved Analysis Files\n\nFiles are stored in the `{OUTPUT_DIR}` directory.\n\n"] |
|
|
|
|
|
for i, file in enumerate(files, 1): |
|
|
file_path = os.path.join(OUTPUT_DIR, file) |
|
|
file_size = os.path.getsize(file_path) / 1024 |
|
|
mod_time = datetime.datetime.fromtimestamp(os.path.getmtime(file_path)) |
|
|
time_str = mod_time.strftime("%Y-%m-%d %H:%M:%S") |
|
|
|
|
|
file_list.append(f"{i}. **{file}** ({file_size:.1f} KB) - {time_str}\n") |
|
|
|
|
|
return "".join(file_list) |
|
|
except Exception as e: |
|
|
return f"Error listing files: {str(e)}" |
|
|
|
|
|
|
|
|
def convert_to_html(content, title="Image Analysis Results"): |
|
|
"""Convert analysis text to formatted HTML.""" |
|
|
|
|
|
|
|
|
def md_to_html(text): |
|
|
|
|
|
text = re.sub(r'\*\*(.*?)\*\*', r'<strong>\1</strong>', text) |
|
|
|
|
|
|
|
|
text = re.sub(r'### (.*)', r'<h3>\1</h3>', text) |
|
|
|
|
|
|
|
|
if text.strip() and text.strip()[0].isdigit() and ". " in text: |
|
|
return f"<li>{text}</li>" |
|
|
if text.strip().startswith("- "): |
|
|
return f"<li>{text[2:]}</li>" |
|
|
|
|
|
return text |
|
|
|
|
|
|
|
|
html = f"""<!DOCTYPE html> |
|
|
<html> |
|
|
<head> |
|
|
<meta charset="UTF-8"> |
|
|
<title>{title}</title> |
|
|
<style> |
|
|
body {{ font-family: Arial, sans-serif; line-height: 1.6; max-width: 900px; margin: 0 auto; padding: 20px; }} |
|
|
h1 {{ color: #2c3e50; border-bottom: 1px solid #eee; padding-bottom: 10px; }} |
|
|
h2 {{ color: #3498db; margin-top: 30px; }} |
|
|
h3 {{ color: #2980b9; }} |
|
|
.image-section {{ background-color: #f9f9f9; border: 1px solid #ddd; border-radius: 5px; padding: 20px; margin-bottom: 20px; }} |
|
|
.image-header {{ display: flex; justify-content: space-between; }} |
|
|
.separator {{ border-top: 1px dashed #ccc; margin: 30px 0; }} |
|
|
pre {{ background-color: #f8f8f8; padding: 10px; border-radius: 5px; overflow-x: auto; }} |
|
|
ul, ol {{ padding-left: 25px; }} |
|
|
</style> |
|
|
</head> |
|
|
<body> |
|
|
<h1>{title}</h1> |
|
|
""" |
|
|
|
|
|
|
|
|
if "Found" in content and "images in" in content: |
|
|
|
|
|
parts = content.split("\n") |
|
|
if parts and "Found" in parts[0]: |
|
|
html += f"<p>{parts[0]}</p>\n" |
|
|
|
|
|
image_sections = content.split("---\n") |
|
|
|
|
|
|
|
|
for section in image_sections[1:]: |
|
|
if not section.strip(): |
|
|
continue |
|
|
|
|
|
|
|
|
section_lines = section.strip().split("\n") |
|
|
image_name = "" |
|
|
if section_lines and "Image" in section_lines[0] and ":" in section_lines[0]: |
|
|
image_name = section_lines[0].split(":")[1].strip() |
|
|
html += f'<div class="image-section">\n' |
|
|
html += f'<div class="image-header"><h2>{section_lines[0]}</h2></div>\n' |
|
|
|
|
|
|
|
|
in_list = False |
|
|
for line in section_lines[1:]: |
|
|
|
|
|
if line.strip().startswith("- ") or (line.strip() and line.strip()[0].isdigit() and ". " in line): |
|
|
if not in_list: |
|
|
html += "<ul>\n" |
|
|
in_list = True |
|
|
html += f"{md_to_html(line)}\n" |
|
|
elif line.strip().startswith("###"): |
|
|
if in_list: |
|
|
html += "</ul>\n" |
|
|
in_list = False |
|
|
html += f"{md_to_html(line)}\n" |
|
|
elif line.strip() == "": |
|
|
if in_list: |
|
|
html += "</ul>\n" |
|
|
in_list = False |
|
|
html += "<p></p>\n" |
|
|
else: |
|
|
if in_list: |
|
|
html += "</ul>\n" |
|
|
in_list = False |
|
|
html += f"<p>{md_to_html(line)}</p>\n" |
|
|
|
|
|
if in_list: |
|
|
html += "</ul>\n" |
|
|
|
|
|
html += '</div>\n' |
|
|
else: |
|
|
|
|
|
html += f'<div class="image-section">\n' |
|
|
html += f'<h2>Single Image Analysis</h2>\n' |
|
|
|
|
|
in_list = False |
|
|
for line in content.split("\n"): |
|
|
if line.strip().startswith("- ") or (line.strip() and line.strip()[0].isdigit() and ". " in line): |
|
|
if not in_list: |
|
|
html += "<ul>\n" |
|
|
in_list = True |
|
|
html += f"{md_to_html(line)}\n" |
|
|
elif line.strip().startswith("###"): |
|
|
if in_list: |
|
|
html += "</ul>\n" |
|
|
in_list = False |
|
|
html += f"{md_to_html(line)}\n" |
|
|
elif line.strip() == "": |
|
|
if in_list: |
|
|
html += "</ul>\n" |
|
|
in_list = False |
|
|
html += "<p></p>\n" |
|
|
else: |
|
|
if in_list: |
|
|
html += "</ul>\n" |
|
|
in_list = False |
|
|
html += f"<p>{md_to_html(line)}</p>\n" |
|
|
|
|
|
if in_list: |
|
|
html += "</ul>\n" |
|
|
|
|
|
html += '</div>\n' |
|
|
|
|
|
|
|
|
html += """ |
|
|
<div class="separator"></div> |
|
|
<p><em>Generated by InternVL2.5 Image Analyzer on """ + datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + """</em></p> |
|
|
</body> |
|
|
</html>""" |
|
|
|
|
|
return html |
|
|
|
|
|
|
|
|
def save_to_html(content, source_type, filename=None): |
|
|
if filename is None: |
|
|
filename = generate_filename(prefix=f"{source_type}_analysis", ext="html") |
|
|
|
|
|
|
|
|
filename = filename.strip() |
|
|
|
|
|
filepath = os.path.join(OUTPUT_DIR, filename) |
|
|
|
|
|
try: |
|
|
html_content = convert_to_html(content, title=f"{source_type.capitalize()} Image Analysis Results") |
|
|
with open(filepath, 'w', encoding='utf-8') as f: |
|
|
f.write(html_content) |
|
|
return f"HTML results saved to {filepath}", filename |
|
|
except Exception as e: |
|
|
return f"Error saving HTML results: {str(e)}", None |
|
|
|
|
|
|
|
|
def analyze_folder_images(folder_path, prompt): |
|
|
"""Analyze all images in a folder.""" |
|
|
|
|
|
print(f"\n\n===== FOLDER ANALYSIS STARTED =====") |
|
|
print(f"Folder path: {folder_path}") |
|
|
print(f"Prompt: {prompt}") |
|
|
print(f"Current directory: {os.getcwd()}") |
|
|
print(f"Directory exists: {os.path.exists(folder_path)}") |
|
|
|
|
|
|
|
|
logger.info(f"analyze_folder_images called with path: '{folder_path}'") |
|
|
|
|
|
if not folder_path or folder_path.strip() == "": |
|
|
error_msg = "No folder path provided. Please enter a valid folder path." |
|
|
logger.error(error_msg) |
|
|
print(f"ERROR: {error_msg}") |
|
|
return error_msg |
|
|
|
|
|
|
|
|
folder_path = folder_path.strip() |
|
|
logger.debug(f"Cleaned folder path: '{folder_path}'") |
|
|
|
|
|
|
|
|
potential_paths = [ |
|
|
folder_path, |
|
|
os.path.join(os.getcwd(), folder_path), |
|
|
os.path.normpath(folder_path), |
|
|
os.path.abspath(folder_path), |
|
|
os.path.expanduser(folder_path) |
|
|
] |
|
|
|
|
|
|
|
|
if os.path.exists("/data"): |
|
|
potential_paths.append(os.path.join("/data", folder_path)) |
|
|
|
|
|
|
|
|
print(f"Trying the following paths:") |
|
|
for i, path in enumerate(potential_paths): |
|
|
print(f" {i+1}. {path} (exists: {os.path.exists(path)})") |
|
|
|
|
|
|
|
|
valid_path = None |
|
|
for test_path in potential_paths: |
|
|
logger.debug(f"Testing path: '{test_path}'") |
|
|
if os.path.exists(test_path): |
|
|
logger.debug(f"Path exists: '{test_path}'") |
|
|
if os.path.isdir(test_path): |
|
|
valid_path = test_path |
|
|
logger.info(f"Found valid directory path: '{valid_path}'") |
|
|
print(f"FOUND VALID PATH: {valid_path}") |
|
|
break |
|
|
else: |
|
|
logger.debug(f"Path exists but is not a directory: '{test_path}'") |
|
|
|
|
|
if not valid_path: |
|
|
error_msg = f"Could not find a valid directory at '{folder_path}'. Please provide a complete and valid folder path." |
|
|
logger.error(error_msg) |
|
|
print(f"ERROR: {error_msg}") |
|
|
|
|
|
|
|
|
try: |
|
|
available_dirs = [d for d in os.listdir('.') if os.path.isdir(d)] |
|
|
print(f"Available directories in current location: {', '.join(available_dirs)}") |
|
|
if available_dirs: |
|
|
return f"Error: {error_msg}\n\nAvailable directories in current location: {', '.join(available_dirs)}" |
|
|
else: |
|
|
return f"Error: {error_msg}\n\nNo directories found in the current location." |
|
|
except Exception as list_err: |
|
|
print(f"Error listing directories: {str(list_err)}") |
|
|
return f"Error: {error_msg}" |
|
|
|
|
|
|
|
|
folder_path = Path(valid_path) |
|
|
logger.debug(f"Using folder path: {folder_path}") |
|
|
|
|
|
|
|
|
image_files = [] |
|
|
for ext in SUPPORTED_EXTENSIONS: |
|
|
logger.debug(f"Searching for files with extension: {ext}") |
|
|
print(f"Searching for *{ext} files") |
|
|
|
|
|
found_files = list(folder_path.glob(f"*{ext.lower()}")) |
|
|
found_files.extend(list(folder_path.glob(f"*{ext.upper()}"))) |
|
|
image_files.extend(found_files) |
|
|
print(f"Found {len(found_files)} files with extension {ext}") |
|
|
|
|
|
logger.info(f"Found {len(image_files)} images in {folder_path}") |
|
|
print(f"Total files found: {len(image_files)}") |
|
|
|
|
|
if not image_files: |
|
|
error_msg = f"No supported image files found in '{folder_path}'. Supported formats: {', '.join(SUPPORTED_EXTENSIONS)}" |
|
|
logger.warning(error_msg) |
|
|
print(f"WARNING: {error_msg}") |
|
|
return error_msg |
|
|
|
|
|
|
|
|
image_files.sort() |
|
|
|
|
|
|
|
|
print("Files to process:") |
|
|
for i, file in enumerate(image_files): |
|
|
print(f" {i+1}. {file.name}") |
|
|
|
|
|
results = [] |
|
|
results.append(f"Found {len(image_files)} images in {folder_path}\n") |
|
|
|
|
|
|
|
|
for i, img_path in enumerate(image_files, 1): |
|
|
try: |
|
|
logger.info(f"Processing image {i}/{len(image_files)}: {img_path.name}") |
|
|
print(f"\nProcessing file {i}/{len(image_files)}: {img_path.name}") |
|
|
|
|
|
|
|
|
is_pdf = img_path.suffix.lower() == '.pdf' |
|
|
|
|
|
if is_pdf: |
|
|
logger.info(f"Processing PDF file: {img_path}") |
|
|
print(f"This is a PDF file: {img_path}") |
|
|
try: |
|
|
|
|
|
logger.debug(f"Converting PDF to images: {img_path}") |
|
|
print(f"Converting PDF to images...") |
|
|
|
|
|
|
|
|
if not os.path.exists(img_path): |
|
|
raise FileNotFoundError(f"PDF file not found: {img_path}") |
|
|
|
|
|
|
|
|
file_size = os.path.getsize(img_path) / 1024 |
|
|
print(f"PDF file size: {file_size:.2f} KB") |
|
|
|
|
|
try: |
|
|
|
|
|
with open(img_path, 'rb') as f: |
|
|
header = f.read(10) |
|
|
print(f"File header (hex): {' '.join([f'{b:02x}' for b in header])}") |
|
|
if not header.startswith(b'%PDF'): |
|
|
print(f"WARNING: File does not have PDF header") |
|
|
except Exception as read_err: |
|
|
print(f"Error reading file header: {str(read_err)}") |
|
|
|
|
|
|
|
|
try: |
|
|
pdf_images = convert_from_path(str(img_path)) |
|
|
print(f"PDF converted to {len(pdf_images)} pages") |
|
|
except Exception as pdf_err: |
|
|
print(f"Error converting PDF: {str(pdf_err)}") |
|
|
print(traceback.format_exc()) |
|
|
raise |
|
|
|
|
|
if not pdf_images or len(pdf_images) == 0: |
|
|
error_msg = f"PDF conversion failed for {img_path.name}: No pages extracted" |
|
|
logger.error(error_msg) |
|
|
print(f"ERROR: {error_msg}") |
|
|
results.append(f"---\nImage {i}/{len(image_files)}: {img_path.name}\nError: PDF conversion failed - no pages extracted\n") |
|
|
continue |
|
|
|
|
|
|
|
|
logger.info(f"PDF converted to {len(pdf_images)} pages") |
|
|
pdf_results = [] |
|
|
|
|
|
for page_num, page_img in enumerate(pdf_images, 1): |
|
|
try: |
|
|
logger.debug(f"Processing PDF page {page_num}/{len(pdf_images)}") |
|
|
print(f"Processing PDF page {page_num}/{len(pdf_images)}") |
|
|
page_prompt = f"PDF {img_path.name} - Page {page_num}/{len(pdf_images)}: {prompt}" |
|
|
page_result = process_image_with_text(page_img, page_prompt) |
|
|
pdf_results.append(f"-- Page {page_num} --\n{page_result}") |
|
|
except Exception as page_err: |
|
|
error_msg = f"Error processing PDF page {page_num}: {str(page_err)}" |
|
|
logger.error(error_msg) |
|
|
logger.error(traceback.format_exc()) |
|
|
print(f"ERROR: {error_msg}") |
|
|
print(traceback.format_exc()) |
|
|
pdf_results.append(f"-- Page {page_num} --\nError: {str(page_err)}") |
|
|
|
|
|
|
|
|
results.append(f"---\nImage {i}/{len(image_files)}: {img_path.name} (PDF with {len(pdf_images)} pages)\n" + |
|
|
"\n".join(pdf_results) + "\n") |
|
|
|
|
|
except Exception as pdf_err: |
|
|
error_msg = f"Error processing PDF {img_path.name}: {str(pdf_err)}" |
|
|
logger.error(error_msg) |
|
|
logger.error(traceback.format_exc()) |
|
|
print(f"ERROR: {error_msg}") |
|
|
print(traceback.format_exc()) |
|
|
results.append(f"---\nImage {i}/{len(image_files)}: {img_path.name}\nError processing PDF: {str(pdf_err)}\n") |
|
|
else: |
|
|
|
|
|
try: |
|
|
|
|
|
print(f"Processing regular image file") |
|
|
image = Image.open(img_path).convert('RGB') |
|
|
logger.debug(f"Image loaded: size={image.size}, mode={image.mode}") |
|
|
print(f"Image loaded: size={image.size}, mode={image.mode}") |
|
|
|
|
|
|
|
|
image_prompt = f"Image {i}/{len(image_files)} - {img_path.name}: {prompt}" |
|
|
logger.debug(f"Processing image with prompt: {image_prompt}") |
|
|
image_result = process_image_with_text(image, image_prompt) |
|
|
|
|
|
|
|
|
results.append(f"---\nImage {i}/{len(image_files)}: {img_path.name}\n{image_result}\n") |
|
|
|
|
|
|
|
|
logger.info(f"Successfully processed image {i}/{len(image_files)}: {img_path.name}") |
|
|
print(f"Successfully processed image {i}/{len(image_files)}: {img_path.name}") |
|
|
except Exception as img_err: |
|
|
error_msg = f"Error opening/processing image {img_path.name}: {str(img_err)}" |
|
|
logger.error(error_msg) |
|
|
logger.error(traceback.format_exc()) |
|
|
print(f"ERROR: {error_msg}") |
|
|
print(traceback.format_exc()) |
|
|
results.append(f"---\nImage {i}/{len(image_files)}: {img_path.name}\nError opening/processing image: {str(img_err)}\n") |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Error processing image {img_path.name}: {str(e)}" |
|
|
logger.error(error_msg) |
|
|
logger.error(traceback.format_exc()) |
|
|
print(f"ERROR: {error_msg}") |
|
|
print(traceback.format_exc()) |
|
|
results.append(f"---\nImage {i}/{len(image_files)}: {img_path.name}\nError: {str(e)}\n") |
|
|
|
|
|
print("===== FOLDER ANALYSIS COMPLETE =====\n\n") |
|
|
combined_result = "\n".join(results) |
|
|
logger.info(f"Folder analysis complete, processed {len(image_files)} images") |
|
|
return combined_result |
|
|
|
|
|
|
|
|
def process_image_with_text(image, prompt): |
|
|
"""Process a single image with the InternVL model and a text prompt.""" |
|
|
start_time = time.time() |
|
|
|
|
|
|
|
|
gui_stats['operations_completed'] += 1 |
|
|
|
|
|
try: |
|
|
logger.info(f"process_image_with_text called with image type: {type(image)}") |
|
|
|
|
|
|
|
|
if hasattr(image, 'size'): |
|
|
logger.debug(f"Image dimensions: {image.size}") |
|
|
if hasattr(image, 'mode'): |
|
|
logger.debug(f"Image mode: {image.mode}") |
|
|
|
|
|
|
|
|
if torch.cuda.is_available(): |
|
|
logger.debug(f"GPU memory allocated: {torch.cuda.memory_allocated() / 1e9:.2f} GB") |
|
|
logger.debug(f"GPU memory reserved: {torch.cuda.memory_reserved() / 1e9:.2f} GB") |
|
|
|
|
|
|
|
|
logger.debug("Loading model") |
|
|
model, tokenizer = load_model() |
|
|
if model is None or tokenizer is None: |
|
|
logger.error("Model failed to load") |
|
|
return "Error loading model. Please check the logs for details." |
|
|
|
|
|
logger.debug("Model loaded successfully") |
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
logger.debug("Converting image to RGB if needed") |
|
|
if hasattr(image, 'convert'): |
|
|
image = image.convert('RGB') |
|
|
logger.debug(f"After conversion: mode={image.mode}, size={image.size}") |
|
|
else: |
|
|
logger.error("Image does not have convert method") |
|
|
return "Error: Unable to convert image to RGB" |
|
|
|
|
|
|
|
|
logger.debug(f"Resizing image to {IMAGE_SIZE}x{IMAGE_SIZE}") |
|
|
if hasattr(image, 'resize'): |
|
|
image_resized = image.resize((IMAGE_SIZE, IMAGE_SIZE)) |
|
|
logger.debug(f"After resize: size={image_resized.size}") |
|
|
else: |
|
|
logger.error("Image does not have resize method") |
|
|
return "Error: Unable to resize image" |
|
|
|
|
|
|
|
|
logger.debug("Creating transform") |
|
|
transform = T.Compose([ |
|
|
T.ToTensor(), |
|
|
T.Normalize(mean=IMAGENET_MEAN, std=IMAGENET_STD) |
|
|
]) |
|
|
|
|
|
|
|
|
logger.debug("Converting image to tensor") |
|
|
tensor = transform(image_resized) |
|
|
|
|
|
|
|
|
if isinstance(tensor, torch.Tensor): |
|
|
logger.debug(f"Image transformed to tensor: shape={tensor.shape}, dtype={tensor.dtype}") |
|
|
if tensor.numel() > 0: |
|
|
logger.debug(f"Tensor stats: min={tensor.min().item():.4f}, max={tensor.max().item():.4f}, " |
|
|
f"mean={tensor.mean().item():.4f}, std={tensor.std().item():.4f}") |
|
|
else: |
|
|
logger.error(f"Transform did not return a tensor: {type(tensor)}") |
|
|
raise TypeError(f"Expected torch.Tensor but got {type(tensor)}") |
|
|
|
|
|
|
|
|
logger.debug("Adding batch dimension if needed") |
|
|
if len(tensor.shape) == 3: |
|
|
tensor = tensor.unsqueeze(0) |
|
|
logger.debug(f"Added batch dimension, new shape: {tensor.shape}") |
|
|
|
|
|
|
|
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
logger.debug(f"Moving tensor to device: {device}") |
|
|
tensor = tensor.to(device) |
|
|
|
|
|
if torch.cuda.is_available(): |
|
|
logger.debug("Converting tensor to bfloat16") |
|
|
tensor = tensor.to(torch.bfloat16) |
|
|
logger.debug(f"Tensor converted to bfloat16, new dtype: {tensor.dtype}") |
|
|
|
|
|
logger.info(f"Final tensor prepared: shape={tensor.shape}, device={tensor.device}, dtype={tensor.dtype}") |
|
|
except Exception as tensor_err: |
|
|
error_msg = f"Error in tensor creation: {str(tensor_err)}" |
|
|
logger.error(error_msg) |
|
|
logger.error(traceback.format_exc()) |
|
|
|
|
|
gui_stats['errors'] += 1 |
|
|
gui_stats['last_error'] = error_msg |
|
|
gui_stats['last_error_time'] = datetime.datetime.now().strftime("%H:%M:%S") |
|
|
return f"Error preparing image for analysis: {str(tensor_err)}" |
|
|
|
|
|
|
|
|
logger.debug(f"Tokenizing prompt: {prompt}") |
|
|
input_tokens = tokenizer(prompt, return_tensors="pt").to(device) |
|
|
logger.debug(f"Input tokens shape: {input_tokens['input_ids'].shape}") |
|
|
|
|
|
|
|
|
with torch.inference_mode(): |
|
|
try: |
|
|
|
|
|
logger.info("Attempting direct generation") |
|
|
|
|
|
|
|
|
logger.debug(f"Checking input token tensor: shape={input_tokens['input_ids'].shape}, device={input_tokens['input_ids'].device}") |
|
|
logger.debug(f"Checking image tensor: shape={tensor.shape}, device={tensor.device}") |
|
|
|
|
|
output_ids = model.generate( |
|
|
input_tokens["input_ids"], |
|
|
tensor, |
|
|
max_new_tokens=512, |
|
|
temperature=0.1, |
|
|
do_sample=False |
|
|
) |
|
|
|
|
|
logger.info("Direct generation successful") |
|
|
logger.debug(f"Output IDs shape: {output_ids.shape}") |
|
|
|
|
|
output = tokenizer.decode(output_ids[0], skip_special_tokens=True) |
|
|
logger.debug(f"Decoded output length: {len(output)} chars") |
|
|
|
|
|
|
|
|
elapsed = time.time() - start_time |
|
|
logger.info(f"Image processing completed in {elapsed:.2f} seconds") |
|
|
|
|
|
return output.strip() |
|
|
except Exception as gen_error: |
|
|
error_msg = f"Direct generation failed: {str(gen_error)}" |
|
|
logger.error(error_msg) |
|
|
logger.error(traceback.format_exc()) |
|
|
|
|
|
gui_stats['errors'] += 1 |
|
|
gui_stats['last_error'] = error_msg |
|
|
gui_stats['last_error_time'] = datetime.datetime.now().strftime("%H:%M:%S") |
|
|
|
|
|
|
|
|
try: |
|
|
logger.info("Attempting chat method") |
|
|
question = f"<image>\n{prompt}" |
|
|
logger.debug(f"Chat question: {question}") |
|
|
|
|
|
|
|
|
if not isinstance(tensor, torch.Tensor): |
|
|
logger.error(f"Chat method: expected torch.Tensor but got {type(tensor)}") |
|
|
raise TypeError(f"Expected torch.Tensor but got {type(tensor)}") |
|
|
|
|
|
response, _ = model.chat( |
|
|
tokenizer=tokenizer, |
|
|
pixel_values=tensor, |
|
|
question=question, |
|
|
generation_config={"max_new_tokens": 512, "do_sample": False}, |
|
|
history=None, |
|
|
return_history=True |
|
|
) |
|
|
|
|
|
logger.info("Chat method successful") |
|
|
logger.debug(f"Chat response length: {len(response)} chars") |
|
|
|
|
|
|
|
|
elapsed = time.time() - start_time |
|
|
logger.info(f"Image processing (fallback chat) completed in {elapsed:.2f} seconds") |
|
|
|
|
|
return response.strip() |
|
|
except Exception as chat_error: |
|
|
error_msg = f"Chat method failed: {str(chat_error)}" |
|
|
logger.error(error_msg) |
|
|
logger.error(traceback.format_exc()) |
|
|
|
|
|
gui_stats['errors'] += 1 |
|
|
gui_stats['last_error'] = error_msg |
|
|
gui_stats['last_error_time'] = datetime.datetime.now().strftime("%H:%M:%S") |
|
|
|
|
|
|
|
|
try: |
|
|
logger.info("Attempting direct model forward call") |
|
|
|
|
|
if hasattr(model, "forward"): |
|
|
logger.debug("Model has forward method") |
|
|
|
|
|
|
|
|
logger.debug("Preparing inputs for direct forward pass") |
|
|
inputs = { |
|
|
"input_ids": input_tokens["input_ids"], |
|
|
"pixel_values": tensor, |
|
|
"return_dict": True, |
|
|
} |
|
|
|
|
|
|
|
|
for k, v in inputs.items(): |
|
|
if hasattr(v, 'shape'): |
|
|
logger.debug(f"Input '{k}' shape: {v.shape}") |
|
|
|
|
|
|
|
|
logger.debug("Calling model.forward") |
|
|
outputs = model(**inputs) |
|
|
|
|
|
|
|
|
if hasattr(outputs, "logits") and outputs.logits is not None: |
|
|
logger.debug(f"Got logits with shape: {outputs.logits.shape}") |
|
|
|
|
|
pred_ids = torch.argmax(outputs.logits, dim=-1) |
|
|
logger.debug(f"Prediction IDs shape: {pred_ids.shape}") |
|
|
|
|
|
response = tokenizer.decode(pred_ids[0], skip_special_tokens=True) |
|
|
logger.debug(f"Decoded response length: {len(response)} chars") |
|
|
|
|
|
|
|
|
elapsed = time.time() - start_time |
|
|
logger.info(f"Image processing (fallback forward) completed in {elapsed:.2f} seconds") |
|
|
|
|
|
return response.strip() |
|
|
else: |
|
|
error_msg = "Model output does not contain logits" |
|
|
logger.error(error_msg) |
|
|
gui_stats['errors'] += 1 |
|
|
gui_stats['last_error'] = error_msg |
|
|
gui_stats['last_error_time'] = datetime.datetime.now().strftime("%H:%M:%S") |
|
|
return "Failed to analyze image - model output contains no usable data" |
|
|
else: |
|
|
error_msg = "Model does not have forward method" |
|
|
logger.error(error_msg) |
|
|
gui_stats['errors'] += 1 |
|
|
gui_stats['last_error'] = error_msg |
|
|
gui_stats['last_error_time'] = datetime.datetime.now().strftime("%H:%M:%S") |
|
|
return "Failed to analyze image - model doesn't support direct calling" |
|
|
except Exception as forward_error: |
|
|
error_msg = f"Forward method failed: {str(forward_error)}" |
|
|
logger.error(error_msg) |
|
|
logger.error(traceback.format_exc()) |
|
|
|
|
|
|
|
|
gui_stats['errors'] += 1 |
|
|
gui_stats['last_error'] = error_msg |
|
|
gui_stats['last_error_time'] = datetime.datetime.now().strftime("%H:%M:%S") |
|
|
|
|
|
|
|
|
return f"Error generating analysis: All methods failed to process the image" |
|
|
except Exception as e: |
|
|
error_msg = f"Fatal error in process_image_with_text: {str(e)}" |
|
|
logger.error(error_msg) |
|
|
logger.error(traceback.format_exc()) |
|
|
|
|
|
|
|
|
gui_stats['errors'] += 1 |
|
|
gui_stats['last_error'] = error_msg |
|
|
gui_stats['last_error_time'] = datetime.datetime.now().strftime("%H:%M:%S") |
|
|
|
|
|
return f"Error processing image: {str(e)}" |
|
|
|
|
|
|
|
|
def get_latest_log_content(): |
|
|
"""Get the content of the latest log file for display in the UI.""" |
|
|
try: |
|
|
log_files = sorted(glob.glob(os.path.join(OUTPUT_DIR, "debug_log_*.log"))) |
|
|
if not log_files: |
|
|
return "No log files found." |
|
|
|
|
|
latest_log = log_files[-1] |
|
|
with open(latest_log, 'r') as f: |
|
|
|
|
|
lines = f.readlines() |
|
|
last_lines = lines[-100:] if len(lines) > 100 else lines |
|
|
return "".join(last_lines) |
|
|
except Exception as e: |
|
|
return f"Error reading log file: {str(e)}" |
|
|
|
|
|
|
|
|
gui_stats = { |
|
|
'errors': 0, |
|
|
'warnings': 0, |
|
|
'last_error': 'None', |
|
|
'last_warning': 'None', |
|
|
'last_error_time': '', |
|
|
'last_warning_time': '', |
|
|
'operations_completed': 0, |
|
|
'start_time': datetime.datetime.now(), |
|
|
'tensor_issues': 0 |
|
|
} |
|
|
|
|
|
|
|
|
def read_log_file(): |
|
|
"""Read and return the contents of the current log file.""" |
|
|
try: |
|
|
if not os.path.exists(log_file): |
|
|
return "Log file not found. The application may have just started." |
|
|
|
|
|
with open(log_file, 'r', encoding='utf-8') as f: |
|
|
content = f.read() |
|
|
if not content: |
|
|
return "Log file is empty. Waiting for events..." |
|
|
return content |
|
|
except Exception as e: |
|
|
return f"Error reading log file: {str(e)}" |
|
|
|
|
|
|
|
|
def main(): |
|
|
|
|
|
model, tokenizer = load_model() |
|
|
|
|
|
if model is None: |
|
|
|
|
|
demo = gr.Interface( |
|
|
fn=lambda x: "Model loading failed. Please check the logs for details.", |
|
|
inputs=gr.Textbox(), |
|
|
outputs=gr.Textbox(), |
|
|
title="InternVL2.5 Image Analyzer - Error", |
|
|
description="The model failed to load. Please check the logs for more information." |
|
|
) |
|
|
return demo |
|
|
|
|
|
|
|
|
prompts = [ |
|
|
"Describe this image in detail.", |
|
|
"What can you tell me about this image?", |
|
|
"Is there any text in this image? If so, can you read it?", |
|
|
"What is the main subject of this image?", |
|
|
"What emotions or feelings does this image convey?", |
|
|
"Describe the composition and visual elements of this image.", |
|
|
"Summarize what you see in this image in one paragraph." |
|
|
] |
|
|
|
|
|
|
|
|
with gr.Blocks(title="InternVL2.5 Image Analyzer", theme=gr.themes.Soft()) as demo: |
|
|
gr.Markdown("# InternVL2.5 Image Analyzer") |
|
|
gr.Markdown("Analyze images using the InternVL2.5 model. You can upload individual images or analyze all images in a folder.") |
|
|
|
|
|
|
|
|
with gr.Tabs() as tabs: |
|
|
|
|
|
with gr.Tab("Debug Logs"): |
|
|
gr.Markdown("## Application Logs") |
|
|
gr.Markdown("View real-time application logs and debug information.") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=3): |
|
|
logs_output = gr.Textbox( |
|
|
label="Application Logs", |
|
|
value=read_log_file(), |
|
|
lines=30, |
|
|
max_lines=50, |
|
|
autoscroll=True |
|
|
) |
|
|
with gr.Column(scale=1): |
|
|
refresh_logs_btn = gr.Button("Refresh Logs") |
|
|
log_info = gr.Markdown(f"Current log file: {log_file}") |
|
|
error_stats = gr.Markdown(f"Error count: {gui_stats['errors']}") |
|
|
|
|
|
refresh_logs_btn.click( |
|
|
fn=read_log_file, |
|
|
inputs=[], |
|
|
outputs=[logs_output] |
|
|
) |
|
|
|
|
|
|
|
|
gr.File(label="Download Complete Log File", value=log_file) |
|
|
|
|
|
|
|
|
with gr.Tab("Single Image Analysis"): |
|
|
with gr.Row(): |
|
|
image_input = gr.Image(type="pil", label="Upload Image or PDF") |
|
|
|
|
|
prompt_single = gr.Dropdown( |
|
|
choices=prompts, |
|
|
value=prompts[0], |
|
|
label="Select a prompt or write your own", |
|
|
allow_custom_value=True |
|
|
) |
|
|
|
|
|
analyze_btn_single = gr.Button("Analyze") |
|
|
output_single = gr.Textbox(label="Analysis Output", lines=20) |
|
|
|
|
|
|
|
|
save_btn_single = gr.Button("Save Results to File") |
|
|
save_status_single = gr.Textbox(label="Save Status", lines=1) |
|
|
|
|
|
analyze_btn_single.click( |
|
|
fn=analyze_with_prompt, |
|
|
inputs=[image_input, prompt_single], |
|
|
outputs=output_single |
|
|
) |
|
|
|
|
|
save_btn_single.click( |
|
|
fn=lambda text: save_to_file(text, f"single_image_{generate_filename()}"), |
|
|
inputs=output_single, |
|
|
outputs=save_status_single |
|
|
) |
|
|
|
|
|
|
|
|
with gr.Tab("Dual Image Analysis"): |
|
|
with gr.Row(): |
|
|
image1_input = gr.Image(type="pil", label="Upload First Image") |
|
|
image2_input = gr.Image(type="pil", label="Upload Second Image") |
|
|
|
|
|
prompt_dual = gr.Dropdown( |
|
|
choices=prompts, |
|
|
value=prompts[0], |
|
|
label="Select a prompt or write your own", |
|
|
allow_custom_value=True |
|
|
) |
|
|
|
|
|
analyze_btn_dual = gr.Button("Analyze Images") |
|
|
output_dual = gr.Textbox(label="Analysis Results", lines=25) |
|
|
|
|
|
|
|
|
save_btn_dual = gr.Button("Save Results to File") |
|
|
save_status_dual = gr.Textbox(label="Save Status", lines=1) |
|
|
|
|
|
analyze_btn_dual.click( |
|
|
fn=lambda img1, img2, prompt: analyze_dual_images(model, tokenizer, img1, img2, prompt), |
|
|
inputs=[image1_input, image2_input, prompt_dual], |
|
|
outputs=output_dual |
|
|
) |
|
|
|
|
|
save_btn_dual.click( |
|
|
fn=lambda text: save_to_file(text, f"dual_images_{generate_filename()}"), |
|
|
inputs=output_dual, |
|
|
outputs=save_status_dual |
|
|
) |
|
|
|
|
|
|
|
|
with gr.Tab("Folder Analysis"): |
|
|
gr.Markdown("## Analyze all images and PDFs in a folder") |
|
|
gr.Markdown(""" |
|
|
Please enter a complete folder path. You can try these options: |
|
|
- Absolute path (e.g., `/home/user/images`) |
|
|
- Relative path from current directory (e.g., `example_images`) |
|
|
- Path with ~ for home directory (e.g., `~/images`) |
|
|
""") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=4): |
|
|
folder_path = gr.Textbox( |
|
|
label="Folder Path", |
|
|
placeholder="Enter the complete path to the folder containing images", |
|
|
value="example_images" |
|
|
) |
|
|
with gr.Column(scale=1): |
|
|
example_folders = gr.Dropdown( |
|
|
choices=["example_images", "example_images_2", "example_pdfs", "/data/images", "images"], |
|
|
label="Example Folders", |
|
|
value="example_images" |
|
|
) |
|
|
|
|
|
def set_folder_path(folder): |
|
|
return folder |
|
|
|
|
|
example_folders.change( |
|
|
fn=set_folder_path, |
|
|
inputs=[example_folders], |
|
|
outputs=[folder_path] |
|
|
) |
|
|
|
|
|
prompt_folder = gr.Dropdown( |
|
|
label="Analysis Prompt", |
|
|
choices=prompts, |
|
|
value=prompts[0], |
|
|
allow_custom_value=True |
|
|
) |
|
|
|
|
|
|
|
|
view_folder_btn = gr.Button("View Folder Contents") |
|
|
folder_contents = gr.Markdown("Select a folder and click 'View Folder Contents' to see available images") |
|
|
|
|
|
def view_folder_contents(folder_path): |
|
|
"""List all image files in the folder without analyzing them.""" |
|
|
logger.info(f"Viewing contents of folder: '{folder_path}'") |
|
|
|
|
|
if not folder_path or folder_path.strip() == "": |
|
|
return "Please enter a folder path." |
|
|
|
|
|
|
|
|
folder_path = folder_path.strip() |
|
|
|
|
|
|
|
|
potential_paths = [ |
|
|
folder_path, |
|
|
os.path.join(os.getcwd(), folder_path), |
|
|
os.path.normpath(folder_path), |
|
|
os.path.abspath(folder_path), |
|
|
os.path.expanduser(folder_path) |
|
|
] |
|
|
|
|
|
|
|
|
if os.path.exists("/data"): |
|
|
potential_paths.append(os.path.join("/data", folder_path)) |
|
|
|
|
|
|
|
|
valid_path = None |
|
|
for test_path in potential_paths: |
|
|
if os.path.exists(test_path) and os.path.isdir(test_path): |
|
|
valid_path = test_path |
|
|
break |
|
|
|
|
|
if not valid_path: |
|
|
return f"Could not find a valid directory at '{folder_path}'.\n\nTried the following paths:\n" + "\n".join(f"- {p}" for p in potential_paths) |
|
|
|
|
|
|
|
|
image_files = [] |
|
|
for ext in SUPPORTED_EXTENSIONS: |
|
|
files = glob.glob(os.path.join(valid_path, f"*{ext}")) |
|
|
files.extend(glob.glob(os.path.join(valid_path, f"*{ext.upper()}"))) |
|
|
image_files.extend(files) |
|
|
|
|
|
|
|
|
image_files.sort() |
|
|
|
|
|
if not image_files: |
|
|
return f"No supported image files found in '{valid_path}'.\n\nSupported formats: {', '.join(SUPPORTED_EXTENSIONS)}" |
|
|
|
|
|
|
|
|
output = f"### Found {len(image_files)} images in '{valid_path}'\n\n" |
|
|
for i, file in enumerate(image_files, 1): |
|
|
file_name = os.path.basename(file) |
|
|
file_size = os.path.getsize(file) / 1024 |
|
|
output += f"{i}. **{file_name}** ({file_size:.1f} KB)\n" |
|
|
|
|
|
output += f"\nPath used: `{valid_path}`" |
|
|
return output |
|
|
|
|
|
view_folder_btn.click( |
|
|
fn=view_folder_contents, |
|
|
inputs=[folder_path], |
|
|
outputs=[folder_contents] |
|
|
) |
|
|
|
|
|
gr.Markdown("---") |
|
|
analyze_btn_folder = gr.Button("Analyze All Images in Folder", variant="primary") |
|
|
output_folder = gr.Textbox(label="Analysis Result", lines=20) |
|
|
|
|
|
|
|
|
with gr.Row(): |
|
|
folder_status = gr.Markdown("Ready to analyze folder images") |
|
|
|
|
|
|
|
|
def analyze_with_status(folder_path, prompt): |
|
|
folder_status_msg = "Starting folder analysis..." |
|
|
yield folder_status_msg, "" |
|
|
|
|
|
try: |
|
|
|
|
|
try: |
|
|
folder_path = folder_path.strip() |
|
|
folder_obj = Path(folder_path) |
|
|
if folder_obj.exists() and folder_obj.is_dir(): |
|
|
image_count = sum(1 for _ in folder_obj.glob("*.*") if _.suffix.lower() in SUPPORTED_EXTENSIONS) |
|
|
folder_status_msg = f"Found {image_count} images to process. Starting analysis..." |
|
|
yield folder_status_msg, "" |
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
folder_status_msg = "Processing images... (this may take several minutes)" |
|
|
yield folder_status_msg, "" |
|
|
|
|
|
|
|
|
result = analyze_folder_images(folder_path, prompt) |
|
|
|
|
|
folder_status_msg = "Folder analysis complete!" |
|
|
yield folder_status_msg, result |
|
|
except Exception as e: |
|
|
error_msg = f"Error analyzing folder: {str(e)}" |
|
|
folder_status_msg = "Analysis failed! See error message in results." |
|
|
yield folder_status_msg, error_msg |
|
|
|
|
|
analyze_btn_folder.click( |
|
|
fn=analyze_with_status, |
|
|
inputs=[folder_path, prompt_folder], |
|
|
outputs=[folder_status, output_folder] |
|
|
) |
|
|
|
|
|
|
|
|
with gr.Row(): |
|
|
save_btn_folder = gr.Button("Save Results to Text File") |
|
|
save_json_folder = gr.Button("Save Results as JSON") |
|
|
save_html_folder = gr.Button("Save Results as HTML") |
|
|
|
|
|
save_status_folder = gr.Textbox(label="Save Status", lines=1) |
|
|
|
|
|
save_btn_folder.click( |
|
|
fn=lambda text, prompt: save_to_file(text, "folder_analysis", prompt=prompt), |
|
|
inputs=[output_folder, prompt_folder], |
|
|
outputs=[save_status_folder] |
|
|
) |
|
|
|
|
|
save_json_folder.click( |
|
|
fn=lambda content: save_to_json(content, "folder", "Folder analysis", None)[0], |
|
|
inputs=[output_folder], |
|
|
outputs=[save_status_folder] |
|
|
) |
|
|
|
|
|
save_html_folder.click( |
|
|
fn=lambda content: save_to_html(content, "folder")[0], |
|
|
inputs=[output_folder], |
|
|
outputs=[save_status_folder] |
|
|
) |
|
|
|
|
|
|
|
|
with gr.Tab("Saved Outputs"): |
|
|
refresh_btn = gr.Button("Refresh File List") |
|
|
file_list = gr.Markdown(value=list_output_files()) |
|
|
|
|
|
|
|
|
def read_saved_file(filename): |
|
|
try: |
|
|
|
|
|
filename = filename.strip() |
|
|
filepath = os.path.join(OUTPUT_DIR, filename) |
|
|
with open(filepath, 'r', encoding='utf-8') as f: |
|
|
return f.read() |
|
|
except Exception as e: |
|
|
return f"Error reading file: {str(e)}" |
|
|
|
|
|
file_selector = gr.Textbox(label="Enter filename to view", placeholder="e.g., single_image_20230322_120000.txt") |
|
|
view_btn = gr.Button("View File Contents") |
|
|
file_contents = gr.Textbox(label="File Contents", lines=30) |
|
|
|
|
|
|
|
|
gr.Markdown("### Download File") |
|
|
gr.Markdown("Select a file to download from the list above.") |
|
|
download_selector = gr.Textbox(label="Enter filename to download", placeholder="e.g., single_image_20230322_120000.txt") |
|
|
|
|
|
def create_download_link(filename): |
|
|
if not filename: |
|
|
return None |
|
|
try: |
|
|
|
|
|
filename = filename.strip() |
|
|
filepath = os.path.join(OUTPUT_DIR, filename) |
|
|
if not os.path.exists(filepath): |
|
|
return None |
|
|
return filepath |
|
|
except: |
|
|
return None |
|
|
|
|
|
download_btn = gr.Button("Show download button") |
|
|
download_output = gr.File(label="Files available for download") |
|
|
|
|
|
|
|
|
download_html = gr.HTML("") |
|
|
|
|
|
def create_better_download_link(filename): |
|
|
if not filename: |
|
|
return "Please enter a filename" |
|
|
|
|
|
try: |
|
|
|
|
|
filename = filename.strip() |
|
|
filepath = os.path.join(OUTPUT_DIR, filename) |
|
|
if not os.path.exists(filepath): |
|
|
return "File not found" |
|
|
|
|
|
file_size = os.path.getsize(filepath) / 1024 |
|
|
file_url = f"/file={filepath}" |
|
|
|
|
|
html = f""" |
|
|
<div style="margin: 10px 0; padding: 10px; border: 1px solid #ddd; border-radius: 5px;"> |
|
|
<p><strong>File:</strong> {filename} ({file_size:.1f} KB)</p> |
|
|
<a href="{file_url}" download="{filename}" target="_blank" |
|
|
style="display: inline-block; padding: 8px 16px; background-color: #4CAF50; |
|
|
color: white; text-decoration: none; border-radius: 4px;"> |
|
|
Download to local computer |
|
|
</a> |
|
|
</div> |
|
|
""" |
|
|
return html |
|
|
except: |
|
|
return "Error creating download link" |
|
|
|
|
|
refresh_btn.click( |
|
|
fn=list_output_files, |
|
|
inputs=[], |
|
|
outputs=file_list |
|
|
) |
|
|
|
|
|
view_btn.click( |
|
|
fn=read_saved_file, |
|
|
inputs=file_selector, |
|
|
outputs=file_contents |
|
|
) |
|
|
|
|
|
download_btn.click( |
|
|
fn=create_download_link, |
|
|
inputs=download_selector, |
|
|
outputs=download_output |
|
|
) |
|
|
|
|
|
|
|
|
nicer_download_btn = gr.Button("Show nice download button") |
|
|
nicer_download_btn.click( |
|
|
fn=create_better_download_link, |
|
|
inputs=download_selector, |
|
|
outputs=download_html |
|
|
) |
|
|
|
|
|
|
|
|
gr.Markdown("### Export Analysis as JSON") |
|
|
gr.Markdown("Convert the most recent analysis to JSON format and download.") |
|
|
|
|
|
json_result = gr.Textbox(label="JSON Export Status", lines=1) |
|
|
|
|
|
|
|
|
with gr.Row(): |
|
|
json_single_btn = gr.Button("Export Single Image Analysis to JSON") |
|
|
json_dual_btn = gr.Button("Export Dual Image Analysis to JSON") |
|
|
json_folder_btn = gr.Button("Export Folder Analysis to JSON") |
|
|
|
|
|
json_download = gr.File(label="JSON File Download") |
|
|
|
|
|
def export_to_json(content, analysis_type, prompt): |
|
|
if not content or content.strip() == "": |
|
|
return "No analysis content to export", None |
|
|
|
|
|
status, filename = save_to_json(content, analysis_type, prompt) |
|
|
if filename: |
|
|
filepath = os.path.join(OUTPUT_DIR, filename) |
|
|
return status, filepath |
|
|
return status, None |
|
|
|
|
|
json_single_btn.click( |
|
|
fn=export_to_json, |
|
|
inputs=[output_single, gr.Textbox(value="single"), prompt_single], |
|
|
outputs=[json_result, json_download] |
|
|
) |
|
|
|
|
|
json_dual_btn.click( |
|
|
fn=export_to_json, |
|
|
inputs=[output_dual, gr.Textbox(value="dual"), prompt_dual], |
|
|
outputs=[json_result, json_download] |
|
|
) |
|
|
|
|
|
json_folder_btn.click( |
|
|
fn=export_to_json, |
|
|
inputs=[output_folder, gr.Textbox(value="folder"), prompt_folder], |
|
|
outputs=[json_result, json_download] |
|
|
) |
|
|
|
|
|
|
|
|
gr.Markdown("### Export Analysis as HTML") |
|
|
gr.Markdown("Convert the analysis to formatted HTML and download.") |
|
|
|
|
|
html_result = gr.Textbox(label="HTML Export Status", lines=1) |
|
|
|
|
|
|
|
|
with gr.Row(): |
|
|
html_single_btn = gr.Button("Export Single Image Analysis to HTML") |
|
|
html_dual_btn = gr.Button("Export Dual Image Analysis to HTML") |
|
|
html_folder_btn = gr.Button("Export Folder Analysis to HTML") |
|
|
|
|
|
html_download = gr.File(label="HTML File Download") |
|
|
|
|
|
def export_to_html(content, analysis_type): |
|
|
if not content or content.strip() == "": |
|
|
return "No analysis content to export", None |
|
|
|
|
|
status, filename = save_to_html(content, analysis_type) |
|
|
if filename: |
|
|
filepath = os.path.join(OUTPUT_DIR, filename) |
|
|
return status, filepath |
|
|
return status, None |
|
|
|
|
|
html_single_btn.click( |
|
|
fn=export_to_html, |
|
|
inputs=[output_single, gr.Textbox(value="single")], |
|
|
outputs=[html_result, html_download] |
|
|
) |
|
|
|
|
|
html_dual_btn.click( |
|
|
fn=export_to_html, |
|
|
inputs=[output_dual, gr.Textbox(value="dual")], |
|
|
outputs=[html_result, html_download] |
|
|
) |
|
|
|
|
|
html_folder_btn.click( |
|
|
fn=export_to_html, |
|
|
inputs=[output_folder, gr.Textbox(value="folder")], |
|
|
outputs=[html_result, html_download] |
|
|
) |
|
|
|
|
|
|
|
|
gr.Markdown(f"## Output Files") |
|
|
gr.Markdown(f"Analysis results are saved to the '{OUTPUT_DIR}' directory with timestamps. Files can be viewed in the 'Saved Outputs' tab.") |
|
|
|
|
|
|
|
|
gr.Markdown("## Examples") |
|
|
with gr.Accordion("Click to view examples", open=False): |
|
|
gr.Examples( |
|
|
examples=[ |
|
|
["example_images/example1.jpg", prompts[0]], |
|
|
["example_images/example2.jpg", prompts[2]] |
|
|
], |
|
|
inputs=[image_input, prompt_single], |
|
|
outputs=output_single, |
|
|
fn=lambda img, prompt: analyze_single_image(model, tokenizer, img, prompt), |
|
|
cache_examples=True |
|
|
) |
|
|
|
|
|
return demo |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
try: |
|
|
|
|
|
if not torch.cuda.is_available(): |
|
|
print("WARNING: CUDA is not available. The model requires a GPU to function properly.") |
|
|
|
|
|
|
|
|
demo = main() |
|
|
demo.launch(server_name="0.0.0.0") |
|
|
except Exception as e: |
|
|
print(f"Error starting the application: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
|