Spaces:
Sleeping
Sleeping
| # app.py | |
| import os | |
| import re | |
| import time | |
| import json | |
| import html | |
| import random | |
| import tempfile | |
| from typing import List | |
| import gradio as gr | |
| from openpyxl import load_workbook | |
| from openpyxl.cell.cell import MergedCell | |
| import google.generativeai as genai | |
| import openai | |
| # --- CONSTANTS --- | |
| LANG_SOURCE = 'zh' | |
| LANG_TARGET = 'vi' | |
| OUTPUT_SUFFIX = "_Dich_CN" | |
| BATCH_SIZE = 100 | |
| SLEEP_BETWEEN_BATCHES = 0.1 | |
| MAX_RETRIES = 5 | |
| GEMINI_MODELS = ["gemini-flash-latest", "gemini-2.5-pro", "gemini-flash-lite-latest","gemini-3-pro-preview"] | |
| OPENAI_MODELS = ["chatgpt-4o-latest", "gpt-5-2025-08-07", "gpt-5-mini-2025-08-07"] | |
| # Default system prompt template that the user can edit | |
| DEFAULT_SYSTEM_PROMPT = ( | |
| "You are a professional translator. " | |
| "Translate each input string from {src} to {tgt}. " | |
| "Return ONLY a valid JSON array of translated strings, same order and length as input. " | |
| "Do NOT include any extra text, code fences, or explanations." | |
| ) | |
| # ----------------- | |
| # --- HELPER FUNCTIONS --- | |
| def safe_sheet_title(base: str) -> str: | |
| cleaned = re.sub(r'[:\\/?*\[\]]', '_', base) | |
| return cleaned[:31] if len(cleaned) > 31 else cleaned | |
| def excel_quote_sheet(name: str) -> str: | |
| need_quote = bool(re.search(r"[ \-+*/^&=<>,;:@\[\]]", name)) or "'" in name | |
| if "'" in name: name = name.replace("'", "''") | |
| return f"'{name}'" if need_quote else name | |
| # --- API CALL FUNCTIONS --- | |
| def _gemini_translate_call(model, texts: List[str], *, system_prompt_template: str, src: str, tgt: str, max_retries: int = MAX_RETRIES) -> List[str]: | |
| if not texts: return [] | |
| system_prompt = system_prompt_template.format(src=src, tgt=tgt) | |
| payload = {"items": texts} | |
| user_msg = f"{system_prompt}\n\nINPUT_JSON:\n{json.dumps(payload, ensure_ascii=False)}\n\nOUTPUT: JSON array of strings only." | |
| for attempt in range(1, max_retries + 1): | |
| try: | |
| resp = model.generate_content(user_msg) | |
| raw = (resp.text or "").strip() | |
| raw = re.sub(r"^```(?:json)?\s*|\s*```$", "", raw, flags=re.IGNORECASE) | |
| out = json.loads(raw) | |
| if not isinstance(out, list) or len(out) != len(texts): raise ValueError("Invalid JSON array response.") | |
| return [html.unescape(str(s)) for s in out] | |
| except Exception as e: | |
| if attempt == max_retries: raise | |
| sleep_s = 0.8 * attempt + random.uniform(0, 0.4) | |
| print(f"⚠️ GEMINI Error (attempt {attempt}/{max_retries}): {e}. Retrying in {sleep_s:.2f}s...") | |
| time.sleep(sleep_s) | |
| def _openai_translate_call(model_name: str, texts: List[str], *, system_prompt_template: str, src: str, tgt: str, max_retries: int = MAX_RETRIES) -> List[str]: | |
| if not texts: return [] | |
| system_prompt = system_prompt_template.format(src=src, tgt=tgt) | |
| payload = {"items": texts} | |
| user_msg = f"INPUT_JSON:\n{json.dumps(payload, ensure_ascii=False)}\n\nOUTPUT: JSON array of strings only." | |
| for attempt in range(1, max_retries + 1): | |
| try: | |
| client = openai.OpenAI() | |
| resp = client.chat.completions.create( | |
| model=model_name, | |
| messages=[ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_msg} | |
| ], | |
| temperature=0.2, | |
| response_format={"type": "json_object"} | |
| ) | |
| raw_content = resp.choices[0].message.content or "{}" | |
| data = json.loads(raw_content) | |
| if isinstance(data, list): out = data | |
| elif isinstance(data, dict) and "items" in data and isinstance(data['items'], list): out = data['items'] | |
| else: out = next((v for v in data.values() if isinstance(v, list)), None) | |
| if out is None or len(out) != len(texts): raise ValueError("Could not find a valid JSON array of the correct length.") | |
| return [html.unescape(str(s)) for s in out] | |
| except Exception as e: | |
| if attempt == max_retries: raise | |
| sleep_s = 0.8 * attempt + random.uniform(0, 0.4) | |
| print(f"⚠️ OpenAI Error (attempt {attempt}/{max_retries}): {e}. Retrying in {sleep_s:.2f}s...") | |
| time.sleep(sleep_s) | |
| def update_formula_references(sheet, sheet_map): | |
| escaped_map = {excel_quote_sheet(old): excel_quote_sheet(new) for old, new in sheet_map.items()} | |
| for row in sheet.iter_rows(values_only=False): | |
| for cell in row: | |
| if cell.data_type == 'f' and cell.value: | |
| formula = cell.value | |
| for old_q, new_q in escaped_map.items(): | |
| pattern = re.compile(rf"(?<!\[){re.escape(old_q)}!") | |
| formula = pattern.sub(f"{new_q}!", formula) | |
| cell.value = formula | |
| # --- MAIN PROCESSING FUNCTION --- | |
| def process_translation( | |
| service: str, gemini_api_key: str, gemini_model: str, | |
| openai_api_key: str, openai_model: str, | |
| user_prompt: str, input_file, | |
| progress=gr.Progress(track_tqdm=True) | |
| ): | |
| if input_file is None: raise gr.Error("No Excel file uploaded.") | |
| if not user_prompt or not user_prompt.strip(): raise gr.Error("The system prompt cannot be empty.") | |
| logs = [] | |
| try: | |
| # 1. Configure service | |
| if service == "Gemini": | |
| if not gemini_api_key: raise gr.Error("Gemini API Key is missing.") | |
| genai.configure(api_key=gemini_api_key.strip()) | |
| model_instance = genai.GenerativeModel(gemini_model) | |
| logs.append(f"✅ Service: Gemini | Model: {gemini_model}") | |
| elif service == "OpenAI": | |
| if not openai_api_key: raise gr.Error("OpenAI API Key is missing.") | |
| openai.api_key = openai_api_key.strip() | |
| model_instance = openai_model | |
| logs.append(f"✅ Service: OpenAI | Model: {openai_model}") | |
| else: | |
| raise gr.Error("Invalid service selected.") | |
| yield "\n".join(logs), None | |
| # 2. Load workbook | |
| wb = load_workbook(input_file.name) | |
| original_sheet_names = [name for name in wb.sheetnames if OUTPUT_SUFFIX not in name] | |
| logs.append(f"✅ Loaded Excel file. Found {len(original_sheet_names)} sheet(s).") | |
| yield "\n".join(logs), None | |
| sheet_name_map = {} | |
| # 3. Translate sheets | |
| for sheet_name in progress.tqdm(original_sheet_names, desc="Translating Sheets"): | |
| source_sheet = wb[sheet_name] | |
| new_sheet_name_raw = f"{sheet_name}{OUTPUT_SUFFIX}" | |
| new_sheet_name = safe_sheet_title(new_sheet_name_raw) | |
| sheet_name_map[sheet_name] = new_sheet_name | |
| if new_sheet_name in wb.sheetnames: new_sheet = wb[new_sheet_name] | |
| else: | |
| new_sheet = wb.copy_worksheet(source_sheet) | |
| new_sheet.title = new_sheet_name | |
| cells_to_translate = [] | |
| seen = {} | |
| for row in source_sheet.iter_rows(values_only=False): | |
| for cell in row: | |
| if isinstance(cell, MergedCell): continue | |
| val = cell.value | |
| if isinstance(val, str) and val.strip() and cell.data_type != 'f': | |
| cells_to_translate.append({'text': val, 'row': cell.row, 'col': cell.column}) | |
| total_cells = len(cells_to_translate) | |
| logs.append(f"--- Translating Sheet: {sheet_name} ({total_cells} cells) -> {new_sheet_name} ---") | |
| yield "\n".join(logs), None | |
| idx = 0 | |
| while idx < total_cells: | |
| chunk = cells_to_translate[idx:idx+BATCH_SIZE] | |
| to_send = [item['text'] for item in chunk if item['text'] not in seen] | |
| if to_send: | |
| try: | |
| if service == "Gemini": | |
| translations = _gemini_translate_call(model_instance, to_send, system_prompt_template=user_prompt, src=LANG_SOURCE, tgt=LANG_TARGET) | |
| else: # OpenAI | |
| translations = _openai_translate_call(model_instance, to_send, system_prompt_template=user_prompt, src=LANG_SOURCE, tgt=LANG_TARGET) | |
| for src_text, dst_text in zip(to_send, translations): | |
| seen[src_text] = dst_text | |
| except Exception as e: | |
| logs.append(f"❌ API ERROR (batch {idx}-{idx+BATCH_SIZE}): {e}. Skipping batch.") | |
| idx += BATCH_SIZE | |
| time.sleep(SLEEP_BETWEEN_BATCHES) | |
| continue | |
| for item in chunk: | |
| translated = seen.get(item['text'], item['text']) | |
| target_cell = new_sheet.cell(row=item['row'], column=item['col']) | |
| if not isinstance(target_cell, MergedCell): | |
| target_cell.value = translated | |
| processed_count = min(idx + len(chunk), total_cells) | |
| logs[-1] = f"--- Translating Sheet: {sheet_name} ({processed_count}/{total_cells} cells) -> {new_sheet_name} ---" | |
| yield "\n".join(logs), None | |
| idx += BATCH_SIZE | |
| time.sleep(SLEEP_BETWEEN_BATCHES) | |
| logs.append(f"✅ Finished sheet: {sheet_name}") | |
| yield "\n".join(logs), None | |
| # 4. Update formula references | |
| logs.append("\n[STEP 2] Updating formula references...") | |
| for sheet_name in original_sheet_names: | |
| new_sheet_name = sheet_name_map.get(sheet_name) | |
| if new_sheet_name: | |
| update_formula_references(wb[new_sheet_name], sheet_name_map) | |
| logs.append(f"✅ Updated references for sheet: {new_sheet_name}") | |
| yield "\n".join(logs), None | |
| # 5. Save the result | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".xlsx") as tmp: | |
| wb.save(tmp.name) | |
| output_file_path = tmp.name | |
| logs.append(f"\n✅ SUCCESS! Your translated file is ready for download.") | |
| yield "\n".join(logs), output_file_path | |
| except Exception as e: | |
| raise gr.Error(f"An error occurred: {e}") | |
| # --- GRADIO UI --- | |
| def update_provider_ui(service): | |
| is_gemini = service == "Gemini" | |
| return { | |
| gemini_ui_group: gr.Group(visible=is_gemini), | |
| openai_ui_group: gr.Group(visible=not is_gemini), | |
| } | |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# 批量Excel翻译工具 (ZH-VI) | Excel Translator (ZH-VI)") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| service_selector = gr.Radio(["Gemini", "OpenAI"], label="1. Select Translation Service", value="Gemini") | |
| with gr.Group(visible=True) as gemini_ui_group: | |
| gemini_api_key_input = gr.Textbox(label="2. Google Gemini API Key", type="password") | |
| gemini_model_selector = gr.Dropdown(label="3. Select Gemini Model", choices=GEMINI_MODELS, value=GEMINI_MODELS[0]) | |
| with gr.Group(visible=False) as openai_ui_group: | |
| openai_api_key_input = gr.Textbox(label="2. OpenAI API Key", type="password") | |
| openai_model_selector = gr.Dropdown(label="3. Select OpenAI Model", choices=OPENAI_MODELS, value=OPENAI_MODELS[0]) | |
| file_input = gr.File(label="4. Upload Excel File (.xlsx)", file_types=[".xlsx"]) | |
| with gr.Accordion("Advanced Options", open=False): | |
| prompt_input = gr.Textbox( | |
| label="System Prompt Template", | |
| value=DEFAULT_SYSTEM_PROMPT, | |
| lines=5, | |
| info="Edit the prompt for the AI model. Use {src} for source language and {tgt} for target language." | |
| ) | |
| translate_button = gr.Button("5. Start Translation", variant="primary") | |
| with gr.Column(scale=2): | |
| log_output = gr.Textbox(label="Progress & Logs", interactive=False, lines=25, autoscroll=True) | |
| file_output = gr.File(label="Download Translated File") | |
| service_selector.change(fn=update_provider_ui, inputs=service_selector, outputs=[gemini_ui_group, openai_ui_group]) | |
| translate_button.click( | |
| fn=process_translation, | |
| inputs=[ | |
| service_selector, gemini_api_key_input, gemini_model_selector, | |
| openai_api_key_input, openai_model_selector, | |
| prompt_input, file_input | |
| ], | |
| outputs=[log_output, file_output] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() |