# priority_indexer.py import pandas as pd import json import os import re from datetime import datetime def load_agency_keywords(filepath=None): """ Load keywords for agency detection or use default keywords if file not found """ # Define default agency keywords if file not provided or not found default_keywords = { # Government-related keywords "government": [ "kerajaan", "menteri", "perdana menteri", "kementerian", "jabatan", "agensi", "dasar", "parlimen", "dewan rakyat", "dewan negara", "dun", "pejabat", "keselamatan negara", "atm", "polis", "kdn", "hasil", "sop", "ancaman", "pentadbiran", "kabinet", "politik", "ahli parlimen", "wakil rakyat", "adun", "pemimpin", "ketua menteri", "menteri besar", "exco", "majlis", "pihak berkuasa", "pbt", "majlis perbandaran", "majlis bandaraya", "dewan bandaraya" ], # Economic keywords "economic": [ "ekonomi", "kewangan", "bank", "cukai", "subsidi", "harga", "kos", "perbelanjaan", "pendapatan", "gaji", "dividen", "saham", "pasaran", "inflasi", "deflasi", "krisis", "kemelesetan", "pertumbuhan", "gdp", "kdnk", "pelaburan", "pelabur", "perniagaan", "syarikat", "industri", "sektor", "perdagangan", "import", "eksport", "mata wang", "ringgit", "dolar", "hutang", "pinjaman", "faedah", "untung", "rugi", "bayaran", "fi", "yuran", "perbelanjaan", "pendapatan", "bonus", "elaun", "insentif", "bantuan", "sumbangan", "derma", "zakat", "duti", "levi", "caj", "jualan", "belian", "pembelian", "perolehan", "tender", "kontrak", "projek", "pembangunan", "infrastruktur", "pembinaan", "hartanah", "rumah", "kediaman", "komersial", "tanah", "saiz", "keluasan", "murah", "mahal", "berpatutan", "mampu", "tidak mampu", "bekalan", "stok", "inventori", "simpanan", "rizab", "aset", "liabiliti", "kredit", "debit", "ansuran", "keuntungan", "kerugian", "defisit", "surplus", "lebihan", "kekurangan", "kenaikan", "penurunan", "peningkatan", "pengurangan", "pemulihan", "pembaikan" ], # Law-related keywords "law": [ "undang-undang", "perundangan", "akta", "enakmen", "ordinan", "peraturan", "perlembagaan", "mahkamah", "hakim", "peguam", "pendakwa", "pendakwaan", "pertuduhan", "dakwaan", "saman", "waran", "tangkap", "tahan", "reman", "jamin", "ikat jamin", "denda", "hukuman", "penjara", "polis", "balai", "laporan", "aduan", "siasatan", "siasat", "jenayah", "sivil", "kes", "fail", "bicara", "perbicaraan", "prosiding", "rayuan", "petisyen", "pindaan", "bon", "jaminan", "saksi", "keterangan", "bukti", "forensik", "peguambela", "peguamcara", "pendakwa raya", "majistret", "ketua hakim", "ketua hakim negara", "hakim besar", "mahkamah tinggi", "mahkamah rayuan", "mahkamah persekutuan", "mahkamah rendah", "mahkamah majistret", "mahkamah sesyen", "mahkamah syariah", "pdrm", "ibu pejabat polis", "ketua polis", "pegawai polis", "anggota polis", "konstabel", "koperal", "sarjan", "inspektor", "superintendan", "komisioner", "sprm", "suruhanjaya pencegahan rasuah", "rasuah", "korupsi", "salah guna kuasa", "penyelewengan", "pecah amanah", "pengubahan wang haram" ], # Danger-related keywords "danger": [ "bahaya", "merbahaya", "risiko", "ancaman", "bencana", "malapetaka", "tragedi", "musibah", "kemalangan", "nahas", "kecelakaan", "kecederaan", "kematian", "korban", "mangsa", "kemusnahan", "kerosakan", "kerugian", "kehilangan", "kecurian", "rompakan", "samun", "ragut", "pecah", "pecah rumah", "pecah masuk", "curi", "culik", "bunuh", "bunuh diri", "mati", "cedera", "parah", "kritikal", "koma", "luka", "patah", "retak", "lebam", "bengkak", "darah", "pendarahan", "kecemasan", "ambulans", "hospital", "klinik", "doktor", "ubat", "dadah", "narkotik", "ganja", "heroin", "kokain", "syabu", "pil kuda", "ekstasi", "ketamin", "morfin", "ketagihan", "penagih", "pengedar", "sindiket", "kartel", "mafia", "gangster", "kongsi gelap", "geng", "kumpulan jenayah", "penjenayah", "penjahat", "pesalah", "banduan", "tahanan", "suspek", "tertuduh", "terdakwa", "senjata", "pistol", "revolver", "senapang", "rifle", "shotgun", "bom", "granat", "peluru", "kelongsong", "senjata api", "senjata tajam", "pisau", "parang", "kapak", "keris", "pedang", "racun", "toksin", "kimia", "biologi", "nuklear", "radiasi", "sinaran", "letupan", "ledakan", "kebakaran", "api", "nyalaan", "bara", "asap", "hangus", "terbakar", "banjir", "bah", "limpahan", "hujan", "ribut", "taufan", "siklon", "hurikan", "tornado", "puting beliung", "angin kencang", "kilat", "petir", "guruh", "guntur", "halilintar", "tanah runtuh", "gelinciran tanah", "runtuhan", "runtuh", "jatuh", "roboh", "rebah", "tumbang", "gempa", "gempa bumi", "tsunami", "ombak besar", "gelombang tinggi", "kemarau", "kekeringan", "perang", "pertempuran", "pergaduhan", "perkelahian", "rusuhan", "kekacauan", "huru-hara", "keganasan", "kekerasan", "keselamatan", "keselamatan negara", "keselamatan awam", "kanser", "barah", "tumor", "penyakit", "wabak", "epidemik", "pandemik", "jangkitan", "virus", "bakteria", "nyawa", "terancam", "maut" ] } # Try to load from file if provided if filepath and os.path.exists(filepath): try: df = pd.read_csv(filepath) if 'keyword' in df.columns and 'category' in df.columns: # Group keywords by category keywords = {} for category in df['category'].unique(): keywords[category] = df[df['category'] == category]['keyword'].tolist() return keywords else: print(f"[⚠️] Warning: Required columns not found in {filepath}. Using default keywords.") return default_keywords except Exception as e: print(f"[⚠️] Error loading agency keywords from {filepath}: {e}") return default_keywords else: if filepath: print(f"[ℹ️] Agency keywords file not found. Using default keywords.") return default_keywords def analyze_text_content(df, keywords_dict): """ Analyze text content in the dataframe to find keywords Returns a dictionary of found keywords by category """ found_keywords = {category: [] for category in keywords_dict.keys()} # Combine all text columns text_columns = ['post_text', 'comment_text', 'title', 'snippet', 'combined_text'] all_text = "" for col in text_columns: if col in df.columns: all_text += " " + " ".join(df[col].fillna("").astype(str)) all_text = all_text.lower() # Search for keywords in the combined text for category, keywords in keywords_dict.items(): for keyword in keywords: if keyword.lower() in all_text: found_keywords[category].append(keyword) # Remove duplicates and limit to top 5 per category for category in found_keywords: found_keywords[category] = list(set(found_keywords[category]))[:5] return found_keywords def calculate_priority_score(flags): """Calculate priority score based on flags""" # Base weights for different flags weights = { "fact_check_value": 1.0, "cause_confusion": 1.5, "cause_chaos": 1.8, "affects_government": 1.0, "economic_impact": 0.8, "law_related": 0.8, "public_interest": 1.2, "lives_in_danger": 1.5, "viral": 1.0, "urgent": 2.0 } # Calculate weighted score score = 0 for flag, value in flags.items(): if flag in weights and value == 1: score += weights[flag] # Normalize to 0-10 scale max_possible_score = sum(weights.values()) normalized_score = (score / max_possible_score) * 10 # Cap at 10 return min(normalized_score, 10.0) def get_priority_level(score): """Get priority level based on score""" if score >= 8.0: return "TINGGI" elif score >= 5.0: return "SEDERHANA" else: return "RENDAH" def run(sentiment_csv, agencies_csv=None, output_path=None, claim=None, claim_id=None, keywords=None): """ Run priority indexing on sentiment data Args: sentiment_csv (str): Path to sentiment CSV file agencies_csv (str, optional): Path to agencies CSV file output_path (str, optional): Path to output JSON file claim (str, optional): The claim text claim_id (str, optional): Unique identifier for the claim keywords (list, optional): List of keywords Returns: dict: Priority report data """ print(f"[🔍] Loading sentiment data from: {sentiment_csv}") try: df = pd.read_csv(sentiment_csv) except Exception as e: print(f"[❌] Error reading sentiment data: {e}") return None # Load agency keywords agency_keywords = load_agency_keywords(agencies_csv) # Initialize flags flags = { "fact_check_value": 0, "cause_confusion": 0, "cause_chaos": 0, "affects_government": 0, "economic_impact": 0, "law_related": 0, "public_interest": 0, "lives_in_danger": 0, "viral": 0, "urgent": 0 } # Calculate sentiment counts sentiment_counts = df['sentiment'].value_counts().to_dict() # Convert numeric sentiments to text sentiment_map = {0: "neutral", 1: "positive", 2: "negative"} text_counts = {} for k, v in sentiment_counts.items(): if k in sentiment_map: text_counts[sentiment_map[k]] = v else: text_counts[k] = v # Get total records total_records = len(df) # Calculate engagement metrics total_likes = df['likes'].sum() if 'likes' in df.columns else 0 total_shares = df['shares'].sum() if 'shares' in df.columns else 0 total_comments = df['comments'].sum() if 'comments' in df.columns else 0 total_views = df['views'].sum() if 'views' in df.columns else 0 total_engagement = total_likes + total_shares + total_comments + total_views # Check fact_check_value flag (based on engagement) # Rule: High engagement indicates need for fact checking if total_engagement > 10000: flags["fact_check_value"] = 1 print(f"[📊] Flag: fact_check_value triggered (Total engagement: {total_engagement})") # Check sentiment-based flags pos = text_counts.get("positive", 0) neg = text_counts.get("negative", 0) neu = text_counts.get("neutral", 0) total_sentiment = pos + neg + neu if total_sentiment > 0: pos_ratio = pos / total_sentiment neg_ratio = neg / total_sentiment neu_ratio = neu / total_sentiment # Rule: cause_confusion if positive = negative OR neutral is high if (abs(pos_ratio - neg_ratio) < 0.2 and pos_ratio > 0.2 and neg_ratio > 0.2) or (neu_ratio > 0.7): flags["cause_confusion"] = 1 print(f"[📊] Flag: cause_confusion triggered (Pos: {pos_ratio:.2f}, Neg: {neg_ratio:.2f}, Neu: {neu_ratio:.2f})") # Rule: cause_chaos if negative sentiment is high if neg_ratio > 0.4: flags["cause_chaos"] = 1 print(f"[📊] Flag: cause_chaos triggered (Negative: {neg_ratio:.2f})") # Analyze text content for keywords found_keywords = analyze_text_content(df, agency_keywords) # Check government-related flag # Rule: Contains government-related keywords if found_keywords["government"]: flags["affects_government"] = 1 print(f"[📊] Flag: affects_government triggered (Gov terms: {', '.join(found_keywords['government'])})") # Check economic impact flag # Rule: Contains economic-related keywords if found_keywords["economic"]: flags["economic_impact"] = 1 print(f"[📊] Flag: economic_impact triggered (Economic terms: {', '.join(found_keywords['economic'])})") # Check law-related flag # Rule: Contains law-related keywords if found_keywords["law"]: flags["law_related"] = 1 print(f"[📊] Flag: law_related triggered (Law terms: {', '.join(found_keywords['law'])})") # Check public interest flag # Rule: High comments and shares indicate public interest if (total_comments + total_shares) > 1000: flags["public_interest"] = 1 print(f"[📊] Flag: public_interest triggered (Comments + Shares: {total_comments + total_shares})") # Check danger-related flag # Rule: Contains danger-related keywords if found_keywords["danger"]: flags["lives_in_danger"] = 1 print(f"[📊] Flag: lives_in_danger triggered (Danger terms: {', '.join(found_keywords['danger'])})") # Check viral flag # Rule: High shares indicate virality if total_shares > 1000: flags["viral"] = 1 print(f"[📊] Flag: viral triggered (Total shares: {total_shares})") # Check urgent flag # Rule: If 5 or more flags are triggered, it's urgent flags_triggered = sum(flags.values()) if flags_triggered >= 5: flags["urgent"] = 1 print(f"[📊] Flag: urgent triggered ({flags_triggered} flags triggered)") # Calculate priority score priority_score = calculate_priority_score(flags) priority_level = get_priority_level(priority_score) # Prepare report data report_data = { "priority_flags": flags, "priority_score": priority_score, "priority_level": priority_level, "sentiment_counts": text_counts, "total_records": total_records, "engagement": { "likes": int(total_likes), "shares": int(total_shares), "comments": int(total_comments), "views": int(total_views), "total": int(total_engagement) }, "found_keywords": found_keywords, "claim": claim, "keywords": keywords, "timestamp": datetime.now().isoformat() } # Ensure output directory exists if not output_path: output_path = os.path.join("reports", os.path.basename(sentiment_csv).replace("_sentiment.csv", "_priority.json")) os.makedirs(os.path.dirname(output_path), exist_ok=True) with open(output_path, 'w') as f: json.dump(report_data, f, indent=4) print(f"[📊] Priority index saved to {output_path}") print(f"[📊] Priority score: {priority_score:.2f}/10 ({priority_level})") return report_data