# app.py # Whisper Transcriber — Gradio 3.x compatible complete file with UI improvements: # - small buttons, advanced toggle, download selected extracted files, # - auto-merge per-file transcripts, auto cleanup of temp files after N minutes # Requirements: gradio (3.x), pydub, pyzipper, python-docx, ffmpeg, whisper or faster-whisper import os import sys import json import shutil import tempfile import subprocess import traceback import threading import re import zipfile from difflib import get_close_matches from uuid import uuid4 from pathlib import Path from concurrent.futures import ProcessPoolExecutor, as_completed import multiprocessing import time # Force unbuffered prints os.environ["PYTHONUNBUFFERED"] = "1" try: import gradio as gr except Exception as e: print("FATAL: gradio import failed:", e) raise # try faster-whisper first for CPU speedups USE_FASTER_WHISPER = False try: from faster_whisper import WhisperModel as FasterWhisperModel USE_FASTER_WHISPER = True print("INFO: faster-whisper detected.") except Exception: try: import whisper except Exception: print("FATAL: Neither faster-whisper nor whisper available. Install whisper or faster-whisper.") raise from pydub import AudioSegment import pyzipper from docx import Document # ---------- Config ---------- MEMORY_FILE = "memory.json" MEMORY_LOCK = threading.Lock() MIN_WAV_SIZE = 1024 FFMPEG_CANDIDATES = [ ("s16le", 16000, 1), ("s16le", 44100, 2), ("pcm_s16le", 16000, 1), ("pcm_s16le", 44100, 2), ("mulaw", 8000, 1), ] MODEL_CACHE = {} EXTRACT_MAP = {} # friendly_name -> path LAST_EXTRACT_DIR = None # path to last extraction folder (for download) LAST_EXTRACT_LIST = [] # friendly names for last extraction (for select all) DEFAULT_ZIP_PASS = "dietcoke1" # NEW: last batch transcripts (set by batch generator). Each item: (friendly_name, txt_path, srt_path) LAST_BATCH_TRANSCRIPTS = [] CPU_COUNT = max(1, multiprocessing.cpu_count()) MAX_WORKERS = min(4, CPU_COUNT) # tune for your environment # Auto-cleanup configuration (minutes); can be changed in settings UI AUTO_CLEANUP_MINUTES = 30 # Temp registry for cleanup: entries are tuples (path, created_timestamp) _TEMP_REGISTRY_LOCK = threading.Lock() _TEMP_REGISTRY = [] def register_temp_path(p): """Register a temp path for later cleanup.""" try: with _TEMP_REGISTRY_LOCK: _TEMP_REGISTRY.append((str(p), time.time())) except Exception: pass def cleanup_temp_worker(interval_seconds=60): """Background thread to cleanup temp files older than AUTO_CLEANUP_MINUTES.""" while True: try: cutoff = time.time() - (AUTO_CLEANUP_MINUTES * 60) to_delete = [] with _TEMP_REGISTRY_LOCK: remaining = [] for p, ts in _TEMP_REGISTRY: if ts < cutoff: to_delete.append(p) else: remaining.append((p, ts)) _TEMP_REGISTRY[:] = remaining for p in to_delete: try: if os.path.isdir(p): shutil.rmtree(p) elif os.path.exists(p): os.unlink(p) except Exception: # ignore deletion errors pass except Exception: pass time.sleep(interval_seconds) # Start cleanup thread as daemon _cleanup_thread = threading.Thread(target=cleanup_temp_worker, daemon=True) _cleanup_thread.start() # ---------- Memory & postprocessing ---------- def load_memory(): try: if os.path.exists(MEMORY_FILE): with open(MEMORY_FILE, "r", encoding="utf-8") as fh: data = json.load(fh) if not isinstance(data, dict): raise ValueError("memory.json root not dict") data.setdefault("words", {}) data.setdefault("phrases", {}) return data except Exception: pass mem = {"words": {}, "phrases": {}} try: with open(MEMORY_FILE, "w", encoding="utf-8") as fh: json.dump(mem, fh, ensure_ascii=False, indent=2) except Exception: pass return mem def save_memory(mem): with MEMORY_LOCK: try: with open(MEMORY_FILE, "w", encoding="utf-8") as fh: json.dump(mem, fh, ensure_ascii=False, indent=2) except Exception: traceback.print_exc() memory = load_memory() MEDICAL_ABBREVIATIONS = { "pt": "patient", "dx": "diagnosis", "hx": "history", "sx": "symptoms", "c/o": "complains of", "bp": "blood pressure", "hr": "heart rate", "o2": "oxygen", "r/o": "rule out", "adm": "admit", "disch": "discharge", } DRUG_NORMALIZATION = { "metformin": "Metformin", "aspirin": "Aspirin", "amoxicillin": "Amoxicillin", } def expand_abbreviations(text): tokens = re.split(r"(\s+)", text) out = [] for t in tokens: key = t.lower().strip(".,;:") if key in MEDICAL_ABBREVIATIONS: trailing = "" m = re.match(r"([A-Za-z0-9/]+)([.,;:]*)", t) if m: trailing = m.group(2) or "" out.append(MEDICAL_ABBREVIATIONS[key] + trailing) else: out.append(t) return "".join(out) def normalize_drugs(text): for k, v in DRUG_NORMALIZATION.items(): text = re.sub(rf"\b{k}\b", v, text, flags=re.IGNORECASE) return text def punctuation_and_capitalization(text): text = text.strip() if not text: return text if not re.search(r"[.?!]\s*$", text): text = text.rstrip() + "." parts = re.split(r"([.?!]\s+)", text) out = [] for p in parts: if p and not re.match(r"[.?!]\s+", p): out.append(p.capitalize()) else: out.append(p) return "".join(out) def postprocess_transcript(text): if not text: return text t = re.sub(r"\s+", " ", text).strip() t = expand_abbreviations(t) t = normalize_drugs(t) t = punctuation_and_capitalization(t) return t def extract_words_and_phrases(text): words = re.findall(r"[A-Za-z0-9\-']+", text) sentences = [s.strip() for s in re.split(r"(?<=[.?!])\s+", text) if s.strip()] return [w for w in words if w.strip()], sentences def update_memory_with_transcript(transcript): global memory words, sentences = extract_words_and_phrases(transcript) changed = False with MEMORY_LOCK: for w in words: lw = w.lower() memory["words"][lw] = memory["words"].get(lw, 0) + 1 changed = True for s in sentences: memory["phrases"][s] = memory["phrases"].get(s, 0) + 1 changed = True if changed: save_memory(memory) def memory_correct_text(text, min_ratio=0.85): if not text or (not memory.get("words") and not memory.get("phrases")): return text def fix_word(w): lw = w.lower() if lw in memory["words"]: return w candidates = get_close_matches(lw, memory["words"].keys(), n=1, cutoff=min_ratio) if candidates: cand = candidates[0] if w and w[0].isupper(): return cand.capitalize() return cand return w tokens = re.split(r"(\W+)", text) corrected_tokens = [] for tok in tokens: if re.match(r"^[A-Za-z0-9\-']+$", tok): corrected_tokens.append(fix_word(tok)) else: corrected_tokens.append(tok) corrected = "".join(corrected_tokens) for phrase in sorted(memory.get("phrases", {}).keys(), key=lambda s: -len(s)): low_phrase = phrase.lower() if len(low_phrase) < 8: continue if low_phrase in corrected.lower(): corrected = re.sub(re.escape(phrase), phrase, corrected, flags=re.IGNORECASE) return corrected # ---------- Utilities ---------- def save_as_word(text, filename=None): if filename is None: filename = os.path.join(tempfile.gettempdir(), f"merged_transcripts_{uuid4().hex[:8]}.docx") doc = Document() doc.add_paragraph(text) doc.save(filename) register_temp_path(filename) return filename def _ffmpeg_convert(input_path, out_path, fmt, sr, ch): try: cmd = ["ffmpeg", "-hide_banner", "-loglevel", "error", "-y"] if fmt in ("s16le", "pcm_s16le", "mulaw"): cmd += ["-f", fmt, "-ar", str(sr), "-ac", str(ch), "-i", input_path, out_path] else: cmd += ["-i", input_path, "-ar", str(sr), "-ac", str(ch), out_path] proc = subprocess.run(cmd, capture_output=True, timeout=60, text=True) stdout_stderr = (proc.stdout or "") + (proc.stderr or "") if proc.returncode == 0 and os.path.exists(out_path) and os.path.getsize(out_path) > MIN_WAV_SIZE: return True, stdout_stderr else: try: if os.path.exists(out_path): os.unlink(out_path) except Exception: pass return False, stdout_stderr except Exception as e: try: if os.path.exists(out_path): os.unlink(out_path) except Exception: pass return False, str(e) def convert_to_wav_if_needed(input_path): input_path = str(input_path) lower = input_path.lower() if lower.endswith(".wav"): return input_path auto_err = "" tmp = None try: tmp = tempfile.NamedTemporaryFile(suffix=".wav", delete=False) tmp.close() AudioSegment.from_file(input_path).export(tmp.name, format="wav") if os.path.exists(tmp.name) and os.path.getsize(tmp.name) > MIN_WAV_SIZE: register_temp_path(tmp.name) return tmp.name else: try: os.unlink(tmp.name) except Exception: pass except Exception: auto_err = traceback.format_exc() try: if tmp and os.path.exists(tmp.name): os.unlink(tmp.name) except Exception: pass diag_dir = tempfile.mkdtemp(prefix="dct_diag_") register_temp_path(diag_dir) diag_log = os.path.join(diag_dir, "conversion_diagnostics.txt") diagnostics = [] for fmt, sr, ch in FFMPEG_CANDIDATES: out_wav = tempfile.NamedTemporaryFile(suffix=".wav", delete=False) out_wav.close() register_temp_path(out_wav.name) success, debug = _ffmpeg_convert(input_path, out_wav.name, fmt, sr, ch) diagnostics.append(f"TRY fmt={fmt} sr={sr} ch={ch} success={success}\n{debug}\n") if success: try: with open(diag_log, "w", encoding="utf-8") as fh: fh.write("pydub auto error:\n") fh.write(auto_err + "\n\n") fh.write("Successful ffmpeg candidate:\n") fh.write(f"fmt={fmt} sr={sr} ch={ch}\n\n") fh.write("Diagnostics:\n") fh.write("\n".join(diagnostics)) except Exception: pass return out_wav.name else: try: if os.path.exists(out_wav.name): os.unlink(out_wav.name) except Exception: pass try: fp = subprocess.run( ["ffprobe", "-v", "error", "-show_format", "-show_streams", input_path], capture_output=True, text=True, timeout=10, ) diagnostics.append("FFPROBE:\n" + (fp.stdout.strip() or fp.stderr.strip())) except Exception as e: diagnostics.append("ffprobe failed: " + str(e)) try: with open(input_path, "rb") as fh: head = fh.read(512) diagnostics.append("HEX PREVIEW:\n" + head.hex()) except Exception as e: diagnostics.append("could not read head: " + str(e)) try: with open(diag_log, "w", encoding="utf-8") as fh: fh.write("pydub auto error:\n") fh.write(auto_err + "\n\n") fh.write("Full diagnostics:\n\n") fh.write("\n\n".join(diagnostics)) except Exception as e: raise Exception(f"Conversion failed; diagnostics write error: {e}") raise Exception(f"Could not convert file to WAV. Diagnostics saved to: {diag_log}") # ---------- Model helper ---------- def whisper_available_models(): try: if USE_FASTER_WHISPER: return set(["tiny", "base", "small", "medium", "large", "large-v3"]) else: models = whisper.available_models() if isinstance(models, (list, tuple, set)): return set(models) except Exception: pass return set(["tiny", "base", "small", "medium", "large", "large-v3"]) AVAILABLE_MODEL_SET = whisper_available_models() def safe_model_choices(prefer_default="small"): base_choices = ["small", "medium", "large", "large-v3", "base", "tiny"] choices = [m for m in base_choices if m in AVAILABLE_MODEL_SET] if not choices: choices = base_choices default = prefer_default if prefer_default in choices else choices[0] return choices, default # ---------- worker used by ProcessPoolExecutor ---------- def _fmt_time(t): h = int(t // 3600) m = int((t % 3600) // 60) s = int(t % 60) ms = int((t - int(t)) * 1000) return f"{h:02d}:{m:02d}:{s:02d},{ms:03d}" def _segments_to_srt(segments): lines = [] for i, seg in enumerate(segments, start=1): start = seg.get("start", 0) end = seg.get("end", 0) text = seg.get("text", "").strip() lines.append(str(i)) lines.append(f"{_fmt_time(start)} --> {_fmt_time(end)}") lines.append(text) lines.append("") return "\n".join(lines) def _worker_transcribe(args): try: (file_path, model_name, device_name, enable_memory, generate_srt, use_two_pass, fast_model, refine_threshold) = args base = os.path.basename(file_path) log_lines = [] device = None if device_name == "auto" else device_name model = None use_fw = False try: if USE_FASTER_WHISPER: model = FasterWhisperModel(model_name, device=device if device else "cpu") use_fw = True log_lines.append(f"Worker: faster-whisper loaded {model_name}") else: import whisper as _wh model = _wh.load_model(model_name) use_fw = False log_lines.append(f"Worker: whisper loaded {model_name}") except Exception as e: log_lines.append(f"Worker model load failed: {e}") try: if USE_FASTER_WHISPER: model = FasterWhisperModel("small", device=device if device else "cpu") use_fw = True log_lines.append("Worker: fallback to faster-whisper small") else: model = whisper.load_model("small") use_fw = False log_lines.append("Worker: fallback whisper small") except Exception as e2: return {"file": base, "text_path": None, "srt_path": None, "log": "Model load failed: " + str(e2)} try: wav = convert_to_wav_if_needed(file_path) log_lines.append(f"Converted to WAV: {os.path.basename(wav)}") except Exception as e: return {"file": base, "text_path": None, "srt_path": None, "log": "Conversion failed: " + str(e)} try: if use_fw: segments, info = model.transcribe(wav, beam_size=5) # faster-whisper segments objects differ; build text text = "".join([getattr(seg, "text", "") for seg in segments]).strip() srt_out = None if generate_srt: srt_lines = [] for idx, seg in enumerate(segments, start=1): start = getattr(seg, "start", 0) end = getattr(seg, "end", 0) txt = getattr(seg, "text", "").strip() srt_lines.append(str(idx)) srt_lines.append(f"{_fmt_time(start)} --> {_fmt_time(end)}") srt_lines.append(txt) srt_lines.append("") srt_out = "\n".join(srt_lines) else: result = model.transcribe(wav) text = result.get("text", "").strip() srt_out = _segments_to_srt(result.get("segments")) if generate_srt and result.get("segments") else None except Exception as e: return {"file": base, "text_path": None, "srt_path": None, "log": "Transcription failed: " + str(e)} if enable_memory and text: text = memory_correct_text(text) text = postprocess_transcript(text) txt_tmp = tempfile.NamedTemporaryFile(suffix=".txt", delete=False) txt_tmp.close() register_temp_path(txt_tmp.name) with open(txt_tmp.name, "w", encoding="utf-8") as fh: fh.write(text) srt_path = None if generate_srt and srt_out: srt_tmp = tempfile.NamedTemporaryFile(suffix=".srt", delete=False) srt_tmp.close() register_temp_path(srt_tmp.name) with open(srt_tmp.name, "w", encoding="utf-8") as fh: fh.write(srt_out) srt_path = srt_tmp.name try: if wav and os.path.exists(wav) and not file_path.lower().endswith(".wav"): os.unlink(wav) except Exception: pass return {"file": base, "text_path": txt_tmp.name, "srt_path": srt_path, "log": "\n".join(log_lines)} except Exception as e: tb = traceback.format_exc() return {"file": os.path.basename(file_path) if file_path else "unknown", "text_path": None, "srt_path": None, "log": f"Worker exception: {e}\n{tb}"} # ---------- ZIP extraction & mapping ---------- def extract_zip_and_map(zip_path, zip_password=None): """ Extract ZIP into a per-run temp dir, populate EXTRACT_MAP (friendly name -> file path), and set LAST_EXTRACT_DIR to the extraction folder for download. Returns (friendly_list, logs_str) """ global EXTRACT_MAP, LAST_EXTRACT_DIR, LAST_EXTRACT_LIST EXTRACT_MAP = {} LAST_EXTRACT_DIR = None LAST_EXTRACT_LIST = [] run_id = uuid4().hex temp_extract_dir = os.path.join(tempfile.gettempdir(), f"extracted_audio_{run_id}") logs = [] try: os.makedirs(temp_extract_dir, exist_ok=True) with pyzipper.ZipFile(zip_path, "r") as zf: if zip_password: try: zf.setpassword(zip_password.encode()) except Exception: logs.append("Warning: failed to set zip password (continuing).") count = {} supported = [".mp3", ".wav", ".aac", ".flac", ".ogg", ".m4a", ".dat", ".dct"] for info in zf.infolist(): if info.is_dir(): continue _, ext = os.path.splitext(info.filename) if ext.lower() not in supported: continue try: zf.extract(info, path=temp_extract_dir) except RuntimeError as e: logs.append(f"Password required or incorrect for {info.filename}: {e}") continue except Exception as e: logs.append(f"Error extracting {info.filename}: {e}") continue fullp = os.path.normpath(os.path.join(temp_extract_dir, info.filename)) if not os.path.exists(fullp): continue base = os.path.basename(info.filename) key = base if key in EXTRACT_MAP: idx = count.get(base, 1) + 1 count[base] = idx name_only, extn = os.path.splitext(base) key = f"{name_only} ({idx}){extn}" else: count[base] = 1 EXTRACT_MAP[key] = fullp logs.append(f"Extracted: {info.filename} -> {key}") if not EXTRACT_MAP: logs.append("No supported audio files found in ZIP.") # cleanup temp dir if empty try: if os.path.exists(temp_extract_dir) and not os.listdir(temp_extract_dir): shutil.rmtree(temp_extract_dir) except Exception: pass return [], "\n".join(logs) friendly = sorted(EXTRACT_MAP.keys()) LAST_EXTRACT_DIR = temp_extract_dir LAST_EXTRACT_LIST = friendly[:] register_temp_path(temp_extract_dir) return friendly, "\n".join(logs) except Exception as e: traceback.print_exc() try: if os.path.exists(temp_extract_dir): shutil.rmtree(temp_extract_dir) except Exception: pass return [], f"Extraction failed: {e}" def download_extracted_folder(): """ Zip LAST_EXTRACT_DIR and return zip path for download (or None + message if missing). """ global LAST_EXTRACT_DIR if not LAST_EXTRACT_DIR or not os.path.exists(LAST_EXTRACT_DIR): return None, "No extracted folder available for download." try: zip_tmp = tempfile.NamedTemporaryFile(suffix=".zip", delete=False) zip_tmp.close() register_temp_path(zip_tmp.name) with zipfile.ZipFile(zip_tmp.name, "w", compression=zipfile.ZIP_DEFLATED) as zf: # Walk and add files preserving relative path for root, dirs, files in os.walk(LAST_EXTRACT_DIR): for f in files: fullp = os.path.join(root, f) rel = os.path.relpath(fullp, LAST_EXTRACT_DIR) zf.write(fullp, arcname=rel) return zip_tmp.name, "OK" except Exception as e: return None, f"Failed to create ZIP: {e}" def download_selected_extracted_files(selected_keys): """ Create a ZIP containing only the selected extracted files. Returns the zip path or None. """ if not selected_keys: return None, "No files selected." entries = [] for k in selected_keys: p = EXTRACT_MAP.get(k) if p and os.path.exists(p): entries.append((k, p)) if not entries: return None, "No valid selected files found." tmpzip = tempfile.NamedTemporaryFile(suffix=".zip", delete=False) tmpzip.close() register_temp_path(tmpzip.name) try: with zipfile.ZipFile(tmpzip.name, "w", compression=zipfile.ZIP_DEFLATED) as zf: for k, p in entries: arcname = k try: zf.write(p, arcname=arcname) except Exception: zf.write(p, arcname=os.path.basename(p)) return tmpzip.name, "OK" except Exception as e: return None, f"Failed to create selected ZIP: {e}" # ---------- Merge uploaded text files into single Word file ---------- def merge_text_files_to_docx(uploaded_text_files): """ Accepts a list of uploaded text file paths (or single path), merges them in order into one .docx and returns path. """ if not uploaded_text_files: return None, "No files provided." if isinstance(uploaded_text_files, (str, os.PathLike)): uploaded_text_files = [str(uploaded_text_files)] elif isinstance(uploaded_text_files, dict) and uploaded_text_files.get("name"): uploaded_text_files = [uploaded_text_files["name"]] elif isinstance(uploaded_text_files, (list, tuple)): normalized = [] for f in uploaded_text_files: if isinstance(f, (str, os.PathLike)): normalized.append(str(f)) elif isinstance(f, dict) and f.get("name"): normalized.append(f["name"]) elif hasattr(f, "name"): normalized.append(f.name) uploaded_text_files = normalized combined = [] for p in uploaded_text_files: if not os.path.exists(p): continue try: with open(p, "r", encoding="utf-8") as fh: txt = fh.read() except Exception: with open(p, "r", encoding="latin-1", errors="replace") as fh: txt = fh.read() header = f"\n\n--- {os.path.basename(p)} ---\n\n" combined.append(header + txt) if not combined: return None, "No readable text files." merged_text = "\n".join(combined) out_path = save_as_word(merged_text) return out_path, "Merged" # ---------- NEW: merge last batch transcripts ---------- def merge_last_batch_transcripts(): """ Merge txt transcripts created by the last batch run (LAST_BATCH_TRANSCRIPTS) into a single .docx. Returns (path_or_None, message) """ global LAST_BATCH_TRANSCRIPTS if not LAST_BATCH_TRANSCRIPTS: return None, "No last-batch transcripts available." combined = [] for fname, txtp, srtp in LAST_BATCH_TRANSCRIPTS: if not txtp or not os.path.exists(txtp): continue try: with open(txtp, "r", encoding="utf-8", errors="replace") as fh: txt = fh.read() except Exception: try: with open(txtp, "r", encoding="latin-1", errors="replace") as fh: txt = fh.read() except Exception: txt = "" header = f"\n\n--- {fname} ---\n\n" combined.append(header + txt) if not combined: return None, "No readable last-batch transcript files found." merged_text = "\n".join(combined) out_path = save_as_word(merged_text) return out_path, f"Merged {len(combined)} files." # ---------- Batch transcription generator (streaming) ---------- def batch_transcribe_parallel_generator( friendly_selected, uploaded_files, model_name, device_name, merge_flag, enable_mem, generate_srt, use_two_pass=False, fast_model="small", refine_threshold=-1.0, zip_password=None, auto_merge_per_file=True, ): global LAST_BATCH_TRANSCRIPTS LAST_BATCH_TRANSCRIPTS = [] # reset at start logs = [] transcripts = [] per_file_paths = [] try: paths = [] # gather selected extracted paths if friendly_selected: for key in friendly_selected: p = EXTRACT_MAP.get(key) if p: paths.append(p) else: logs.append(f"Warning: selected not found in extract map: {key}") # uploaded files if uploaded_files: if isinstance(uploaded_files, (list, tuple)): for f in uploaded_files: paths.append(str(f)) else: paths.append(str(uploaded_files)) if not paths: logs.append("No files selected or uploaded.") yield "\n\n".join(logs), "", None, 100 return total = len(paths) logs.append(f"Starting batch of {total} files with up to {MAX_WORKERS} workers.") yield "\n\n".join(logs), "", None, 2 tasks = [] for p in paths: tasks.append((p, model_name, device_name, enable_mem, generate_srt, use_two_pass, fast_model, refine_threshold)) completed = 0 with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, total)) as exe: futs = {exe.submit(_worker_transcribe, t): t for t in tasks} for fut in as_completed(futs): res = fut.result() completed += 1 fname = res.get("file") res_log = res.get("log", "") logs.append(f"[{completed}/{total}] {fname}: {res_log}") txtp = res.get("text_path") srtp = res.get("srt_path") if txtp: try: with open(txtp, "r", encoding="utf-8") as fh: txt_content = fh.read() except Exception: with open(txtp, "r", encoding="latin-1", errors="replace") as fh: txt_content = fh.read() transcripts.append(f"FILE: {fname}\n{txt_content}\n") per_file_paths.append((fname, txtp, srtp)) pct = int(5 + (completed / total) * 90) yield "\n\n".join(logs), "\n\n".join(transcripts), None, pct # Save per-file transcript list into global for later merging/downloading LAST_BATCH_TRANSCRIPTS = per_file_paths[:] combined = "\n\n".join(transcripts) out_doc = None if merge_flag or auto_merge_per_file: try: out_doc = save_as_word(combined) logs.append(f"Merged saved: {out_doc}") except Exception as e: logs.append(f"Merge failed: {e}") # Create ZIP of per-file transcripts (not original audio) if per_file_paths: zip_tmp = tempfile.NamedTemporaryFile(suffix=".zip", delete=False) zip_tmp.close() register_temp_path(zip_tmp.name) with zipfile.ZipFile(zip_tmp.name, "w", compression=zipfile.ZIP_DEFLATED) as zf: for fname, txtp, srtp in per_file_paths: arc_txt = f"{fname}.txt" try: zf.write(txtp, arcname=arc_txt) except Exception: zf.write(txtp, arcname=os.path.basename(txtp)) if srtp and os.path.exists(srtp): arc_srt = f"{fname}.srt" try: zf.write(srtp, arcname=arc_srt) except Exception: zf.write(srtp, arcname=os.path.basename(srtp)) logs.append(f"Per-file transcripts ZIP: {zip_tmp.name}") yield "\n\n".join(logs), combined, zip_tmp.name, 100 else: yield "\n\n".join(logs), combined, out_doc, 100 except Exception as e: tb = traceback.format_exc() logs.append(f"Batch error: {e}\n{tb}") yield "\n\n".join(logs), "\n\n".join(transcripts), None, 100 # ---------- Memory import helpers ---------- def _read_file_text_try_encodings(path): encodings = ["utf-8", "utf-16", "latin-1"] for enc in encodings: try: with open(path, "r", encoding=enc) as fh: return fh.read(), enc except UnicodeDecodeError: continue except Exception: break try: with open(path, "rb") as fh: raw = fh.read() try: text = raw.decode("utf-8") return text, "utf-8(guessed)" except Exception: text = raw.decode("latin-1", errors="replace") return text, "latin-1(replaced)" except Exception: return None, None def _process_single_memory_text(text): added = 0 try: parsed = json.loads(text) if isinstance(parsed, dict): words = parsed.get("words", {}) phrases = parsed.get("phrases", {}) with MEMORY_LOCK: for k, v in words.items(): try: cnt = int(v) except Exception: cnt = 1 memory["words"][k.lower()] = memory["words"].get(k.lower(), 0) + cnt added += 1 for k, v in phrases.items(): try: cnt = int(v) except Exception: cnt = 1 memory["phrases"][k] = memory["phrases"].get(k, 0) + cnt added += 1 return added except Exception: pass lines = [l.strip() for l in text.splitlines() if l.strip()] with MEMORY_LOCK: for line in lines: if "," in line: parts = [p.strip() for p in line.split(",", 1)] key = parts[0] try: cnt = int(parts[1]) except Exception: cnt = 1 memory["words"][key.lower()] = memory["words"].get(key.lower(), 0) + cnt added += 1 else: if len(line.split()) <= 3: memory["words"][line.lower()] = memory["words"].get(line.lower(), 0) + 1 added += 1 else: memory["phrases"][line] = memory["phrases"].get(line, 0) + 1 added += 1 return added def preview_zip_members_for_memory(zip_path): members = [] logs = [] try: with zipfile.ZipFile(zip_path, "r") as zf: for info in zf.infolist(): if info.is_dir(): continue name = info.filename _, ext = os.path.splitext(name) members.append(name) if not members: logs.append("No members found in ZIP.") else: logs.append(f"Found {len(members)} members.") except Exception as e: logs.append(f"ZIP preview failed: {e}") return members, "\n".join(logs) def import_memory_files_multiple(uploaded_files, zip_members_to_import=None): if not uploaded_files: return "No files provided." if isinstance(uploaded_files, (str, os.PathLike)): uploaded_files = [str(uploaded_files)] elif isinstance(uploaded_files, dict) and uploaded_files.get("name"): uploaded_files = [uploaded_files["name"]] elif isinstance(uploaded_files, (list, tuple)): normalized = [] for f in uploaded_files: if isinstance(f, (str, os.PathLike)): normalized.append(str(f)) elif isinstance(f, dict) and f.get("name"): normalized.append(f["name"]) elif hasattr(f, "name"): normalized.append(f.name) uploaded_files = normalized total_added = 0 messages = [] skipped = [] for fp in uploaded_files: try: if not os.path.exists(fp): messages.append(f"Missing: {fp}") continue if fp.lower().endswith(".zip"): try: with zipfile.ZipFile(fp, "r") as zf: for info in zf.infolist(): if info.is_dir(): continue name = info.filename if zip_members_to_import and name not in zip_members_to_import: continue try: with zf.open(info) as member: raw = member.read() text = None for enc in ("utf-8", "utf-16", "latin-1"): try: text = raw.decode(enc) break except Exception: text = None if text is None: text = raw.decode("latin-1", errors="replace") added = _process_single_memory_text(text) total_added += added messages.append(f"Imported {added} from {name} in {os.path.basename(fp)}") except Exception as e: skipped.append(f"{name}: {e}") continue except zipfile.BadZipFile: skipped.append(f"Bad zip: {fp}") continue text, used_enc = _read_file_text_try_encodings(fp) if text is None: skipped.append(fp) continue added = _process_single_memory_text(text) total_added += added messages.append(f"Imported {added} from {os.path.basename(fp)} (enc={used_enc})") except Exception as e: skipped.append(f"{fp}: {e}") save_memory(memory) summary = [f"Total entries added: {total_added}"] if messages: summary.append("Details:") summary.extend(messages) if skipped: summary.append("Skipped/failed:") summary.extend(skipped) return "\n".join(summary) # ---------- Build Gradio UI ---------- print("DEBUG: building Gradio UI", flush=True) available_choices, default_choice = safe_model_choices(prefer_default="small") # CSS tweaks: small buttons and nicer layout CSS = """ :root{ --accent:#4f46e5; --muted:#6b7280; --card:#ffffff; --bg:#f7f8fb; --text:#0f172a; --transcript-bg:#0f172a; --transcript-color:#e6eef8; } [data-theme="dark"] { --accent: #7c3aed; --muted: #9ca3af; --card: #0b1220; --bg: #071022; --text: #e6eef8; --transcript-bg: #071026; --transcript-color: #e6eef8; } body { background: var(--bg); color: var(--text); font-family: Inter, system-ui, -apple-system, "Segoe UI", Roboto, "Helvetica Neue", Arial; } .header { padding: 14px; border-radius: 10px; background: linear-gradient(90deg, rgba(79,70,229,0.08), rgba(99,102,241,0.02)); margin-bottom: 12px; display:flex;align-items:center;gap:12px; } .app-icon { width:50px;height:50px;border-radius:10px;background:linear-gradient(135deg,var(--accent),#06b6d4);display:flex;align-items:center;justify-content:center;color:white;font-weight:700;font-size:20px; } .card { background:var(--card); border-radius:10px; padding:12px; box-shadow: 0 6px 20px rgba(16,24,40,0.04); } .transcript-area { white-space:pre-wrap; font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, "Roboto Mono", monospace; background: var(--transcript-bg); color: var(--transcript-color); padding:12px; border-radius:8px; min-height:200px; } .small-note { color:var(--muted); font-size:12px;} .btn-row { display:flex; gap:8px; margin-top:8px; } .gr-button.small { padding:6px 8px !important; font-size:12px !important; } """ with gr.Blocks(title="Whisper Transcriber — Parallel + Memory", css=CSS) as demo: # set dark theme by default via injected JS gr.HTML(""" """) gr.Markdown("