| import joblib |
| import pickle |
| import pandas as pd |
| import os |
| |
| import os |
|
|
| |
| os.environ["HF_HOME"] = "/tmp/hf_cache" |
| os.environ["TRANSFORMERS_CACHE"] = "/tmp/hf_cache/transformers" |
| os.environ["HF_DATASETS_CACHE"] = "/tmp/hf_cache/datasets" |
|
|
|
|
| from sentence_transformers import SentenceTransformer |
| from fastapi.middleware.cors import CORSMiddleware |
| import re |
| import torch |
| import numpy as np |
| from fastapi import FastAPI, HTTPException |
| from fastapi.concurrency import run_in_threadpool |
| from pydantic import BaseModel |
| from typing import List, Dict, Union, Optional |
| import faiss |
|
|
| from dotenv import load_dotenv |
| from supabase import create_client, Client |
|
|
| load_dotenv() |
| SUPABASE_URL = os.environ.get("SUPABASE_URL") |
| SUPABASE_KEY = os.environ.get("SUPABASE_KEY") |
|
|
| if not SUPABASE_URL or not SUPABASE_KEY: |
| raise RuntimeError("SUPABASE_URL and SUPABASE_KEY must be set in environment variables or .env file.") |
|
|
| try: |
| supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) |
| print("Successfully connected to Supabase.") |
| except Exception as e: |
| raise RuntimeError(f"Could not connect to Supabase: {e}") |
|
|
| try: |
| model_sbert = SentenceTransformer('naufalihsan/indonesian-sbert-large') |
| device = 'cuda' if torch.cuda.is_available() else 'cpu' |
| model_sbert.to(device) |
|
|
| model_lr = joblib.load('models/lr_98_24.pkl') |
| loaded_index_faiss = faiss.read_index('embeddings/embeddings_model.faissindex') |
| print(f"Indeks FAISS dimuat. Jumlah vektor: {loaded_index_faiss.ntotal}") |
| with open('embeddings/metadata_model.pkl', "rb") as f: |
| loaded_metadata_list = pickle.load(f) |
| print(f"Metadata dimuat. Jumlah item metadata: {len(loaded_metadata_list)}") |
| dataset = pd.read_csv('Datasets/final_dataset.csv') |
| if 'hoax' in dataset.columns: |
| dataset['hoax'] = dataset['hoax'].astype(int) |
| except FileNotFoundError as e: |
| raise RuntimeError(f"Error loading model or data files: {e}") |
| except Exception as e: |
| raise RuntimeError(f"An unexpected error occurred during model/data loading: {e}") |
|
|
| def check_database_schema(): |
| """ |
| Memanggil fungsi RPC di Supabase untuk mendapatkan detail tabel dan kolom. |
| """ |
| print("Mencoba mengambil skema database dari Supabase...") |
| try: |
| |
| response = supabase.rpc('get_schema_details').execute() |
|
|
| if response.data: |
| print("Skema Database Berhasil Diambil:") |
| current_table = None |
| for row in response.data: |
| table_name = row['table_name'] |
| column_name = row['column_name'] |
| data_type = row['data_type'] |
|
|
| |
| if table_name != current_table: |
| print(f"\n[Tabel: {table_name}]") |
| current_table = table_name |
|
|
| print(f" - Kolom: {column_name} (Tipe: {data_type})") |
|
|
| else: |
| print("Tidak ada data skema yang ditemukan atau terjadi error.") |
| if hasattr(response, 'error') and response.error: |
| print(f"Detail Error: {response.error}") |
|
|
| except Exception as e: |
| print(f"Terjadi kesalahan saat mengambil skema: {e}") |
|
|
| check_database_schema() |
|
|
| def normalize_text_for_counting(text: str) -> str: |
| """Normalizes text for consistent counting.""" |
| if not isinstance(text, str): |
| return "" |
| text_val = text.lower() |
| text_val = re.sub(r'[^a-z0-9\s]', '', text_val) |
| text_val = re.sub(r'\s+', ' ', text_val).strip() |
| return text_val |
|
|
| def embeddings(text: str) -> np.ndarray: |
| text_normalized = text.lower() |
| text_normalized = re.sub(r'[^A-Za-z0-9\s\[\]\(\)\{\}\<\>\.,!?;:\'\"-]', '', text_normalized) |
| embedding_tensor = model_sbert.encode(text_normalized, show_progress_bar=False, device=device, convert_to_tensor=True) |
| return embedding_tensor.cpu().numpy() |
|
|
| def pipeline_hoax(text: str) -> Dict[str, float]: |
| embedding_val = embeddings(text) |
| pred_proba = model_lr.predict_proba(embedding_val.reshape(1, -1))[0] |
| return { |
| "hoax_prob": float(pred_proba[1]), |
| "valid_prob": float(pred_proba[0]) |
| } |
|
|
| def retrieval(embeddings_val: np.ndarray) -> tuple[np.ndarray, np.ndarray]: |
| if embeddings_val.dtype != np.float32: |
| embeddings_val = embeddings_val.astype(np.float32) |
| |
| embeddings_2d = embeddings_val.reshape(1, -1) if embeddings_val.ndim == 1 else embeddings_val |
| |
| distances_2d, indices_2d = loaded_index_faiss.search(embeddings_2d, 5) |
| return indices_2d[0], distances_2d[0] |
|
|
|
|
| def retrieval_pipeline(text: str) -> pd.DataFrame: |
| embedding_val = embeddings(text) |
| idx, score = retrieval(embedding_val) |
| result = dataset.iloc[idx].copy() |
| result['score'] = score |
| return result |
|
|
|
|
|
|
| app = FastAPI( |
| title="TruthCheck API", |
| description="API for Hoax Identification and Article Similarity Retrieval for Indonesian News." |
| ) |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
|
|
| class CheckClaimRequest(BaseModel): |
| claim_text: str |
|
|
| class SupportingArticle(BaseModel): |
| title: str |
| full_text_snippet: str |
| url: str |
| source: str |
| published_year: Optional[str] = None |
| hoax_status: str |
| similarity_score: float |
|
|
| class CheckClaimResponse(BaseModel): |
| claim_text: str |
| hoax_probability: float |
| valid_probability: float |
| accuracy_percentage: float |
| supporting_articles: List[SupportingArticle] |
|
|
| class PopularCheckItem(BaseModel): |
| example_claim_title: str |
| claim_text: str |
| hoax_probability: float |
| valid_probability: float |
| accuracy_percentage: float |
| supporting_articles: List[SupportingArticle] |
|
|
|
|
| def format_supporting_articles(retrieval_results_df: pd.DataFrame) -> List[SupportingArticle]: |
| supporting_articles_list = [] |
| for _, row in retrieval_results_df.iterrows(): |
| published_year = str(row.get('Timestamp', '')) |
| try: |
| if pd.isna(row.get('Timestamp')): |
| published_year = None |
| else: |
| original_ts_str = str(row['Timestamp']) |
| if re.match(r'^\d{4}-\d{2}-\d{2}', original_ts_str): |
| published_year = pd.to_datetime(original_ts_str).strftime('%Y') |
| elif re.match(r'^[A-Za-z]+ \d{1,2}, \d{4}', original_ts_str): |
| published_year = pd.to_datetime(original_ts_str).strftime('%Y') |
| elif re.match(r'^[A-Za-z]+, \d{1,2} [A-Za-z]+ \d{4}', original_ts_str): |
| try: |
| published_year = pd.to_datetime(original_ts_str, format='%A, %d %B %Y %H:%M WIB').strftime('%Y') |
| except ValueError: |
| published_year = pd.to_datetime(original_ts_str.split('WIB')[0].strip()).strftime('%Y') if 'WIB' in original_ts_str else original_ts_str |
|
|
| else: |
| published_year = original_ts_str |
| except Exception: |
| published_year = str(row.get('Timestamp', '')) |
|
|
| full_text = str(row.get('FullText', '')) if not pd.isna(row.get('FullText')) else '' |
| |
| supporting_articles_list.append( |
| SupportingArticle( |
| title=str(row.get('Title', '')) if not pd.isna(row.get('Title')) else '', |
| full_text_snippet=full_text[:300] + "..." if len(full_text) > 300 else full_text, |
| url=str(row.get('Url', '')) if not pd.isna(row.get('Url')) else '', |
| source=str(row.get('source', '')) if not pd.isna(row.get('source')) else '', |
| published_year=published_year, |
| hoax_status="HOAX" if row.get('hoax') == 1 else "VALID", |
| similarity_score=float(row['score']) |
| ) |
| ) |
| return supporting_articles_list |
|
|
| |
|
|
| @app.post("/check-claim", response_model=CheckClaimResponse, summary="Periksa Klaim/Pernyataan") |
| async def check_claim(request: CheckClaimRequest): |
| text = request.claim_text |
| if not text or not text.strip(): |
| raise HTTPException(status_code=400, detail="Claim text cannot be empty.") |
|
|
| normalized_text = normalize_text_for_counting(text) |
| print(normalized_text) |
| if normalized_text: |
| def _sync_upsert_claim(norm_text: str, orig_text: str): |
| try: |
| response = supabase.rpc( |
| 'upsert_popular_claim', |
| {'p_normalized_text': norm_text, 'p_original_text': orig_text} |
| ).execute() |
|
|
| if hasattr(response, 'error') and response.error: |
| print(f"Supabase RPC returned an error: {response.error}") |
| else: |
| print(f"Supabase RPC call successful for: {norm_text[:50]}") |
|
|
| except Exception as e: |
| print(f"Exception during Supabase RPC call: {e}") |
|
|
| try: |
| await run_in_threadpool(_sync_upsert_claim, normalized_text, text) |
| except Exception as e: |
| print(f"Error running Supabase task in threadpool: {e}") |
|
|
| |
| hoax_prediction = await run_in_threadpool(pipeline_hoax, text) |
| retrieval_results_df = await run_in_threadpool(retrieval_pipeline, text) |
|
|
| supporting_articles_list = format_supporting_articles(retrieval_results_df) |
|
|
| return CheckClaimResponse( |
| claim_text=text, |
| hoax_probability=hoax_prediction["hoax_prob"], |
| valid_probability=hoax_prediction["valid_prob"], |
| accuracy_percentage=hoax_prediction["valid_prob"] * 100, |
| supporting_articles=supporting_articles_list |
| ) |
|
|
| @app.get("/popular-checks", response_model=List[PopularCheckItem], summary="Pengecekan Terpopuler") |
| async def get_popular_checks(limit: int = 5): |
| popular_checks_results = [] |
| try: |
| response = await run_in_threadpool( |
| lambda: supabase.table("popular_claims") |
| .select("original_claim_text, search_count") |
| .order("search_count", desc=True) |
| .limit(limit) |
| .execute() |
| ) |
| |
| if response.data: |
| for item_data in response.data: |
| original_text = item_data["original_claim_text"] |
| |
| example_title = original_text[:70] + "..." if len(original_text) > 70 else original_text |
|
|
| hoax_prediction = await run_in_threadpool(pipeline_hoax, original_text) |
| retrieval_results_df = await run_in_threadpool(retrieval_pipeline, original_text) |
| |
| supporting_articles_list = format_supporting_articles(retrieval_results_df) |
| |
| popular_checks_results.append( |
| PopularCheckItem( |
| example_claim_title=example_title, |
| claim_text=original_text, |
| hoax_probability=hoax_prediction["hoax_prob"], |
| valid_probability=hoax_prediction["valid_prob"], |
| accuracy_percentage=hoax_prediction["valid_prob"] * 100, |
| supporting_articles=supporting_articles_list |
| ) |
| ) |
| else: |
| print("No popular claims found in Supabase or error in fetching.") |
| if hasattr(response, 'error') and response.error: |
| print(f"Supabase fetch error: {response.error}") |
|
|
|
|
| except Exception as e: |
| print(f"Error fetching popular checks from Supabase: {e}") |
|
|
| return popular_checks_results |