| import os |
| import time |
| import shutil |
| from fastapi import FastAPI, File, UploadFile |
| from pydub import AudioSegment |
| from openai import OpenAI |
| import gradio as gr |
|
|
| |
| |
| |
| PASSWORD = os.getenv("APP_PASSWORD", "defaultpass") |
| MAX_SIZE = 25 * 1024 * 1024 |
| client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) |
|
|
| |
| |
| |
| MAX_FAILED_IN_WINDOW = 10 |
| WINDOW_SECONDS = 24 * 3600 |
| LOCK_DURATION_SECONDS = 24 * 3600 |
| SHORT_BURST_LIMIT = 5 |
| SHORT_BURST_SECONDS = 60 |
|
|
| attempts = {} |
| locked = {} |
|
|
| def _now(): return int(time.time()) |
|
|
| def prune_old_attempts(sid): |
| cutoff = _now() - WINDOW_SECONDS |
| if sid in attempts: |
| attempts[sid] = [t for t in attempts[sid] if t >= cutoff] |
| if not attempts[sid]: |
| del attempts[sid] |
|
|
| def check_lock(sid): |
| if sid in locked: |
| if _now() < locked[sid]: |
| remain = locked[sid] - _now() |
| return True, f"🔒 已被鎖定,請 {remain // 60} 分鐘後再試。" |
| else: |
| locked.pop(sid, None) |
| attempts.pop(sid, None) |
| prune_old_attempts(sid) |
| cnt = len(attempts.get(sid, [])) |
| if cnt >= MAX_FAILED_IN_WINDOW: |
| locked[sid] = _now() + LOCK_DURATION_SECONDS |
| return True, f"🔒 嘗試過多,已鎖定 24 小時。" |
| return False, "" |
|
|
| def record_failed_attempt(sid): |
| now = _now() |
| attempts.setdefault(sid, []).append(now) |
| prune_old_attempts(sid) |
| recent_cutoff = now - SHORT_BURST_SECONDS |
| recent = [t for t in attempts[sid] if t >= recent_cutoff] |
| if len(recent) >= SHORT_BURST_LIMIT: |
| locked[sid] = now + 300 |
| return len(attempts[sid]), "⚠️ 多次快速嘗試,暫時鎖定5分鐘。" |
| return len(attempts[sid]), "" |
|
|
| def clear_attempts(sid): |
| attempts.pop(sid, None) |
| locked.pop(sid, None) |
|
|
| |
| |
| |
| def split_audio_if_needed(path): |
| size = os.path.getsize(path) |
| if size <= MAX_SIZE: |
| return [path] |
| audio = AudioSegment.from_file(path) |
| num = int(size / MAX_SIZE) + 1 |
| chunk_ms = len(audio) / num |
| files = [] |
| for i in range(num): |
| start, end = int(i * chunk_ms), int((i + 1) * chunk_ms) |
| chunk = audio[start:end] |
| fn = f"chunk_{i+1}.wav" |
| chunk.export(fn, format="wav") |
| files.append(fn) |
| return files |
|
|
| def transcribe_core(path, model): |
| |
| if path and path.lower().endswith(".mp4"): |
| fixed_path = path[:-4] + ".m4a" |
| try: |
| shutil.copy(path, fixed_path) |
| path = fixed_path |
| print("🔧 已自動修正 mp4 → m4a") |
| except Exception as e: |
| print(f"⚠️ mp4→m4a 複製失敗:{e},改用原檔嘗試") |
|
|
| chunks = split_audio_if_needed(path) |
| txts = [] |
| for f in chunks: |
| with open(f, "rb") as af: |
| res = client.audio.transcriptions.create( |
| model=model, |
| file=af, |
| response_format="text" |
| ) |
| txts.append(res) |
| full = "\n".join(txts) |
| res = client.chat.completions.create( |
| model="gpt-4o-mini", |
| messages=[{"role":"user","content":f"請用繁體中文摘要以下內容:\n{full}"}], |
| temperature=0.4, |
| ) |
| summ = res.choices[0].message.content.strip() |
| return full, summ |
|
|
| |
| |
| |
| def _normalize_upload_path(file_input): |
| if not file_input: |
| return None |
| if isinstance(file_input, str): |
| return file_input |
| if isinstance(file_input, list) and file_input: |
| return _normalize_upload_path(file_input[0]) |
| path = getattr(file_input, "name", None) |
| if not path and isinstance(file_input, dict): |
| path = file_input.get("name") or file_input.get("path") |
| return path |
|
|
| def transcribe_with_password(session_id, password, file_input, model_choice): |
| password = password.strip().replace(" ", "").replace("\u200b", "") |
| locked_flag, msg = check_lock(session_id) |
| if locked_flag: |
| return msg, "", "" |
| if password != PASSWORD: |
| cnt, msg2 = record_failed_attempt(session_id) |
| return msg2 or f"密碼錯誤(第 {cnt} 次)", "", "" |
| path = _normalize_upload_path(file_input) |
| if not path or not os.path.exists(path): |
| return "找不到上傳檔案,請重新選擇。", "", "" |
| clear_attempts(session_id) |
| full, summ = transcribe_core(path, model_choice) |
| return "✅ 轉錄完成", full, summ |
|
|
| def ask_about_transcript(full_text, q): |
| if not full_text.strip(): |
| return "⚠️ 尚未有轉錄內容" |
| if not q.strip(): |
| return "請輸入問題" |
| prompt = f"以下是轉錄內容:\n{full_text}\n\n問題:{q}\n請用繁體中文回答。" |
| res = client.chat.completions.create( |
| model="gpt-4o-mini", |
| messages=[{"role":"user","content":prompt}], |
| temperature=0.6, |
| ) |
| return res.choices[0].message.content.strip() |
|
|
| |
| |
| |
| api = FastAPI() |
|
|
| @api.post("/api/transcribe") |
| async def api_transcribe(file: UploadFile = File(...)): |
| """供 iPhone 捷徑上傳音檔""" |
| temp_path = file.filename |
| with open(temp_path, "wb") as f: |
| f.write(await file.read()) |
| text, summary = transcribe_core(temp_path, "whisper-1") |
| os.remove(temp_path) |
| return {"text": text, "summary": summary} |
|
|
| |
| |
| |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: |
| gr.Markdown("## 🎧 語音轉錄與摘要工具(私人API勿轉傳|支援 iPhone LINE .mp4)") |
|
|
| session_state = gr.State(value=None) |
| with gr.Row(): |
| password_input = gr.Textbox( |
| label="輸入密碼", |
| placeholder="請輸入英文與數字(請切換成英文輸入法)", |
| type="password", |
| max_lines=1 |
| ) |
| model_choice = gr.Dropdown( |
| ["whisper-1", "gpt-4o-mini-transcribe"], |
| value="whisper-1", |
| label="選擇模型" |
| ) |
|
|
| file_input = gr.File( |
| label="上傳音訊 / LINE 語音檔(支援 .m4a, .aac, .wav, .mp4)", |
| file_count="single", |
| file_types=["audio", ".mp4", ".m4a", ".aac", ".wav"] |
| ) |
|
|
| transcribe_btn = gr.Button("開始轉錄與摘要 🚀") |
| status_box = gr.Textbox(label="狀態", interactive=False) |
| transcript_box = gr.Textbox(label="完整轉錄文字", lines=10) |
| copy_transcript = gr.Button("📋 複製轉錄文字") |
| summary_box = gr.Textbox(label="摘要結果", lines=10) |
| copy_summary = gr.Button("📋 複製摘要結果") |
|
|
| with gr.Accordion("💬 進一步問 AI", open=False): |
| user_q = gr.Textbox(label="輸入問題", lines=2) |
| ask_btn = gr.Button("詢問 AI 🤔") |
| ai_reply = gr.Textbox(label="AI 回覆", lines=6) |
| copy_reply = gr.Button("📋 複製 AI 回覆") |
|
|
| def init_session(): |
| import uuid |
| return str(uuid.uuid4()) |
| demo.load(init_session, None, session_state) |
|
|
| transcribe_btn.click( |
| transcribe_with_password, |
| [session_state, password_input, file_input, model_choice], |
| [status_box, transcript_box, summary_box], |
| ) |
| ask_btn.click(ask_about_transcript, [transcript_box, user_q], [ai_reply]) |
|
|
| copy_js = """ |
| async (text) => { |
| try { |
| await navigator.clipboard.writeText(text); |
| alert("✅ 已複製到剪貼簿!"); |
| } catch (e) { |
| alert("❌ 複製失敗:" + e); |
| } |
| } |
| """ |
| copy_transcript.click(fn=None, inputs=transcript_box, outputs=None, js=copy_js) |
| copy_summary.click(fn=None, inputs=summary_box, outputs=None, js=copy_js) |
| copy_reply.click(fn=None, inputs=ai_reply, outputs=None, js=copy_js) |
|
|
| |
| import threading |
| import uvicorn |
|
|
| def run_api(): |
| uvicorn.run(api, host="0.0.0.0", port=7861) |
|
|
| threading.Thread(target=run_api, daemon=True).start() |
| demo.launch(server_name="0.0.0.0", server_port=7860) |
|
|