# Copyright: Shayekh Bin Islam. KAIST, South Korea. 2026. MAX_TEXT_CHAR = 1500 model_id = "Qwen/Qwen3.5-9B" # model_id = "Qwen/Qwen3.5-2B" try: import spaces IS_HF = True except ImportError: IS_HF = False if not IS_HF: class spaces: @staticmethod def GPU(*args, **kwargs): def decorator(func): return func if len(args) == 1 and callable(args[0]) and not kwargs: return args[0] return decorator else: import os, sys, subprocess os.environ['SUPERTONIC_CACHE_DIR'] = '/home/user/huggingface' os.environ["HF_HOME"] = "/home/user/huggingface" os.environ['XDG_CACHE_HOME'] = "/home/user/huggingface" os.environ['PLAYWRIGHT_BROWSERS_PATH'] = "/home/user/huggingface/ms-playwright" # os.system("playwright install chromium") result = subprocess.run( ["python", "-m", "playwright", "install", "chromium"], env={**os.environ}, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) import gradio as gr import fitz # PyMuPDF from PIL import Image import io import json import base64 import soundfile as sf import torch import os import tempfile import re as re_module if IS_HF: LOG_DIR = tempfile.gettempdir() else: LOG_DIR = "log" os.makedirs(LOG_DIR, exist_ok=True) from supertonic import TTS from transformers import AutoProcessor, AutoModelForImageTextToText # model = None # processor = None # tts = None # voice_style = None global_stop_thinking = [False] global_kill_threads = [False] def set_stop_thinking(): global_stop_thinking[0] = True print(f"[STOP-THINK] set_stop_thinking CALLED! Flag is now: {global_stop_thinking[0]}") return gr.update(value="⚑ Forcing generation...") def reset_stop_thinking_after_delay(): """Wait 5 seconds then restore the button text so the user can click it again.""" import time time.sleep(5) return gr.update(value="⚑ Stop thinking, Generate now") def set_kill_threads(): global_kill_threads[0] = True print(f"[KILL] set_kill_threads CALLED! Flag is now: {global_kill_threads[0]}") return gr.update(value="πŸ›‘ Stopping...") def reset_generation_flags(): """Reset all generation control flags at the start of a new generation.""" global_stop_thinking[0] = False global_kill_threads[0] = False print("[FLAGS] Reset stop_thinking and kill_threads to False") def extract_pdf_content(pdf_path, max_pages=2): """Extract text and images from up to max_pages of a PDF.""" doc = fitz.open(pdf_path) text = "" images = [] for i in range(min(max_pages, len(doc))): page = doc[i] text += page.get_text() + "\n" pix = page.get_pixmap(dpi=150) img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) images.append(img) return text, images def is_youtube_url(url): """Check if a URL is a YouTube link.""" if not url: return False youtube_patterns = [ r'(https?://)?(www\.)?youtube\.com/watch', r'(https?://)?(www\.)?youtube\.com/shorts/', r'(https?://)?youtu\.be/', r'(https?://)?(www\.)?youtube\.com/embed/', r'(https?://)?m\.youtube\.com/', ] for pattern in youtube_patterns: if re_module.search(pattern, url): return True return False def extract_youtube_audio(url, max_duration_sec=300, cookiefile=None): """Extract audio from YouTube video (first max_duration_sec seconds). Returns path to the downloaded audio file. cookiefile: optional path to a Netscape-format cookies.txt to bypass bot filtering. """ import yt_dlp from yt_dlp.utils import download_range_func os.makedirs("log", exist_ok=True) output_path = os.path.join("log", "yt_audio") ydl_opts = { 'format': 'bestaudio/best', 'postprocessors': [{ 'key': 'FFmpegExtractAudio', 'preferredcodec': 'wav', 'preferredquality': '192', }], 'download_ranges': download_range_func(None, [(0, max_duration_sec)]), 'force_keyframes_at_cuts': True, 'outtmpl': output_path + '.%(ext)s', 'quiet': True, 'no_warnings': True, } if cookiefile: ydl_opts['cookiefile'] = cookiefile with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=True) title = info.get('title', 'YouTube Video') # The output file will be output_path.wav after postprocessing wav_path = output_path + '.wav' if not os.path.exists(wav_path): # Sometimes the extension might differ, search for it for ext in ['wav', 'mp3', 'opus', 'webm', 'm4a']: candidate = output_path + '.' + ext if os.path.exists(candidate): wav_path = candidate break if not os.path.exists(wav_path): raise FileNotFoundError(f"Could not find downloaded audio file at {output_path}.*") return wav_path, title @spaces.GPU(duration=180) def transcribe_audio_with_asr(audio_path): """Transcribe audio file using Cohere ASR model via transformers.""" global asr_model, asr_processor from transformers.audio_utils import load_audio audio = load_audio(audio_path, sampling_rate=16000) inputs = asr_processor(audio, language="ko", sampling_rate=16000, return_tensors="pt") inputs = inputs.to(asr_model.device, dtype=asr_model.dtype) outputs = asr_model.generate(**inputs, max_new_tokens=2048) texts = asr_processor.decode(outputs, skip_special_tokens=True) # text = texts[0] if isinstance(texts, list) else texts # join texts # Filter the lines in texts which are english only and no korean if isinstance(texts, list): # Filter out lines that are purely English/symbols (no Korean characters) # Korean Unicode range: AC00-D7A3 (Syllables), 1100-11FF (Jamo), 3130-318F (Compatibility Jamo) korean_re = re_module.compile(r'[κ°€-νž£γ„±-γ…Žγ…-γ…£]') texts = [line for line in texts if korean_re.search(line)] text = "\n".join(texts) if isinstance(texts, list) else texts return text.strip() def extract_website_content(url, max_images=2): """Extract text and images from a website URL.""" import requests from bs4 import BeautifulSoup import io headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' } html_content = "" try: from playwright.sync_api import sync_playwright with sync_playwright() as p: browser = p.chromium.launch(headless=True) page = browser.new_page(user_agent=headers['User-Agent']) # Wait until there are no network connections for at least 500 ms (so JS can finish) page.goto(url, timeout=30000, wait_until="networkidle") html_content = page.content() browser.close() except Exception as e: print(f"Playwright headless fetch failed: {e}. Falling back to requests...") response = requests.get(url, headers=headers, timeout=10) response.raise_for_status() html_content = response.content soup = BeautifulSoup(html_content, 'html.parser') for script in soup(["script", "style", "nav", "footer", "header", "noscript"]): script.extract() text = soup.get_text(separator='\n') lines = (line.strip() for line in text.splitlines()) chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) text = '\n'.join(chunk for chunk in chunks if chunk) images = [] img_tags = soup.find_all('img') for img in img_tags: if len(images) >= max_images: break src = img.get('src') or img.get('data-src') if src: if src.startswith('//'): src = 'https:' + src elif src.startswith('/'): from urllib.parse import urljoin src = urljoin(url, src) try: img_resp = requests.get(src, headers=headers, timeout=5) if img_resp.status_code == 200: pil_img = Image.open(io.BytesIO(img_resp.content)) if pil_img.mode != 'RGB': pil_img = pil_img.convert('RGB') if pil_img.width >= 100 and pil_img.height >= 100: images.append(pil_img) except Exception as e: print(f"Failed to load image {src}: {e}") return text, images def get_base64_image(image): buffered = io.BytesIO() image.save(buffered, format="JPEG") img_str = base64.b64encode(buffered.getvalue()).decode("utf-8") return f"data:image/jpeg;base64,{img_str}" @spaces.GPU(duration=180) def extract_vocabulary(pdf_text, images, translit_lang, translit_format, target_lang, max_text_char=1500, repetition_penalty_val=1.1, partial_assistant_text=None, auto_force_chars=1000, enable_thinking=True): """Use Transformers to extract vocabulary from text and images.""" global model, processor os.makedirs("log", exist_ok=True) if len(pdf_text.strip()) == 0: pdf_text = '''"No Text available, see provided Images only."''' no_img = "" if len(images) == 0: no_img = '''\n"No Images available, see provided Text only."''' non_english = "" if translit_lang.upper() != "ENGLISH": non_english = f" CRITICAL: You MUST use the native alphabet/script of {translit_lang.upper()}, do NOT use English letters unless requested." prompt_text = f"""Text: {pdf_text[:int(max_text_char)]} {no_img} Extract at least 10 key Korean words or phrases from the following text and images. Focus on meaningful vocabulary that is highly helpful for a new language learner (e.g., common nouns, verbs, adjectives, or useful expressions). CRITICAL: Do NOT extract website template words, navigation menus, boilerplate text, UI elements, or titles like 'Home page', 'News', 'Menu'. Return ONLY a valid JSON list of dictionaries, where each dictionary has four keys: - 'korean' (the Korean text) - 'transliteration' (the pronunciation transliterated into {translit_lang.upper()} script/characters, formatted as {translit_format}.{non_english}) - 'translation' (the brief translation into {target_lang.upper()}) - 'explanation' (a brief grammar or context note in {target_lang.upper()}). Just output raw JSON with ```json and ``` markers, as the user will load in python. Example: ```json [ {{ "korean": "날씨", "transliteration": "nal-ssi", "translation": "weather", "explanation": "Common noun used to describe weather conditions." }}, {{ "korean": "λ§›μžˆλ‹€", "transliteration": "ma-sit-da", "translation": "to be delicious", "explanation": "Descriptive verb. Polite form: λ§›μžˆμ–΄μš”. Used to compliment food." }} ] ``` CRITICAL: Do NOT overthink. Do NOT deliberate over conditions, edge cases, or reasoning. Keep your thinking extremely brief (a few words at most). Output the JSON array IMMEDIATELY without lengthy analysis. """ # DEBUG: Log prompt text if not IS_HF: with open(os.path.join(LOG_DIR, "debug_vlm_prompt.txt"), "w", encoding="utf-8") as f: f.write(prompt_text) content = [] pil_images = [] for i, img in enumerate(images): # DEBUG: Log images img.save(os.path.join(LOG_DIR, f"debug_image_{i}.png"), format="PNG") pil_images.append(img) content.append({ "type": "image", }) content += [{"type": "text", "text": prompt_text}] messages = [ { "role": "user", "content": content } ] try: model.to("cuda") text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True, enable_thinking=enable_thinking) if not enable_thinking: # Non-think mode: force JSON output immediately text += "```json\n[\n" elif partial_assistant_text: text += partial_assistant_text + "\nReady to generate.\n\n\n```json\n[\n" inputs = processor( text=[text], images=pil_images if pil_images else None, return_tensors="pt", padding=True ).to("cuda") from transformers import TextIteratorStreamer, StoppingCriteria, StoppingCriteriaList from threading import Thread import queue local_stop = [False] class LocalKillCriteria(StoppingCriteria): def __call__(self, input_ids, scores, **kwargs): return local_stop[0] or global_kill_threads[0] def run_generation(cur_inputs, cur_streamer, cur_local_stop): """Run model.generate in a thread, always calling streamer.end() on exit.""" kill_criteria = StoppingCriteriaList([LocalKillCriteria()]) gen_kwargs = dict( **cur_inputs, streamer=cur_streamer, max_new_tokens=2048*16, do_sample=True, repetition_penalty=repetition_penalty_val, stopping_criteria=kill_criteria ) if len(images) > 0: gen_kwargs.update(dict(temperature=0.6, top_p=0.95, top_k=20, min_p=0.0)) else: gen_kwargs.update(dict(temperature=1.0, top_p=0.95, top_k=20, min_p=0.0)) try: model.generate(**gen_kwargs) except Exception as e: import traceback print(f"\n[THREAD ERROR] model.generate crashed: {e}") traceback.print_exc() finally: try: cur_streamer.end() except Exception: pass output_text = partial_assistant_text + "\n\n\n```json\n[\n" if partial_assistant_text else ("" if enable_thinking else "") streamer = TextIteratorStreamer(processor.tokenizer, skip_prompt=True, skip_special_tokens=True) thread = Thread(target=run_generation, args=(inputs, streamer, local_stop)) thread.start() force_triggered = False AUTO_FORCE_CHARS = auto_force_chars for new_text in streamer: output_text += new_text yield output_text, None # Auto-force JSON if thinking exceeds 300 chars without producing JSON should_auto_force = ( enable_thinking and not force_triggered and not partial_assistant_text and len(output_text) > AUTO_FORCE_CHARS and '```json' not in output_text ) # Check if user clicked "Stop thinking" OR auto-force threshold reached if (global_stop_thinking[0] or should_auto_force) and not force_triggered: force_triggered = True reason = f"auto-force (>{AUTO_FORCE_CHARS} chars)" if should_auto_force else "user clicked stop" print(f"[STOP-THINK] Force triggered ({reason})! Killing current generation...") # 1. Kill the current generation thread local_stop[0] = True # Drain queue so the thread can exit while not streamer.text_queue.empty(): try: streamer.text_queue.get_nowait() except queue.Empty: break thread.join(timeout=5) print("[STOP-THINK] Old thread joined. Starting forced JSON generation...") # 2. Reset flags global_stop_thinking[0] = False local_stop[0] = False # 3. Append the think-closing + JSON prefix output_text += "\nReady to generate.\n\n\n```json\n[\n" yield output_text, None # 4. Build new prompt with partial assistant text text2 = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) text2 += output_text inputs2 = processor( text=[text2], images=pil_images if pil_images else None, return_tensors="pt", padding=True ).to("cuda") # 5. Start new generation thread with force-JSON context # This loop also monitors stop_thinking so user can force again if model keeps thinking streamer2 = TextIteratorStreamer(processor.tokenizer, skip_prompt=True, skip_special_tokens=True) thread2 = Thread(target=run_generation, args=(inputs2, streamer2, local_stop)) thread2.start() HARD_FORCE_CHARS = 10_000 for new_text2 in streamer2: output_text += new_text2 yield output_text, None # Hard auto-force: if total output exceeds 10K chars, # the model has been rambling too long β€” force again regardless # of JSON state (the model may complete JSON then start over). should_hard_force = len(output_text) > HARD_FORCE_CHARS # Allow user to force again OR hard auto-force kicks in if global_stop_thinking[0] or global_kill_threads[0] or should_hard_force: reason = "hard auto-force (>10K chars)" if should_hard_force and not global_stop_thinking[0] else "user/kill flag" print(f"[STOP-THINK] Flag detected in forced generation loop ({reason})! Killing...") local_stop[0] = True while not streamer2.text_queue.empty(): try: streamer2.text_queue.get_nowait() except queue.Empty: break thread2.join(timeout=5) global_stop_thinking[0] = False local_stop[0] = False # Force JSON prefix again output_text += "\nReady to generate.\n\n\n```json\n[\n" yield output_text, None text3 = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) text3 += output_text inputs3 = processor( text=[text3], images=pil_images if pil_images else None, return_tensors="pt", padding=True ).to("cuda") streamer3 = TextIteratorStreamer(processor.tokenizer, skip_prompt=True, skip_special_tokens=True) thread3 = Thread(target=run_generation, args=(inputs3, streamer3, local_stop)) thread3.start() for new_text3 in streamer3: output_text += new_text3 yield output_text, None thread3.join(timeout=10) break else: thread2.join(timeout=10) break # Exit the outer streamer loop if not force_triggered: thread.join() # Reset flag in case it was set but generation finished naturally global_stop_thinking[0] = False # DEBUG: Log raw output text if not IS_HF: with open(os.path.join(LOG_DIR, "debug_vlm_output.txt"), "w", encoding="utf-8") as f: f.write(output_text) except Exception as e: print(f"Error during Transformers inference: {e}") yield f"Error during Transformers inference: {e}", [] return try: import re # Extract JSON from markdown code fences or raw output json_matches = list(re.finditer(r'```(?:json)?\s*([\s\S]*?)```', output_text)) if json_matches: clean_text = json_matches[-1].group(1).strip() else: # Fallback: find last [ ... ] or { ... } block json_matches = list(re.finditer(r'(\[[\s\S]*\]|\{[\s\S]*\})', output_text)) clean_text = json_matches[-1].group(1).strip() if json_matches else output_text.strip() try: data = json.loads(clean_text) except: import jiter # Get bytes from string data = jiter.from_json(clean_text.encode("utf-8"), partial_mode=True) if not isinstance(data, list): data = [data] yield output_text, data except Exception as e: print(f"Error parsing JSON: {e}\nRaw output: {output_text}") yield output_text, [] def translate_vocabulary(korean_words, translit_lang, translit_format, target_lang, repetition_penalty_val=1.1, enable_thinking=True): """Use Transformers text-only inference to translate/transliterate Korean words.""" global model, processor non_english = "" if translit_lang.upper() != "ENGLISH": non_english = f" CRITICAL: You MUST use the native alphabet/script of {translit_lang.upper()}, do NOT use English letters unless requested." words_str = ", ".join(korean_words) prompt_text = f"""Translate and transliterate the following Korean words. Return ONLY a valid JSON list of dictionaries, where each dictionary has four keys: - 'korean' (the original Korean text) - 'transliteration' (the pronunciation transliterated into {translit_lang.upper()} script/characters, formatted as {translit_format}.{non_english}) - 'translation' (the translation into {target_lang.upper()}) - 'explanation' (a brief grammar or context note in {target_lang.upper()}). No markdown formatting, just raw JSON with ```json and ``` markers. CRITICAL: Do NOT overthink. Do NOT deliberate over conditions, edge cases, or reasoning. Keep your thinking extremely brief (5 paragraphs at most). Output the JSON array IMMEDIATELY without lengthy analysis. Korean words: {words_str} """ # DEBUG: Log translation prompt text if not IS_HF: with open(os.path.join(LOG_DIR, "debug_translate_prompt.txt"), "w", encoding="utf-8") as f: f.write(prompt_text) messages = [ { "role": "user", "content": [{"type": "text", "text": prompt_text}] } ] try: model.to("cuda") text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True, enable_thinking=enable_thinking) if not enable_thinking: text += "```json\n[\n" inputs = processor( text=[text], images=None, return_tensors="pt", padding=True ).to("cuda") generated_ids = model.generate( **inputs, # max_new_tokens=2048*16, max_new_tokens=2048*2, # temperature=1.0, # top_p=0.95, temperature=1.0, top_p=0.95, top_k=20, min_p=0.0, # presence_penalty=1.5, repetition_penalty=repetition_penalty_val, do_sample=True ) generated_ids = [ output_ids[len(input_ids):] for input_ids, output_ids in zip(inputs.input_ids, generated_ids) ] output_text = processor.batch_decode(generated_ids, skip_special_tokens=True)[0] # DEBUG: Log raw translation output text if not IS_HF: with open(os.path.join(LOG_DIR, "debug_translate_output.txt"), "w", encoding="utf-8") as f: f.write(output_text) except Exception as e: print(f"Error during Transformers text inference: {e}") return [] try: import re json_matches = list(re.finditer(r'```(?:json)?\s*([\s\S]*?)```', output_text)) if json_matches: clean_text = json_matches[-1].group(1).strip() else: json_matches = list(re.finditer(r'(\[[\s\S]*\]|\{[\s\S]*\})', output_text)) clean_text = json_matches[-1].group(1).strip() if json_matches else output_text.strip() try: data = json.loads(clean_text) except: import jiter data = jiter.from_json(clean_text.encode("utf-8"), partial_mode=True) if not isinstance(data, list): data = [data] return data except Exception as e: print(f"Error parsing JSON: {e}\nRaw output: {output_text}") return [] def numpy_to_base64_audio(wav, sample_rate): wav = wav.squeeze() buffer = io.BytesIO() sf.write(buffer, wav, sample_rate, format='WAV') buffer.seek(0) audio_base64 = base64.b64encode(buffer.read()).decode('utf-8') return f"data:audio/wav;base64,{audio_base64}" import hashlib def hash_file(filepath): with open(filepath, 'rb') as f: return hashlib.md5(f.read(1024*1024)).hexdigest() @spaces.GPU(duration=180) def process_pdf(pdf_file, url_input, audio_file_input, yt_url_input, yt_cookies_file, translit_lang, translit_format, target_lang, max_text_char, repetition_penalty_val, auto_force_chars_val, last_source_hash, last_korean_words, active_tab, enable_thinking=True, progress=gr.Progress()): global tts, voice_style # Clean language choices from "Family - Language" to just "Language" if " - " in translit_lang: translit_lang = translit_lang.split(" - ")[-1] if " - " in target_lang: target_lang = target_lang.split(" - ")[-1] os.makedirs(LOG_DIR, exist_ok=True) # Reset flags at start of new generation reset_generation_flags() # Determine input source based on active tab is_url = (active_tab == "Website URL") and bool(url_input and url_input.strip()) is_youtube = (active_tab == "YouTube Link") and bool(yt_url_input and yt_url_input.strip() and is_youtube_url(yt_url_input.strip())) is_audio_upload = (active_tab == "Upload Audio") and (audio_file_input is not None) is_pdf = (active_tab == "Upload PDF") and (pdf_file is not None) if not is_url and not is_youtube and not is_audio_upload and not is_pdf: yield "

