QA_sRAG / app.py
Duc Trung
Update app.py (#1)
fbffabe verified
# app.py
import os
import re
import time
import json
import html
import random
import tempfile
from typing import List
import gradio as gr
from openpyxl import load_workbook
from openpyxl.cell.cell import MergedCell
import google.generativeai as genai
import openai
# --- CONSTANTS ---
LANG_SOURCE = 'zh'
LANG_TARGET = 'vi'
OUTPUT_SUFFIX = "_Dich_CN"
BATCH_SIZE = 100
SLEEP_BETWEEN_BATCHES = 0.1
MAX_RETRIES = 5
GEMINI_MODELS = ["gemini-flash-latest", "gemini-2.5-pro", "gemini-flash-lite-latest","gemini-3-pro-preview"]
OPENAI_MODELS = ["chatgpt-4o-latest", "gpt-5-2025-08-07", "gpt-5-mini-2025-08-07"]
# Default system prompt template that the user can edit
DEFAULT_SYSTEM_PROMPT = (
"You are a professional translator. "
"Translate each input string from {src} to {tgt}. "
"Return ONLY a valid JSON array of translated strings, same order and length as input. "
"Do NOT include any extra text, code fences, or explanations."
)
# -----------------
# --- HELPER FUNCTIONS ---
def safe_sheet_title(base: str) -> str:
cleaned = re.sub(r'[:\\/?*\[\]]', '_', base)
return cleaned[:31] if len(cleaned) > 31 else cleaned
def excel_quote_sheet(name: str) -> str:
need_quote = bool(re.search(r"[ \-+*/^&=<>,;:@\[\]]", name)) or "'" in name
if "'" in name: name = name.replace("'", "''")
return f"'{name}'" if need_quote else name
# --- API CALL FUNCTIONS ---
def _gemini_translate_call(model, texts: List[str], *, system_prompt_template: str, src: str, tgt: str, max_retries: int = MAX_RETRIES) -> List[str]:
if not texts: return []
system_prompt = system_prompt_template.format(src=src, tgt=tgt)
payload = {"items": texts}
user_msg = f"{system_prompt}\n\nINPUT_JSON:\n{json.dumps(payload, ensure_ascii=False)}\n\nOUTPUT: JSON array of strings only."
for attempt in range(1, max_retries + 1):
try:
resp = model.generate_content(user_msg)
raw = (resp.text or "").strip()
raw = re.sub(r"^```(?:json)?\s*|\s*```$", "", raw, flags=re.IGNORECASE)
out = json.loads(raw)
if not isinstance(out, list) or len(out) != len(texts): raise ValueError("Invalid JSON array response.")
return [html.unescape(str(s)) for s in out]
except Exception as e:
if attempt == max_retries: raise
sleep_s = 0.8 * attempt + random.uniform(0, 0.4)
print(f"⚠️ GEMINI Error (attempt {attempt}/{max_retries}): {e}. Retrying in {sleep_s:.2f}s...")
time.sleep(sleep_s)
def _openai_translate_call(model_name: str, texts: List[str], *, system_prompt_template: str, src: str, tgt: str, max_retries: int = MAX_RETRIES) -> List[str]:
if not texts: return []
system_prompt = system_prompt_template.format(src=src, tgt=tgt)
payload = {"items": texts}
user_msg = f"INPUT_JSON:\n{json.dumps(payload, ensure_ascii=False)}\n\nOUTPUT: JSON array of strings only."
for attempt in range(1, max_retries + 1):
try:
client = openai.OpenAI()
resp = client.chat.completions.create(
model=model_name,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_msg}
],
temperature=0.2,
response_format={"type": "json_object"}
)
raw_content = resp.choices[0].message.content or "{}"
data = json.loads(raw_content)
if isinstance(data, list): out = data
elif isinstance(data, dict) and "items" in data and isinstance(data['items'], list): out = data['items']
else: out = next((v for v in data.values() if isinstance(v, list)), None)
if out is None or len(out) != len(texts): raise ValueError("Could not find a valid JSON array of the correct length.")
return [html.unescape(str(s)) for s in out]
except Exception as e:
if attempt == max_retries: raise
sleep_s = 0.8 * attempt + random.uniform(0, 0.4)
print(f"⚠️ OpenAI Error (attempt {attempt}/{max_retries}): {e}. Retrying in {sleep_s:.2f}s...")
time.sleep(sleep_s)
def update_formula_references(sheet, sheet_map):
escaped_map = {excel_quote_sheet(old): excel_quote_sheet(new) for old, new in sheet_map.items()}
for row in sheet.iter_rows(values_only=False):
for cell in row:
if cell.data_type == 'f' and cell.value:
formula = cell.value
for old_q, new_q in escaped_map.items():
pattern = re.compile(rf"(?<!\[){re.escape(old_q)}!")
formula = pattern.sub(f"{new_q}!", formula)
cell.value = formula
# --- MAIN PROCESSING FUNCTION ---
def process_translation(
service: str, gemini_api_key: str, gemini_model: str,
openai_api_key: str, openai_model: str,
user_prompt: str, input_file,
progress=gr.Progress(track_tqdm=True)
):
if input_file is None: raise gr.Error("No Excel file uploaded.")
if not user_prompt or not user_prompt.strip(): raise gr.Error("The system prompt cannot be empty.")
logs = []
try:
# 1. Configure service
if service == "Gemini":
if not gemini_api_key: raise gr.Error("Gemini API Key is missing.")
genai.configure(api_key=gemini_api_key.strip())
model_instance = genai.GenerativeModel(gemini_model)
logs.append(f"✅ Service: Gemini | Model: {gemini_model}")
elif service == "OpenAI":
if not openai_api_key: raise gr.Error("OpenAI API Key is missing.")
openai.api_key = openai_api_key.strip()
model_instance = openai_model
logs.append(f"✅ Service: OpenAI | Model: {openai_model}")
else:
raise gr.Error("Invalid service selected.")
yield "\n".join(logs), None
# 2. Load workbook
wb = load_workbook(input_file.name)
original_sheet_names = [name for name in wb.sheetnames if OUTPUT_SUFFIX not in name]
logs.append(f"✅ Loaded Excel file. Found {len(original_sheet_names)} sheet(s).")
yield "\n".join(logs), None
sheet_name_map = {}
# 3. Translate sheets
for sheet_name in progress.tqdm(original_sheet_names, desc="Translating Sheets"):
source_sheet = wb[sheet_name]
new_sheet_name_raw = f"{sheet_name}{OUTPUT_SUFFIX}"
new_sheet_name = safe_sheet_title(new_sheet_name_raw)
sheet_name_map[sheet_name] = new_sheet_name
if new_sheet_name in wb.sheetnames: new_sheet = wb[new_sheet_name]
else:
new_sheet = wb.copy_worksheet(source_sheet)
new_sheet.title = new_sheet_name
cells_to_translate = []
seen = {}
for row in source_sheet.iter_rows(values_only=False):
for cell in row:
if isinstance(cell, MergedCell): continue
val = cell.value
if isinstance(val, str) and val.strip() and cell.data_type != 'f':
cells_to_translate.append({'text': val, 'row': cell.row, 'col': cell.column})
total_cells = len(cells_to_translate)
logs.append(f"--- Translating Sheet: {sheet_name} ({total_cells} cells) -> {new_sheet_name} ---")
yield "\n".join(logs), None
idx = 0
while idx < total_cells:
chunk = cells_to_translate[idx:idx+BATCH_SIZE]
to_send = [item['text'] for item in chunk if item['text'] not in seen]
if to_send:
try:
if service == "Gemini":
translations = _gemini_translate_call(model_instance, to_send, system_prompt_template=user_prompt, src=LANG_SOURCE, tgt=LANG_TARGET)
else: # OpenAI
translations = _openai_translate_call(model_instance, to_send, system_prompt_template=user_prompt, src=LANG_SOURCE, tgt=LANG_TARGET)
for src_text, dst_text in zip(to_send, translations):
seen[src_text] = dst_text
except Exception as e:
logs.append(f"❌ API ERROR (batch {idx}-{idx+BATCH_SIZE}): {e}. Skipping batch.")
idx += BATCH_SIZE
time.sleep(SLEEP_BETWEEN_BATCHES)
continue
for item in chunk:
translated = seen.get(item['text'], item['text'])
target_cell = new_sheet.cell(row=item['row'], column=item['col'])
if not isinstance(target_cell, MergedCell):
target_cell.value = translated
processed_count = min(idx + len(chunk), total_cells)
logs[-1] = f"--- Translating Sheet: {sheet_name} ({processed_count}/{total_cells} cells) -> {new_sheet_name} ---"
yield "\n".join(logs), None
idx += BATCH_SIZE
time.sleep(SLEEP_BETWEEN_BATCHES)
logs.append(f"✅ Finished sheet: {sheet_name}")
yield "\n".join(logs), None
# 4. Update formula references
logs.append("\n[STEP 2] Updating formula references...")
for sheet_name in original_sheet_names:
new_sheet_name = sheet_name_map.get(sheet_name)
if new_sheet_name:
update_formula_references(wb[new_sheet_name], sheet_name_map)
logs.append(f"✅ Updated references for sheet: {new_sheet_name}")
yield "\n".join(logs), None
# 5. Save the result
with tempfile.NamedTemporaryFile(delete=False, suffix=".xlsx") as tmp:
wb.save(tmp.name)
output_file_path = tmp.name
logs.append(f"\n✅ SUCCESS! Your translated file is ready for download.")
yield "\n".join(logs), output_file_path
except Exception as e:
raise gr.Error(f"An error occurred: {e}")
# --- GRADIO UI ---
def update_provider_ui(service):
is_gemini = service == "Gemini"
return {
gemini_ui_group: gr.Group(visible=is_gemini),
openai_ui_group: gr.Group(visible=not is_gemini),
}
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("# 批量Excel翻译工具 (ZH-VI) | Excel Translator (ZH-VI)")
with gr.Row():
with gr.Column(scale=1):
service_selector = gr.Radio(["Gemini", "OpenAI"], label="1. Select Translation Service", value="Gemini")
with gr.Group(visible=True) as gemini_ui_group:
gemini_api_key_input = gr.Textbox(label="2. Google Gemini API Key", type="password")
gemini_model_selector = gr.Dropdown(label="3. Select Gemini Model", choices=GEMINI_MODELS, value=GEMINI_MODELS[0])
with gr.Group(visible=False) as openai_ui_group:
openai_api_key_input = gr.Textbox(label="2. OpenAI API Key", type="password")
openai_model_selector = gr.Dropdown(label="3. Select OpenAI Model", choices=OPENAI_MODELS, value=OPENAI_MODELS[0])
file_input = gr.File(label="4. Upload Excel File (.xlsx)", file_types=[".xlsx"])
with gr.Accordion("Advanced Options", open=False):
prompt_input = gr.Textbox(
label="System Prompt Template",
value=DEFAULT_SYSTEM_PROMPT,
lines=5,
info="Edit the prompt for the AI model. Use {src} for source language and {tgt} for target language."
)
translate_button = gr.Button("5. Start Translation", variant="primary")
with gr.Column(scale=2):
log_output = gr.Textbox(label="Progress & Logs", interactive=False, lines=25, autoscroll=True)
file_output = gr.File(label="Download Translated File")
service_selector.change(fn=update_provider_ui, inputs=service_selector, outputs=[gemini_ui_group, openai_ui_group])
translate_button.click(
fn=process_translation,
inputs=[
service_selector, gemini_api_key_input, gemini_model_selector,
openai_api_key_input, openai_model_selector,
prompt_input, file_input
],
outputs=[log_output, file_output]
)
if __name__ == "__main__":
demo.launch()