custom_toolbox / apps /paper_image_tool.py
MashiroLn's picture
Upload folder using huggingface_hub
ad6c225 verified
import gradio as gr
import os
import zipfile
import pandas as pd
from PIL import Image, ImageOps, ImageChops
import shutil
from pathlib import Path
import re
import tempfile
import json
import requests
# --- LLM Configuration ---
LLM_API_KEY = "sk-fa6c38ce957e4c7b946ccbeed33237ec" # Replace with your actual key or use env var
LLM_API_URL = "https://api.deepseek.com/v1/chat/completions" # Example URL
def call_llm_structure_inference(file_list):
"""
Calls DeepSeek API to infer structure from a list of file paths.
"""
if not file_list:
return []
# Take a sample if too many files to save tokens/context
sample_files = file_list[:50] if len(file_list) > 50 else file_list
prompt = f"""
I have a list of file paths from a research project. I need to organize them into a structured format.
The structure should identify a 'Sample ID' (unique identifier for the experiment/sample) and a 'Type' (category of the image, e.g., input, heatmap, result).
Here are the file paths:
{json.dumps(sample_files, indent=2)}
Please analyze the naming patterns and directory structure.
Return a JSON object with a list of rules or a direct mapping.
For this task, simply return a JSON list where each item corresponds to the input files, with 'path', 'sample_id', and 'type' fields.
If you can infer a regex pattern, please include it in a 'pattern' field in the root of the JSON.
Format:
{{
"files": [
{{"path": "path/to/file1.png", "sample_id": "exp1", "type": "input"}},
...
]
}}
Only return the JSON, no markdown formatting.
"""
headers = {
"Authorization": f"Bearer {LLM_API_KEY}",
"Content-Type": "application/json"
}
data = {
"model": "deepseek-chat", # Or appropriate model name
"messages": [
{"role": "system", "content": "You are a helpful assistant that parses file paths into structured data."},
{"role": "user", "content": prompt}
],
"stream": False
}
try:
response = requests.post(LLM_API_URL, headers=headers, json=data, timeout=30)
response.raise_for_status()
result = response.json()
content = result['choices'][0]['message']['content']
# Clean up markdown code blocks if present
if "```json" in content:
content = content.split("```json")[1].split("```")[0]
elif "```" in content:
content = content.split("```")[1].split("```")[0]
parsed = json.loads(content.strip())
return parsed.get('files', [])
except Exception as e:
print(f"LLM API Error: {e}")
return []
class ImageMatrix:
def __init__(self):
self.temp_dir = None
self.file_map = [] # List of {'path': str, 'type': str, 'sample_id': str}
self.types = []
self.samples = []
self.use_llm = False # Toggle for LLM usage
def load_zip(self, zip_path, use_llm=False):
self.use_llm = use_llm
self.temp_dir = tempfile.mkdtemp()
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(self.temp_dir)
# Scan files
image_extensions = {'.png', '.jpg', '.jpeg', '.bmp', '.tiff'}
files = []
for root, _, filenames in os.walk(self.temp_dir):
for filename in filenames:
if Path(filename).suffix.lower() in image_extensions:
full_path = os.path.join(root, filename)
rel_path = os.path.relpath(full_path, self.temp_dir)
files.append(rel_path)
if self.use_llm:
self._infer_structure_llm(files)
else:
self._infer_structure(files)
return self.get_summary()
def _infer_structure_llm(self, files):
print("Using LLM for structure inference...")
llm_results = call_llm_structure_inference(files)
if not llm_results:
print("LLM failed or returned empty, falling back to heuristic.")
self._infer_structure(files)
return
# Map LLM results back to full file list (if we only sampled)
# For now, let's assume the LLM returns a mapping for the provided files.
# If we sampled, we might need to generalize the pattern.
# To keep it simple for this iteration, we'll trust the LLM for the sample
# and try to apply the logic to the rest if possible, or just use what we have.
# Ideally, we ask LLM for a Regex.
parsed_data = []
all_types = set()
all_samples = set()
# Create a lookup for the LLM results
llm_lookup = {item['path']: item for item in llm_results}
for f in files:
if f in llm_lookup:
item = llm_lookup[f]
t = item.get('type', 'unknown')
s = item.get('sample_id', 'unknown')
else:
# Fallback for files not in LLM sample
# In a real app, we'd use the Regex returned by LLM
t = 'unknown'
s = 'unknown'
parsed_data.append({
'rel_path': f,
'full_path': os.path.join(self.temp_dir, f),
'type': t,
'sample_id': s
})
all_types.add(t)
all_samples.add(s)
self.types = sorted(list(all_types))
self.samples = sorted(list(all_samples))
self.file_map = parsed_data
def _infer_structure(self, files):
# Simple Heuristic:
# 1. Look at directory structure.
# Case A: root/type/sample.png
# Case B: root/sample/type.png
parsed_data = []
all_types = set()
all_samples = set()
# Naive approach: assume parent folder is one dimension, filename is another
for f in files:
p = Path(f)
parent = p.parent.name
stem = p.stem
# Heuristic: If parent is empty (root), use filename parts
if str(p.parent) == '.':
# Try splitting by _ or -
parts = re.split(r'[_-]', stem)
if len(parts) > 1:
type_guess = parts[-1] # Suffix as type
sample_guess = "_".join(parts[:-1])
else:
type_guess = "default"
sample_guess = stem
else:
# Check if parent looks like a type (fewer unique values) or sample (more unique values)
# For now, let's just store both and let user swap if needed?
# Let's default to: Parent = Group/Type, Filename = Sample (Common in datasets)
# But in paper writing, often Sample/Type.png is common too.
# We will treat the folder name as "Category A" and filename as "Category B"
# We'll decide which is 'Type' based on which set is smaller.
type_guess = parent
sample_guess = stem
parsed_data.append({
'rel_path': f,
'full_path': os.path.join(self.temp_dir, f),
'dim1': type_guess,
'dim2': sample_guess
})
all_types.add(type_guess)
all_samples.add(sample_guess)
# Decide which dimension is 'Type' (Columns) and which is 'Sample' (Rows)
# Usually Types are fewer than Samples.
if len(all_types) <= len(all_samples):
self.types = sorted(list(all_types))
self.samples = sorted(list(all_samples))
for item in parsed_data:
item['type'] = item['dim1']
item['sample_id'] = item['dim2']
else:
self.types = sorted(list(all_samples))
self.samples = sorted(list(all_types))
for item in parsed_data:
item['type'] = item['dim2']
item['sample_id'] = item['dim1']
self.file_map = parsed_data
def get_summary(self):
return pd.DataFrame(self.file_map)[['type', 'sample_id', 'rel_path']]
def get_sample_images(self, sample_id):
# Return a dict of {type: image_path} for a given sample
result = {}
for item in self.file_map:
if item['sample_id'] == sample_id:
result[item['type']] = item['full_path']
return result
def process_all(self, rules, overrides=None):
# rules: dict of {type: rule_config}
# overrides: dict of {sample_id: {type: rule_config}}
if overrides is None:
overrides = {}
output_zip = tempfile.mktemp(suffix='.zip')
output_dir = tempfile.mkdtemp()
for item in self.file_map:
img_type = item['type']
sample_id = item['sample_id']
# Determine rule: Override > Type Rule > Default
rule = {'action': 'None'}
if sample_id in overrides and img_type in overrides[sample_id]:
rule = overrides[sample_id][img_type]
elif img_type in rules:
rule = rules[img_type]
try:
img = Image.open(item['full_path'])
# Apply Rules
img = apply_image_rule(img, rule)
# Save
# Structure: Output/Type/Sample.png
save_dir = os.path.join(output_dir, img_type)
os.makedirs(save_dir, exist_ok=True)
# Use original filename or sample_id?
# Using sample_id ensures consistency
save_path = os.path.join(save_dir, f"{item['sample_id']}.png")
img.save(save_path)
except Exception as e:
print(f"Error processing {item['full_path']}: {e}")
shutil.make_archive(output_zip.replace('.zip', ''), 'zip', output_dir)
return output_zip
def apply_image_rule(img, rule):
# rule: {'action': str, 'params': dict}
action = rule.get('action', 'None')
params = rule.get('params', {})
if action == 'Auto Trim':
threshold = params.get('threshold', 50)
bg = Image.new(img.mode, img.size, img.getpixel((0,0)))
diff = ImageChops.difference(img, bg)
diff = ImageOps.grayscale(diff)
# Threshold
diff = diff.point(lambda x: 255 if x > threshold else 0)
bbox = diff.getbbox()
if bbox:
img = img.crop(bbox)
elif action == 'Manual Crop':
x = int(params.get('x', 0))
y = int(params.get('y', 0))
w = int(params.get('w', 100))
h = int(params.get('h', 100))
# Ensure crop is within bounds
img_w, img_h = img.size
x = max(0, min(x, img_w))
y = max(0, min(y, img_h))
w = max(1, min(w, img_w - x))
h = max(1, min(h, img_h - y))
img = img.crop((x, y, x+w, y+h))
elif action == 'Resize':
w = int(params.get('w', 512))
h = int(params.get('h', 512))
img = img.resize((w, h))
return img
# Global State
matrix = ImageMatrix()
# Store rules globally for this session (Not multi-user safe, but fits current architecture)
global_rules = {}
# Store overrides: {sample_id: {type: rule}}
global_overrides = {}
def handle_upload(file, use_llm_chk):
if file is None:
return None, gr.update(choices=[])
df = matrix.load_zip(file.name, use_llm=use_llm_chk)
types = matrix.types
samples = matrix.samples
# Reset rules and overrides
global_rules.clear()
global_overrides.clear()
summary_text = f"Found {len(samples)} samples and {len(types)} types.\nTypes: {', '.join(types)}"
return df, gr.update(choices=samples, value=samples[0] if samples else None), gr.update(choices=types, value=types[0] if types else None), summary_text
def save_rule(type_sel, action, p1, p2, p3, p4):
if not type_sel:
return "No type selected."
params = {}
if action == 'Manual Crop':
params = {'x': p1, 'y': p2, 'w': p3, 'h': p4}
elif action == 'Resize':
params = {'w': p1, 'h': p2}
elif action == 'Auto Trim':
params = {'threshold': p1}
rule = {'action': action, 'params': params}
global_rules[type_sel] = rule
return f"Saved rule for Type: {type_sel} -> {action}"
def save_override(sample_id, type_sel, action, p1, p2, p3, p4):
if not sample_id or not type_sel:
return "Sample or Type not selected."
params = {}
if action == 'Manual Crop':
params = {'x': p1, 'y': p2, 'w': p3, 'h': p4}
elif action == 'Resize':
params = {'w': p1, 'h': p2}
elif action == 'Auto Trim':
params = {'threshold': p1}
rule = {'action': action, 'params': params}
if sample_id not in global_overrides:
global_overrides[sample_id] = {}
global_overrides[sample_id][type_sel] = rule
return f"Saved OVERRIDE for {sample_id} / {type_sel} -> {action}"
def clear_override(sample_id, type_sel):
if sample_id in global_overrides and type_sel in global_overrides[sample_id]:
del global_overrides[sample_id][type_sel]
return f"Cleared override for {sample_id} / {type_sel}"
return "No override found to clear."
def update_preview(sample_id, type_sel, action, p1, p2, p3, p4):
# p1-p4 are generic params mapped based on action
if not sample_id:
return None
images = matrix.get_sample_images(sample_id)
# Construct rule for preview
params = {}
if action == 'Manual Crop':
params = {'x': p1, 'y': p2, 'w': p3, 'h': p4}
elif action == 'Resize':
params = {'w': p1, 'h': p2}
elif action == 'Auto Trim':
params = {'threshold': p1}
rule = {'action': action, 'params': params}
results = []
# Show the selected type first/highlighted?
# For now just show the selected type processed
if type_sel in images:
path = images[type_sel]
orig_img = Image.open(path)
proc_img = apply_image_rule(orig_img.copy(), rule)
# Check if override exists
is_overridden = sample_id in global_overrides and type_sel in global_overrides[sample_id]
status_text = f"{type_sel} (Processed)"
if is_overridden:
status_text += " [OVERRIDE ACTIVE]"
results.append((orig_img, f"{type_sel} (Original)"))
results.append((proc_img, status_text))
return results
def run_batch_process():
if not matrix.file_map:
return None
return matrix.process_all(global_rules, global_overrides)
def generate_code_prompt(df_json, user_req):
# Fallback for complex needs
prompt = f"""
I have a directory of images with the following structure (sample):
{str(df_json)[:1000]}...
My goal is: {user_req}
Please write a Python script using Pillow and os/shutil to process these files.
"""
return prompt
def create_paper_tool():
# Note: This function is called inside a gr.Blocks context in app.py
# So we don't need to create a new gr.Blocks() here unless we want a nested one.
# To keep it clean and consistent with other tools, we'll just define the layout.
if True: # Placeholder to keep indentation
gr.Markdown("""
## ๐Ÿ“‘ Image Auto Cropper (่‡ชๅŠจ่ฃๅ›พ)
Organize, crop, and batch process research images. Supports structure inference via LLM and specific overrides.
ๆ•ด็†ใ€่ฃๅ‰ชๅ’Œๆ‰น้‡ๅค„็†ๅฏ่ง†ๅŒ–ๅ›พ็‰‡ใ€‚ๆ”ฏๆŒ้€š่ฟ‡ LLM ๆŽจๆ–ญ็›ฎๅฝ•็ป“ๆž„ไปฅๅŠ้’ˆๅฏน็‰นๅฎšๆ ทๆœฌ็š„ไพ‹ๅค–ๅค„็†ใ€‚
""")
with gr.Row():
with gr.Column(scale=1):
zip_input = gr.File(label="Upload Zip", file_types=['.zip'])
use_llm_chk = gr.Checkbox(label="Use LLM for Structure Inference (DeepSeek)", value=False)
analyze_btn = gr.Button("Analyze Structure")
structure_info = gr.Markdown("No data loaded.")
with gr.Column(scale=2):
file_table = gr.Dataframe(label="Detected Structure", headers=['type', 'sample_id', 'rel_path'], interactive=False)
gr.Markdown("### ๐Ÿ› ๏ธ Rule Configuration & Preview")
with gr.Row():
with gr.Column():
sample_selector = gr.Dropdown(label="Preview Sample ID", choices=[])
type_selector = gr.Dropdown(label="Configure Type", choices=[])
action_selector = gr.Dropdown(label="Action", choices=["None", "Manual Crop", "Resize", "Auto Trim"], value="None")
# Dynamic inputs
with gr.Group():
p1 = gr.Number(label="Param 1 (X / Width / Threshold)", value=0)
p2 = gr.Number(label="Param 2 (Y / Height)", value=0)
p3 = gr.Number(label="Param 3 (W)", value=100)
p4 = gr.Number(label="Param 4 (H)", value=100)
with gr.Row():
preview_btn = gr.Button("Preview Effect")
save_rule_btn = gr.Button("Save Rule for Type (Batch)")
with gr.Row():
save_override_btn = gr.Button("Save Override (This Sample Only)", variant="secondary")
clear_override_btn = gr.Button("Clear Override", variant="stop")
rule_status = gr.Markdown("")
with gr.Column():
preview_gallery = gr.Gallery(label="Preview", columns=2)
gr.Markdown("### ๐Ÿš€ Batch Process")
process_btn = gr.Button("Apply Rules & Download", variant="primary")
download_output = gr.File(label="Download Result")
# Event Wiring
analyze_btn.click(handle_upload, inputs=[zip_input, use_llm_chk], outputs=[file_table, sample_selector, type_selector, structure_info])
preview_btn.click(update_preview,
inputs=[sample_selector, type_selector, action_selector, p1, p2, p3, p4],
outputs=[preview_gallery])
save_rule_btn.click(save_rule,
inputs=[type_selector, action_selector, p1, p2, p3, p4],
outputs=[rule_status])
save_override_btn.click(save_override,
inputs=[sample_selector, type_selector, action_selector, p1, p2, p3, p4],
outputs=[rule_status])
clear_override_btn.click(clear_override,
inputs=[sample_selector, type_selector],
outputs=[rule_status])
process_btn.click(run_batch_process, inputs=[], outputs=[download_output])
with gr.Accordion("๐Ÿ’ป Code Gen Helper (Fallback)", open=False):
user_req = gr.Textbox(label="Describe your requirement")
gen_prompt_btn = gr.Button("Generate Prompt")
prompt_output = gr.Code(label="Copy this prompt to an LLM", language="markdown")
gen_prompt_btn.click(generate_code_prompt, inputs=[file_table, user_req], outputs=[prompt_output])
pass
if __name__ == "__main__":
with gr.Blocks() as demo:
create_paper_tool()
demo.launch()