Spaces:
Sleeping
Sleeping
| from datetime import datetime, timezone | |
| import hashlib | |
| import json | |
| import logging | |
| import os | |
| import random | |
| import re | |
| import sqlite3 | |
| import sys | |
| import threading | |
| import time | |
| import traceback | |
| from dotenv import load_dotenv | |
| from google import genai | |
| from google.genai import types | |
| import gradio as gr | |
| import yaml | |
| load_dotenv() | |
| print("BOOT_STAGE: dotenv_loaded") | |
| api_key = os.environ.get("GEMINI_API_KEY") | |
| client = genai.Client(api_key=api_key) | |
| def load_app_config(): | |
| config_path = os.environ.get("LANG_REWINDER_CONFIG_PATH", "config.yaml") | |
| with open(config_path, "r", encoding="utf-8") as f: | |
| return yaml.safe_load(f) | |
| APP_CONFIG = load_app_config() | |
| MODEL_NAME = APP_CONFIG["models"]["main"] | |
| FALLBACK_MODEL_NAME = APP_CONFIG["models"]["fallback"] | |
| DECADE_START_YEAR = APP_CONFIG["decades"]["start_year"] | |
| DECADE_END_YEAR = APP_CONFIG["decades"]["end_year"] | |
| DECADE_INTERVAL = APP_CONFIG["decades"]["interval"] | |
| def pick_storage_root(): | |
| if os.environ.get("LTM_STORAGE_DIR"): | |
| return os.environ["LTM_STORAGE_DIR"] | |
| if os.environ.get("SPACE_ID"): | |
| # HF persistent storage (paid) is mounted at /data. | |
| if os.path.isdir("/data") and os.access("/data", os.W_OK): | |
| return "/data/lang_rewinder" | |
| return "/tmp/lang_rewinder" | |
| return "." | |
| storage_root = pick_storage_root() | |
| cache_db_path = os.environ.get( | |
| "LLM_CACHE_DB_PATH", | |
| os.path.join(storage_root, ".cache", "lang_rewinder_cache.sqlite3"), | |
| ) | |
| log_path = os.environ.get( | |
| "LLM_LOG_PATH", | |
| os.path.join(storage_root, ".logs", "lang_rewinder_requests.log"), | |
| ) | |
| cache_dir = os.path.dirname(cache_db_path) | |
| if cache_dir: | |
| os.makedirs(cache_dir, exist_ok=True) | |
| log_dir = os.path.dirname(log_path) | |
| if log_dir: | |
| os.makedirs(log_dir, exist_ok=True) | |
| cache_conn = sqlite3.connect(cache_db_path, check_same_thread=False) | |
| cache_lock = threading.Lock() | |
| with cache_lock: | |
| cache_conn.execute( | |
| """ | |
| CREATE TABLE IF NOT EXISTS llm_cache ( | |
| cache_key TEXT PRIMARY KEY, | |
| output_text TEXT NOT NULL, | |
| expires_at INTEGER | |
| ) | |
| """ | |
| ) | |
| try: | |
| cache_conn.execute("ALTER TABLE llm_cache ADD COLUMN expires_at INTEGER") | |
| except sqlite3.OperationalError: | |
| pass | |
| cache_conn.commit() | |
| logger = logging.getLogger("lang_rewinder") | |
| logger.setLevel(logging.INFO) | |
| if not logger.handlers: | |
| file_handler = logging.FileHandler(log_path) | |
| file_handler.setFormatter(logging.Formatter("%(message)s")) | |
| logger.addHandler(file_handler) | |
| # TODO switch to structured output | |
| SYSTEM_PROMPT = """ | |
| You are a Historical Linguist and Translator. Your goal is to rewrite modern text into the vocabulary, | |
| syntax, and slang of a specific year or decade. | |
| RULES: | |
| 1. ANCHRONISM FILTER: Strictly avoid words, concepts, or technologies that did not exist in the target year. | |
| 2. CULTURAL VIBE: Adopt the social tone of the era (e.g., the earnest restraint of 1940s Europe, the laid-back groove of 1970s America). | |
| 3. EXPLANATION: After the translation, provide a short 'Etymology Note' explaining why you replaced certain modern words. | |
| 4. LANGUAGE: If the input is not in English, translate it into the target year's equivalent within that same language. Do not translate between languages (e.g., French stays French). | |
| OUTPUT FORMAT (MANDATORY): | |
| - Return plain Markdown only. | |
| - First line must be exactly: **<TARGET_YEAR> <LANGUAGE_NAME_IN_ENGLISH>:** | |
| - Then write only the translated text. | |
| - Then write exactly this header on its own line: **Etymology Note:** | |
| - Then 2-4 short bullet points. | |
| """ | |
| ALL_DECADES_SYSTEM_PROMPT = """ | |
| You are a Historical Linguist and Translator. Rewrite modern text into the style of multiple target decades. | |
| RULES: | |
| 1. Keep the same language as the input text. | |
| 2. Return short translations only, no explanations and no etymology notes. | |
| 3. Follow each decade label exactly as provided. | |
| OUTPUT FORMAT (MANDATORY): | |
| - Return a Markdown table only. | |
| - Header must be exactly: | |
| | Decade | Translation | | |
| |---|---| | |
| - Then one row per requested decade. | |
| """ | |
| def log_request(event_data): | |
| payload = { | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "model": MODEL_NAME, | |
| **event_data, | |
| } | |
| logger.info(json.dumps(payload, ensure_ascii=False)) | |
| def normalize_output_text(raw_text): | |
| lines = raw_text.strip().splitlines() | |
| normalized_lines = [] | |
| for line in lines: | |
| stripped = line.strip() | |
| if re.fullmatch(r"[-*_]{3,}", stripped): | |
| continue | |
| heading_candidate = re.sub(r"^#{1,6}\s*", "", stripped) | |
| heading_candidate = heading_candidate.strip() | |
| heading_candidate = re.sub(r"^\*{1,2}\s*(.*?)\s*\*{1,2}$", r"\1", heading_candidate) | |
| heading_candidate = heading_candidate.strip() | |
| if re.fullmatch(r"etymology note:?", heading_candidate, flags=re.IGNORECASE): | |
| normalized_lines.append("**Etymology Note:**") | |
| continue | |
| normalized_lines.append(line) | |
| cleaned_text = "\n".join(normalized_lines) | |
| cleaned_text = re.sub(r"(?m)^\s*\*{1,2}\s*$", "", cleaned_text) | |
| cleaned_text = re.sub( | |
| r"(?im)^\s*\*{0,2}\s*etymology note:\s*\*{0,2}\s*$", | |
| "**Etymology Note:**", | |
| cleaned_text, | |
| ) | |
| cleaned_text = re.sub(r"\n{3,}", "\n\n", cleaned_text) | |
| cleaned_text = re.sub( | |
| r"\n*\*\*Etymology Note:\*\*\n*", | |
| "\n\n**Etymology Note:**\n", | |
| cleaned_text, | |
| flags=re.IGNORECASE, | |
| ) | |
| cleaned_text = cleaned_text.strip() | |
| return cleaned_text | |
| def extract_usage_and_cost(response): | |
| usage = getattr(response, "usage_metadata", None) | |
| input_tokens = getattr(usage, "prompt_token_count", None) if usage else None | |
| output_tokens = getattr(usage, "candidates_token_count", None) if usage else None | |
| return input_tokens, output_tokens, None, None, None | |
| def get_cached_output(cache_key): | |
| now_ts = int(time.time()) | |
| with cache_lock: | |
| row = cache_conn.execute( | |
| """ | |
| SELECT output_text | |
| FROM llm_cache | |
| WHERE cache_key = ? | |
| AND (expires_at IS NULL OR expires_at > ?) | |
| """, | |
| (cache_key, now_ts), | |
| ).fetchone() | |
| return row[0] if row else None | |
| def set_cached_output(cache_key, cached_text, ttl_seconds=None): | |
| expires_at = None | |
| if ttl_seconds is not None: | |
| expires_at = int(time.time()) + int(ttl_seconds) | |
| with cache_lock: | |
| cache_conn.execute( | |
| "INSERT OR REPLACE INTO llm_cache (cache_key, output_text, expires_at) VALUES (?, ?, ?)", | |
| (cache_key, cached_text, expires_at), | |
| ) | |
| cache_conn.commit() | |
| def is_503_error(error_text): | |
| upper_error = error_text.upper() | |
| return "503" in upper_error and "UNAVAILABLE" in upper_error | |
| def get_show_all_years(): | |
| years = [] | |
| step_years = DECADE_INTERVAL * 10 | |
| current = DECADE_START_YEAR | |
| while current <= DECADE_END_YEAR: | |
| years.append(current) | |
| current += step_years | |
| return years | |
| def generate_with_retry(model_name, system_prompt, contents, retry_count, initial_backoff_seconds): | |
| attempt = 0 | |
| while True: | |
| try: | |
| response = client.models.generate_content( | |
| model=model_name, | |
| config=types.GenerateContentConfig(system_instruction=system_prompt), | |
| contents=contents, | |
| ) | |
| return response, None | |
| except Exception as e: # pylint: disable=broad-exception-caught | |
| error_text = str(e) | |
| if not is_503_error(error_text) or attempt >= retry_count: | |
| return None, e | |
| base_wait_seconds = initial_backoff_seconds * (2**attempt) | |
| jitter_seconds = random.uniform(0.2, base_wait_seconds) | |
| time.sleep(jitter_seconds) | |
| attempt += 1 | |
| def translate_text(user_input, target_year, show_all=False): # pylint: disable=too-many-locals,too-many-return-statements,too-many-statements | |
| if not api_key: | |
| output_text = "Error: API key not found. Please set GEMINI_API_KEY." | |
| log_request( | |
| { | |
| "model": MODEL_NAME, | |
| "target_year": target_year, | |
| "show_all": show_all, | |
| "input_text": user_input, | |
| "output_text": output_text, | |
| "error_text": "Missing GEMINI_API_KEY", | |
| "cache_hit": False, | |
| "input_tokens": None, | |
| "output_tokens": None, | |
| "input_cost_usd": None, | |
| "output_cost_usd": None, | |
| "total_cost_usd": None, | |
| } | |
| ) | |
| return output_text | |
| if not user_input.strip(): | |
| return "" | |
| active_prompt = SYSTEM_PROMPT | |
| active_target = str(target_year) | |
| response_format_mode = "single" | |
| if show_all: | |
| years = get_show_all_years() | |
| active_target = ",".join(str(year) for year in years) | |
| active_prompt = ALL_DECADES_SYSTEM_PROMPT | |
| response_format_mode = "all_decades" | |
| cache_input = f"{MODEL_NAME}|{FALLBACK_MODEL_NAME}|{active_target}|{response_format_mode}|{active_prompt}|{user_input}" | |
| cache_key = hashlib.sha256(cache_input.encode("utf-8")).hexdigest() | |
| cached_output = get_cached_output(cache_key) | |
| if cached_output is not None: | |
| normalized_cached_output = cached_output if show_all else normalize_output_text(cached_output) | |
| log_request( | |
| { | |
| "model": MODEL_NAME, | |
| "responded_model": "cache", | |
| "target_year": target_year, | |
| "show_all": show_all, | |
| "input_text": user_input, | |
| "output_text": normalized_cached_output, | |
| "error_text": None, | |
| "cache_hit": True, | |
| "input_tokens": None, | |
| "output_tokens": None, | |
| "input_cost_usd": 0.0, | |
| "output_cost_usd": 0.0, | |
| "total_cost_usd": 0.0, | |
| } | |
| ) | |
| return normalized_cached_output | |
| if show_all: | |
| years = get_show_all_years() | |
| contents = ( | |
| f"Target Decades: {', '.join(str(year) for year in years)}\n" | |
| "Return only the Markdown table with one row per requested decade.\n" | |
| f"Text: {user_input}" | |
| ) | |
| else: | |
| contents = ( | |
| f"Target Year: {target_year}\n" | |
| "Follow the exact output format from the system prompt. " | |
| "Do not add any extra label before the translation body.\n" | |
| f"Text: {user_input}" | |
| ) | |
| primary_response, primary_error = generate_with_retry( | |
| model_name=MODEL_NAME, | |
| system_prompt=active_prompt, | |
| contents=contents, | |
| retry_count=0, | |
| initial_backoff_seconds=1, | |
| ) | |
| responded_model = MODEL_NAME | |
| response = primary_response | |
| error = primary_error | |
| if response is None and error is not None and is_503_error(str(error)): | |
| fallback_response, fallback_error = generate_with_retry( | |
| model_name=FALLBACK_MODEL_NAME, | |
| system_prompt=active_prompt, | |
| contents=contents, | |
| retry_count=3, | |
| initial_backoff_seconds=1, | |
| ) | |
| response = fallback_response | |
| error = fallback_error | |
| responded_model = FALLBACK_MODEL_NAME if response is not None else MODEL_NAME | |
| if response is not None: | |
| cleaned_text = response.text if show_all else normalize_output_text(response.text) | |
| input_tokens, output_tokens, input_cost_usd, output_cost_usd, total_cost_usd = extract_usage_and_cost(response) | |
| set_cached_output(cache_key, cleaned_text) | |
| log_request( | |
| { | |
| "model": MODEL_NAME, | |
| "responded_model": responded_model, | |
| "target_year": target_year, | |
| "show_all": show_all, | |
| "input_text": user_input, | |
| "output_text": cleaned_text, | |
| "error_text": None, | |
| "cache_hit": False, | |
| "input_tokens": input_tokens, | |
| "output_tokens": output_tokens, | |
| "input_cost_usd": input_cost_usd, | |
| "output_cost_usd": output_cost_usd, | |
| "total_cost_usd": total_cost_usd, | |
| } | |
| ) | |
| return cleaned_text | |
| error_text = str(error) if error is not None else "Unknown generation error" | |
| is_unavailable_error = is_503_error(error_text) | |
| if is_unavailable_error: | |
| output_text = "🚦 The models are under high demand right now. Please try again in a minute." | |
| set_cached_output(cache_key, output_text, ttl_seconds=30) | |
| log_request( | |
| { | |
| "model": MODEL_NAME, | |
| "responded_model": "none", | |
| "target_year": target_year, | |
| "show_all": show_all, | |
| "input_text": user_input, | |
| "output_text": output_text, | |
| "error_text": error_text, | |
| "cache_hit": False, | |
| "input_tokens": None, | |
| "output_tokens": None, | |
| "input_cost_usd": None, | |
| "output_cost_usd": None, | |
| "total_cost_usd": None, | |
| } | |
| ) | |
| return output_text | |
| is_quota_error = "RESOURCE_EXHAUSTED" in error_text or "429" in error_text | |
| if is_quota_error: | |
| is_daily_quota = "PerDay" in error_text or "free_tier_requests" in error_text | |
| if is_daily_quota: | |
| output_text = ( | |
| "🚫 Daily free-tier quota reached for this model. Waiting a few seconds won't help. " | |
| "Please try again after the daily reset, switch model, or use a paid quota." | |
| ) | |
| set_cached_output(cache_key, output_text, ttl_seconds=300) | |
| log_request( | |
| { | |
| "model": MODEL_NAME, | |
| "responded_model": "none", | |
| "target_year": target_year, | |
| "show_all": show_all, | |
| "input_text": user_input, | |
| "output_text": output_text, | |
| "error_text": error_text, | |
| "cache_hit": False, | |
| "input_tokens": None, | |
| "output_tokens": None, | |
| "input_cost_usd": None, | |
| "output_cost_usd": None, | |
| "total_cost_usd": None, | |
| } | |
| ) | |
| return output_text | |
| retry_match = re.search(r"retry in ([\d.]+)s", error_text, re.IGNORECASE) | |
| if retry_match: | |
| wait_seconds = max(1, int(float(retry_match.group(1)) + 0.999)) | |
| output_text = f"⏳ Too many requests right now. Please wait ~{wait_seconds} seconds and try again." | |
| set_cached_output(cache_key, output_text, ttl_seconds=wait_seconds) | |
| log_request( | |
| { | |
| "model": MODEL_NAME, | |
| "responded_model": "none", | |
| "target_year": target_year, | |
| "show_all": show_all, | |
| "input_text": user_input, | |
| "output_text": output_text, | |
| "error_text": error_text, | |
| "cache_hit": False, | |
| "input_tokens": None, | |
| "output_tokens": None, | |
| "input_cost_usd": None, | |
| "output_cost_usd": None, | |
| "total_cost_usd": None, | |
| } | |
| ) | |
| return output_text | |
| output_text = "⏳ Too many requests right now. Please wait a bit and try again." | |
| set_cached_output(cache_key, output_text, ttl_seconds=20) | |
| log_request( | |
| { | |
| "model": MODEL_NAME, | |
| "responded_model": "none", | |
| "target_year": target_year, | |
| "show_all": show_all, | |
| "input_text": user_input, | |
| "output_text": output_text, | |
| "error_text": error_text, | |
| "cache_hit": False, | |
| "input_tokens": None, | |
| "output_tokens": None, | |
| "input_cost_usd": None, | |
| "output_cost_usd": None, | |
| "total_cost_usd": None, | |
| } | |
| ) | |
| return output_text | |
| output_text = f"Error: {error_text}" | |
| log_request( | |
| { | |
| "model": MODEL_NAME, | |
| "responded_model": "none", | |
| "target_year": target_year, | |
| "show_all": show_all, | |
| "input_text": user_input, | |
| "output_text": output_text, | |
| "error_text": error_text, | |
| "cache_hit": False, | |
| "input_tokens": None, | |
| "output_tokens": None, | |
| "input_cost_usd": None, | |
| "output_cost_usd": None, | |
| "total_cost_usd": None, | |
| } | |
| ) | |
| return output_text | |
| theme = gr.themes.Soft( | |
| font=[gr.themes.GoogleFont("Inter"), "ui-sans-serif", "system-ui", "sans-serif"], | |
| primary_hue="indigo", | |
| ) | |
| CSS = """ | |
| #white-box { | |
| background-color: var(--input-background-fill); | |
| border: var(--input-border-width) solid var(--input-border-color); | |
| border-radius: var(--input-radius); | |
| padding: var(--input-padding); | |
| /* NEW: Setup explicit heights and add a scrollbar for long outputs */ | |
| min-height: 250px; | |
| max-height: 350px; | |
| overflow-y: auto; | |
| } | |
| """ | |
| with gr.Blocks(title="Language Rewinder", theme=theme, css=CSS) as demo: | |
| gr.Markdown("# ⏪ Language Rewinder") | |
| gr.Markdown("Adapt your writing for historical accuracy and translate modern slang into the language of the past.") | |
| # NEW: Removed equal_height=True so the columns operate independently | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| input_text = gr.Textbox( | |
| label="Modern Phrase", | |
| placeholder="e.g., No cap, her rizz is actually insane.", | |
| lines=4, | |
| max_length=500, | |
| ) | |
| year_slider = gr.Slider( | |
| minimum=1900, | |
| maximum=2025, | |
| value=1930, | |
| step=5, | |
| label="Target Era" | |
| ) | |
| show_all_checkbox = gr.Checkbox(label="Show all", value=False) | |
| submit_btn = gr.Button("Adapt to the Past", variant="primary", size="md") | |
| with gr.Column(scale=1): | |
| gr.HTML("<div style='display: inline-block; color: var(--block-label-text-color); font-size: var(--block-label-text-size); font-weight: var(--block-label-text-weight); margin-bottom: -10px; margin-left: 0px; padding: 6px 10px; border-radius: 8px; background-color: rgba(99, 202, 241, 0.18); '>Historical Translation</div>") | |
| output_markdown = gr.Markdown(elem_id="white-box") | |
| show_all_checkbox.change( # pylint: disable=no-member | |
| fn=lambda checked: gr.update(interactive=not checked), | |
| inputs=show_all_checkbox, | |
| outputs=year_slider, | |
| api_name=False, | |
| ) | |
| submit_btn.click( # pylint: disable=no-member | |
| fn=translate_text, | |
| inputs=[input_text, year_slider, show_all_checkbox], | |
| outputs=output_markdown, | |
| api_name=False, | |
| ) | |
| input_text.submit( # pylint: disable=no-member | |
| fn=translate_text, | |
| inputs=[input_text, year_slider, show_all_checkbox], | |
| outputs=output_markdown, | |
| api_name=False, | |
| ) | |
| gr.Examples( | |
| examples=[ | |
| ["What's up dude? you chillin'?", 1940], | |
| ["This startup is looking for a deep dive into our synergy.", 1920], | |
| ["J'ai trop le seum, le mec m'a ghosté de ouf.", 1990], | |
| ], | |
| inputs=[input_text, year_slider], | |
| label="Try these modern examples" | |
| ) | |
| print("BOOT_STAGE: gradio_launch_start") | |
| try: | |
| demo.queue(default_concurrency_limit=5, max_size=10, api_open=False).launch( | |
| server_name="0.0.0.0", | |
| server_port=int(os.environ.get("PORT", 7860)), | |
| ssr_mode=False, | |
| ) | |
| except Exception: | |
| print("BOOT_STAGE: fatal_startup_exception") | |
| traceback.print_exc(file=sys.stdout) | |
| raise |