import joblib import pickle import pandas as pd import os # Atasi masalah permission dengan cache lokal import os # ⬅️ PASTIKAN baris ini ada # Set direktori cache ke tempat yang diizinkan Hugging Face Spaces os.environ["HF_HOME"] = "/tmp/hf_cache" os.environ["TRANSFORMERS_CACHE"] = "/tmp/hf_cache/transformers" os.environ["HF_DATASETS_CACHE"] = "/tmp/hf_cache/datasets" from sentence_transformers import SentenceTransformer from fastapi.middleware.cors import CORSMiddleware import re import torch import numpy as np from fastapi import FastAPI, HTTPException from fastapi.concurrency import run_in_threadpool from pydantic import BaseModel from typing import List, Dict, Union, Optional import faiss from dotenv import load_dotenv from supabase import create_client, Client load_dotenv() SUPABASE_URL = os.environ.get("SUPABASE_URL") SUPABASE_KEY = os.environ.get("SUPABASE_KEY") if not SUPABASE_URL or not SUPABASE_KEY: raise RuntimeError("SUPABASE_URL and SUPABASE_KEY must be set in environment variables or .env file.") try: supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) print("Successfully connected to Supabase.") except Exception as e: raise RuntimeError(f"Could not connect to Supabase: {e}") try: model_sbert = SentenceTransformer('naufalihsan/indonesian-sbert-large') device = 'cuda' if torch.cuda.is_available() else 'cpu' model_sbert.to(device) model_lr = joblib.load('models/lr_98_24.pkl') loaded_index_faiss = faiss.read_index('embeddings/embeddings_model.faissindex') print(f"Indeks FAISS dimuat. Jumlah vektor: {loaded_index_faiss.ntotal}") with open('embeddings/metadata_model.pkl', "rb") as f: loaded_metadata_list = pickle.load(f) print(f"Metadata dimuat. Jumlah item metadata: {len(loaded_metadata_list)}") dataset = pd.read_csv('Datasets/final_dataset.csv') if 'hoax' in dataset.columns: dataset['hoax'] = dataset['hoax'].astype(int) except FileNotFoundError as e: raise RuntimeError(f"Error loading model or data files: {e}") except Exception as e: raise RuntimeError(f"An unexpected error occurred during model/data loading: {e}") def check_database_schema(): """ Memanggil fungsi RPC di Supabase untuk mendapatkan detail tabel dan kolom. """ print("Mencoba mengambil skema database dari Supabase...") try: # Panggil fungsi 'get_schema_details' yang harus sudah Anda buat di Supabase response = supabase.rpc('get_schema_details').execute() if response.data: print("Skema Database Berhasil Diambil:") current_table = None for row in response.data: table_name = row['table_name'] column_name = row['column_name'] data_type = row['data_type'] # Tampilkan nama tabel sekali saja if table_name != current_table: print(f"\n[Tabel: {table_name}]") current_table = table_name print(f" - Kolom: {column_name} (Tipe: {data_type})") else: print("Tidak ada data skema yang ditemukan atau terjadi error.") if hasattr(response, 'error') and response.error: print(f"Detail Error: {response.error}") except Exception as e: print(f"Terjadi kesalahan saat mengambil skema: {e}") check_database_schema() def normalize_text_for_counting(text: str) -> str: """Normalizes text for consistent counting.""" if not isinstance(text, str): return "" text_val = text.lower() text_val = re.sub(r'[^a-z0-9\s]', '', text_val) text_val = re.sub(r'\s+', ' ', text_val).strip() return text_val def embeddings(text: str) -> np.ndarray: text_normalized = text.lower() text_normalized = re.sub(r'[^A-Za-z0-9\s\[\]\(\)\{\}\<\>\.,!?;:\'\"-]', '', text_normalized) embedding_tensor = model_sbert.encode(text_normalized, show_progress_bar=False, device=device, convert_to_tensor=True) return embedding_tensor.cpu().numpy() def pipeline_hoax(text: str) -> Dict[str, float]: embedding_val = embeddings(text) pred_proba = model_lr.predict_proba(embedding_val.reshape(1, -1))[0] return { "hoax_prob": float(pred_proba[1]), "valid_prob": float(pred_proba[0]) } def retrieval(embeddings_val: np.ndarray) -> tuple[np.ndarray, np.ndarray]: # Tipe hint diperjelas if embeddings_val.dtype != np.float32: embeddings_val = embeddings_val.astype(np.float32) embeddings_2d = embeddings_val.reshape(1, -1) if embeddings_val.ndim == 1 else embeddings_val distances_2d, indices_2d = loaded_index_faiss.search(embeddings_2d, 5) return indices_2d[0], distances_2d[0] def retrieval_pipeline(text: str) -> pd.DataFrame: embedding_val = embeddings(text) idx, score = retrieval(embedding_val) result = dataset.iloc[idx].copy() result['score'] = score return result app = FastAPI( title="TruthCheck API", description="API for Hoax Identification and Article Similarity Retrieval for Indonesian News." ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) class CheckClaimRequest(BaseModel): claim_text: str class SupportingArticle(BaseModel): title: str full_text_snippet: str url: str source: str published_year: Optional[str] = None hoax_status: str similarity_score: float class CheckClaimResponse(BaseModel): claim_text: str hoax_probability: float valid_probability: float accuracy_percentage: float supporting_articles: List[SupportingArticle] class PopularCheckItem(BaseModel): example_claim_title: str claim_text: str hoax_probability: float valid_probability: float accuracy_percentage: float supporting_articles: List[SupportingArticle] def format_supporting_articles(retrieval_results_df: pd.DataFrame) -> List[SupportingArticle]: supporting_articles_list = [] for _, row in retrieval_results_df.iterrows(): published_year = str(row.get('Timestamp', '')) try: if pd.isna(row.get('Timestamp')): published_year = None else: original_ts_str = str(row['Timestamp']) if re.match(r'^\d{4}-\d{2}-\d{2}', original_ts_str): published_year = pd.to_datetime(original_ts_str).strftime('%Y') elif re.match(r'^[A-Za-z]+ \d{1,2}, \d{4}', original_ts_str): published_year = pd.to_datetime(original_ts_str).strftime('%Y') elif re.match(r'^[A-Za-z]+, \d{1,2} [A-Za-z]+ \d{4}', original_ts_str): try: published_year = pd.to_datetime(original_ts_str, format='%A, %d %B %Y %H:%M WIB').strftime('%Y') except ValueError: published_year = pd.to_datetime(original_ts_str.split('WIB')[0].strip()).strftime('%Y') if 'WIB' in original_ts_str else original_ts_str else: published_year = original_ts_str except Exception: published_year = str(row.get('Timestamp', '')) full_text = str(row.get('FullText', '')) if not pd.isna(row.get('FullText')) else '' supporting_articles_list.append( SupportingArticle( title=str(row.get('Title', '')) if not pd.isna(row.get('Title')) else '', full_text_snippet=full_text[:300] + "..." if len(full_text) > 300 else full_text, url=str(row.get('Url', '')) if not pd.isna(row.get('Url')) else '', source=str(row.get('source', '')) if not pd.isna(row.get('source')) else '', published_year=published_year, hoax_status="HOAX" if row.get('hoax') == 1 else "VALID", similarity_score=float(row['score']) ) ) return supporting_articles_list # --- API Endpoints --- @app.post("/check-claim", response_model=CheckClaimResponse, summary="Periksa Klaim/Pernyataan") async def check_claim(request: CheckClaimRequest): text = request.claim_text if not text or not text.strip(): raise HTTPException(status_code=400, detail="Claim text cannot be empty.") normalized_text = normalize_text_for_counting(text) print(normalized_text) if normalized_text: def _sync_upsert_claim(norm_text: str, orig_text: str): try: response = supabase.rpc( 'upsert_popular_claim', {'p_normalized_text': norm_text, 'p_original_text': orig_text} ).execute() if hasattr(response, 'error') and response.error: print(f"Supabase RPC returned an error: {response.error}") else: print(f"Supabase RPC call successful for: {norm_text[:50]}") except Exception as e: print(f"Exception during Supabase RPC call: {e}") try: await run_in_threadpool(_sync_upsert_claim, normalized_text, text) except Exception as e: print(f"Error running Supabase task in threadpool: {e}") # Jalankan pipeline seperti biasa hoax_prediction = await run_in_threadpool(pipeline_hoax, text) retrieval_results_df = await run_in_threadpool(retrieval_pipeline, text) supporting_articles_list = format_supporting_articles(retrieval_results_df) return CheckClaimResponse( claim_text=text, hoax_probability=hoax_prediction["hoax_prob"], valid_probability=hoax_prediction["valid_prob"], accuracy_percentage=hoax_prediction["valid_prob"] * 100, supporting_articles=supporting_articles_list ) @app.get("/popular-checks", response_model=List[PopularCheckItem], summary="Pengecekan Terpopuler") async def get_popular_checks(limit: int = 5): popular_checks_results = [] try: response = await run_in_threadpool( lambda: supabase.table("popular_claims") .select("original_claim_text, search_count") .order("search_count", desc=True) .limit(limit) .execute() ) if response.data: for item_data in response.data: original_text = item_data["original_claim_text"] example_title = original_text[:70] + "..." if len(original_text) > 70 else original_text hoax_prediction = await run_in_threadpool(pipeline_hoax, original_text) retrieval_results_df = await run_in_threadpool(retrieval_pipeline, original_text) supporting_articles_list = format_supporting_articles(retrieval_results_df) popular_checks_results.append( PopularCheckItem( example_claim_title=example_title, claim_text=original_text, hoax_probability=hoax_prediction["hoax_prob"], valid_probability=hoax_prediction["valid_prob"], accuracy_percentage=hoax_prediction["valid_prob"] * 100, supporting_articles=supporting_articles_list ) ) else: print("No popular claims found in Supabase or error in fetching.") if hasattr(response, 'error') and response.error: print(f"Supabase fetch error: {response.error}") except Exception as e: print(f"Error fetching popular checks from Supabase: {e}") return popular_checks_results