Please provide input in the active tab.

", None, None, "", "", [], None return if is_youtube: current_source_hash = hashlib.md5(yt_url_input.strip().encode()).hexdigest() elif is_audio_upload: current_source_hash = hash_file(audio_file_input) elif is_url: current_source_hash = hashlib.md5(url_input.strip().encode()).hexdigest() else: current_source_hash = hash_file(pdf_file.name) vocab_list = [] extracted_audio_path = None try: if is_youtube: progress(0, desc="Downloading YouTube audio (first 5 min)...") cookies_path = yt_cookies_file.name if yt_cookies_file else None audio_path, yt_title = extract_youtube_audio(yt_url_input.strip(), max_duration_sec=300, cookiefile=cookies_path) progress(0.1, desc=f"Transcribing audio with Cohere ASR...") content_text = transcribe_audio_with_asr(audio_path) images = [] extracted_audio_path = audio_path if not content_text.strip(): yield "

Could not transcribe any text from the YouTube video.

", current_source_hash, None, "", "", [], extracted_audio_path return # Log the transcription if not IS_HF: with open(os.path.join(LOG_DIR, "debug_yt_transcription.txt"), "w", encoding="utf-8") as f: f.write(f"Title: {yt_title}\n\n{content_text}") elif is_audio_upload: progress(0, desc="Transcribing uploaded audio with Cohere ASR...") content_text = transcribe_audio_with_asr(audio_file_input) images = [] extracted_audio_path = audio_file_input if not content_text.strip(): yield "

