reCheck / main.py
Xinqi04
baru7
bc5fd97
import joblib
import pickle
import pandas as pd
import os
# Atasi masalah permission dengan cache lokal
import os # ⬅️ PASTIKAN baris ini ada
# Set direktori cache ke tempat yang diizinkan Hugging Face Spaces
os.environ["HF_HOME"] = "/tmp/hf_cache"
os.environ["TRANSFORMERS_CACHE"] = "/tmp/hf_cache/transformers"
os.environ["HF_DATASETS_CACHE"] = "/tmp/hf_cache/datasets"
from sentence_transformers import SentenceTransformer
from fastapi.middleware.cors import CORSMiddleware
import re
import torch
import numpy as np
from fastapi import FastAPI, HTTPException
from fastapi.concurrency import run_in_threadpool
from pydantic import BaseModel
from typing import List, Dict, Union, Optional
import faiss
from dotenv import load_dotenv
from supabase import create_client, Client
load_dotenv()
SUPABASE_URL = os.environ.get("SUPABASE_URL")
SUPABASE_KEY = os.environ.get("SUPABASE_KEY")
if not SUPABASE_URL or not SUPABASE_KEY:
raise RuntimeError("SUPABASE_URL and SUPABASE_KEY must be set in environment variables or .env file.")
try:
supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
print("Successfully connected to Supabase.")
except Exception as e:
raise RuntimeError(f"Could not connect to Supabase: {e}")
try:
model_sbert = SentenceTransformer('naufalihsan/indonesian-sbert-large')
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model_sbert.to(device)
model_lr = joblib.load('models/lr_98_24.pkl')
loaded_index_faiss = faiss.read_index('embeddings/embeddings_model.faissindex')
print(f"Indeks FAISS dimuat. Jumlah vektor: {loaded_index_faiss.ntotal}")
with open('embeddings/metadata_model.pkl', "rb") as f:
loaded_metadata_list = pickle.load(f)
print(f"Metadata dimuat. Jumlah item metadata: {len(loaded_metadata_list)}")
dataset = pd.read_csv('Datasets/final_dataset.csv')
if 'hoax' in dataset.columns:
dataset['hoax'] = dataset['hoax'].astype(int)
except FileNotFoundError as e:
raise RuntimeError(f"Error loading model or data files: {e}")
except Exception as e:
raise RuntimeError(f"An unexpected error occurred during model/data loading: {e}")
def check_database_schema():
"""
Memanggil fungsi RPC di Supabase untuk mendapatkan detail tabel dan kolom.
"""
print("Mencoba mengambil skema database dari Supabase...")
try:
# Panggil fungsi 'get_schema_details' yang harus sudah Anda buat di Supabase
response = supabase.rpc('get_schema_details').execute()
if response.data:
print("Skema Database Berhasil Diambil:")
current_table = None
for row in response.data:
table_name = row['table_name']
column_name = row['column_name']
data_type = row['data_type']
# Tampilkan nama tabel sekali saja
if table_name != current_table:
print(f"\n[Tabel: {table_name}]")
current_table = table_name
print(f" - Kolom: {column_name} (Tipe: {data_type})")
else:
print("Tidak ada data skema yang ditemukan atau terjadi error.")
if hasattr(response, 'error') and response.error:
print(f"Detail Error: {response.error}")
except Exception as e:
print(f"Terjadi kesalahan saat mengambil skema: {e}")
check_database_schema()
def normalize_text_for_counting(text: str) -> str:
"""Normalizes text for consistent counting."""
if not isinstance(text, str):
return ""
text_val = text.lower()
text_val = re.sub(r'[^a-z0-9\s]', '', text_val)
text_val = re.sub(r'\s+', ' ', text_val).strip()
return text_val
def embeddings(text: str) -> np.ndarray:
text_normalized = text.lower()
text_normalized = re.sub(r'[^A-Za-z0-9\s\[\]\(\)\{\}\<\>\.,!?;:\'\"-]', '', text_normalized)
embedding_tensor = model_sbert.encode(text_normalized, show_progress_bar=False, device=device, convert_to_tensor=True)
return embedding_tensor.cpu().numpy()
def pipeline_hoax(text: str) -> Dict[str, float]:
embedding_val = embeddings(text)
pred_proba = model_lr.predict_proba(embedding_val.reshape(1, -1))[0]
return {
"hoax_prob": float(pred_proba[1]),
"valid_prob": float(pred_proba[0])
}
def retrieval(embeddings_val: np.ndarray) -> tuple[np.ndarray, np.ndarray]: # Tipe hint diperjelas
if embeddings_val.dtype != np.float32:
embeddings_val = embeddings_val.astype(np.float32)
embeddings_2d = embeddings_val.reshape(1, -1) if embeddings_val.ndim == 1 else embeddings_val
distances_2d, indices_2d = loaded_index_faiss.search(embeddings_2d, 5)
return indices_2d[0], distances_2d[0]
def retrieval_pipeline(text: str) -> pd.DataFrame:
embedding_val = embeddings(text)
idx, score = retrieval(embedding_val)
result = dataset.iloc[idx].copy()
result['score'] = score
return result
app = FastAPI(
title="TruthCheck API",
description="API for Hoax Identification and Article Similarity Retrieval for Indonesian News."
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class CheckClaimRequest(BaseModel):
claim_text: str
class SupportingArticle(BaseModel):
title: str
full_text_snippet: str
url: str
source: str
published_year: Optional[str] = None
hoax_status: str
similarity_score: float
class CheckClaimResponse(BaseModel):
claim_text: str
hoax_probability: float
valid_probability: float
accuracy_percentage: float
supporting_articles: List[SupportingArticle]
class PopularCheckItem(BaseModel):
example_claim_title: str
claim_text: str
hoax_probability: float
valid_probability: float
accuracy_percentage: float
supporting_articles: List[SupportingArticle]
def format_supporting_articles(retrieval_results_df: pd.DataFrame) -> List[SupportingArticle]:
supporting_articles_list = []
for _, row in retrieval_results_df.iterrows():
published_year = str(row.get('Timestamp', ''))
try:
if pd.isna(row.get('Timestamp')):
published_year = None
else:
original_ts_str = str(row['Timestamp'])
if re.match(r'^\d{4}-\d{2}-\d{2}', original_ts_str):
published_year = pd.to_datetime(original_ts_str).strftime('%Y')
elif re.match(r'^[A-Za-z]+ \d{1,2}, \d{4}', original_ts_str):
published_year = pd.to_datetime(original_ts_str).strftime('%Y')
elif re.match(r'^[A-Za-z]+, \d{1,2} [A-Za-z]+ \d{4}', original_ts_str):
try:
published_year = pd.to_datetime(original_ts_str, format='%A, %d %B %Y %H:%M WIB').strftime('%Y')
except ValueError:
published_year = pd.to_datetime(original_ts_str.split('WIB')[0].strip()).strftime('%Y') if 'WIB' in original_ts_str else original_ts_str
else:
published_year = original_ts_str
except Exception:
published_year = str(row.get('Timestamp', ''))
full_text = str(row.get('FullText', '')) if not pd.isna(row.get('FullText')) else ''
supporting_articles_list.append(
SupportingArticle(
title=str(row.get('Title', '')) if not pd.isna(row.get('Title')) else '',
full_text_snippet=full_text[:300] + "..." if len(full_text) > 300 else full_text,
url=str(row.get('Url', '')) if not pd.isna(row.get('Url')) else '',
source=str(row.get('source', '')) if not pd.isna(row.get('source')) else '',
published_year=published_year,
hoax_status="HOAX" if row.get('hoax') == 1 else "VALID",
similarity_score=float(row['score'])
)
)
return supporting_articles_list
# --- API Endpoints ---
@app.post("/check-claim", response_model=CheckClaimResponse, summary="Periksa Klaim/Pernyataan")
async def check_claim(request: CheckClaimRequest):
text = request.claim_text
if not text or not text.strip():
raise HTTPException(status_code=400, detail="Claim text cannot be empty.")
normalized_text = normalize_text_for_counting(text)
print(normalized_text)
if normalized_text:
def _sync_upsert_claim(norm_text: str, orig_text: str):
try:
response = supabase.rpc(
'upsert_popular_claim',
{'p_normalized_text': norm_text, 'p_original_text': orig_text}
).execute()
if hasattr(response, 'error') and response.error:
print(f"Supabase RPC returned an error: {response.error}")
else:
print(f"Supabase RPC call successful for: {norm_text[:50]}")
except Exception as e:
print(f"Exception during Supabase RPC call: {e}")
try:
await run_in_threadpool(_sync_upsert_claim, normalized_text, text)
except Exception as e:
print(f"Error running Supabase task in threadpool: {e}")
# Jalankan pipeline seperti biasa
hoax_prediction = await run_in_threadpool(pipeline_hoax, text)
retrieval_results_df = await run_in_threadpool(retrieval_pipeline, text)
supporting_articles_list = format_supporting_articles(retrieval_results_df)
return CheckClaimResponse(
claim_text=text,
hoax_probability=hoax_prediction["hoax_prob"],
valid_probability=hoax_prediction["valid_prob"],
accuracy_percentage=hoax_prediction["valid_prob"] * 100,
supporting_articles=supporting_articles_list
)
@app.get("/popular-checks", response_model=List[PopularCheckItem], summary="Pengecekan Terpopuler")
async def get_popular_checks(limit: int = 5):
popular_checks_results = []
try:
response = await run_in_threadpool(
lambda: supabase.table("popular_claims")
.select("original_claim_text, search_count")
.order("search_count", desc=True)
.limit(limit)
.execute()
)
if response.data:
for item_data in response.data:
original_text = item_data["original_claim_text"]
example_title = original_text[:70] + "..." if len(original_text) > 70 else original_text
hoax_prediction = await run_in_threadpool(pipeline_hoax, original_text)
retrieval_results_df = await run_in_threadpool(retrieval_pipeline, original_text)
supporting_articles_list = format_supporting_articles(retrieval_results_df)
popular_checks_results.append(
PopularCheckItem(
example_claim_title=example_title,
claim_text=original_text,
hoax_probability=hoax_prediction["hoax_prob"],
valid_probability=hoax_prediction["valid_prob"],
accuracy_percentage=hoax_prediction["valid_prob"] * 100,
supporting_articles=supporting_articles_list
)
)
else:
print("No popular claims found in Supabase or error in fetching.")
if hasattr(response, 'error') and response.error:
print(f"Supabase fetch error: {response.error}")
except Exception as e:
print(f"Error fetching popular checks from Supabase: {e}")
return popular_checks_results