Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import os | |
| import zipfile | |
| import pandas as pd | |
| from PIL import Image, ImageOps, ImageChops | |
| import shutil | |
| from pathlib import Path | |
| import re | |
| import tempfile | |
| import json | |
| import requests | |
| # --- LLM Configuration --- | |
| LLM_API_KEY = "sk-fa6c38ce957e4c7b946ccbeed33237ec" # Replace with your actual key or use env var | |
| LLM_API_URL = "https://api.deepseek.com/v1/chat/completions" # Example URL | |
| def call_llm_structure_inference(file_list): | |
| """ | |
| Calls DeepSeek API to infer structure from a list of file paths. | |
| """ | |
| if not file_list: | |
| return [] | |
| # Take a sample if too many files to save tokens/context | |
| sample_files = file_list[:50] if len(file_list) > 50 else file_list | |
| prompt = f""" | |
| I have a list of file paths from a research project. I need to organize them into a structured format. | |
| The structure should identify a 'Sample ID' (unique identifier for the experiment/sample) and a 'Type' (category of the image, e.g., input, heatmap, result). | |
| Here are the file paths: | |
| {json.dumps(sample_files, indent=2)} | |
| Please analyze the naming patterns and directory structure. | |
| Return a JSON object with a list of rules or a direct mapping. | |
| For this task, simply return a JSON list where each item corresponds to the input files, with 'path', 'sample_id', and 'type' fields. | |
| If you can infer a regex pattern, please include it in a 'pattern' field in the root of the JSON. | |
| Format: | |
| {{ | |
| "files": [ | |
| {{"path": "path/to/file1.png", "sample_id": "exp1", "type": "input"}}, | |
| ... | |
| ] | |
| }} | |
| Only return the JSON, no markdown formatting. | |
| """ | |
| headers = { | |
| "Authorization": f"Bearer {LLM_API_KEY}", | |
| "Content-Type": "application/json" | |
| } | |
| data = { | |
| "model": "deepseek-chat", # Or appropriate model name | |
| "messages": [ | |
| {"role": "system", "content": "You are a helpful assistant that parses file paths into structured data."}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| "stream": False | |
| } | |
| try: | |
| response = requests.post(LLM_API_URL, headers=headers, json=data, timeout=30) | |
| response.raise_for_status() | |
| result = response.json() | |
| content = result['choices'][0]['message']['content'] | |
| # Clean up markdown code blocks if present | |
| if "```json" in content: | |
| content = content.split("```json")[1].split("```")[0] | |
| elif "```" in content: | |
| content = content.split("```")[1].split("```")[0] | |
| parsed = json.loads(content.strip()) | |
| return parsed.get('files', []) | |
| except Exception as e: | |
| print(f"LLM API Error: {e}") | |
| return [] | |
| class ImageMatrix: | |
| def __init__(self): | |
| self.temp_dir = None | |
| self.file_map = [] # List of {'path': str, 'type': str, 'sample_id': str} | |
| self.types = [] | |
| self.samples = [] | |
| self.use_llm = False # Toggle for LLM usage | |
| def load_zip(self, zip_path, use_llm=False): | |
| self.use_llm = use_llm | |
| self.temp_dir = tempfile.mkdtemp() | |
| with zipfile.ZipFile(zip_path, 'r') as zip_ref: | |
| zip_ref.extractall(self.temp_dir) | |
| # Scan files | |
| image_extensions = {'.png', '.jpg', '.jpeg', '.bmp', '.tiff'} | |
| files = [] | |
| for root, _, filenames in os.walk(self.temp_dir): | |
| for filename in filenames: | |
| if Path(filename).suffix.lower() in image_extensions: | |
| full_path = os.path.join(root, filename) | |
| rel_path = os.path.relpath(full_path, self.temp_dir) | |
| files.append(rel_path) | |
| if self.use_llm: | |
| self._infer_structure_llm(files) | |
| else: | |
| self._infer_structure(files) | |
| return self.get_summary() | |
| def _infer_structure_llm(self, files): | |
| print("Using LLM for structure inference...") | |
| llm_results = call_llm_structure_inference(files) | |
| if not llm_results: | |
| print("LLM failed or returned empty, falling back to heuristic.") | |
| self._infer_structure(files) | |
| return | |
| # Map LLM results back to full file list (if we only sampled) | |
| # For now, let's assume the LLM returns a mapping for the provided files. | |
| # If we sampled, we might need to generalize the pattern. | |
| # To keep it simple for this iteration, we'll trust the LLM for the sample | |
| # and try to apply the logic to the rest if possible, or just use what we have. | |
| # Ideally, we ask LLM for a Regex. | |
| parsed_data = [] | |
| all_types = set() | |
| all_samples = set() | |
| # Create a lookup for the LLM results | |
| llm_lookup = {item['path']: item for item in llm_results} | |
| for f in files: | |
| if f in llm_lookup: | |
| item = llm_lookup[f] | |
| t = item.get('type', 'unknown') | |
| s = item.get('sample_id', 'unknown') | |
| else: | |
| # Fallback for files not in LLM sample | |
| # In a real app, we'd use the Regex returned by LLM | |
| t = 'unknown' | |
| s = 'unknown' | |
| parsed_data.append({ | |
| 'rel_path': f, | |
| 'full_path': os.path.join(self.temp_dir, f), | |
| 'type': t, | |
| 'sample_id': s | |
| }) | |
| all_types.add(t) | |
| all_samples.add(s) | |
| self.types = sorted(list(all_types)) | |
| self.samples = sorted(list(all_samples)) | |
| self.file_map = parsed_data | |
| def _infer_structure(self, files): | |
| # Simple Heuristic: | |
| # 1. Look at directory structure. | |
| # Case A: root/type/sample.png | |
| # Case B: root/sample/type.png | |
| parsed_data = [] | |
| all_types = set() | |
| all_samples = set() | |
| # Naive approach: assume parent folder is one dimension, filename is another | |
| for f in files: | |
| p = Path(f) | |
| parent = p.parent.name | |
| stem = p.stem | |
| # Heuristic: If parent is empty (root), use filename parts | |
| if str(p.parent) == '.': | |
| # Try splitting by _ or - | |
| parts = re.split(r'[_-]', stem) | |
| if len(parts) > 1: | |
| type_guess = parts[-1] # Suffix as type | |
| sample_guess = "_".join(parts[:-1]) | |
| else: | |
| type_guess = "default" | |
| sample_guess = stem | |
| else: | |
| # Check if parent looks like a type (fewer unique values) or sample (more unique values) | |
| # For now, let's just store both and let user swap if needed? | |
| # Let's default to: Parent = Group/Type, Filename = Sample (Common in datasets) | |
| # But in paper writing, often Sample/Type.png is common too. | |
| # We will treat the folder name as "Category A" and filename as "Category B" | |
| # We'll decide which is 'Type' based on which set is smaller. | |
| type_guess = parent | |
| sample_guess = stem | |
| parsed_data.append({ | |
| 'rel_path': f, | |
| 'full_path': os.path.join(self.temp_dir, f), | |
| 'dim1': type_guess, | |
| 'dim2': sample_guess | |
| }) | |
| all_types.add(type_guess) | |
| all_samples.add(sample_guess) | |
| # Decide which dimension is 'Type' (Columns) and which is 'Sample' (Rows) | |
| # Usually Types are fewer than Samples. | |
| if len(all_types) <= len(all_samples): | |
| self.types = sorted(list(all_types)) | |
| self.samples = sorted(list(all_samples)) | |
| for item in parsed_data: | |
| item['type'] = item['dim1'] | |
| item['sample_id'] = item['dim2'] | |
| else: | |
| self.types = sorted(list(all_samples)) | |
| self.samples = sorted(list(all_types)) | |
| for item in parsed_data: | |
| item['type'] = item['dim2'] | |
| item['sample_id'] = item['dim1'] | |
| self.file_map = parsed_data | |
| def get_summary(self): | |
| return pd.DataFrame(self.file_map)[['type', 'sample_id', 'rel_path']] | |
| def get_sample_images(self, sample_id): | |
| # Return a dict of {type: image_path} for a given sample | |
| result = {} | |
| for item in self.file_map: | |
| if item['sample_id'] == sample_id: | |
| result[item['type']] = item['full_path'] | |
| return result | |
| def process_all(self, rules, overrides=None): | |
| # rules: dict of {type: rule_config} | |
| # overrides: dict of {sample_id: {type: rule_config}} | |
| if overrides is None: | |
| overrides = {} | |
| output_zip = tempfile.mktemp(suffix='.zip') | |
| output_dir = tempfile.mkdtemp() | |
| for item in self.file_map: | |
| img_type = item['type'] | |
| sample_id = item['sample_id'] | |
| # Determine rule: Override > Type Rule > Default | |
| rule = {'action': 'None'} | |
| if sample_id in overrides and img_type in overrides[sample_id]: | |
| rule = overrides[sample_id][img_type] | |
| elif img_type in rules: | |
| rule = rules[img_type] | |
| try: | |
| img = Image.open(item['full_path']) | |
| # Apply Rules | |
| img = apply_image_rule(img, rule) | |
| # Save | |
| # Structure: Output/Type/Sample.png | |
| save_dir = os.path.join(output_dir, img_type) | |
| os.makedirs(save_dir, exist_ok=True) | |
| # Use original filename or sample_id? | |
| # Using sample_id ensures consistency | |
| save_path = os.path.join(save_dir, f"{item['sample_id']}.png") | |
| img.save(save_path) | |
| except Exception as e: | |
| print(f"Error processing {item['full_path']}: {e}") | |
| shutil.make_archive(output_zip.replace('.zip', ''), 'zip', output_dir) | |
| return output_zip | |
| def apply_image_rule(img, rule): | |
| # rule: {'action': str, 'params': dict} | |
| action = rule.get('action', 'None') | |
| params = rule.get('params', {}) | |
| if action == 'Auto Trim': | |
| threshold = params.get('threshold', 50) | |
| bg = Image.new(img.mode, img.size, img.getpixel((0,0))) | |
| diff = ImageChops.difference(img, bg) | |
| diff = ImageOps.grayscale(diff) | |
| # Threshold | |
| diff = diff.point(lambda x: 255 if x > threshold else 0) | |
| bbox = diff.getbbox() | |
| if bbox: | |
| img = img.crop(bbox) | |
| elif action == 'Manual Crop': | |
| x = int(params.get('x', 0)) | |
| y = int(params.get('y', 0)) | |
| w = int(params.get('w', 100)) | |
| h = int(params.get('h', 100)) | |
| # Ensure crop is within bounds | |
| img_w, img_h = img.size | |
| x = max(0, min(x, img_w)) | |
| y = max(0, min(y, img_h)) | |
| w = max(1, min(w, img_w - x)) | |
| h = max(1, min(h, img_h - y)) | |
| img = img.crop((x, y, x+w, y+h)) | |
| elif action == 'Resize': | |
| w = int(params.get('w', 512)) | |
| h = int(params.get('h', 512)) | |
| img = img.resize((w, h)) | |
| return img | |
| # Global State | |
| matrix = ImageMatrix() | |
| # Store rules globally for this session (Not multi-user safe, but fits current architecture) | |
| global_rules = {} | |
| # Store overrides: {sample_id: {type: rule}} | |
| global_overrides = {} | |
| def handle_upload(file, use_llm_chk): | |
| if file is None: | |
| return None, gr.update(choices=[]) | |
| df = matrix.load_zip(file.name, use_llm=use_llm_chk) | |
| types = matrix.types | |
| samples = matrix.samples | |
| # Reset rules and overrides | |
| global_rules.clear() | |
| global_overrides.clear() | |
| summary_text = f"Found {len(samples)} samples and {len(types)} types.\nTypes: {', '.join(types)}" | |
| return df, gr.update(choices=samples, value=samples[0] if samples else None), gr.update(choices=types, value=types[0] if types else None), summary_text | |
| def save_rule(type_sel, action, p1, p2, p3, p4): | |
| if not type_sel: | |
| return "No type selected." | |
| params = {} | |
| if action == 'Manual Crop': | |
| params = {'x': p1, 'y': p2, 'w': p3, 'h': p4} | |
| elif action == 'Resize': | |
| params = {'w': p1, 'h': p2} | |
| elif action == 'Auto Trim': | |
| params = {'threshold': p1} | |
| rule = {'action': action, 'params': params} | |
| global_rules[type_sel] = rule | |
| return f"Saved rule for Type: {type_sel} -> {action}" | |
| def save_override(sample_id, type_sel, action, p1, p2, p3, p4): | |
| if not sample_id or not type_sel: | |
| return "Sample or Type not selected." | |
| params = {} | |
| if action == 'Manual Crop': | |
| params = {'x': p1, 'y': p2, 'w': p3, 'h': p4} | |
| elif action == 'Resize': | |
| params = {'w': p1, 'h': p2} | |
| elif action == 'Auto Trim': | |
| params = {'threshold': p1} | |
| rule = {'action': action, 'params': params} | |
| if sample_id not in global_overrides: | |
| global_overrides[sample_id] = {} | |
| global_overrides[sample_id][type_sel] = rule | |
| return f"Saved OVERRIDE for {sample_id} / {type_sel} -> {action}" | |
| def clear_override(sample_id, type_sel): | |
| if sample_id in global_overrides and type_sel in global_overrides[sample_id]: | |
| del global_overrides[sample_id][type_sel] | |
| return f"Cleared override for {sample_id} / {type_sel}" | |
| return "No override found to clear." | |
| def update_preview(sample_id, type_sel, action, p1, p2, p3, p4): | |
| # p1-p4 are generic params mapped based on action | |
| if not sample_id: | |
| return None | |
| images = matrix.get_sample_images(sample_id) | |
| # Construct rule for preview | |
| params = {} | |
| if action == 'Manual Crop': | |
| params = {'x': p1, 'y': p2, 'w': p3, 'h': p4} | |
| elif action == 'Resize': | |
| params = {'w': p1, 'h': p2} | |
| elif action == 'Auto Trim': | |
| params = {'threshold': p1} | |
| rule = {'action': action, 'params': params} | |
| results = [] | |
| # Show the selected type first/highlighted? | |
| # For now just show the selected type processed | |
| if type_sel in images: | |
| path = images[type_sel] | |
| orig_img = Image.open(path) | |
| proc_img = apply_image_rule(orig_img.copy(), rule) | |
| # Check if override exists | |
| is_overridden = sample_id in global_overrides and type_sel in global_overrides[sample_id] | |
| status_text = f"{type_sel} (Processed)" | |
| if is_overridden: | |
| status_text += " [OVERRIDE ACTIVE]" | |
| results.append((orig_img, f"{type_sel} (Original)")) | |
| results.append((proc_img, status_text)) | |
| return results | |
| def run_batch_process(): | |
| if not matrix.file_map: | |
| return None | |
| return matrix.process_all(global_rules, global_overrides) | |
| def generate_code_prompt(df_json, user_req): | |
| # Fallback for complex needs | |
| prompt = f""" | |
| I have a directory of images with the following structure (sample): | |
| {str(df_json)[:1000]}... | |
| My goal is: {user_req} | |
| Please write a Python script using Pillow and os/shutil to process these files. | |
| """ | |
| return prompt | |
| def create_paper_tool(): | |
| # Note: This function is called inside a gr.Blocks context in app.py | |
| # So we don't need to create a new gr.Blocks() here unless we want a nested one. | |
| # To keep it clean and consistent with other tools, we'll just define the layout. | |
| if True: # Placeholder to keep indentation | |
| gr.Markdown(""" | |
| ## ๐ Image Auto Cropper (่ชๅจ่ฃๅพ) | |
| Organize, crop, and batch process research images. Supports structure inference via LLM and specific overrides. | |
| ๆด็ใ่ฃๅชๅๆน้ๅค็ๅฏ่งๅๅพ็ใๆฏๆ้่ฟ LLM ๆจๆญ็ฎๅฝ็ปๆไปฅๅ้ๅฏน็นๅฎๆ ทๆฌ็ไพๅคๅค็ใ | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| zip_input = gr.File(label="Upload Zip", file_types=['.zip']) | |
| use_llm_chk = gr.Checkbox(label="Use LLM for Structure Inference (DeepSeek)", value=False) | |
| analyze_btn = gr.Button("Analyze Structure") | |
| structure_info = gr.Markdown("No data loaded.") | |
| with gr.Column(scale=2): | |
| file_table = gr.Dataframe(label="Detected Structure", headers=['type', 'sample_id', 'rel_path'], interactive=False) | |
| gr.Markdown("### ๐ ๏ธ Rule Configuration & Preview") | |
| with gr.Row(): | |
| with gr.Column(): | |
| sample_selector = gr.Dropdown(label="Preview Sample ID", choices=[]) | |
| type_selector = gr.Dropdown(label="Configure Type", choices=[]) | |
| action_selector = gr.Dropdown(label="Action", choices=["None", "Manual Crop", "Resize", "Auto Trim"], value="None") | |
| # Dynamic inputs | |
| with gr.Group(): | |
| p1 = gr.Number(label="Param 1 (X / Width / Threshold)", value=0) | |
| p2 = gr.Number(label="Param 2 (Y / Height)", value=0) | |
| p3 = gr.Number(label="Param 3 (W)", value=100) | |
| p4 = gr.Number(label="Param 4 (H)", value=100) | |
| with gr.Row(): | |
| preview_btn = gr.Button("Preview Effect") | |
| save_rule_btn = gr.Button("Save Rule for Type (Batch)") | |
| with gr.Row(): | |
| save_override_btn = gr.Button("Save Override (This Sample Only)", variant="secondary") | |
| clear_override_btn = gr.Button("Clear Override", variant="stop") | |
| rule_status = gr.Markdown("") | |
| with gr.Column(): | |
| preview_gallery = gr.Gallery(label="Preview", columns=2) | |
| gr.Markdown("### ๐ Batch Process") | |
| process_btn = gr.Button("Apply Rules & Download", variant="primary") | |
| download_output = gr.File(label="Download Result") | |
| # Event Wiring | |
| analyze_btn.click(handle_upload, inputs=[zip_input, use_llm_chk], outputs=[file_table, sample_selector, type_selector, structure_info]) | |
| preview_btn.click(update_preview, | |
| inputs=[sample_selector, type_selector, action_selector, p1, p2, p3, p4], | |
| outputs=[preview_gallery]) | |
| save_rule_btn.click(save_rule, | |
| inputs=[type_selector, action_selector, p1, p2, p3, p4], | |
| outputs=[rule_status]) | |
| save_override_btn.click(save_override, | |
| inputs=[sample_selector, type_selector, action_selector, p1, p2, p3, p4], | |
| outputs=[rule_status]) | |
| clear_override_btn.click(clear_override, | |
| inputs=[sample_selector, type_selector], | |
| outputs=[rule_status]) | |
| process_btn.click(run_batch_process, inputs=[], outputs=[download_output]) | |
| with gr.Accordion("๐ป Code Gen Helper (Fallback)", open=False): | |
| user_req = gr.Textbox(label="Describe your requirement") | |
| gen_prompt_btn = gr.Button("Generate Prompt") | |
| prompt_output = gr.Code(label="Copy this prompt to an LLM", language="markdown") | |
| gen_prompt_btn.click(generate_code_prompt, inputs=[file_table, user_req], outputs=[prompt_output]) | |
| pass | |
| if __name__ == "__main__": | |
| with gr.Blocks() as demo: | |
| create_paper_tool() | |
| demo.launch() | |