Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import os | |
| import json | |
| import time | |
| import asyncio | |
| from datetime import datetime | |
| import numpy as np | |
| from datasets import load_dataset, Dataset as HFDataset | |
| from huggingface_hub import HfApi, upload_file, create_repo | |
| import torch | |
| from transformers import AutoTokenizer, AutoModel | |
| from openai import OpenAI | |
| from docx import Document | |
| import io | |
| from typing import List, Dict, Any, Optional, Tuple | |
| from dataclasses import dataclass, field | |
| from enum import Enum | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| import threading | |
| from tqdm import tqdm | |
| import traceback | |
| import queue | |
| # ========================================== | |
| # 環境變數設定 | |
| # ========================================== | |
| HF_TOKEN = os.environ.get("HF_TOKEN", "") | |
| OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "") | |
| ACCESS_PASSWORD = os.environ.get("ACCESS_PASSWORD", "netzero2025") # 密碼保護 | |
| # 資料集配置 | |
| DATASET_NAME = "s880453/interview-transcripts-vectorized" | |
| OUTPUT_DATASET = "s880453/interview-outputs" # 輸出資料集 | |
| EMBEDDING_MODEL = "intfloat/multilingual-e5-large" | |
| # 採訪者名單(需要排除) | |
| INTERVIEWERS = ["徐美苓", "許弘諺", "郭禹彤"] | |
| # 並行處理設定 | |
| MAX_WORKERS = 5 # 最多 5 個並行處理 | |
| MAX_RETRIES = 3 # 最多重試 3 次 | |
| RELEVANCE_THRESHOLD = 0.6 # 相關性門檻 | |
| # ========================================== | |
| # 結構化數據模型 | |
| # ========================================== | |
| class SearchResult: | |
| """搜尋結果結構""" | |
| text: str | |
| speaker: str | |
| turn_index: int | |
| file_id: str | |
| vector_score: float = 0.0 | |
| llm_score: float = 0.0 | |
| weighted_score: float = 0.0 | |
| relevance_reasoning: str = "" | |
| class ProcessingStatus: | |
| """處理狀態追蹤""" | |
| total_items: int = 0 | |
| completed_items: int = 0 | |
| failed_items: int = 0 | |
| current_item: str = "" | |
| start_time: float = 0.0 | |
| estimated_time: float = 0.0 | |
| errors: List[str] = field(default_factory=list) | |
| # ========================================== | |
| # 全域變數 | |
| # ========================================== | |
| dataset = None | |
| embeddings = None | |
| tokenizer = None | |
| model = None | |
| openai_client = None | |
| hf_api = None | |
| all_speakers = [] | |
| init_success = False | |
| processing_status = ProcessingStatus() | |
| status_lock = threading.Lock() | |
| # ========================================== | |
| # 初始化函數 | |
| # ========================================== | |
| def initialize_system(): | |
| """初始化系統""" | |
| global dataset, embeddings, tokenizer, model, openai_client, all_speakers, init_success, hf_api | |
| try: | |
| print("🔄 正在初始化系統...") | |
| # 初始化 OpenAI | |
| if OPENAI_API_KEY: | |
| openai_client = OpenAI(api_key=OPENAI_API_KEY) | |
| print("✅ OpenAI 客戶端初始化成功") | |
| else: | |
| print("⚠️ 未設定 OpenAI API Key") | |
| return False, "請設定 OPENAI_API_KEY" | |
| # 初始化 Hugging Face API | |
| hf_api = HfApi(token=HF_TOKEN) | |
| # 確保輸出資料集存在 | |
| try: | |
| create_repo( | |
| repo_id=OUTPUT_DATASET, | |
| repo_type="dataset", | |
| private=True, | |
| exist_ok=True, | |
| token=HF_TOKEN | |
| ) | |
| print(f"✅ 輸出資料集 {OUTPUT_DATASET} 已準備") | |
| except Exception as e: | |
| print(f"⚠️ 創建輸出資料集時出現問題: {e}") | |
| # 載入資料集 | |
| print(f"📊 正在載入資料集: {DATASET_NAME}") | |
| dataset = load_dataset(DATASET_NAME, split="train", token=HF_TOKEN) | |
| print(f"✅ 資料集載入成功,共 {len(dataset)} 筆資料") | |
| # 提取所有嵌入向量 | |
| embeddings = np.array([item['embedding'] for item in dataset]) | |
| print(f"✅ 嵌入向量提取成功,維度: {embeddings.shape}") | |
| # 載入嵌入模型 | |
| print(f"🤖 正在載入模型: {EMBEDDING_MODEL}") | |
| tokenizer = AutoTokenizer.from_pretrained(EMBEDDING_MODEL) | |
| model = AutoModel.from_pretrained(EMBEDDING_MODEL) | |
| model.eval() | |
| print("✅ 嵌入模型載入成功") | |
| # 提取所有發言人(排除採訪者) | |
| all_speakers_set = set() | |
| for item in dataset: | |
| speaker = item['speaker'] | |
| if speaker not in INTERVIEWERS: | |
| all_speakers_set.add(speaker) | |
| all_speakers = sorted(list(all_speakers_set)) | |
| print(f"✅ 發言人列表提取成功,共 {len(all_speakers)} 位受訪者") | |
| init_success = True | |
| return True, "系統初始化成功!" | |
| except Exception as e: | |
| error_msg = f"系統初始化失敗: {str(e)}" | |
| print(f"❌ {error_msg}") | |
| init_success = False | |
| return False, error_msg | |
| # ========================================== | |
| # HF Dataset 上傳函數 | |
| # ========================================== | |
| def upload_to_hf_dataset(file_path, file_type="guide", metadata=None): | |
| """上傳檔案到 HF Dataset""" | |
| try: | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| # 決定路徑 | |
| if file_type == "guide": | |
| repo_path = f"guide_outputs/{os.path.basename(file_path)}" | |
| elif file_type == "chat": | |
| repo_path = f"chat_exports/{os.path.basename(file_path)}" | |
| else: | |
| repo_path = f"others/{os.path.basename(file_path)}" | |
| # 上傳檔案 | |
| upload_file( | |
| path_or_fileobj=file_path, | |
| path_in_repo=repo_path, | |
| repo_id=OUTPUT_DATASET, | |
| repo_type="dataset", | |
| token=HF_TOKEN | |
| ) | |
| print(f"✅ 檔案已上傳到 HF Dataset: {repo_path}") | |
| # 更新 metadata | |
| if metadata: | |
| metadata_path = f"metadata/{timestamp}.json" | |
| metadata_content = json.dumps(metadata, ensure_ascii=False, indent=2) | |
| # 創建臨時檔案並上傳 | |
| with open(f"temp_metadata_{timestamp}.json", "w", encoding="utf-8") as f: | |
| f.write(metadata_content) | |
| upload_file( | |
| path_or_fileobj=f"temp_metadata_{timestamp}.json", | |
| path_in_repo=metadata_path, | |
| repo_id=OUTPUT_DATASET, | |
| repo_type="dataset", | |
| token=HF_TOKEN | |
| ) | |
| # 刪除臨時檔案 | |
| os.remove(f"temp_metadata_{timestamp}.json") | |
| return True, repo_path | |
| except Exception as e: | |
| print(f"❌ 上傳到 HF Dataset 失敗: {str(e)}") | |
| return False, str(e) | |
| # ========================================== | |
| # 向量搜尋函數 | |
| # ========================================== | |
| def average_pool(last_hidden_states, attention_mask): | |
| """Average pooling for embeddings""" | |
| last_hidden = last_hidden_states.masked_fill(~attention_mask[..., None].bool(), 0.0) | |
| return last_hidden.sum(dim=1) / attention_mask.sum(dim=1)[..., None] | |
| def generate_query_embedding(query_text): | |
| """生成查詢向量""" | |
| try: | |
| query_with_prefix = f"query: {query_text}" | |
| inputs = tokenizer( | |
| [query_with_prefix], | |
| max_length=512, | |
| padding=True, | |
| truncation=True, | |
| return_tensors='pt' | |
| ) | |
| with torch.no_grad(): | |
| outputs = model(**inputs) | |
| query_embedding = average_pool(outputs.last_hidden_state, inputs['attention_mask']) | |
| query_embedding = torch.nn.functional.normalize(query_embedding, p=2, dim=1) | |
| return query_embedding.cpu().numpy()[0] | |
| except Exception as e: | |
| print(f"生成查詢向量失敗: {str(e)}") | |
| return None | |
| # ========================================== | |
| # 冠軍級智慧路由與重排序系統 | |
| # ========================================== | |
| def build_reranking_prompt(query: str, search_results: List[Dict]) -> str: | |
| """構建重排序的結構化 Prompt""" | |
| instruction = """你是一個訪談內容檢索排序系統。 | |
| 你將收到一個查詢和幾個檢索到的訪談片段。你的任務是根據片段與查詢的相關性來評估和評分每個片段。 | |
| 評分指南: | |
| - 1.0 = 完全相關 | |
| - 0.7-0.9 = 高度相關 | |
| - 0.5-0.7 = 中等相關 | |
| - 0.3-0.5 = 輕微相關 | |
| - 0-0.3 = 幾乎無關 | |
| 請為每個搜尋結果提供JSON格式的評分。""" | |
| results_text = f"查詢:{query}\n\n檢索結果:\n" | |
| for i, result in enumerate(search_results): | |
| results_text += f"\n結果 {i}:\n" | |
| results_text += f"發言人:{result['speaker']}\n" | |
| results_text += f"內容:{result['text'][:500]}\n" | |
| results_text += f"Turn Index:{result['turn_index']}\n" | |
| return f"{instruction}\n\n{results_text}" | |
| def intelligent_routing_and_reranking(query: str, selected_speakers: List[str], top_k: int = 30) -> List[SearchResult]: | |
| """智慧路由與重排序 - 冠軍策略實現""" | |
| if not dataset or not init_success: | |
| return [] | |
| try: | |
| # Step 1: 向量檢索 | |
| query_vector = generate_query_embedding(query) | |
| if query_vector is None: | |
| return [] | |
| # Step 2: 計算相似度並過濾 | |
| initial_results = [] | |
| for i, item in enumerate(dataset): | |
| # 排除採訪者 | |
| if item['speaker'] in INTERVIEWERS: | |
| continue | |
| # 嚴格的受訪者過濾 | |
| if selected_speakers and len(selected_speakers) > 0: | |
| if item['speaker'] not in selected_speakers: | |
| continue | |
| # 計算向量相似度 | |
| item_vector = np.array(item['embedding']) | |
| vector_score = float(np.dot(query_vector, item_vector)) | |
| initial_results.append({ | |
| 'index': i, | |
| 'text': item.get('text', ''), | |
| 'speaker': item.get('speaker', ''), | |
| 'turn_index': item.get('turn_index', 0), | |
| 'file_id': item.get('file_id', ''), | |
| 'vector_score': vector_score | |
| }) | |
| # 排序並取 Top-K | |
| initial_results.sort(key=lambda x: x['vector_score'], reverse=True) | |
| candidates = initial_results[:top_k] | |
| if not candidates: | |
| return [] | |
| # Step 3: LLM 重排序(只對前10個) | |
| try: | |
| rerank_prompt = build_reranking_prompt(query, candidates[:10]) | |
| response = openai_client.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=[ | |
| {"role": "system", "content": "你是一個精準的訪談內容排序系統。"}, | |
| {"role": "user", "content": rerank_prompt} | |
| ], | |
| temperature=0.1, | |
| response_format={"type": "json_object"} | |
| ) | |
| rerank_results = json.loads(response.choices[0].message.content) | |
| # Step 4: 加權計分 | |
| final_results = [] | |
| for i, candidate in enumerate(candidates[:10]): | |
| llm_score = 0.5 | |
| if 'results' in rerank_results: | |
| for r in rerank_results['results']: | |
| if r.get('index') == i: | |
| llm_score = r.get('relevance_score', 0.5) | |
| break | |
| weighted_score = 0.3 * candidate['vector_score'] + 0.7 * llm_score | |
| final_results.append(SearchResult( | |
| text=candidate['text'], | |
| speaker=candidate['speaker'], | |
| turn_index=candidate['turn_index'], | |
| file_id=candidate['file_id'], | |
| vector_score=candidate['vector_score'], | |
| llm_score=llm_score, | |
| weighted_score=weighted_score | |
| )) | |
| # 加入剩餘的候選 | |
| for candidate in candidates[10:]: | |
| final_results.append(SearchResult( | |
| text=candidate['text'], | |
| speaker=candidate['speaker'], | |
| turn_index=candidate['turn_index'], | |
| file_id=candidate['file_id'], | |
| vector_score=candidate['vector_score'], | |
| llm_score=0.0, | |
| weighted_score=candidate['vector_score'] * 0.3 | |
| )) | |
| # 按加權分數排序 | |
| final_results.sort(key=lambda x: x.weighted_score, reverse=True) | |
| # 只返回高於門檻的結果 | |
| filtered_results = [r for r in final_results if r.weighted_score >= RELEVANCE_THRESHOLD] | |
| return filtered_results | |
| except Exception as e: | |
| print(f"LLM 重排序失敗: {str(e)}") | |
| # 降級處理 | |
| return [SearchResult( | |
| text=c['text'], | |
| speaker=c['speaker'], | |
| turn_index=c['turn_index'], | |
| file_id=c['file_id'], | |
| vector_score=c['vector_score'], | |
| llm_score=0.0, | |
| weighted_score=c['vector_score'] | |
| ) for c in candidates if c['vector_score'] >= RELEVANCE_THRESHOLD] | |
| except Exception as e: | |
| print(f"智慧路由失敗: {str(e)}") | |
| return [] | |
| # ========================================== | |
| # 並行處理與錯誤重試 | |
| # ========================================== | |
| def call_gpt_with_retry(prompt, max_retries=MAX_RETRIES): | |
| """調用 GPT 並實現錯誤重試機制""" | |
| for attempt in range(max_retries): | |
| try: | |
| response = openai_client.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=[ | |
| {"role": "system", "content": "你是訪談分析專家。基於提供的內容準確回答。"}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| temperature=0.1, | |
| timeout=30 # 30秒超時 | |
| ) | |
| return response.choices[0].message.content | |
| except Exception as e: | |
| if attempt < max_retries - 1: | |
| wait_time = 2 ** attempt # 指數退避 | |
| print(f"API 調用失敗,{wait_time}秒後重試... (嘗試 {attempt + 1}/{max_retries})") | |
| time.sleep(wait_time) | |
| else: | |
| print(f"API 調用最終失敗: {str(e)}") | |
| return f"處理失敗: {str(e)}" | |
| def process_single_question(question, selected_speakers, question_index=0, total_questions=1): | |
| """處理單個問題(用於並行處理)""" | |
| try: | |
| # 更新狀態 | |
| with status_lock: | |
| processing_status.current_item = f"問題 {question_index + 1}/{total_questions}" | |
| # 使用智慧路由與重排序檢索 | |
| search_results = intelligent_routing_and_reranking(question, selected_speakers, top_k=30) | |
| # 過濾結果 | |
| filtered_results = [r for r in search_results if r.weighted_score >= RELEVANCE_THRESHOLD] | |
| if not filtered_results: | |
| return { | |
| 'question': question, | |
| 'answer': "未找到相關內容", | |
| 'raw_contexts': [], | |
| 'success': False | |
| } | |
| # 構建上下文(使用所有符合條件的結果) | |
| context = "" | |
| raw_contexts = [] | |
| for j, result in enumerate(filtered_results): | |
| context += f"[片段 {j+1}]\n" | |
| context += f"發言人:{result.speaker}\n" | |
| context += f"內容:{result.text}\n" | |
| context += f"相關性:{result.weighted_score:.3f}\n\n" | |
| raw_contexts.append({ | |
| 'speaker': result.speaker, | |
| 'text': result.text, | |
| 'turn_index': result.turn_index, | |
| 'score': result.weighted_score | |
| }) | |
| # 調用 GPT(含重試機制) | |
| prompt = f"""基於以下訪談內容回答訪綱問題: | |
| {context} | |
| 問題:{question} | |
| 請提供: | |
| 1. 主要回答 | |
| 2. 不同受訪者的觀點(如果有) | |
| 3. 具體引述""" | |
| answer = call_gpt_with_retry(prompt) | |
| return { | |
| 'question': question, | |
| 'answer': answer, | |
| 'raw_contexts': raw_contexts, | |
| 'success': True | |
| } | |
| except Exception as e: | |
| print(f"處理問題失敗: {str(e)}") | |
| return { | |
| 'question': question, | |
| 'answer': f"處理失敗: {str(e)}", | |
| 'raw_contexts': [], | |
| 'success': False | |
| } | |
| def parallel_process_questions(questions, selected_speakers, progress_callback=None): | |
| """並行處理多個問題""" | |
| results = [] | |
| total = len(questions) | |
| with status_lock: | |
| processing_status.total_items = total | |
| processing_status.completed_items = 0 | |
| processing_status.failed_items = 0 | |
| processing_status.start_time = time.time() | |
| with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: | |
| # 提交所有任務 | |
| future_to_question = { | |
| executor.submit( | |
| process_single_question, | |
| question, | |
| selected_speakers, | |
| i, | |
| total | |
| ): (i, question) | |
| for i, question in enumerate(questions) | |
| } | |
| # 處理完成的任務 | |
| for future in as_completed(future_to_question): | |
| i, question = future_to_question[future] | |
| try: | |
| result = future.result(timeout=60) # 60秒超時 | |
| results.append((i, result)) | |
| with status_lock: | |
| if result['success']: | |
| processing_status.completed_items += 1 | |
| else: | |
| processing_status.failed_items += 1 | |
| # 計算預估時間 | |
| elapsed = time.time() - processing_status.start_time | |
| if processing_status.completed_items > 0: | |
| avg_time = elapsed / processing_status.completed_items | |
| remaining = total - processing_status.completed_items | |
| processing_status.estimated_time = avg_time * remaining | |
| # 進度回調 | |
| if progress_callback: | |
| progress = (processing_status.completed_items + processing_status.failed_items) / total | |
| progress_callback(progress, f"已處理 {processing_status.completed_items}/{total} 個問題") | |
| except Exception as e: | |
| print(f"任務執行失敗: {str(e)}") | |
| results.append((i, { | |
| 'question': question, | |
| 'answer': f"處理失敗: {str(e)}", | |
| 'raw_contexts': [], | |
| 'success': False | |
| })) | |
| with status_lock: | |
| processing_status.failed_items += 1 | |
| processing_status.errors.append(str(e)) | |
| # 按原始順序排序結果 | |
| results.sort(key=lambda x: x[0]) | |
| return [r[1] for r in results] | |
| # ========================================== | |
| # RAG 對話函數 | |
| # ========================================== | |
| def rag_chat(question, selected_speakers, history): | |
| """RAG 對話處理(使用 messages 格式)""" | |
| if not init_success: | |
| return history + [{"role": "user", "content": question}, {"role": "assistant", "content": "系統尚未初始化,請稍後再試。"}] | |
| try: | |
| # 執行智慧路由與重排序 | |
| search_results = intelligent_routing_and_reranking(question, selected_speakers, top_k=30) | |
| # 過濾結果(使用所有 >= 0.6 的結果) | |
| filtered_results = [r for r in search_results if r.weighted_score >= RELEVANCE_THRESHOLD] | |
| if not filtered_results: | |
| return history + [ | |
| {"role": "user", "content": question}, | |
| {"role": "assistant", "content": "未找到相關內容,請嘗試其他問題。"} | |
| ] | |
| # 構建上下文(使用所有符合條件的結果) | |
| context = "相關訪談內容:\n\n" | |
| raw_contexts = [] | |
| for i, result in enumerate(filtered_results): | |
| context += f"[片段 {i+1}]\n" | |
| context += f"發言人:{result.speaker}\n" | |
| context += f"內容:{result.text}\n" | |
| context += f"相關性分數:{result.weighted_score:.3f}\n\n" | |
| if result.text: | |
| raw_context_text = f"[{result.speaker} - Turn {result.turn_index}]: {result.text}" | |
| raw_contexts.append(raw_context_text) | |
| if not raw_contexts: | |
| raw_contexts = ["未能提取原始內容"] | |
| # 構建 GPT prompt | |
| prompt = f"""基於以下訪談內容回答問題。請提供準確、完整的回答。 | |
| {context} | |
| 問題:{question}""" | |
| # 調用 GPT(含重試機制) | |
| answer = call_gpt_with_retry(prompt) | |
| # 添加原始 RAG 內容(完整顯示所有符合條件的) | |
| answer_with_sources = f"{answer}\n\n---\n📚 **原始 RAG 來源(共 {len(raw_contexts)} 個):**\n" | |
| for i, raw_context in enumerate(raw_contexts): | |
| if raw_context and raw_context != "未能提取原始內容": | |
| answer_with_sources += f"\n**來源 {i+1}:**\n{raw_context}\n" | |
| else: | |
| answer_with_sources += f"\n**來源 {i+1}:** 無內容\n" | |
| return history + [ | |
| {"role": "user", "content": question}, | |
| {"role": "assistant", "content": answer_with_sources} | |
| ] | |
| except Exception as e: | |
| return history + [ | |
| {"role": "user", "content": question}, | |
| {"role": "assistant", "content": f"處理過程中發生錯誤:{str(e)}"} | |
| ] | |
| def export_chat_to_word(chat_history): | |
| """將對話匯出為 Word 檔案(支援 messages 格式)""" | |
| try: | |
| doc = Document() | |
| doc.add_heading('AI 對話記錄', 0) | |
| doc.add_paragraph(f'匯出時間:{datetime.now().strftime("%Y-%m-%d %H:%M:%S")}') | |
| doc.add_paragraph('') | |
| # 處理 messages 格式的對話歷史 | |
| conversation_pairs = [] | |
| current_question = None | |
| for msg in chat_history: | |
| if msg["role"] == "user": | |
| current_question = msg["content"] | |
| elif msg["role"] == "assistant" and current_question: | |
| conversation_pairs.append((current_question, msg["content"])) | |
| current_question = None | |
| for i, (question, answer) in enumerate(conversation_pairs, 1): | |
| doc.add_heading(f'對話 {i}', level=1) | |
| doc.add_heading('問題:', level=2) | |
| doc.add_paragraph(question) | |
| doc.add_heading('回答:', level=2) | |
| for line in answer.split('\n'): | |
| if line.strip(): | |
| doc.add_paragraph(line) | |
| doc.add_page_break() | |
| # 保存到記憶體 | |
| output_buffer = io.BytesIO() | |
| doc.save(output_buffer) | |
| output_buffer.seek(0) | |
| # 保存到檔案 | |
| output_filename = f"chat_export_{datetime.now().strftime('%Y%m%d_%H%M%S')}.docx" | |
| with open(output_filename, 'wb') as f: | |
| f.write(output_buffer.getvalue()) | |
| # 上傳到 HF Dataset | |
| upload_to_hf_dataset(output_filename, file_type="chat", metadata={ | |
| 'export_time': datetime.now().isoformat(), | |
| 'total_conversations': len(conversation_pairs) | |
| }) | |
| return output_filename, "對話已匯出並上傳到資料集" | |
| except Exception as e: | |
| return None, f"匯出失敗:{str(e)}" | |
| # ========================================== | |
| # 訪綱填答函數 | |
| # ========================================== | |
| def parse_word_document(file_path): | |
| """解析 Word 文檔中的問題""" | |
| try: | |
| doc = Document(file_path) | |
| questions = [] | |
| for para in doc.paragraphs: | |
| text = para.text.strip() | |
| if text and ( | |
| any(char in text for char in ['?', '?']) or | |
| (text[0].isdigit() if text else False) or | |
| text.startswith(('Q', '問')) | |
| ): | |
| questions.append(text) | |
| return questions | |
| except Exception as e: | |
| print(f"解析文檔失敗: {str(e)}") | |
| return [] | |
| def extract_speaker_from_filename(filename, available_speakers): | |
| """從檔案名稱中提取受訪者名稱""" | |
| import os | |
| base_name = os.path.basename(filename) | |
| base_name_no_ext = os.path.splitext(base_name)[0] | |
| for speaker in available_speakers: | |
| if speaker in base_name_no_ext: | |
| return [speaker] | |
| return None | |
| def single_interviewee_guide_filling(file_path, selected_speakers, file_name=None, progress_callback=None): | |
| """單一受訪者訪綱填答 - 使用並行處理""" | |
| if not init_success: | |
| return None, "系統尚未初始化" | |
| try: | |
| # 從檔名檢測受訪者 | |
| if file_name: | |
| detected_speakers = extract_speaker_from_filename(file_name, all_speakers) | |
| if detected_speakers: | |
| selected_speakers = detected_speakers | |
| print(f"從檔名 '{file_name}' 中檢測到受訪者: {detected_speakers[0]}") | |
| # 解析 Word 訪綱 | |
| questions = parse_word_document(file_path) | |
| if not questions: | |
| return None, "未能從文檔中提取問題" | |
| # 進度更新 | |
| if progress_callback: | |
| progress_callback(0.1, f"開始處理 {len(questions)} 個問題...") | |
| # 並行處理所有問題 | |
| print(f"開始並行處理 {len(questions)} 個問題") | |
| results = parallel_process_questions(questions, selected_speakers, progress_callback) | |
| # 創建新的 Word 文檔 | |
| output_doc = Document() | |
| output_doc.add_heading('訪談訪綱 - AI 智慧填答', 0) | |
| output_doc.add_paragraph(f'處理時間:{datetime.now().strftime("%Y-%m-%d %H:%M:%S")}') | |
| output_doc.add_paragraph(f'原始檔案:{file_name if file_name else "未知"}') | |
| output_doc.add_paragraph(f'選擇的受訪者:{", ".join(selected_speakers) if selected_speakers else "全部"}') | |
| output_doc.add_paragraph(f'使用技術:並行處理 ({MAX_WORKERS} 組) + 冠軍級 RAG') | |
| output_doc.add_paragraph(f'處理統計:成功 {processing_status.completed_items}/{processing_status.total_items},失敗 {processing_status.failed_items}') | |
| output_doc.add_paragraph('') | |
| # 添加處理結果 | |
| for i, result in enumerate(results, 1): | |
| output_doc.add_heading(f'問題 {i}', level=1) | |
| output_doc.add_paragraph(result['question']) | |
| if result['success'] and result['raw_contexts']: | |
| # AI 回答 | |
| output_doc.add_heading('AI 分析回答:', level=2) | |
| for line in result['answer'].split('\n'): | |
| if line.strip(): | |
| output_doc.add_paragraph(line) | |
| # 原始 RAG 內容(顯示所有符合條件的) | |
| output_doc.add_heading(f'原始 RAG 向量檢索內容(共 {len(result["raw_contexts"])} 個):', level=2) | |
| for j, raw in enumerate(result['raw_contexts']): | |
| p = output_doc.add_paragraph() | |
| p.add_run(f"{j+1}. [{raw['speaker']} - Turn {raw['turn_index']}] ").bold = True | |
| p.add_run(f"(相關性: {raw['score']:.3f})\n") | |
| p.add_run(raw['text']) # 完整顯示 | |
| else: | |
| output_doc.add_paragraph(result['answer']) | |
| output_doc.add_page_break() | |
| # 保存文檔 | |
| output_buffer = io.BytesIO() | |
| output_doc.save(output_buffer) | |
| output_buffer.seek(0) | |
| # 生成輸出檔名 | |
| speaker_suffix = f"_{selected_speakers[0]}" if len(selected_speakers) == 1 else "_多位" | |
| output_filename = f"filled_guide{speaker_suffix}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.docx" | |
| with open(output_filename, 'wb') as f: | |
| f.write(output_buffer.getvalue()) | |
| # 自動上傳到 HF Dataset | |
| if progress_callback: | |
| progress_callback(0.9, "正在上傳到資料集...") | |
| success, path = upload_to_hf_dataset(output_filename, file_type="guide", metadata={ | |
| 'original_file': file_name, | |
| 'speakers': selected_speakers, | |
| 'total_questions': len(questions), | |
| 'successful_answers': processing_status.completed_items, | |
| 'processing_time': time.time() - processing_status.start_time | |
| }) | |
| if success: | |
| message = f"訪綱填答完成!已上傳到:{path}" | |
| else: | |
| message = f"訪綱填答完成!本地檔案:{output_filename}" | |
| if progress_callback: | |
| progress_callback(1.0, message) | |
| return output_filename, message | |
| except Exception as e: | |
| return None, f"處理失敗:{str(e)}" | |
| def batch_process_guides(files, default_speakers, progress_callback=None): | |
| """批量處理多個訪綱檔案(並行處理)""" | |
| if not init_success: | |
| return [], "系統尚未初始化" | |
| results = [] | |
| processed_files = [] | |
| total_files = len(files) | |
| try: | |
| print(f"開始批量處理 {total_files} 個檔案") | |
| # 重置狀態 | |
| with status_lock: | |
| processing_status.total_items = total_files | |
| processing_status.completed_items = 0 | |
| processing_status.failed_items = 0 | |
| processing_status.errors.clear() | |
| def process_file(file_info): | |
| """處理單個檔案的函數""" | |
| idx, file = file_info | |
| try: | |
| file_name = file.name if hasattr(file, 'name') else str(file) | |
| print(f"\n處理檔案 {idx+1}/{total_files}: {file_name}") | |
| # 從檔名檢測受訪者 | |
| detected_speakers = extract_speaker_from_filename(file_name, all_speakers) | |
| if detected_speakers: | |
| speakers_to_use = detected_speakers | |
| status_msg = f"檔案 {idx+1}: 檢測到受訪者 {detected_speakers[0]}" | |
| else: | |
| speakers_to_use = default_speakers | |
| status_msg = f"檔案 {idx+1}: 使用預設受訪者" | |
| print(status_msg) | |
| # 處理單個檔案(含並行處理問題) | |
| output_file, process_status = single_interviewee_guide_filling( | |
| file.name if hasattr(file, 'name') else file, | |
| speakers_to_use, | |
| file_name, | |
| None # 批量處理時不使用個別進度回調 | |
| ) | |
| if output_file: | |
| return { | |
| 'success': True, | |
| 'file_name': file_name, | |
| 'output_file': output_file, | |
| 'status': process_status | |
| } | |
| else: | |
| return { | |
| 'success': False, | |
| 'file_name': file_name, | |
| 'error': process_status | |
| } | |
| except Exception as e: | |
| return { | |
| 'success': False, | |
| 'file_name': file_name if 'file_name' in locals() else f"檔案 {idx+1}", | |
| 'error': str(e) | |
| } | |
| # 使用執行緒池並行處理多個檔案 | |
| with ThreadPoolExecutor(max_workers=min(MAX_WORKERS, total_files)) as executor: | |
| # 提交所有檔案處理任務 | |
| future_to_file = { | |
| executor.submit(process_file, (i, file)): i | |
| for i, file in enumerate(files) | |
| } | |
| # 處理完成的任務 | |
| for future in as_completed(future_to_file): | |
| file_idx = future_to_file[future] | |
| try: | |
| result = future.result(timeout=300) # 5分鐘超時 | |
| if result['success']: | |
| processed_files.append(result['output_file']) | |
| results.append(f"✅ {result['file_name']} -> {result['output_file']}") | |
| with status_lock: | |
| processing_status.completed_items += 1 | |
| else: | |
| results.append(f"❌ {result['file_name']}: {result['error']}") | |
| with status_lock: | |
| processing_status.failed_items += 1 | |
| processing_status.errors.append(result['error']) | |
| # 更新進度 | |
| if progress_callback: | |
| progress = (processing_status.completed_items + processing_status.failed_items) / total_files | |
| progress_callback( | |
| progress, | |
| f"已處理 {processing_status.completed_items + processing_status.failed_items}/{total_files} 個檔案" | |
| ) | |
| except Exception as e: | |
| error_msg = f"檔案 {file_idx+1} 處理超時或失敗: {str(e)}" | |
| print(error_msg) | |
| results.append(error_msg) | |
| with status_lock: | |
| processing_status.failed_items += 1 | |
| processing_status.errors.append(str(e)) | |
| # 彙總結果 | |
| summary = f"\n處理完成!\n成功: {processing_status.completed_items}/{total_files} 個檔案\n" | |
| if processing_status.failed_items > 0: | |
| summary += f"失敗: {processing_status.failed_items} 個檔案\n" | |
| summary += "\n詳細結果:\n" + "\n".join(results) | |
| return processed_files, summary | |
| except Exception as e: | |
| return [], f"批量處理失敗:{str(e)}" | |
| # ========================================== | |
| # Gradio 介面 | |
| # ========================================== | |
| def create_interface(): | |
| """創建 Gradio 介面""" | |
| with gr.Blocks( | |
| title="訪談轉錄稿 RAG 系統", | |
| theme=gr.themes.Soft(), | |
| css=""" | |
| .gradio-container { | |
| font-family: 'Microsoft JhengHei', sans-serif; | |
| } | |
| .progress-bar { | |
| background-color: #4CAF50; | |
| } | |
| """ | |
| ) as app: | |
| # 標題 | |
| gr.Markdown(""" | |
| # 🎙️ 訪談轉錄稿智慧分析系統 v2.0 | |
| **技術架構:** Multilingual-E5-Large + GPT-4o-mini + 冠軍級 RAG + 並行處理 | |
| **核心功能:** | |
| - 🔍 智慧語義搜尋(顯示所有 ≥0.6 分結果) | |
| - 💬 AI 對話(含匯出功能) | |
| - 📝 訪綱自動填答(5組並行處理) | |
| - 📊 批量處理(自動上傳 HF Dataset) | |
| - ⚡ 錯誤重試機制 + 進度追蹤 | |
| """) | |
| # 系統狀態 | |
| with gr.Row(): | |
| status_text = gr.Textbox( | |
| label="系統狀態", | |
| value="初始化中...", | |
| interactive=False | |
| ) | |
| init_btn = gr.Button("重新初始化", scale=0) | |
| # 主要功能區 | |
| with gr.Tabs(): | |
| # Tab 1: AI 對話 | |
| with gr.Tab("💬 AI 對話"): | |
| gr.Markdown("### 智慧問答系統(顯示所有相關 RAG)") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### 選擇受訪者") | |
| speaker_selector = gr.CheckboxGroup( | |
| choices=[], | |
| label="受訪者列表", | |
| info="不選擇則搜尋全部內容" | |
| ) | |
| with gr.Column(scale=3): | |
| chatbot = gr.Chatbot( | |
| height=500, | |
| label="對話記錄", | |
| show_label=True, | |
| type="messages" # 使用新的 messages 格式 | |
| ) | |
| with gr.Row(): | |
| msg = gr.Textbox( | |
| label="輸入問題", | |
| placeholder="請輸入您想詢問的問題...", | |
| scale=4 | |
| ) | |
| send_btn = gr.Button("發送", variant="primary", scale=1) | |
| with gr.Row(): | |
| clear_btn = gr.Button("清除對話") | |
| export_btn = gr.Button("📥 匯出對話為 Word", variant="secondary") | |
| export_status = gr.Textbox( | |
| label="匯出狀態", | |
| visible=False | |
| ) | |
| export_file = gr.File( | |
| label="下載匯出檔案", | |
| visible=False | |
| ) | |
| # Tab 2: 訪綱填答 | |
| with gr.Tab("📝 訪綱填答"): | |
| gr.Markdown(""" | |
| ### 智慧訪綱填答系統 | |
| **特色功能:** | |
| - 🚀 5組並行處理(速度提升5倍) | |
| - 📊 顯示所有 ≥0.6 分的 RAG 結果 | |
| - 🔄 錯誤自動重試(最多3次) | |
| - 📤 自動上傳到 HF Dataset | |
| - 🏷️ 檔名自動識別受訪者 | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| guide_speakers = gr.CheckboxGroup( | |
| choices=[], | |
| label="預設受訪者(當檔名未指定時使用)", | |
| info="檔名中有受訪者名稱時會自動覆蓋此選擇" | |
| ) | |
| # 單檔上傳 | |
| with gr.Accordion("單檔處理", open=False): | |
| single_file_input = gr.File( | |
| label="上傳單個訪綱 (Word 格式)", | |
| file_types=[".docx"] | |
| ) | |
| single_process_btn = gr.Button("處理單檔", variant="secondary") | |
| # 批量上傳 | |
| batch_file_input = gr.File( | |
| label="批量上傳訪綱(最多15個 Word 檔案)", | |
| file_types=[".docx"], | |
| file_count="multiple" | |
| ) | |
| batch_process_btn = gr.Button("🚀 批量並行處理", variant="primary", size="lg") | |
| with gr.Column(): | |
| # 處理狀態 | |
| process_status = gr.Textbox( | |
| label="處理狀態", | |
| interactive=False, | |
| lines=10 | |
| ) | |
| # 處理統計 | |
| with gr.Row(): | |
| stat_total = gr.Number(label="總計", value=0) | |
| stat_success = gr.Number(label="成功", value=0) | |
| stat_failed = gr.Number(label="失敗", value=0) | |
| stat_time = gr.Number(label="預估時間(秒)", value=0) | |
| # 下載區 | |
| single_download_file = gr.File( | |
| label="下載單檔結果", | |
| visible=False | |
| ) | |
| batch_download_files = gr.File( | |
| label="下載所有結果", | |
| visible=False, | |
| file_count="multiple" | |
| ) | |
| # 技術細節 | |
| with gr.Accordion("🔧 技術細節", open=False): | |
| gr.Markdown(""" | |
| ### 系統特色 | |
| **1. 並行處理架構** | |
| - 5組同時處理不同問題 | |
| - ThreadPoolExecutor 管理 | |
| - 自動負載平衡 | |
| **2. RAG 結果顯示** | |
| - 所有 weighted_score ≥ 0.6 的結果都納入分析 | |
| - 完整顯示原始內容(不截斷) | |
| - 顯示相關性分數 | |
| **3. 錯誤處理** | |
| - API 調用失敗自動重試(最多3次) | |
| - 指數退避策略(2^n 秒) | |
| - 失敗隔離(不影響其他任務) | |
| **4. HF Dataset 自動上傳** | |
| - 位置:s880453/interview-outputs | |
| - 自動分類:guide_outputs / chat_exports | |
| - 包含處理 metadata | |
| **5. 進度追蹤** | |
| - 即時顯示處理進度 | |
| - 預估剩餘時間 | |
| - 詳細錯誤日誌 | |
| """) | |
| # 事件處理 | |
| def send_message(message, speakers, history): | |
| if not message: | |
| return "", history | |
| new_history = rag_chat(message, speakers, history) | |
| return "", new_history | |
| def clear_chat(): | |
| return [] | |
| def export_chat(history): | |
| if not history: | |
| return gr.Textbox(value="沒有對話可匯出", visible=True), gr.File(visible=False) | |
| file_path, status = export_chat_to_word(history) | |
| if file_path: | |
| return ( | |
| gr.Textbox(value=status, visible=True), | |
| gr.File(value=file_path, visible=True) | |
| ) | |
| else: | |
| return gr.Textbox(value=status, visible=True), gr.File(visible=False) | |
| def process_single_guide_with_progress(file, speakers): | |
| """處理單個檔案(含進度顯示)""" | |
| if not file: | |
| return ( | |
| "請上傳文件", | |
| gr.File(visible=False), | |
| 0, 0, 0, 0 | |
| ) | |
| def progress_update(progress, message): | |
| return message | |
| result_file, status = single_interviewee_guide_filling( | |
| file.name, | |
| speakers, | |
| file.name, | |
| progress_update | |
| ) | |
| if result_file: | |
| return ( | |
| status, | |
| gr.File(value=result_file, visible=True), | |
| processing_status.total_items, | |
| processing_status.completed_items, | |
| processing_status.failed_items, | |
| processing_status.estimated_time | |
| ) | |
| else: | |
| return ( | |
| status, | |
| gr.File(visible=False), | |
| processing_status.total_items, | |
| processing_status.completed_items, | |
| processing_status.failed_items, | |
| 0 | |
| ) | |
| def process_batch_guides_with_progress(files, speakers): | |
| """批量處理(含進度顯示)""" | |
| if not files: | |
| return ( | |
| "請上傳至少一個檔案", | |
| gr.File(visible=False), | |
| 0, 0, 0, 0 | |
| ) | |
| if len(files) > 15: | |
| return ( | |
| f"檔案數量超過限制(最多15個),您上傳了 {len(files)} 個", | |
| gr.File(visible=False), | |
| 0, 0, 0, 0 | |
| ) | |
| def progress_update(progress, message): | |
| return message | |
| # 批量處理 | |
| processed_files, status = batch_process_guides(files, speakers, progress_update) | |
| if processed_files: | |
| return ( | |
| status, | |
| gr.File(value=processed_files, visible=True, file_count="multiple"), | |
| processing_status.total_items, | |
| processing_status.completed_items, | |
| processing_status.failed_items, | |
| processing_status.estimated_time | |
| ) | |
| else: | |
| return ( | |
| status, | |
| gr.File(visible=False), | |
| processing_status.total_items, | |
| processing_status.completed_items, | |
| processing_status.failed_items, | |
| 0 | |
| ) | |
| def update_status(): | |
| success, message = initialize_system() | |
| if success: | |
| return ( | |
| message, | |
| gr.CheckboxGroup(choices=all_speakers), | |
| gr.CheckboxGroup(choices=all_speakers) | |
| ) | |
| return message, gr.CheckboxGroup(), gr.CheckboxGroup() | |
| # 綁定事件 | |
| send_btn.click( | |
| send_message, | |
| inputs=[msg, speaker_selector, chatbot], | |
| outputs=[msg, chatbot] | |
| ) | |
| msg.submit( | |
| send_message, | |
| inputs=[msg, speaker_selector, chatbot], | |
| outputs=[msg, chatbot] | |
| ) | |
| clear_btn.click(clear_chat, outputs=[chatbot]) | |
| export_btn.click( | |
| export_chat, | |
| inputs=[chatbot], | |
| outputs=[export_status, export_file] | |
| ) | |
| # 單檔處理 | |
| single_process_btn.click( | |
| process_single_guide_with_progress, | |
| inputs=[single_file_input, guide_speakers], | |
| outputs=[ | |
| process_status, | |
| single_download_file, | |
| stat_total, | |
| stat_success, | |
| stat_failed, | |
| stat_time | |
| ] | |
| ) | |
| # 批量處理 | |
| batch_process_btn.click( | |
| process_batch_guides_with_progress, | |
| inputs=[batch_file_input, guide_speakers], | |
| outputs=[ | |
| process_status, | |
| batch_download_files, | |
| stat_total, | |
| stat_success, | |
| stat_failed, | |
| stat_time | |
| ] | |
| ) | |
| init_btn.click( | |
| update_status, | |
| outputs=[status_text, speaker_selector, guide_speakers] | |
| ) | |
| # 初始化系統 | |
| app.load( | |
| update_status, | |
| outputs=[status_text, speaker_selector, guide_speakers] | |
| ) | |
| # 啟用 Queue(Gradio 5.42.0 的新語法) | |
| app.queue(max_size=50) | |
| return app | |
| # ========================================== | |
| # 主程式入口 | |
| # ========================================== | |
| if __name__ == "__main__": | |
| # 創建並啟動應用(加入密碼保護) | |
| app = create_interface() | |
| # 設定身份驗證 | |
| # 支援多種驗證方式: | |
| # 1. 任意使用者名稱 + 正確密碼 | |
| # 2. 可以設定多組帳號密碼 | |
| def authenticate(username, password): | |
| """驗證函數 - 確保安全性""" | |
| # 主要密碼驗證 | |
| if password == ACCESS_PASSWORD: | |
| return True | |
| # 可選:添加特定使用者帳號 | |
| valid_users = { | |
| "admin": ACCESS_PASSWORD, | |
| "user": ACCESS_PASSWORD | |
| } | |
| return username in valid_users and valid_users[username] == password | |
| # 啟動應用(適配 HuggingFace Spaces) | |
| app.launch( | |
| auth=authenticate, | |
| auth_message="🔒 訪談轉錄稿 RAG 系統\n請輸入密碼以訪問系統", | |
| show_api=False, # 隱藏 API 文檔 | |
| show_error=False, # 不顯示詳細錯誤 | |
| ssr_mode=False # 關閉 SSR 模式以避免錯誤 | |
| ) |