import gradio as gr from sentence_transformers import SentenceTransformer, CrossEncoder from supabase import create_client import os from dotenv import load_dotenv from google import genai import pandas as pd import time import math load_dotenv() GOOGLE_API_KEY = os.getenv("GEMINI_API") if not GOOGLE_API_KEY: print("⚠️ Peringatan: GOOGLE_API_KEY tidak ditemukan, Gemini akan dinonaktifkan.") gemini_client = None else: gemini_client = genai.Client(api_key=GOOGLE_API_KEY) embedder = SentenceTransformer("sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2") reranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2") print("--- Daftar Model yang Tersedia ---") if gemini_client: for m in gemini_client.models.list(): print(f"Model: {m.name} | Name: {m.display_name}") print("----------------------------------") # === Supabase === supabase_url = os.getenv("SUPABASE_URL") supabase_key = os.getenv("SUPABASE_KEY") supabase = create_client(supabase_url, supabase_key) def expand_query(query: str, num_variations: int = 3) -> str: """ Memperluas query menggunakan Gemini 2.5 API resmi. - Otomatis fallback jika API error. - Cache hasil untuk menghemat pemanggilan API. """ if not query.strip(): return query if gemini_client is None: return f"Kegiatan usaha yang berkaitan dengan {query}" prompt = f""" Anda adalah ahli dalam sistem pencarian KBLI (Klasifikasi Baku Lapangan Usaha Indonesia) 2020. Tugas Anda adalah membuat {num_variations} variasi dari kueri berikut untuk meningkatkan hasil pencarian. Kueri pengguna: "{query}" Buatkan {num_variations} variasi kueri yang: 1. Menggunakan bahasa formal atau teknis (mis. istilah industri). 2. Menggunakan bahasa sehari-hari. 3. Mengandung kata kunci relevan lain. Format keluaran HARUS seperti ini: Variasi 1: ... Variasi 2: ... Variasi 3: ... """ try: # Panggil Gemini response = gemini_client.models.generate_content( # model="gemini-2.5-flash", # # model="gemini-robotics-er-1.5-preview", model="gemini-3.1-flash-lite", contents=prompt, ) text_output = response.text.strip() variations = [] for line in text_output.splitlines(): if line.lower().startswith("variasi"): parts = line.split(":", 1) if len(parts) > 1: variations.append(parts[1].strip()) if not variations: print("[Gemini Warning] Tidak ada variasi ditemukan. Gunakan fallback.") return f"Kegiatan usaha yang berkaitan dengan {query}" # Gabungkan hasil expanded = query + ". " + " ".join(variations) print(f"[Gemini Expand] {query} -> {expanded}") return expanded except Exception as e: print(f"[Gemini Error] {e}. Menggunakan fallback lokal.") return f"Kegiatan usaha yang berkaitan dengan {query}" def get_embedding(text: str): """ Menghasilkan embedding vector dari teks menggunakan model SentenceTransformer. """ if not text: return [] expanded_text = expand_query(text) embedding = embedder.encode(expanded_text, normalize_embeddings=True).tolist() return embedding # ========================================== # ABLATION STUDY # ========================================== # Helper Function def apply_sigmoid(logit): return 1 / (1 + math.exp(-logit)) def bm25_only(query: str, match_count: int = 50): """Hanya Lexical / Full-Text Search (Tanpa Vector, Tanpa Gemini, Tanpa Reranker)""" # Catatan: Pastikan Anda sudah membuat RPC 'search_kbli_lexical' di Supabase response = supabase.rpc( "search_kbli_lexical", {"query_text": query, "match_count": match_count} ).execute() candidates = response.data or [] return {"results": candidates[:10]} def dense_only(query: str, match_count: int = 50): """Hanya Semantic Vector (Tanpa BM25, Tanpa Gemini, Tanpa Reranker)""" # Gunakan query asli (tanpa expand) embedding_q = embedder.encode(query, normalize_embeddings=True).tolist() response = supabase.rpc( "search_kbli", {"query_embedding": embedding_q, "match_count": match_count} ).execute() candidates = response.data or [] return {"results": candidates[:10]} def semantic_no_gemini(query: str, match_count: int = 50): """Semantic Vector + Reranker (TANPA Gemini Query Expansion)""" expanded = query # Bypass Gemini embedding_q = embedder.encode(expanded, normalize_embeddings=True).tolist() response = supabase.rpc( "search_kbli", {"query_embedding": embedding_q, "match_count": match_count} ).execute() candidates = response.data or [] if not candidates: return {"results": []} pairs = [(expanded, c["judul"] + " " + c["deskripsi"]) for c in candidates] try: scores = reranker.predict(pairs) except Exception as e: print("Reranker error:", e) return {"results": sorted(candidates, key=lambda x: x.get("similarity", 0), reverse=True)[:10]} rerank_vals = [float(s) for s in scores] rmin, rmax = min(rerank_vals), max(rerank_vals) for c, s in zip(candidates, rerank_vals): c["rerank_score"] = s if rmax - rmin > 1e-9: c["rerank_norm"] = (s - rmin) / (rmax - rmin) else: c["rerank_norm"] = 0.0 sim = c.get("similarity", 0.0) c["hybrid_score"] = 0.6 * sim + 0.4 * c["rerank_norm"] candidates = sorted(candidates, key=lambda x: x["hybrid_score"], reverse=True) return {"results": candidates[:10]} def hybrid_search_no_gemini(query: str, match_count: int = 50): """Hybrid (BM25 + Dense) + Reranker (TANPA Gemini Query Expansion)""" expanded = query embedding_q = embedder.encode(expanded, normalize_embeddings=True).tolist() response = supabase.rpc( "search_kbli_hybrid", { "query_text": query, "query_embedding": embedding_q, "match_count": match_count, "lexical_weight": 0.4, # Diabaikan di SQL, tapi wajib dikirim "dense_weight": 0.6 # Diabaikan di SQL, tapi wajib dikirim } ).execute() candidates = response.data or [] if not candidates: return {"results": []} pairs = [(query, c["judul"] + " " + c["deskripsi"]) for c in candidates] try: scores = reranker.predict(pairs) except Exception as e: print("Reranker error:", e) return {"results": sorted(candidates, key=lambda x: x.get("similarity", 0), reverse=True)[:10]} # Reranker sebagai Hakim Tunggal for c, s in zip(candidates, scores): c["rerank_score"] = float(s) # Gunakan Sigmoid agar nilainya menjadi probabilitas pasti c["final_score"] = apply_sigmoid(float(s)) # Urutkan berdasarkan keputusan mutlak dari Reranker candidates = sorted(candidates, key=lambda x: x["final_score"], reverse=True) return {"results": candidates[:10]} # ========================================== # CORE APPS # ========================================== def fn_semantic(query: str, match_count: int = 50): expanded = expand_query(query) embedding_q = embedder.encode(expanded, normalize_embeddings=True).tolist() response = supabase.rpc( "search_kbli", {"query_embedding": embedding_q, "match_count": match_count} ).execute() candidates = response.data or [] if not candidates: return {"results": []} print("=== Candidates BEFORE rerank (top 10) ===") for c in candidates[:10]: print(c.get("kode"), c.get("judul")[:80], "sim=", c.get("similarity")) pairs = [(expanded, c["judul"] + " " + c["deskripsi"]) for c in candidates] try: scores = reranker.predict(pairs) except Exception as e: print("Reranker error:", e) return {"results": sorted(candidates, key=lambda x: x.get("similarity", 0), reverse=True)[:10]} for c, s in zip(candidates, scores): c["rerank_score"] = float(s) print("=== Candidates AFTER rerank (top 10) ===") for c in candidates[:10]: print(c.get("kode"), c.get("judul")[:80], "sim=", c.get("similarity"), "rerank=", c.get("rerank_score")) rerank_vals = [c["rerank_score"] for c in candidates] rmin, rmax = min(rerank_vals), max(rerank_vals) for c in candidates: if rmax - rmin > 1e-9: c["rerank_norm"] = (c["rerank_score"] - rmin) / (rmax - rmin) else: c["rerank_norm"] = 0.0 for c in candidates: sim = c.get("similarity", 0.0) c["hybrid_score"] = 0.6 * sim + 0.4 * c["rerank_norm"] candidates = sorted(candidates, key=lambda x: x["hybrid_score"], reverse=True) return {"results": candidates[:10]} def hybrid_search(query: str, match_count: int = 50): # 1. Query Expansion expanded = expand_query(query) # 2. Embedding # Kita encode query yang sudah di-expand untuk pencarian dense embedding_q = embedder.encode(expanded, normalize_embeddings=True).tolist() # 3. Panggil Hybrid Search di Supabase # Kita kirimkan query ASLI untuk Lexical (agar tidak terlalu banyak noise kata), # dan query EXPANDED untuk Dense (embedding). response = supabase.rpc( "search_kbli_hybrid", { "query_text": query, # Untuk Lexical Match (tsvector) "query_embedding": embedding_q, # Untuk Dense Match (pgvector) "match_count": match_count, "lexical_weight": 0.2, # Bobot Lexical (bisa disesuaikan untuk Ablation Study) "dense_weight": 0.8 # Bobot Dense } ).execute() candidates = response.data or [] if not candidates: return {"results": []} print("=== Candidates dari Hybrid DB BEFORE rerank (top 10) ===") for c in candidates[:10]: # Tampilkan similarity yang sekarang merupakan gabungan Lexical & Dense print(c.get("kode"), c.get("judul")[:80], "hybrid_db_sim=", c.get("similarity")) # 4. Reranking dengan Cross-Encoder # Evaluasi kecocokan antara query asli dengan dokumen kandidat pairs = [(expanded, c["judul"] + " " + c["deskripsi"]) for c in candidates] try: scores = reranker.predict(pairs) except Exception as e: print("Reranker error:", e) return {"results": sorted(candidates, key=lambda x: x.get("similarity", 0), reverse=True)[:10]} # 5. Normalisasi skor Reranker & Kalkulasi Final Score rerank_vals = [float(s) for s in scores] rmin, rmax = min(rerank_vals), max(rerank_vals) for c, s in zip(candidates, rerank_vals): c["rerank_score"] = s # Normalisasi Min-Max if rmax - rmin > 1e-9: c["rerank_norm"] = (s - rmin) / (rmax - rmin) else: c["rerank_norm"] = 0.0 # Skor Final Sistem Neural IR Anda (gabungan Stage 1: Hybrid Retrieval + Stage 2: Reranking) # Anda bisa menyesuaikan bobot ini nanti db_hybrid_sim = c.get("similarity", 0.0) c["final_score"] = (0.5 * db_hybrid_sim) + (0.5 * c["rerank_norm"]) print("=== Candidates AFTER Cross-Encoder rerank (top 10) ===") # Urutkan berdasarkan final_score candidates = sorted(candidates, key=lambda x: x["final_score"], reverse=True) for c in candidates[:10]: print(c.get("kode"), c.get("judul")[:80], "final_score=", c.get("final_score"), "rerank=", c.get("rerank_score")) # Kembalikan 10 teratas (sesuai logika asli Anda) return {"results": candidates[:10]} def search_kbli(text: str): if not text: return {"embedding": [], "results": []} embedding = get_embedding(text) response = supabase.rpc( "search_kbli", {"query_embedding": embedding, "match_count": 25} ).execute() results = response.data if response.data else [] if not results: return "

Tidak ditemukan hasil.

" html = """
""" # Top 10 == // for r in results top_10 = results[:10] for r in top_10: html += f"""

