import gradio as gr import json import os import pandas as pd from dotenv import load_dotenv from services import GeminiService from huggingface_hub import HfApi, hf_hub_download # Load Env load_dotenv() SAVE_FILE = os.getenv("SAVE_FILE_NAME", "saved_companies.json") HF_TOKEN = os.getenv("HF_TOKEN") DATASET_REPO_ID = os.getenv("DATASET_REPO_ID") # Init Service try: gemini_service = GeminiService() except Exception as e: print(f"Service Error: {e}") gemini_service = None # --- Helper Functions --- def get_key(c): return f"{c['name']}" def load_data(): data = [] # 1. 嘗試從雲端下載 if HF_TOKEN and DATASET_REPO_ID: try: print(f"正在同步雲端資料: {DATASET_REPO_ID}...") hf_hub_download( repo_id=DATASET_REPO_ID, filename=SAVE_FILE, repo_type="dataset", token=HF_TOKEN, local_dir="." # 覆蓋本地檔案 ) print("雲端同步完成。") except Exception as e: print(f"雲端同步略過 (初次啟動或無權限): {e}") # 2. 讀取檔案 if os.path.exists(SAVE_FILE): try: with open(SAVE_FILE, 'r', encoding='utf-8') as f: data = json.load(f) except: data = [] return data def save_data(data): # 1. 存本地 try: with open(SAVE_FILE, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) except Exception as e: print(f"Save Error: {e}") return # 2. 上傳雲端 if HF_TOKEN and DATASET_REPO_ID: try: api = HfApi(token=HF_TOKEN) api.upload_file( path_or_fileobj=SAVE_FILE, path_in_repo=SAVE_FILE, repo_id=DATASET_REPO_ID, repo_type="dataset", commit_message="Sync company data" ) except Exception as e: print(f"Upload Error: {e}") def format_df(source_list, saved_list): if not source_list: return pd.DataFrame(columns=["狀態", "公司名稱", "產業類別", "標籤"]) if saved_list is None: saved_list = [] saved_map = {get_key(c): c for c in saved_list} data = [] for c in source_list: display_c = saved_map.get(get_key(c), c) status_map = {'good': '✅ 優質', 'risk': '⚠️ 風險', 'pending': '❓ 未定'} status_icon = status_map.get(display_c.get('status'), '') has_detail = "📄" if display_c.get('details') else "" tags = ", ".join(display_c.get('tags', [])) data.append([ f"{status_icon} {has_detail}", display_c['name'], display_c.get('industry', '未知'), tags ]) return pd.DataFrame(data, columns=["狀態", "公司名稱", "產業類別", "標籤"]) def get_tags_text(comp): if not comp or not comp.get('tags'): return "目前標籤: (無)" return "🏷️ " + ", ".join([f"`{t}`" for t in comp['tags']]) def get_tags_choices(comp): if not comp: return [] return comp.get('tags', []) # --- Event Handlers --- def search_companies(query, current_saved): if not query: return gr.update(), current_saved, gr.update() try: results = gemini_service.search_companies(query) return format_df(results, current_saved), results, gr.update(visible=True) except Exception as e: raise gr.Error(f"搜尋失敗: {e}") def load_more(query, current_results, current_saved): if not query: return gr.update(), current_results current_names = [c['name'] for c in current_results] try: new_results = gemini_service.search_companies(query, exclude_names=current_names) existing_keys = set(get_key(c) for c in current_results) for c in new_results: if get_key(c) not in existing_keys: current_results.append(c) return format_df(current_results, current_saved), current_results except Exception as e: raise gr.Error(f"載入失敗: {e}") def select_company(evt: gr.SelectData, search_results, saved_data, view_mode): if not evt: return [gr.update()] * 8 index = evt.index[0] target_list = saved_data if view_mode == "追蹤清單" else search_results if not target_list or index >= len(target_list): return gr.update(), gr.update(), gr.update(), None, None, gr.update(), gr.update(), gr.update() comp = target_list[index] key = get_key(comp) saved_comp = next((c for c in saved_data if get_key(c) == key), None) current_comp = saved_comp if saved_comp else comp details_md = "" # Check Cache if current_comp.get('details') and len(current_comp.get('details')) > 10: details_md = current_comp['details'] if not saved_comp: saved_data.insert(0, current_comp) save_data(saved_data) else: # Call API gr.Info(f"正在調查 {current_comp['name']} (查詢統編、PTT評價)...") try: res = gemini_service.get_company_details(current_comp) current_comp['details'] = res['text'] current_comp['sources'] = res['sources'] details_md = res['text'] if saved_comp: saved_comp.update(current_comp) else: saved_data.insert(0, current_comp) save_data(saved_data) except Exception as e: raise gr.Error(f"調查失敗: {e}") if current_comp.get('sources'): details_md += "\n\n### 📚 資料來源\n" for s in current_comp['sources']: details_md += f"- [{s['title']}]({s['uri']})\n" return ( gr.update(visible=True), details_md, [], current_comp, saved_data, get_tags_text(current_comp), gr.update(choices=get_tags_choices(current_comp), value=None), gr.update(visible=True) ) def add_tag(new_tag, selected_comp, saved_data, view_mode, search_results): if not selected_comp or not new_tag: return gr.update(), gr.update(), gr.update(), saved_data, gr.update() if 'tags' not in selected_comp: selected_comp['tags'] = [] if new_tag not in selected_comp['tags']: selected_comp['tags'].append(new_tag) key = get_key(selected_comp) found = False for i, c in enumerate(saved_data): if get_key(c) == key: saved_data[i] = selected_comp found = True break if not found: saved_data.insert(0, selected_comp) save_data(saved_data) gr.Info(f"已新增標籤: {new_tag}") target_list = saved_data if view_mode == "追蹤清單" else search_results new_df = format_df(target_list, saved_data) return ( gr.update(value=""), get_tags_text(selected_comp), gr.update(choices=selected_comp['tags']), saved_data, new_df ) def remove_tag(tag_to_remove, selected_comp, saved_data, view_mode, search_results): if not selected_comp or not tag_to_remove: return gr.update(), gr.update(), saved_data, gr.update() if 'tags' in selected_comp and tag_to_remove in selected_comp['tags']: selected_comp['tags'].remove(tag_to_remove) key = get_key(selected_comp) for i, c in enumerate(saved_data): if get_key(c) == key: saved_data[i] = selected_comp break save_data(saved_data) gr.Info(f"已移除標籤: {tag_to_remove}") target_list = saved_data if view_mode == "追蹤清單" else search_results new_df = format_df(target_list, saved_data) return ( get_tags_text(selected_comp), gr.update(choices=selected_comp['tags'], value=None), saved_data, new_df ) def chat_response(history, message, selected_comp): if not selected_comp: return history, "" context = selected_comp.get('details', '') if not context: return history, "" service_history = [] for h in history: service_history.append({"role": "user", "content": h[0]}) if h[1]: service_history.append({"role": "model", "content": h[1]}) try: reply = gemini_service.chat_with_ai(service_history, message, context) history.append((message, reply)) except Exception as e: history.append((message, f"Error: {e}")) return history, "" def update_status(status, selected_comp, saved_data, view_mode, search_results): if not selected_comp: return gr.update(), saved_data selected_comp['status'] = status if selected_comp.get('status') != status else None key = get_key(selected_comp) for i, c in enumerate(saved_data): if get_key(c) == key: saved_data[i] = selected_comp break save_data(saved_data) target_list = saved_data if view_mode == "追蹤清單" else search_results return format_df(target_list, saved_data), saved_data def remove_comp(selected_comp, saved_data, view_mode, search_results): if not selected_comp: return gr.update(), gr.update(value=None), saved_data, gr.update(visible=False) key = get_key(selected_comp) new_saved = [c for c in saved_data if get_key(c) != key] save_data(new_saved) target_list = new_saved if view_mode == "追蹤清單" else search_results return ( gr.Info("已移除"), format_df(target_list, new_saved), new_saved, gr.update(visible=False) ) def toggle_view(mode, search_res, saved_data): if mode == "搜尋結果": return format_df(search_res, saved_data), gr.update(visible=True) else: return format_df(saved_data, saved_data), gr.update(visible=False) def init_on_load(): data = load_data() return data, format_df(data, data) # --- UI Layout --- with gr.Blocks(title="Com.404 公司去那兒?", theme=gr.themes.Soft()) as demo: saved_state = gr.State([]) search_res_state = gr.State([]) selected_comp_state = gr.State(None) gr.Markdown("""