Could not transcribe any text from the uploaded audio.

", current_source_hash, None, "", "", [], extracted_audio_path return if not IS_HF: with open(os.path.join(LOG_DIR, "debug_audio_transcription.txt"), "w", encoding="utf-8") as f: f.write(content_text) elif is_url: progress(0, desc="Fetching Website...") content_text, images = extract_website_content(url_input.strip()) else: progress(0, desc="Reading PDF...") content_text, images = extract_pdf_content(pdf_file.name) if not content_text.strip() and not images: yield "

No content found.

", current_source_hash, None, "", "", [], extracted_audio_path return except Exception as e: import traceback traceback.print_exc() yield f"

Error reading content: {e}

", None, None, "", "", [], None return vocab_list = [] stream_text = "" for attempt in range(1, 4): if global_kill_threads[0]: print("[KILL] Kill flag detected, stopping extraction attempts.") break progress(0.2, desc=f"Extracting vocabulary (Attempt {attempt}/3)...") for stream_t, v_list in extract_vocabulary(content_text, images, translit_lang, translit_format, target_lang, max_text_char, repetition_penalty_val, auto_force_chars=auto_force_chars_val, enable_thinking=enable_thinking): stream_text = stream_t if v_list is not None: vocab_list = v_list yield "", current_source_hash, None, stream_text, content_text, images, extracted_audio_path if vocab_list: break # Reset kill flag after extraction so TTS can proceed global_kill_threads[0] = False # If generation was killed but we don't have vocab yet, try to salvage JSON from stream_text if not vocab_list and stream_text: print("[KILL] Attempting to salvage JSON from partial generation output...") try: import re json_matches = list(re.finditer(r'```(?:json)?\s*([\s\S]*?)```', stream_text)) if json_matches: clean_text = json_matches[-1].group(1).strip() else: json_matches = list(re.finditer(r'(\[[\s\S]*\]|\{[\s\S]*\})', stream_text)) clean_text = json_matches[-1].group(1).strip() if json_matches else "" if clean_text: try: data = json.loads(clean_text) except: import jiter data = jiter.from_json(clean_text.encode("utf-8"), partial_mode=True) if not isinstance(data, list): data = [data] if data and isinstance(data[0], dict) and 'korean' in data[0]: vocab_list = data print(f"[KILL] Salvaged {len(vocab_list)} vocab items from partial output!") except Exception as e: print(f"[KILL] Could not salvage JSON: {e}") if not vocab_list: yield "

Failed to extract or translate vocabulary after 3 attempts.

", current_source_hash, None, stream_text, content_text, images, extracted_audio_path return progress(0.6, desc="Generating TTS audio...") # Pre-generate TTS audio for i, item in enumerate(vocab_list): korean = item.get("korean", "") # Add dot if not korean.endswith("."): korean += "." try: wav, dur = tts.synthesize( korean, voice_style=voice_style, lang="ko", total_steps=12, speed=0.7, ) # DEBUG: Save audio locally if not IS_HF: wav_1d = wav.squeeze() sf.write(os.path.join(LOG_DIR, f"debug_audio_{i}.wav"), wav_1d, tts.sample_rate, format='WAV') audio_data_uri = numpy_to_base64_audio(wav, tts.sample_rate) item['audio_uri'] = audio_data_uri except Exception as e: print(f"TTS error for '{korean}': {e}") item['audio_uri'] = None cards_json = json.dumps(vocab_list).replace("
Loading...

Click card to flip 🎯