{r['kode']} – {r['judul']}

{r['deskripsi']}

""" # Expandable for others = results[10:] if others: html += "
Lihat hasil lainnya
" for r in others: html += f"""

{r['kode']} – {r['judul']}

{r['deskripsi']}

""" html += "
" # End html += "
" return html def calculate_mrr(retrieved_kodes, relevant_kodes_set): for i, kode in enumerate(retrieved_kodes): if kode in relevant_kodes_set: return 1.0 / (i + 1) return 0.0 def calculate_recall(retrieved_kodes, relevant_kodes_set, k=10): retrieved_k_set = set(retrieved_kodes[:k]) if not relevant_kodes_set: return 0.0 return len(retrieved_k_set & relevant_kodes_set) / len(relevant_kodes_set) def calculate_ndcg(retrieved_kodes, relevance_dict, k=10): dcg = 0 for i, kode in enumerate(retrieved_kodes[:k]): rel = relevance_dict.get(kode, 0) dcg += rel / math.log2(i + 2) ideal_rels = sorted(relevance_dict.values(), reverse=True)[:k] idcg = 0 for i, rel in enumerate(ideal_rels): idcg += rel / math.log2(i + 2) return dcg / idcg if idcg > 0 else 0.0 def run_evaluation(file_obj, scenario): if file_obj is None: return "Peringatan: Silakan unggah file ground_truth.csv terlebih dahulu.", None, None df = pd.read_csv(file_obj.name) queries = df.groupby('query_id').first()['query'].to_dict() ground_truth = {} for q_id, group in df.groupby('query_id'): ground_truth[q_id] = dict(zip(group['kode_kbli'].astype(str), group['relevance'])) results_list = [] retrieval_rows = [] for q_id, query_text in queries.items(): start_time = time.perf_counter() # Mulai hitung latensi # Eksekusi fungsi berdasarkan skenario yang dipilih if scenario == "BM25 Only (Lexical)": response = bm25_only(query_text, match_count=50) elif scenario == "Dense Only (Semantic)": response = dense_only(query_text, match_count=50) elif scenario == "Semantic + Reranker (No Gemini)": response = semantic_no_gemini(query_text, match_count=50) elif scenario == "Semantic + Reranker (With Gemini)": response = fn_semantic(query_text, match_count=50) elif scenario == "Hybrid + Reranker (No Gemini)": response = hybrid_search_no_gemini(query_text, match_count=50) elif scenario == "Hybrid + Reranker (With Gemini)": response = hybrid_search(query_text, match_count=50) else: response = {"results": []} latency = time.perf_counter() - start_time # Hitung selisih waktu candidates = response.get("results", []) retrieved_kodes = [str(r.get('kode')) for r in candidates] for rank, kode in enumerate(retrieved_kodes, start=1): retrieval_rows.append({ "query_id": q_id, "query": query_text, "scenario": scenario, "rank": rank, "kode_kbli": kode }) rel_dict = ground_truth.get(q_id, {}) relevant_kodes_set = {k for k, r in rel_dict.items() if r > 0} mrr = calculate_mrr(retrieved_kodes, relevant_kodes_set) recall = calculate_recall(retrieved_kodes, relevant_kodes_set, k=10) ndcg = calculate_ndcg(retrieved_kodes, rel_dict, k=10) results_list.append({ "Query ID": q_id, "Query Text": query_text, "MRR@10": round(mrr, 4), "Recall@10": round(recall, 4), "nDCG@10": round(ndcg, 4), "Latency (sec)": round(latency, 4) # Menyimpan data latensi per kueri }) if "With Gemini" in scenario: time.sleep(1) # Hindari rate limit Gemini API results_df = pd.DataFrame(results_list) # Hitung rata-rata summary = { "Skenario": scenario, "Total Query": len(queries), "Avg MRR@10": round(results_df["MRR@10"].mean(), 4), "Avg Recall@10": round(results_df["Recall@10"].mean(), 4), "Avg nDCG@10": round(results_df["nDCG@10"].mean(), 4), "Avg Latency (sec)": round(results_df["Latency (sec)"].mean(), 4) } # Export ke Excel safe_scenario_name = scenario.replace(" ", "_").replace("(", "").replace(")", "").replace("+", "plus") output_filename = f"Evaluasi_{safe_scenario_name}.xlsx" results_df.to_excel(output_filename, index=False) retrieval_df = pd.DataFrame(retrieval_rows) retrieval_filename = f"retrieval_results_{safe_scenario_name}.csv" retrieval_df.to_csv(retrieval_filename, index=False) return summary, results_df, output_filename, retrieval_filename with gr.Blocks(css=""" .title {font-size: 22px; font-weight: 700; color: #111827; margin-bottom: 4px;} .desc {font-size: 14px; color: #6b7280; margin-bottom: 16px;} button.gr-button { border-radius: 6px; } button.gr-button-primary, button.gr-button-secondary { border-radius: 6px; } .btn-row {display: flex; gap: 8px;} .btn-row > * {flex: 1;} .btn-row-search {display: flex; gap: 8px;} .btn-row-search > * {flex: none;} @media (max-width: 640px) { .btn-row, .btn-row-search {flex-direction: column-reverse;} .btn-row > button, .btn-row-search > button { width: 100% !important; flex: none; } } """) as demo: gr.Markdown("
Semantic KBLI Search
") gr.Markdown("
Cari kode KBLI dengan semantic search (Embedding + Matching)
") with gr.Tab("Embedding Only"): with gr.Row(): with gr.Column(scale=1): inp1 = gr.Textbox(label="Masukkan teks") with gr.Row(elem_classes="btn-row"): btn_clear1 = gr.Button("Clear", variant="secondary") btn_submit1 = gr.Button("Submit", variant="primary") with gr.Column(scale=1): out1 = gr.JSON(label="Embedding Vector") inp1.submit(get_embedding, inp1, out1, api_name="get_embedding") btn_clear1.click(lambda: ("", None), None, [inp1, out1]) btn_submit1.click(get_embedding, inp1, out1, api_name="get_embedding") with gr.Tab("Embedding Fine-tuned"): with gr.Row(): with gr.Column(scale=1): inp2 = gr.Textbox(label="Masukkan teks") with gr.Row(elem_classes="btn-row"): btn_clear2 = gr.Button("Clear", variant="secondary") btn_submit2 = gr.Button("Submit", variant="primary") with gr.Column(scale=1): out2 = gr.JSON(label="Embedding Vector") inp2.submit(fn_semantic, inp2, out2, api_name="fn_semantic") btn_clear2.click(lambda: ("", None), None, [inp2, out2]) btn_submit2.click(fn_semantic, inp2, out2, api_name="fn_semantic") with gr.Tab("Search KBLI"): inp3 = gr.Textbox(label="Masukkan teks") with gr.Row(elem_classes="btn-row-search"): btn_clear3 = gr.Button("Clear", variant="secondary") btn_submit3 = gr.Button("Submit", variant="primary") out3 = gr.HTML(label="Hasil Pencarian Semantic") inp3.submit(search_kbli, inp3, out3, api_name="search_kbli") btn_clear3.click(lambda: ("", None), None, [inp3, out3]) btn_submit3.click(search_kbli, inp3, out3, api_name="search_kbli") with gr.Tab("Hybrid Search (Final)"): with gr.Row(): with gr.Column(scale=1): inp4 = gr.Textbox(label="Masukkan teks") with gr.Row(elem_classes="btn-row"): btn_clear4 = gr.Button("Clear", variant="secondary") btn_submit4 = gr.Button("Submit", variant="primary") with gr.Column(scale=1): out4 = gr.JSON(label="Hasil Hybrid Search") inp4.submit(hybrid_search, inp4, out4, api_name="hybrid_search") btn_clear4.click(lambda: ("", None), None, [inp4, out4]) btn_submit4.click(hybrid_search, inp4, out4, api_name="hybrid_search") with gr.Tab("Ablation Endpoints (API)"): gr.Markdown("### Individual Model") with gr.Row(): with gr.Column(scale=1): inp5 = gr.Textbox(label="Masukkan kueri teks") with gr.Row(elem_classes="btn-row"): btn_bm25 = gr.Button("BM25 Only", variant="primary") btn_dense = gr.Button("Dense Only", variant="primary") with gr.Row(elem_classes="btn-row"): btn_sem_no_gem = gr.Button("Semantic (No Gemini)", variant="primary") btn_hyb_no_gem = gr.Button("Hybrid (No Gemini)", variant="primary") with gr.Row(): btn_clear5 = gr.Button("Clear", variant="secondary") with gr.Column(scale=1): out5 = gr.JSON(label="Hasil Pencarian Ablation") # Clear Button btn_clear5.click(lambda: ("", None), None, [inp5, out5]) # Registrasi Event dan API Name btn_bm25.click(bm25_only, inputs=[inp5], outputs=[out5], api_name="bm25_only") btn_dense.click(dense_only, inputs=[inp5], outputs=[out5], api_name="dense_only") btn_sem_no_gem.click(semantic_no_gemini, inputs=[inp5], outputs=[out5], api_name="semantic_no_gemini") btn_hyb_no_gem.click(hybrid_search_no_gemini, inputs=[inp5], outputs=[out5], api_name="hybrid_search_no_gemini") inp5.submit(dense_only, inputs=[inp5], outputs=[out5]) with gr.Tab("Ablation Study"): gr.Markdown("### Metrics & Latency") gr.Markdown("Unggah file `ground_truth.csv` Anda untuk menjalankan *batch testing* dan membandingkan skenario.") with gr.Row(): with gr.Column(scale=1): eval_file = gr.File(label="Upload ground_truth.csv", file_types=[".csv"]) eval_scenario = gr.Dropdown( choices=[ "BM25 Only (Lexical)", "Dense Only (Semantic)", "Semantic + Reranker (No Gemini)", "Semantic + Reranker (With Gemini)", "Hybrid + Reranker (No Gemini)", "Hybrid + Reranker (With Gemini)" ], label="Pilih Skenario Evaluasi" ) btn_run_eval = gr.Button("Jalankan Evaluasi Otomatis", variant="primary") with gr.Column(scale=1): eval_summary = gr.JSON(label="Ringkasan Skor Rata-rata & Latensi") eval_download = gr.File(label="Download Laporan (Excel)") eval_retrieval_download = gr.File(label="Download Retrieval Results (CSV)") eval_table = gr.Dataframe(label="Detail Per-Kueri") btn_run_eval.click( run_evaluation, inputs=[eval_file, eval_scenario], outputs=[eval_summary, eval_table, eval_download, eval_retrieval_download] ) if __name__ == "__main__": demo.queue().launch(show_error=True)