| |
| import os |
| import re |
| import pandas as pd |
| from pypdf import PdfReader |
| from odf import text, teletype |
| from odf.opendocument import load |
| from langchain_text_splitters import RecursiveCharacterTextSplitter |
| import google.generativeai as genai |
| from supabase import create_client |
| import config |
|
|
| |
| genai.configure(api_key=config.GEMINI_KEY) |
| supabase = create_client(config.SUPABASE_URL, config.SUPABASE_KEY) |
|
|
| |
| text_splitter = RecursiveCharacterTextSplitter( |
| chunk_size=600, |
| chunk_overlap=100, |
| separators=["\n\n", "\n", "。", "!", "?", " ", ""] |
| ) |
|
|
| def get_embedding(text): |
| """呼叫 Google 向量模型 (768 維)""" |
| try: |
| result = genai.embed_content( |
| model="models/text-embedding-004", |
| content=text, |
| task_type="retrieval_document" |
| ) |
| return result['embedding'] |
| except Exception as e: |
| print(f"❌ 向量化失敗: {e}") |
| return None |
|
|
| def parse_file_info(filename): |
| """ |
| 💡 執行長建議的檔名解析邏輯 |
| 格式範例: [01_Dict]_TRK_太魯閣語辭典.odt |
| """ |
| try: |
| |
| match = re.match(r"\[(\d+)_(\w+)\]_(\w+)_", filename) |
| if match: |
| code = match.group(1) |
| lang_tag = match.group(2) |
| |
| |
| category_map = { |
| "01": "Dict", |
| "02": "Gram", |
| "03": "Cult", |
| "04": "Corp" |
| } |
| return lang_tag, category_map.get(code, "Other") |
| except: |
| pass |
| return "Unknown", "Other" |
|
|
| def process_file(file_path): |
| """根據副檔名選擇讀取方式""" |
| filename = os.path.basename(file_path) |
| tribe, category = parse_file_info(filename) |
| |
| print(f"🚀 處理中: {filename} (族語: {tribe}, 類別: {category})") |
| |
| content_list = [] |
| |
| |
| if file_path.lower().endswith(".pdf"): |
| reader = PdfReader(file_path) |
| full_text = "" |
| for page in reader.pages: |
| full_text += page.extract_text() + "\n" |
| content_list = text_splitter.split_text(full_text) |
|
|
| |
| elif file_path.lower().endswith(".odt"): |
| odt_doc = load(file_path) |
| paragraphs = odt_doc.getElementsByType(text.P) |
| full_text = "\n".join([teletype.extractText(p) for p in paragraphs]) |
| |
| content_list = text_splitter.split_text(full_text) |
|
|
| |
| elif file_path.lower().endswith(".csv"): |
| df = pd.read_csv(file_path) |
| content_list = df.apply(lambda x: " | ".join(x.astype(str)), axis=1).tolist() |
|
|
| |
| success_count = 0 |
| for chunk in content_list: |
| if not chunk.strip(): continue |
| |
| vector = get_embedding(chunk) |
| if vector: |
| data = { |
| "tribe": tribe, |
| "category": category, |
| "content": chunk, |
| "embedding": vector, |
| "metadata": {"source": filename} |
| } |
| try: |
| supabase.table("lang_knowledge").insert(data).execute() |
| success_count += 1 |
| except Exception as e: |
| print(f"⚠️ 資料庫寫入失敗: {e}") |
| |
| print(f"✅ {filename} 上傳完成,共 {success_count} 個區塊。") |
|
|
| def start_ingestion(data_folder="data"): |
| """一鍵掃描資料夾並導入""" |
| if not os.path.exists(data_folder): |
| print(f"❌ 找不到資料夾: {data_folder}") |
| return |
|
|
| files = [f for f in os.listdir(data_folder) if not f.startswith(".")] |
| print(f"📦 準備導入 {len(files)} 個檔案...") |
| |
| for file in files: |
| process_file(os.path.join(data_folder, file)) |
| |
| print("\n🎉 所有知識庫資料導入完畢!") |
|
|
| if __name__ == "__main__": |
| |
| start_ingestion("data") |