""" fc_html = build_flashcard_html(vocab_list) yield fc_html, current_source_hash, vocab_list, stream_text, content_text, images, extracted_audio_path LANGUAGE_DATA = """Indo-European Bengali, English, French, Portuguese, German, Romanian, Swedish, Danish, Bulgarian, Russian, Czech, Greek, Ukrainian, Spanish, Dutch, Slovak, Croatian, Polish, Lithuanian, Norwegian BokmΓ₯l, Norwegian Nynorsk, Persian, Slovenian, Gujarati, Latvian, Italian, Occitan, Nepali, Marathi, Belarusian, Serbian, Luxembourgish, Venetian, Assamese, Welsh, Silesian, Asturian, Chhattisgarhi, Awadhi, Maithili, Bhojpuri, Sindhi, Irish, Faroese, Hindi, Punjabi, Oriya, Tajik, Eastern Yiddish, Lombard, Ligurian, Sicilian, Friulian, Sardinian, Galician, Catalan, Icelandic, Tosk Albanian, Limburgish, Dari, Afrikaans, Macedonian, Sinhala, Urdu, Magahi, Bosnian, Armenian, Latgalian, Scottish Gaelic, Central Kurdish, Northern Kurdish, Southern Pashto, Sanskrit, Dhundari, Marwari, Ahirani, Bagheli, Bagri, Bundeli, Braj, Kumaoni, Kashmiri Sino-Tibetan Chinese (Simplified), Chinese (Traditional), Cantonese, Burmese, Standard Tibetan, Meitei Afro-Asiatic Arabic (Standard), Arabic (Najdi), Arabic (Levantine), Arabic (Egyptian), Arabic (Moroccan), Arabic (Mesopotamian), Arabic (Ta’izzi-Adeni), Arabic (Tunisian), Arabic (Gulf), Arabic (Algerian), Arabic (Sudanese), Arabic (Libyan), Hebrew, Maltese, Amharic, Tigrinya, Kabyle, Somali, West Central Oromo, Hausa Austronesian Indonesian, Malay, Tagalog, Cebuano, Javanese, Sundanese, Minangkabau, Balinese, Banjar, Pangasinan, Iloko, Waray (Philippines), Plateau Malagasy, Malagasy, Buginese, Maori, Samoan, Hawaiian, Fijian Dravidian Tamil, Telugu, Kannada, Malayalam Turkic Turkish, North Azerbaijani, Northern Uzbek, Kazakh, Bashkir, Tatar, Crimean Tatar, Kyrgyz, Turkmen, Uyghur Tai-Kadai Thai, Lao, Shan Uralic Finnish, Estonian, Hungarian, Meadow Mari Austroasiatic Vietnamese, Khmer Niger–Congo Yoruba, Ewe, Kinyarwanda, Lingala, Northern Sotho, Nyanja, Shona, Southern Sotho, Tswana, Xhosa, Zulu, Luganda, Swati, Tsonga, Tumbuka, Venda, Chokwe, Luba-Kasai, Rundi, Umbundu, Kikuyu, Kongo, Nigerian Fulfulde, Wolof, Fon, KabiyΓ¨, Mossi, Akan, Twi, Bambara, Igbo""" # Other Japanese, Korean, Georgian, Basque, Haitian, Papiamento, Kabuverdianu, Tok Pisin, Swahili, Central Aymara, Tulu, Nagamese, Nigerian Pidgin, Mauritian Creole, Sango, Ayacucho Quechua, Halh Mongolian, Southwestern Dinka, Nuer, Guarani LANGUAGE_CHOICES = [] for line in LANGUAGE_DATA.strip().split('\n'): family, langs = line.split('\t') for lang in langs.split(', '): LANGUAGE_CHOICES.append(f"{family} - {lang}") import urllib.request def get_example_pdf(): url = "https://raw.githubusercontent.com/ShayekhBinIslam/file-host/main/cnp_korean_page7.pdf" file_path = "cnp_korean_page7.pdf" if not os.path.exists(file_path): try: urllib.request.urlretrieve(url, file_path) except Exception as e: print(f"Failed to download example PDF: {e}") return file_path if os.path.exists(file_path) else None def get_example_audio(): url = "https://raw.githubusercontent.com/ShayekhBinIslam/file-host/main/new_1min.wav" file_path = "new_1min.wav" if not os.path.exists(file_path): try: urllib.request.urlretrieve(url, file_path) except Exception as e: print(f"Failed to download example audio: {e}") return file_path if os.path.exists(file_path) else None @spaces.GPU(duration=180) def process_pdf_force(partial_text, pdf_file, url_input, translit_lang, translit_format, target_lang, max_text_char, repetition_penalty_val, last_source_state, last_korean_words_state): """Force JSON generation using the current partial stream_box text.""" is_url = bool(url_input and url_input.strip()) current_source_hash = "" if is_url: current_source_hash = "url:" + url_input.strip() elif pdf_file is not None: import hashlib with open(pdf_file.name, "rb") as f: current_source_hash = "pdf:" + hashlib.md5(f.read()).hexdigest() try: if is_url: progress(0, desc="Fetching Website...") content_text, images = extract_website_content(url_input.strip()) else: progress(0, desc="Reading PDF...") content_text, images = extract_pdf_content(pdf_file.name) if not content_text.strip() and not images: yield "

No content found.

", current_source_hash, None, partial_text, "", [], None return except Exception as e: yield f"

Error reading content: {e}

", None, None, partial_text, "", [], None return vocab_list = [] stream_text = partial_text progress(0.2, desc="Extracting vocabulary (Forced JSON)...") for stream_t, v_list in extract_vocabulary(content_text, images, translit_lang, translit_format, target_lang, max_text_char, repetition_penalty_val, partial_assistant_text=partial_text): stream_text = stream_t if v_list is not None: vocab_list = v_list yield "", current_source_hash, None, stream_text, content_text, images, None if not vocab_list: yield "

Failed to parse forced JSON.

", current_source_hash, None, stream_text, content_text, images, None return progress(0.6, desc="Generating TTS audio...") for i, item in enumerate(vocab_list): korean = item.get("korean", "") if korean and tts is not None: progress(0.6 + 0.3 * (i / len(vocab_list)), desc=f"Generating audio {i+1}/{len(vocab_list)}...") try: wav, dur = tts.synthesize( korean, voice_style=voice_style, lang="ko", total_steps=12, speed=0.7, ) import numpy as np import soundfile as sf if not IS_HF: audio_path = os.path.join(LOG_DIR, f"audio_{i}.wav") sf.write(audio_path, wav, 24000) item["audio_uri"] = numpy_to_base64_audio(wav, tts.sample_rate) except Exception as e: print(f"Failed to generate audio for {korean}: {e}") item["audio_uri"] = None progress(1.0, desc="Rendering flashcards...") fc_html = build_flashcard_html(vocab_list) yield fc_html, current_source_hash, vocab_list, stream_text, content_text, images, None def build_flashcard_html(vocab_list): """Build the flashcard SPA with spaced repetition (SM-2 lite via localStorage).""" import html as _html cards_json = json.dumps(vocab_list).replace("
πŸ†• NEW

Click card to flip 🎯

""" safe_srcdoc = _html.escape(iframe_html) return f'' def build_quiz_html(vocab_list): """Build a 5-question multiple-choice quiz SPA.""" import html as _html import random as rnd if not vocab_list or len(vocab_list) < 2: return "

⚠️ Need at least 2 flashcards to start a quiz.
Generate or import a deck first!

