Spaces:
Running
Running
| import gradio as gr | |
| import json | |
| import requests | |
| import pandas as pd | |
| import traceback | |
| # --- LLM Configuration --- | |
| LLM_API_KEY = "sk-fa6c38ce957e4c7b946ccbeed33237ec" | |
| LLM_API_URL = "https://api.deepseek.com/v1/chat/completions" | |
| def call_llm(prompt, system_prompt="You are a helpful assistant."): | |
| headers = { | |
| "Authorization": f"Bearer {LLM_API_KEY}", | |
| "Content-Type": "application/json" | |
| } | |
| data = { | |
| "model": "deepseek-chat", | |
| "messages": [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| "stream": False | |
| } | |
| try: | |
| response = requests.post(LLM_API_URL, headers=headers, json=data, timeout=60) | |
| response.raise_for_status() | |
| return response.json()['choices'][0]['message']['content'] | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| def analyze_json_structure(json_input): | |
| try: | |
| # Try parsing as JSON | |
| data = json.loads(json_input) | |
| except: | |
| # Try parsing as JSONL (first line) | |
| try: | |
| data = json.loads(json_input.strip().split('\n')[0]) | |
| except Exception as e: | |
| return [], f"Parse Error: {e}" | |
| prompt = f""" | |
| Analyze this JSON item from an SFT dataset: | |
| {json.dumps(data, indent=2)} | |
| 1. Identify all fields, their types, and a short sample value. | |
| 2. For each field, suggest 1-3 common data cleaning/modification actions relevant to SFT (e.g., "Normalize score", "Remove 'User:' prefix", "Fix HTML entities", "Delete if empty"). | |
| 3. Return ONLY a JSON list of objects with keys: "field", "type", "sample", "suggestions" (list of strings). | |
| """ | |
| response = call_llm(prompt, "You are a data engineering expert.") | |
| # Clean response | |
| if "```json" in response: | |
| response = response.split("```json")[1].split("```")[0] | |
| elif "```" in response: | |
| response = response.split("```")[1].split("```")[0] | |
| try: | |
| analysis = json.loads(response.strip()) | |
| # Convert to list of dicts for DataFrame | |
| return analysis, "Analysis Complete" | |
| except Exception as e: | |
| return [], f"LLM Parse Error: {e}\nRaw: {response}" | |
| def generate_transform_code(json_sample, rules): | |
| # rules is a list of dicts: [{'field': 'x', 'action': 'y', 'custom': 'z'}] | |
| prompt = f""" | |
| I have a JSON item structure like this: | |
| {json_sample} | |
| I need a Python function `transform(item)` that modifies this item based on these rules: | |
| {json.dumps(rules, indent=2)} | |
| Requirements: | |
| 1. The function must take a dict `item` and return the modified dict. | |
| 2. If the item should be filtered out (dropped), return None. | |
| 3. Handle missing fields gracefully. | |
| 4. Return ONLY the Python code for the function. No markdown. | |
| """ | |
| code = call_llm(prompt, "You are a Python expert.") | |
| if "```python" in code: | |
| code = code.split("```python")[1].split("```")[0] | |
| elif "```" in code: | |
| code = code.split("```")[1].split("```")[0] | |
| return code.strip() | |
| def generate_full_script(transform_code): | |
| template = f"""import orjson | |
| import tqdm | |
| import argparse | |
| import sys | |
| def transform(item): | |
| {transform_code} | |
| def main(): | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument('--input', required=True, help='Input JSON/JSONL file') | |
| parser.add_argument('--output', required=True, help='Output JSONL file') | |
| args = parser.parse_args() | |
| print(f"Processing {{args.input}} -> {{args.output}}") | |
| with open(args.input, 'rb') as f_in, open(args.output, 'wb') as f_out: | |
| # Detect format roughly | |
| first_char = f_in.read(1) | |
| f_in.seek(0) | |
| is_jsonl = True # Default assumption or logic | |
| # Simple line-by-line processing for JSONL | |
| # For standard JSON list, we'd need ijson or similar for streaming, | |
| # but for simplicity let's assume JSONL or small JSON. | |
| lines = f_in | |
| if first_char == b'[': | |
| print("Warning: Standard JSON list detected. Loading full file (memory intensive).") | |
| data = orjson.loads(f_in.read()) | |
| lines = data | |
| is_jsonl = False | |
| processed_count = 0 | |
| for line in tqdm.tqdm(lines): | |
| if is_jsonl: | |
| try: | |
| item = orjson.loads(line) | |
| except: | |
| continue | |
| else: | |
| item = line | |
| result = transform(item) | |
| if result is not None: | |
| f_out.write(orjson.dumps(result) + b'\\n') | |
| processed_count += 1 | |
| print(f"Done. Wrote {{processed_count}} items.") | |
| if __name__ == "__main__": | |
| main() | |
| """ | |
| return template | |
| # --- UI Logic --- | |
| def on_analyze(json_text): | |
| analysis, msg = analyze_json_structure(json_text) | |
| # Prepare choices for dropdowns | |
| fields = [item['field'] for item in analysis] if analysis else [] | |
| # Store analysis in State | |
| return analysis, gr.update(choices=fields), msg | |
| def on_field_select(field, analysis_data): | |
| # Find suggestions for this field | |
| suggestions = ["Keep Unchanged", "Delete Field", "Custom"] | |
| if analysis_data: | |
| for item in analysis_data: | |
| if item['field'] == field: | |
| suggestions += item.get('suggestions', []) | |
| break | |
| # Ensure Custom is always available | |
| if "Custom" not in suggestions: | |
| suggestions.append("Custom") | |
| return gr.update(choices=suggestions, value=suggestions[0]) | |
| def add_rule(field, action, custom, current_rules): | |
| if not current_rules: | |
| current_rules = [] | |
| rule_desc = action | |
| if action == "Custom": | |
| rule_desc = f"Custom: {custom}" | |
| new_rule = {"field": field, "action": action, "custom": custom, "display": f"{field} -> {rule_desc}"} | |
| current_rules.append(new_rule) | |
| # Return updated dataframe data | |
| display_data = [[r['field'], r['action'], r['custom']] for r in current_rules] | |
| return current_rules, display_data | |
| def run_preview(json_text, rules): | |
| if not rules: | |
| return "No rules defined." | |
| # 1. Generate Code | |
| transform_code = generate_transform_code(json_text, rules) | |
| # 2. Execute locally (Safe-ish for this context) | |
| local_scope = {} | |
| try: | |
| exec(transform_code, {}, local_scope) | |
| transform_func = local_scope.get('transform') | |
| if not transform_func: | |
| return "Error: Could not find 'transform' function in generated code." | |
| # Parse input | |
| try: | |
| item = json.loads(json_text) | |
| except: | |
| item = json.loads(json_text.strip().split('\n')[0]) | |
| # Run | |
| result = transform_func(item) | |
| return { | |
| "original": item, | |
| "modified": result, | |
| "code": transform_code | |
| } | |
| except Exception as e: | |
| return f"Execution Error: {e}\nCode:\n{transform_code}" | |
| def create_ui(): | |
| gr.Markdown(""" | |
| ## ⚡ Fastest JSON Editor (快速 JSON 编辑器) | |
| Intelligent analysis and modification of JSON/JSONL data using LLM. | |
| 利用 LLM 智能分析和修改 JSON/JSONL 数据,生成高性能处理脚本。 | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| json_input = gr.Textbox(label="Sample JSON Item", lines=10, placeholder="Paste a single JSON object here...") | |
| analyze_btn = gr.Button("🔍 Analyze Structure") | |
| status_msg = gr.Markdown("") | |
| with gr.Column(scale=1): | |
| # Field Inspector | |
| analysis_state = gr.State([]) | |
| rules_state = gr.State([]) | |
| with gr.Group(): | |
| gr.Markdown("### 🛠️ Add Modification Rule") | |
| field_dropdown = gr.Dropdown(label="Select Field", choices=[]) | |
| action_dropdown = gr.Dropdown(label="Action", choices=["Keep Unchanged", "Delete Field", "Custom"], allow_custom_value=True) | |
| custom_input = gr.Textbox(label="Custom Instruction (if needed)", placeholder="e.g. Convert to YYYY-MM-DD") | |
| add_btn = gr.Button("Add Rule") | |
| rules_table = gr.Dataframe(headers=["Field", "Action", "Custom"], label="Active Rules", interactive=False) | |
| with gr.Row(): | |
| preview_btn = gr.Button("▶️ Preview & Generate Code", variant="primary") | |
| with gr.Row(): | |
| with gr.Column(): | |
| preview_json = gr.JSON(label="Preview Result (Diff)") | |
| with gr.Column(): | |
| code_output = gr.Code(label="Generated Transform Function", language="python") | |
| with gr.Row(): | |
| gen_script_btn = gr.Button("🚀 Generate Full Script") | |
| full_script_output = gr.Code(label="Full Production Script", language="python", visible=False) | |
| # Event Wiring | |
| analyze_btn.click(on_analyze, inputs=[json_input], outputs=[analysis_state, field_dropdown, status_msg]) | |
| field_dropdown.change(on_field_select, inputs=[field_dropdown, analysis_state], outputs=[action_dropdown]) | |
| add_btn.click(add_rule, | |
| inputs=[field_dropdown, action_dropdown, custom_input, rules_state], | |
| outputs=[rules_state, rules_table]) | |
| preview_btn.click(run_preview, | |
| inputs=[json_input, rules_state], | |
| outputs=[preview_json]) | |
| # Update code output from preview result | |
| def update_code_view(result): | |
| if isinstance(result, dict): | |
| return result.get('code', '') | |
| return "" | |
| preview_btn.click(update_code_view, inputs=[preview_json], outputs=[code_output]) | |
| def on_gen_script(code): | |
| return gr.update(visible=True, value=generate_full_script(code)) | |
| gen_script_btn.click(on_gen_script, inputs=[code_output], outputs=[full_script_output]) | |