" nq = min(5, len(vocab_list)) q_cards = rnd.sample(vocab_list, nq) quiz_data = [] for qc in q_cards: correct = qc.get('translation', '') or qc.get('english', '') wrong_pool = [c for c in vocab_list if c is not qc and (c.get('translation', '') or c.get('english', '')) != correct] wrongs = rnd.sample(wrong_pool, min(3, len(wrong_pool))) choices = [correct] + [w.get('translation', '') or w.get('english', '') for w in wrongs] rnd.shuffle(choices) quiz_data.append({ 'korean': qc.get('korean', ''), 'transliteration': qc.get('transliteration', ''), 'choices': choices, 'correct': choices.index(correct), }) quiz_json = json.dumps(quiz_data).replace("
🧠 Vocabulary Quiz
Question 1 of {nq}
What does this word mean?
""" safe_srcdoc = _html.escape(iframe_html) return f'' def export_json_file_fn(vocab_list): """Export current vocab list to a JSON file for download.""" if not vocab_list: gr.Warning("No flashcards to export. Generate or import a deck first!") return gr.update(visible=False) export_data = [{k: v for k, v in item.items() if k != 'audio_uri'} for item in vocab_list] # Create a unique temporary file temp_fd, temp_path = tempfile.mkstemp(suffix=".json", prefix="flashcards_export_") with os.fdopen(temp_fd, "w", encoding="utf-8") as f: json.dump(export_data, f, ensure_ascii=False, indent=2) return gr.update(value=temp_path, visible=True) def export_anki_file_fn(vocab_list): """Export current vocab list to an Anki .apkg file for download.""" if not vocab_list: gr.Warning("No flashcards to export. Generate or import a deck first!") return gr.update(visible=False) try: import genanki import random as rnd except ImportError: gr.Warning("genanki not installed. Run: pip install genanki") return gr.update(visible=False) model = genanki.Model( rnd.randrange(1 << 30, 1 << 31), 'LocalDuo Korean Vocab', fields=[{'name': 'Korean'}, {'name': 'Translation'}, {'name': 'Transliteration'}, {'name': 'Explanation'}], templates=[{ 'name': 'Card 1', 'qfmt': '
{{Korean}}
', 'afmt': '{{FrontSide}}
{{Translation}}
{{Transliteration}}
{{Explanation}}
', }] ) deck = genanki.Deck(rnd.randrange(1 << 30, 1 << 31), 'LocalDuo - Korean Vocabulary') for item in vocab_list: deck.add_note(genanki.Note(model=model, fields=[ item.get('korean', ''), item.get('translation', '') or item.get('english', ''), item.get('transliteration', ''), item.get('explanation', ''), ])) # Create a unique temporary file temp_fd, temp_path = tempfile.mkstemp(suffix=".apkg", prefix="flashcards_export_") os.close(temp_fd) # Close it so genanki can write to it genanki.Package(deck).write_to_file(temp_path) return gr.update(value=temp_path, visible=True) def import_deck_fn(json_file, anki_file): """Load a flashcard deck from a JSON or Anki .apkg file.""" if json_file is not None: try: with open(json_file, "r", encoding="utf-8") as f: data = json.load(f) if not isinstance(data, list): data = [data] for item in data: if 'audio_uri' not in item: item['audio_uri'] = None return build_flashcard_html(data), data except Exception as e: return f"

❌ Error loading JSON: {e}

", None elif anki_file is not None: try: import zipfile, sqlite3, tempfile with tempfile.TemporaryDirectory() as tmpdir: with zipfile.ZipFile(anki_file, 'r') as z: z.extractall(tmpdir) db_path = os.path.join(tmpdir, 'collection.anki2') if not os.path.exists(db_path): db_path = os.path.join(tmpdir, 'collection.anki21') conn = sqlite3.connect(db_path) rows = conn.execute("SELECT flds FROM notes").fetchall() conn.close() vocab_list = [] for row in rows: fields = row[0].split('\x1f') vocab_list.append({ 'korean': fields[0] if len(fields) > 0 else '', 'translation': fields[1] if len(fields) > 1 else '', 'transliteration': fields[2] if len(fields) > 2 else '', 'explanation': fields[3] if len(fields) > 3 else '', 'audio_uri': None, }) if not vocab_list: return "

❌ No notes found in Anki deck.

", None return build_flashcard_html(vocab_list), vocab_list except Exception as e: return f"

❌ Error loading Anki deck: {e}

", None return "

⚠️ Please upload a JSON or Anki (.apkg) file above.

", None # ─── Bootstrap demo content (shown before any generation) ─── BOOTSTRAP_VOCAB = [ {"korean": "μ•ˆλ…•ν•˜μ„Έμš”", "translation": "Hello", "transliteration": "an-nyeong-ha-se-yo", "explanation": "The most common formal greeting in Korean, used when meeting someone.", "audio_uri": None}, {"korean": "κ°μ‚¬ν•©λ‹ˆλ‹€", "translation": "Thank you", "transliteration": "gam-sa-ham-ni-da", "explanation": "The standard polite way to express gratitude.", "audio_uri": None}, {"korean": "μ‚¬λž‘", "translation": "Love", "transliteration": "sa-rang", "explanation": "A fundamental word for love or affection, used in many K-pop songs.", "audio_uri": None}, {"korean": "학ꡐ", "translation": "School", "transliteration": "hak-gyo", "explanation": "Refers to a school or educational institution. ν•™ means 'study', ꡐ means 'teach'.", "audio_uri": None}, {"korean": "μŒμ‹", "translation": "Food", "transliteration": "eum-sik", "explanation": "General word for food or cuisine. Korean μŒμ‹ (food) is world-famous!", "audio_uri": None}, {"korean": "친ꡬ", "translation": "Friend", "transliteration": "chin-gu", "explanation": "Means a friend or buddy. In Korean culture, 친ꡬ specifically refers to someone the same age.", "audio_uri": None}, {"korean": "λ¬Ό", "translation": "Water", "transliteration": "mul", "explanation": "Essential vocabulary β€” λ¬Ό μ£Όμ„Έμš” (mul ju-se-yo) means 'Water, please'.", "audio_uri": None}, {"korean": "μ‹œκ°„", "translation": "Time", "transliteration": "si-gan", "explanation": "Means time or hour. μ‹œ (si) = hour, κ°„ (gan) = interval.", "audio_uri": None}, {"korean": "행볡", "translation": "Happiness", "transliteration": "haeng-bok", "explanation": "Means happiness or bliss. ν–‰λ³΅ν•˜λ‹€ (haeng-bok-ha-da) = to be happy.", "audio_uri": None}, {"korean": "μ—¬ν–‰", "translation": "Travel", "transliteration": "yeo-haeng", "explanation": "Means travel or trip. μ—¬ν–‰ν•˜λ‹€ (yeo-haeng-ha-da) = to travel.", "audio_uri": None}, ] DEMO_EXTRACTED_TEXT = """[Demo Content β€” Common Korean Words] μ•ˆλ…•ν•˜μ„Έμš”! μ˜€λŠ˜μ€ ν•œκ΅­μ–΄λ₯Ό λ°°μ›Œλ΄…μ‹œλ‹€. (Hello! Let's learn Korean today.) ν•œκ΅­ μŒμ‹μ€ 정말 λ§›μžˆμŠ΅λ‹ˆλ‹€. μΉœκ΅¬μ™€ ν•¨κ»˜ 학ꡐ 근처 μ‹λ‹Ήμ—μ„œ 점심을 λ¨Ήμ—ˆμŠ΅λ‹ˆλ‹€. (Korean food is really delicious. I had lunch at a restaurant near the school with a friend.) 여행을 κ°€λ©΄ 항상 ν–‰λ³΅ν•©λ‹ˆλ‹€. μƒˆλ‘œμš΄ κ³³μ—μ„œ μƒˆλ‘œμš΄ μ‚¬λžŒλ“€μ„ λ§Œλ‚˜λŠ” 것이 μ’‹μŠ΅λ‹ˆλ‹€. (I'm always happy when I travel. I enjoy meeting new people in new places.) λ¬Ό μ£Όμ„Έμš”! κ°μ‚¬ν•©λ‹ˆλ‹€. (Water, please! Thank you.) μ‹œκ°„μ΄ 빨리 κ°‘λ‹ˆλ‹€. μ‚¬λž‘ν•˜λŠ” μ‚¬λžŒκ³Ό ν•¨κ»˜ν•˜λ©΄ 더 빨리 κ°‘λ‹ˆλ‹€. (Time flies. It goes even faster when you're with someone you love.) """ def create_demo(): example_pdf = get_example_pdf() example_audio = get_example_audio() custom_theme = gr.themes.Soft( primary_hue="amber", secondary_hue="stone", neutral_hue="stone", font=[gr.themes.GoogleFont("Outfit"), gr.themes.GoogleFont("Noto Serif KR"), "ui-sans-serif", "sans-serif"] ) css = """ /* ── ν•œκ΅­ (Hanguk) Inspired Theme ── */ @import url('https://fonts.googleapis.com/css2?family=Noto+Serif+KR:wght@400;700;900&family=Outfit:wght@400;600;700;800&display=swap'); /* ── Animated Background β€” ink wash μˆ˜λ¬΅ν™” ── */ @keyframes gradientBG { 0% { background-position: 0% 50%; } 50% { background-position: 100% 50%; } 100% { background-position: 0% 50%; } } body, .gradio-container { background: linear-gradient(-45deg, #0a0a0f, #1a1008, #0d1a12, #12080a, #0f0f1a) !important; background-size: 400% 400% !important; animation: gradientBG 20s ease infinite !important; background-attachment: fixed !important; color: #e8e0d4 !important; font-family: 'Outfit', 'Noto Serif KR', sans-serif !important; /* Force internal Gradio variables */ --block-label-background-fill: transparent !important; --block-title-background-fill: transparent !important; --background-fill-primary: transparent !important; --background-fill-secondary: transparent !important; --block-background-fill: transparent !important; --input-background-fill: rgba(0, 0, 0, 0.25) !important; --input-background-fill-focus: rgba(0, 0, 0, 0.35) !important; --panel-background-fill: transparent !important; --checkbox-background-color: rgba(0, 0, 0, 0.2) !important; --table-even-background-fill: transparent !important; --table-odd-background-fill: rgba(0, 0, 0, 0.1) !important; } /* ── AGGRESSIVE BACKGROUND OVERRIDES FOR FIREFOX ── */ .gradio-container .tabitem, .gradio-container .wrap, .gradio-container .form, .gradio-container .panel, .gradio-container .box, .gradio-container input, .gradio-container textarea, .gradio-container select, .gradio-container .dropdown-container, .gradio-container .block, .gradio-container .secondary-wrap, .gradio-container .contain { background: rgba(15, 12, 8, 0.5) !important; background-color: rgba(15, 12, 8, 0.5) !important; } /* ── FILE UPLOAD / PREVIEW β€” fix white backgrounds ── */ .gradio-container .file-preview, .gradio-container .file-preview *, .gradio-container .file-preview table, .gradio-container .file-preview tr, .gradio-container .file-preview td, .gradio-container .file-preview th, .gradio-container .file-preview tbody, .gradio-container .file-preview thead, .gradio-container .upload-button, .gradio-container .file-upload, .gradio-container [data-testid="file"], .gradio-container [data-testid="file"] *, .gradio-container .file, .gradio-container .file *, .gradio-container .upload-text, .gradio-container .icon-wrap, .gradio-container .waveform-container, .gradio-container .empty, .gradio-container .empty *, .gradio-container .wrap.default { background: rgba(10, 8, 5, 0.4) !important; background-color: rgba(10, 8, 5, 0.4) !important; color: #e8e0d4 !important; border-color: rgba(196, 164, 105, 0.15) !important; } .gradio-container .file-preview a, .gradio-container .file a { color: #c4a469 !important; } /* ── Glassmorphism Panels β€” hanji paper texture feel ── */ .gradio-container .form, .gradio-container .panel, .gradio-container .box { background: rgba(15, 12, 8, 0.45) !important; backdrop-filter: blur(20px) !important; -webkit-backdrop-filter: blur(20px) !important; border-radius: 16px !important; border: 1px solid rgba(196, 164, 105, 0.12) !important; box-shadow: 0 12px 30px rgba(0, 0, 0, 0.3), inset 0 0 0 1px rgba(196, 164, 105, 0.05) !important; } /* ── Selection β€” warm gold ── */ ::selection { background: rgba(196, 164, 105, 0.45) !important; color: #ffffff !important; } ::-moz-selection { background: rgba(196, 164, 105, 0.45) !important; color: #ffffff !important; } /* ── GLOBAL BACKGROUND OVERRIDES ── */ .gradio-container label, .gradio-container .label-wrap, .gradio-container .block label, .gradio-container .block-info, .gradio-container .block-title, .gradio-container .form > .block > .label-wrap, .gradio-container .form .label-wrap { background: transparent !important; background-color: transparent !important; border: none !important; box-shadow: none !important; } /* ── GLOBAL TEXT β€” warm parchment tones ── */ .gradio-container, .gradio-container label, .gradio-container .label-wrap, .gradio-container .label-wrap span, .gradio-container span, .gradio-container p, .gradio-container h2, .gradio-container h3, .gradio-container h4, .gradio-container h5, .gradio-container h6, .gradio-container .prose, .gradio-container .prose *, .gradio-container .block label span, .gradio-container .block .label-wrap span, .gradio-container button, .gradio-container button span, .gradio-container input, .gradio-container select, .gradio-container textarea { color: #e8e0d4 !important; } /* ── Heading β€” μ„œμ˜ˆ calligraphy style ── */ h1 { text-align: center; background: linear-gradient(135deg, #c4a469, #e8c97a, #a0825a); -webkit-background-clip: text; -webkit-text-fill-color: transparent; font-weight: 900 !important; font-size: 2.4em !important; margin-bottom: 0.3em !important; letter-spacing: 1px; line-height: 1.2 !important; font-family: 'Noto Serif KR', 'Outfit', serif !important; } /* ── TAB LABELS β€” 단청 style ── */ .gradio-container .tabs .tab-nav button, .gradio-container .tabs .tab-nav button span { color: #8a7a65 !important; font-weight: 600 !important; font-size: 14px !important; padding: 10px 16px !important; background: transparent !important; border: none !important; border-bottom: 2px solid transparent !important; transition: all 0.3s ease !important; } .gradio-container .tabs .tab-nav button.selected, .gradio-container .tabs .tab-nav button.selected span, .gradio-container .tabs .tab-nav button[aria-selected="true"], .gradio-container .tabs .tab-nav button[aria-selected="true"] span { color: #c4a469 !important; border-bottom-color: #c4a469 !important; background: rgba(196, 164, 105, 0.08) !important; } .gradio-container .tabs .tab-nav button:hover, .gradio-container .tabs .tab-nav button:hover span { color: #e8e0d4 !important; background: rgba(196, 164, 105, 0.05) !important; } .gradio-container .tabs .tab-nav { background: transparent !important; border-bottom: 1px solid rgba(196, 164, 105, 0.15) !important; } /* ── SLIDER / RANGE LABELS ── */ .gradio-container input[type="range"] + .rangeSlider, .gradio-container .range-slider, .gradio-container input[type="number"], .gradio-container input[type="number"]::-moz-placeholder { color: #e8e0d4 !important; } .gradio-container .wrap.default span, .gradio-container .head span, .gradio-container .range_slider span { color: #8a7a65 !important; } /* ── ACCORDION HEADERS ── */ .gradio-container .accordion > button, .gradio-container .accordion > .label-wrap, .gradio-container details > summary, .gradio-container details > summary span { color: #e8e0d4 !important; font-weight: 600 !important; } /* ── MARKDOWN / PROSE ── */ .gradio-container .md, .gradio-container .md p, .gradio-container .md li, .gradio-container .md strong, .gradio-container .md em, .gradio-container .md h3, .gradio-container .md h2 { color: #d4cbbe !important; } .gradio-container .md strong { color: #f0e8da !important; } .gradio-container .md a { color: #c4a469 !important; } .gradio-container .prose h3, .gradio-container h3 { color: #c4a469 !important; font-weight: 700 !important; font-size: 1.05em !important; } /* ── HINT TEXT ── */ .hint-text, .hint-text p, .hint-text * { color: #6b5e4f !important; font-size: 13px !important; } /* ── FILE UPLOAD ── */ .gradio-container .file-upload, .gradio-container .upload-button { background: rgba(0, 0, 0, 0.25) !important; border: 1px dashed rgba(196, 164, 105, 0.2) !important; color: #8a7a65 !important; border-radius: 12px !important; } /* ── DROPDOWN / SELECT ── */ .gradio-container .dropdown-container, .gradio-container .secondary-wrap, .gradio-container ul[role="listbox"] { background: rgba(15, 12, 8, 0.95) !important; border: 1px solid rgba(196, 164, 105, 0.15) !important; color: #e8e0d4 !important; } .gradio-container ul[role="listbox"] li { color: #e8e0d4 !important; } .gradio-container ul[role="listbox"] li:hover { background: rgba(196, 164, 105, 0.15) !important; } /* ── BUTTONS β€” ν•œλ³΅ inspired ── */ button.primary { background: linear-gradient(135deg, #b8860b, #c4a469, #8b6914) !important; border: none !important; box-shadow: 0 0 20px rgba(196, 164, 105, 0.35) !important; transition: all 0.3s cubic-bezier(0.4, 0, 0.2, 1) !important; font-weight: 700 !important; letter-spacing: 1px !important; text-transform: uppercase; border-radius: 12px !important; color: #0a0a0f !important; } button.primary:hover { transform: translateY(-3px) scale(1.02) !important; box-shadow: 0 0 30px rgba(196, 164, 105, 0.6) !important; color: #0a0a0f !important; } button.secondary { background: rgba(100, 90, 70, 0.15) !important; border: 1px solid rgba(196, 164, 105, 0.2) !important; color: #e8e0d4 !important; border-radius: 12px !important; font-weight: 600 !important; transition: all 0.25s ease !important; } button.secondary:hover { background: rgba(196, 164, 105, 0.12) !important; border-color: rgba(196, 164, 105, 0.35) !important; color: #f0e8da !important; transform: translateY(-2px) !important; } button.stop { background: linear-gradient(135deg, #8b2020, #c0392b) !important; border: none !important; box-shadow: 0 0 20px rgba(192, 57, 43, 0.4) !important; transition: all 0.3s cubic-bezier(0.4, 0, 0.2, 1) !important; font-weight: 700 !important; text-transform: uppercase; letter-spacing: 1px !important; border-radius: 12px !important; color: #f0e8da !important; } button.stop:hover { transform: translateY(-3px) scale(1.02) !important; box-shadow: 0 0 30px rgba(192, 57, 43, 0.6) !important; } /* ── INPUTS ── */ textarea, input[type="text"], input[type="number"], select { background: rgba(0, 0, 0, 0.3) !important; border: 1px solid rgba(196, 164, 105, 0.12) !important; border-radius: 10px !important; color: #f0e8da !important; transition: all 0.3s cubic-bezier(0.4, 0, 0.2, 1) !important; } textarea:focus, input[type="text"]:focus { border-color: #c4a469 !important; box-shadow: 0 0 15px rgba(196, 164, 105, 0.25), inset 0 0 0 1px rgba(196, 164, 105, 0.25) !important; } .gradio-container .textbox label span, .gradio-container .input-label { color: #c4a469 !important; font-weight: 600 !important; } /* ── EXPORT / DOWNLOAD FILE COMPONENT ── */ .gradio-container .download, .gradio-container .file a { color: #c4a469 !important; } /* ── GALLERY ── */ .gradio-container .gallery-item { background: rgba(0, 0, 0, 0.3) !important; border: 1px solid rgba(196, 164, 105, 0.1) !important; border-radius: 10px !important; } /* ── INFO / TOOLTIP ── */ .gradio-container .info, .gradio-container .wrap .info { color: #6b5e4f !important; } /* ── BLOCK BACKGROUNDS ── */ .gradio-container .block { background: transparent !important; } .gradio-container .contain { background: transparent !important; } .gradio-container .tabitem { background: transparent !important; } /* ── AUDIO PLAYER ── */ .gradio-container audio { filter: invert(0.85) hue-rotate(180deg) sepia(0.15); border-radius: 8px; } /* ── Hide Stream Box Duplicate Progress ── */ #stream_box .progress-text, #stream_box .progress-level, #stream_box .progress, #stream_box .progress-container { display: none !important; } /* ── MAIN PROGRESS BAR ── */ .gradio-container .progress-container, .gradio-container .progress-text { color: #ffffff !important; font-weight: 700 !important; text-shadow: 0px 1px 4px rgba(0,0,0,0.9) !important; } .gradio-container .progress-level { background: linear-gradient(90deg, #b8860b, #c4a469) !important; } /* ── SCROLLBAR β€” ink brush style ── */ * { scrollbar-width: thin; scrollbar-color: rgba(196, 164, 105, 0.3) transparent; } ::-webkit-scrollbar { width: 5px; height: 5px; } ::-webkit-scrollbar-track { background: transparent; } ::-webkit-scrollbar-thumb { background: rgba(196, 164, 105, 0.3); border-radius: 3px; } ::-webkit-scrollbar-thumb:hover { background: rgba(196, 164, 105, 0.5); } /* ── CHECKBOX ── */ .gradio-container input[type="checkbox"] { appearance: none !important; -webkit-appearance: none !important; width: 20px !important; height: 20px !important; border: 2px solid rgba(196, 164, 105, 0.4) !important; border-radius: 5px !important; background: rgba(0, 0, 0, 0.3) !important; cursor: pointer !important; position: relative !important; transition: all 0.2s ease !important; } .gradio-container input[type="checkbox"]:checked { background: linear-gradient(135deg, #b8860b, #c4a469) !important; border-color: #c4a469 !important; } .gradio-container input[type="checkbox"]:checked::after { content: "βœ“" !important; position: absolute !important; top: 50% !important; left: 50% !important; transform: translate(-50%, -50%) !important; color: #0a0a0f !important; font-size: 14px !important; font-weight: 900 !important; } .gradio-container input[type="checkbox"]:hover { border-color: #c4a469 !important; box-shadow: 0 0 8px rgba(196, 164, 105, 0.3) !important; } /* ── Korean decorative border on main content ── */ .gradio-container > .main { border-top: 3px solid transparent !important; border-image: linear-gradient(90deg, transparent, rgba(196, 164, 105, 0.3), rgba(192, 57, 43, 0.2), rgba(196, 164, 105, 0.3), transparent) 1 !important; } """ # Force dark mode via JavaScript so Gradio's internal CSS variables default to dark dark_mode_js = """ function() { document.body.classList.add('dark'); document.documentElement.classList.add('dark'); } """ with gr.Blocks(title="LocalDuo", theme=custom_theme, css=css, js=dark_mode_js) as demo: gr.Markdown("# LocalDuo β€” Learn Korean from PDFs, Websites & YouTube") gr.Markdown("πŸ‡°πŸ‡·βœ¨ Enter a website URL 🌐, upload a PDF πŸ“„, upload an audio file 🎡, or paste a YouTube link 🎬. The app uses a **Vision-Language Model (VLM)** 🧠, **ASR** 🎀, and **TTS** πŸ—£οΈ to generate vocabulary flashcards.") active_tab = gr.State("Website URL") with gr.Row(): with gr.Column(scale=1): with gr.Tabs() as input_tabs: with gr.Tab("Website URL", id="tab_url") as tab_url: url_input = gr.Textbox(label="Enter a Website URL 🌐", placeholder=r"e.g. https://www.bbc.com/korean/articles/cn0p7rkvxdgo", value=r"https://www.bbc.com/korean/articles/cn0p7rkvxdgo") with gr.Tab("Upload PDF", id="tab_pdf") as tab_pdf: pdf_input = gr.File(label="Upload Book PDF πŸ“š", file_types=[".pdf"], value=example_pdf) with gr.Tab("Upload Audio", id="tab_audio") as tab_audio: audio_file_input = gr.File(label="Upload Audio File 🎡", file_types=[".wav", ".mp3", ".m4a", ".ogg", ".flac", ".opus", ".webm"], value=example_audio) gr.Markdown("*Upload a Korean audio file. It will be transcribed using Cohere ASR and vocabulary will be extracted from the transcript.*", elem_classes=["hint-text"]) with gr.Tab("YouTube Link", id="tab_yt") as tab_yt: yt_url_input = gr.Textbox(label="Enter a YouTube Link 🎬", placeholder=r"e.g. https://www.youtube.com/watch?v=...", value="https://www.youtube.com/watch?v=9Nj7l73PBWE", info="Audio from the first 5 minutes will be transcribed using Cohere ASR") yt_cookies_input = gr.File(label="YouTube Cookies (cookies.txt)", file_types=[".txt"], value=None, type="filepath") gr.Markdown("*Optional. Helps bypass YouTube bot detection. Install the [cookies.txt](https://addons.mozilla.org/firefox/addon/cookies-txt/) extension, go to youtube.com while logged in, click the extension β†’ 'Current Site' to export.*", elem_classes=["hint-text"]) with gr.Tab("πŸ“‚ Import Deck", id="tab_import") as tab_import: gr.Markdown("### Load a saved deck into the app") gr.Markdown("Upload a previously exported **JSON file** or an **Anki .apkg deck** to reload flashcards without regenerating.") import_json_file_in = gr.File(label="πŸ“„ JSON Deck (.json)", file_types=[".json"]) import_anki_file_in = gr.File(label="πŸ“¦ Anki Deck (.apkg)", file_types=[".apkg"]) import_load_btn = gr.Button("πŸ“‚ Load Deck", variant="primary") # Track active tab tab_url.select(fn=lambda: "Website URL", inputs=None, outputs=active_tab) tab_pdf.select(fn=lambda: "Upload PDF", inputs=None, outputs=active_tab) tab_audio.select(fn=lambda: "Upload Audio", inputs=None, outputs=active_tab) tab_yt.select(fn=lambda: "YouTube Link", inputs=None, outputs=active_tab) tab_import.select(fn=lambda: "Import Deck", inputs=None, outputs=active_tab) gr.Markdown("### βš™οΈ Customization Settings") max_text_char_input = gr.Slider(minimum=1000, maximum=30000, step=1000, value=1500, label="Max Input Text Length (Characters)") repetition_penalty_input = gr.Slider(minimum=0.1, maximum=2.0, step=0.1, value=1.2, label="Repetition Penalty") auto_force_chars_input = gr.Slider(minimum=1_000, maximum=10_000, step=100, value=4_000, label="Auto-force JSON after (chars of thinking)") enable_thinking_checkbox = gr.Checkbox(label="🧠 Enable Thinking (longer but more accurate)", value=True) with gr.Accordion("πŸ”§ Advanced", open=False): translit_lang = gr.Dropdown( label="Word Transliteration Language", choices=LANGUAGE_CHOICES, value="Indo-European - English" ) translit_format = gr.Dropdown(label="Transliteration Format", choices=["dashed syllable", "regular word with space"], value="dashed syllable") target_lang = gr.Dropdown( label="Target Language (Full App)", choices=LANGUAGE_CHOICES, value="Indo-European - English" ) submit_btn = gr.Button("✨ Generate Flashcards ✨", variant="primary") with gr.Accordion("πŸ›‘ Generation Controls", open=False): with gr.Row(): stop_thinking_btn = gr.Button("⚑ Stop thinking, Generate now", variant="secondary") stop_btn = gr.Button("πŸ›‘ Stop Generation", variant="stop") with gr.Column(scale=2): with gr.Tabs() as output_tabs: with gr.Tab("πŸ“– Flashcards"): bootstrap_html = build_flashcard_html(BOOTSTRAP_VOCAB) output_html = gr.HTML(label="Flashcards will appear here", value=bootstrap_html) gr.Markdown("**Export current deck:**") with gr.Row(): export_json_btn = gr.Button("πŸ“₯ Export JSON", variant="secondary", size="sm") export_anki_btn = gr.Button("πŸ“¦ Export Anki (.apkg)", variant="secondary", size="sm") export_json_out = gr.File(label="⬇️ JSON Download", visible=False, interactive=False) export_anki_out = gr.File(label="⬇️ Anki Deck Download", visible=False, interactive=False) with gr.Tab("❓ Quiz"): gr.Markdown("**Test your knowledge** with a randomized 5-question multiple-choice quiz from the current deck.") start_quiz_btn = gr.Button("πŸ§ͺ Start 5-Question Quiz", variant="primary") bootstrap_quiz_html = build_quiz_html(BOOTSTRAP_VOCAB) quiz_output_html = gr.HTML(label="Quiz", value=bootstrap_quiz_html) stream_box = gr.Textbox(label="Live Model Generation 🧠", lines=10, max_lines=20, interactive=False, autoscroll=True, elem_id="stream_box") with gr.Accordion("πŸ“„ Extracted Source Content", open=True): extracted_text_box = gr.Textbox(label="Extracted Text", lines=10, max_lines=15, interactive=False, value=DEMO_EXTRACTED_TEXT) extracted_images_gallery = gr.Gallery(label="Extracted Images", columns=4, height="auto", object_fit="contain") extracted_audio_player = gr.Audio(label="Extracted Audio (YouTube / Uploaded)", type="filepath", interactive=False) last_source_state = gr.State(None) last_korean_words_state = gr.State(BOOTSTRAP_VOCAB) def reset_btn_text(): return gr.update(value="⚑ Stop thinking, Generate now"), gr.update(value="πŸ›‘ Stop Generation") submit_btn.click(fn=reset_btn_text, inputs=None, outputs=[stop_thinking_btn, stop_btn], queue=False) generate_event = submit_btn.click( fn=process_pdf, inputs=[pdf_input, url_input, audio_file_input, yt_url_input, yt_cookies_input, translit_lang, translit_format, target_lang, max_text_char_input, repetition_penalty_input, auto_force_chars_input, last_source_state, last_korean_words_state, active_tab, enable_thinking_checkbox], outputs=[output_html, last_source_state, last_korean_words_state, stream_box, extracted_text_box, extracted_images_gallery, extracted_audio_player] ) stop_thinking_btn.click(fn=set_stop_thinking, inputs=None, outputs=stop_thinking_btn, queue=False).then( fn=reset_stop_thinking_after_delay, inputs=None, outputs=stop_thinking_btn ) stop_btn.click(fn=set_kill_threads, inputs=None, outputs=stop_btn, queue=False) # Export events export_json_btn.click(fn=export_json_file_fn, inputs=[last_korean_words_state], outputs=[export_json_out]) export_anki_btn.click(fn=export_anki_file_fn, inputs=[last_korean_words_state], outputs=[export_anki_out]) # Import event import_load_btn.click( fn=import_deck_fn, inputs=[import_json_file_in, import_anki_file_in], outputs=[output_html, last_korean_words_state] ) # Quiz event start_quiz_btn.click(fn=build_quiz_html, inputs=[last_korean_words_state], outputs=[quiz_output_html]) # Force autoscroll using Custom JS stream_box.change( fn=None, js=""" function() { const ta = document.querySelector('#stream_box textarea'); if (ta) { ta.scrollTop = ta.scrollHeight; } } """ ) return demo if __name__ == "__main__": global model, processor, tts, voice_style, asr_model, asr_processor model_id = "Qwen/Qwen3.5-9B" # model_id = "Qwen/Qwen3.5-2B" print(f"Loading {model_id} model via Transformers...") processor = AutoProcessor.from_pretrained(model_id, trust_remote_code=True) # try: # with open("chat_template.jinja", "r", encoding="utf-8") as f: # processor.chat_template = f.read() # except Exception as e: # print("Could not load custom chat template:", e) processor.chat_template = """ {%- set image_count = namespace(value=0) %} {%- set video_count = namespace(value=0) %} {%- macro render_content(content, do_vision_count, is_system_content=false) %} {%- if content is string %} {{- content }} {%- elif content is iterable and content is not mapping %} {%- for item in content %} {%- if 'image' in item or 'image_url' in item or item.type == 'image' %} {%- if is_system_content %} {{- raise_exception('System message cannot contain images.') }} {%- endif %} {%- if do_vision_count %} {%- set image_count.value = image_count.value + 1 %} {%- endif %} {%- if add_vision_id %} {{- 'Picture ' ~ image_count.value ~ ': ' }} {%- endif %} {{- '<|vision_start|><|image_pad|><|vision_end|>' }} {%- elif 'video' in item or item.type == 'video' %} {%- if is_system_content %} {{- raise_exception('System message cannot contain videos.') }} {%- endif %} {%- if do_vision_count %} {%- set video_count.value = video_count.value + 1 %} {%- endif %} {%- if add_vision_id %} {{- 'Video ' ~ video_count.value ~ ': ' }} {%- endif %} {{- '<|vision_start|><|video_pad|><|vision_end|>' }} {%- elif 'text' in item %} {{- item.text }} {%- else %} {{- raise_exception('Unexpected item type in content.') }} {%- endif %} {%- endfor %} {%- elif content is none or content is undefined %} {{- '' }} {%- else %} {{- raise_exception('Unexpected content type.') }} {%- endif %} {%- endmacro %} {%- if not messages %} {{- raise_exception('No messages provided.') }} {%- endif %} {%- if tools and tools is iterable and tools is not mapping %} {{- '<|im_start|>system\n' }} {{- "# Tools\n\nYou have access to the following functions:\n\n" }} {%- for tool in tools %} {{- "\n" }} {{- tool | tojson }} {%- endfor %} {{- "\n" }} {{- '\n\nIf you choose to call a function ONLY reply in the following format with NO suffix:\n\n\n\n\nvalue_1\n\n\nThis is the value for the second parameter\nthat can span\nmultiple lines\n\n\n\n\n\nReminder:\n- Function calls MUST follow the specified format: an inner block must be nested within XML tags\n- Required parameters MUST be specified\n- You may provide optional reasoning for your function call in natural language BEFORE the function call, but NOT after\n- If there is no function call available, answer the question like normal with your current knowledge and do not tell the user about function calls\n' }} {%- if messages[0].role == 'system' %} {%- set content = render_content(messages[0].content, false, true)|trim %} {%- if content %} {{- '\n\n' + content }} {%- endif %} {%- endif %} {{- '<|im_end|>\n' }} {%- else %} {%- if messages[0].role == 'system' %} {%- set content = render_content(messages[0].content, false, true)|trim %} {{- '<|im_start|>system\n' + content + '<|im_end|>\n' }} {%- endif %} {%- endif %} {%- set ns = namespace(multi_step_tool=true, last_query_index=messages|length - 1) %} {%- for message in messages[::-1] %} {%- set index = (messages|length - 1) - loop.index0 %} {%- if ns.multi_step_tool and message.role == "user" %} {%- set content = render_content(message.content, false)|trim %} {%- if not(content.startswith('') and content.endswith('')) %} {%- set ns.multi_step_tool = false %} {%- set ns.last_query_index = index %} {%- endif %} {%- endif %} {%- endfor %} {%- if ns.multi_step_tool %} {{- raise_exception('No user query found in messages.') }} {%- endif %} {%- for message in messages %} {%- set content = render_content(message.content, true)|trim %} {%- if message.role == "system" %} {%- if not loop.first %} {{- raise_exception('System message must be at the beginning.') }} {%- endif %} {%- elif message.role == "user" %} {{- '<|im_start|>' + message.role + '\n' + content + '<|im_end|>' + '\n' }} {%- elif message.role == "assistant" %} {%- set reasoning_content = '' %} {%- if message.reasoning_content is string %} {%- set reasoning_content = message.reasoning_content %} {%- else %} {%- if '' in content %} {%- set reasoning_content = content.split('')[0].rstrip('\n').split('')[-1].lstrip('\n') %} {%- set content = content.split('')[-1].lstrip('\n') %} {%- endif %} {%- endif %} {%- set reasoning_content = reasoning_content|trim %} {%- if loop.index0 > ns.last_query_index %} {{- '<|im_start|>' + message.role + '\n\n' + reasoning_content + '\n\n\n' + content }} {%- else %} {{- '<|im_start|>' + message.role + '\n' + content }} {%- endif %} {%- if message.tool_calls and message.tool_calls is iterable and message.tool_calls is not mapping %} {%- for tool_call in message.tool_calls %} {%- if tool_call.function is defined %} {%- set tool_call = tool_call.function %} {%- endif %} {%- if loop.first %} {%- if content|trim %} {{- '\n\n\n\n' }} {%- else %} {{- '\n\n' }} {%- endif %} {%- else %} {{- '\n\n\n' }} {%- endif %} {%- if tool_call.arguments is defined %} {%- for args_name, args_value in tool_call.arguments|items %} {{- '\n' }} {%- set args_value = args_value | tojson | safe if args_value is mapping or (args_value is sequence and args_value is not string) else args_value | string %} {{- args_value }} {{- '\n\n' }} {%- endfor %} {%- endif %} {{- '\n' }} {%- endfor %} {%- endif %} {{- '<|im_end|>\n' }} {%- elif message.role == "tool" %} {%- if loop.previtem and loop.previtem.role != "tool" %} {{- '<|im_start|>user' }} {%- endif %} {{- '\n\n' }} {{- content }} {{- '\n' }} {%- if not loop.last and loop.nextitem.role != "tool" %} {{- '<|im_end|>\n' }} {%- elif loop.last %} {{- '<|im_end|>\n' }} {%- endif %} {%- else %} {{- raise_exception('Unexpected message role.') }} {%- endif %} {%- endfor %} {%- if add_generation_prompt %} {{- '<|im_start|>assistant\n' }} {%- if enable_thinking is defined and enable_thinking is false %} {{- '\n\n\n\n' }} {%- else %} {{- '\n' }} {%- endif %} {%- endif %} """.strip() model = AutoModelForImageTextToText.from_pretrained( model_id, torch_dtype=torch.bfloat16, device_map="cpu", trust_remote_code=True ) print("Loading Cohere ASR model...") from transformers import CohereAsrForConditionalGeneration asr_processor = AutoProcessor.from_pretrained("CohereLabs/cohere-transcribe-03-2026") asr_model = CohereAsrForConditionalGeneration.from_pretrained( "CohereLabs/cohere-transcribe-03-2026", device_map="cpu", ) print("Loading Supertonic TTS...") tts = TTS(model="supertonic-3") try: voice_style = tts.get_voice_style("F1") except Exception: voice_style = tts.get_voice_style(tts.voice_style_names[0]) print("Generating audio for BOOTSTRAP_VOCAB...") for item in BOOTSTRAP_VOCAB: korean_word = item.get("korean", "") if korean_word and tts is not None: try: wav, dur = tts.synthesize( text=korean_word, voice_style=voice_style, lang="ko", speed=0.7, total_steps=12, ) item["audio_uri"] = numpy_to_base64_audio(wav, tts.sample_rate) except Exception as e: print(f"Failed to generate audio for {korean_word}: {e}") demo = create_demo() if IS_HF: demo.launch(allowed_paths=[LOG_DIR]) else: demo.launch(server_name="0.0.0.0", server_port=7865, allowed_paths=[LOG_DIR])