import gradio as gr import joblib import os import logging import sqlite3 import hashlib import json import pandas as pd from datetime import datetime from collections import Counter import re # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class SMSScamDetector: """Enhanced SMS Scam Detection System with Analytics and Reporting""" def __init__(self): self.model = None self.vectorizer = None self.db_path = "sms_analytics.db" self.init_database() self.load_models() self.scam_patterns = self.load_scam_patterns() def init_database(self): """Initialize SQLite database for analytics""" try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS sms_logs ( id INTEGER PRIMARY KEY AUTOINCREMENT, message_hash TEXT UNIQUE, prediction TEXT, confidence REAL, timestamp DATETIME, message_length INTEGER, suspicious_keywords INTEGER ) ''') conn.commit() conn.close() logger.info("Database initialized successfully") except Exception as e: logger.error(f"Database initialization error: {str(e)}") def load_scam_patterns(self): """Load common scam patterns and keywords""" return { 'prize_keywords': ['ushindi', 'zawadi', 'hongera', 'umeshinda', 'pesa', 'dola'], 'urgency_keywords': ['haraka', 'sasa hivi', 'urgent', 'muda mchache'], 'suspicious_urls': [r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+'], 'phone_patterns': [r'\*\d+#', r'\d{10,}'], 'money_patterns': [r'tsh?\s*[\d,]+', r'usd?\s*[\d,]+', r'[\d,]+\s*shilling'] } def analyze_message_patterns(self, text): """Analyze message for suspicious patterns""" text_lower = text.lower() suspicious_score = 0 detected_patterns = [] # Check for prize/money keywords for keyword in self.scam_patterns['prize_keywords']: if keyword in text_lower: suspicious_score += 2 detected_patterns.append(f"Prize keyword: {keyword}") # Check for urgency keywords for keyword in self.scam_patterns['urgency_keywords']: if keyword in text_lower: suspicious_score += 1 detected_patterns.append(f"Urgency keyword: {keyword}") # Check for URLs if re.search(self.scam_patterns['suspicious_urls'][0], text): suspicious_score += 3 detected_patterns.append("Contains suspicious URL") # Check for USSD codes if re.search(self.scam_patterns['phone_patterns'][0], text): suspicious_score += 2 detected_patterns.append("Contains USSD code") # Check for money mentions for pattern in self.scam_patterns['money_patterns']: if re.search(pattern, text_lower): suspicious_score += 1 detected_patterns.append("Contains money amount") break return suspicious_score, detected_patterns def log_prediction(self, text, prediction, confidence, suspicious_score): """Log prediction to database for analytics""" try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() message_hash = hashlib.md5(text.encode()).hexdigest() cursor.execute(''' INSERT OR REPLACE INTO sms_logs (message_hash, prediction, confidence, timestamp, message_length, suspicious_keywords) VALUES (?, ?, ?, ?, ?, ?) ''', ( message_hash, prediction, float(max(confidence)), datetime.now().isoformat(), len(text), suspicious_score )) conn.commit() conn.close() except Exception as e: logger.error(f"Logging error: {str(e)}") def get_analytics(self): """Get analytics data from database""" try: conn = sqlite3.connect(self.db_path) df = pd.read_sql_query("SELECT * FROM sms_logs ORDER BY timestamp DESC LIMIT 100", conn) conn.close() if df.empty: return "Hakuna data ya kutosha kwa takwimu" total_messages = len(df) scam_count = len(df[df['prediction'] == 'scam']) trust_count = len(df[df['prediction'] == 'trust']) avg_confidence = df['confidence'].mean() analytics = f""" ## 📊 Takwimu za Mfumo **Jumla ya Ujumbe**: {total_messages} **Scam**: {scam_count} ({scam_count/total_messages*100:.1f}%) **Trust**: {trust_count} ({trust_count/total_messages*100:.1f}%) **Wastani wa Uhakika**: {avg_confidence:.2f} ### Takwimu za Wiki Hii - Ujumbe mrefu zaidi: {df['message_length'].max()} herufi - Ujumbe mfupi zaidi: {df['message_length'].min()} herufi - Wastani wa urefu: {df['message_length'].mean():.0f} herufi """ return analytics except Exception as e: return f"Kosa la takwimu: {str(e)}" def export_report(self): """Export detailed report""" try: conn = sqlite3.connect(self.db_path) df = pd.read_sql_query(""" SELECT prediction, confidence, timestamp, message_length, suspicious_keywords FROM sms_logs ORDER BY timestamp DESC LIMIT 1000 """, conn) conn.close() if df.empty: return "Hakuna data ya kuexport" # Create summary report report = { 'total_analyzed': len(df), 'scam_percentage': (df['prediction'] == 'scam').mean() * 100, 'average_confidence': df['confidence'].mean(), 'date_range': { 'from': df['timestamp'].min(), 'to': df['timestamp'].max() }, 'message_stats': { 'avg_length': df['message_length'].mean(), 'max_length': df['message_length'].max(), 'min_length': df['message_length'].min() } } # Save to JSON report_file = f"sms_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" with open(report_file, 'w') as f: json.dump(report, f, indent=2, default=str) return f"Ripoti imehifadhiwa: {report_file}" except Exception as e: return f"Kosa la report: {str(e)}" def load_models(self): """Load machine learning models with error handling""" try: if os.path.exists("scam_classifier_model.joblib"): self.model = joblib.load("scam_classifier_model.joblib") logger.info("Model loaded successfully") else: logger.error("Model file not found") if os.path.exists("tfidf_vectorizer.joblib"): self.vectorizer = joblib.load("tfidf_vectorizer.joblib") logger.info("Vectorizer loaded successfully") else: logger.error("Vectorizer file not found") except Exception as e: logger.error(f"Error loading models: {str(e)}") self.model = None self.vectorizer = None def preprocess_text(self, text): """Clean and preprocess input text""" if not text or not isinstance(text, str): return "" # Basic cleaning text = text.strip() text = ' '.join(text.split()) # Remove extra whitespace return text def get_confidence_level(self, prediction_proba): """Determine confidence level based on prediction probability""" max_prob = max(prediction_proba) if max_prob >= 0.8: return "Imara sana (Very High)", "🔴" elif max_prob >= 0.65: return "Imara (High)", "🟠" elif max_prob >= 0.5: return "Wastani (Medium)", "🟡" else: return "Haba (Low)", "đŸŸĸ" def predict_sms(self, text): """Enhanced prediction function with detailed output and logging""" # Input validation if not text or len(text.strip()) == 0: return "❌ **Kosa**: Tafadhali ingiza ujumbe wa SMS" if len(text.strip()) < 5: return "âš ī¸ **Onyo**: Ujumbe mfupi sana. Ingiza ujumbe kamili." # Check if models are loaded if self.model is None or self.vectorizer is None: return "❌ **Kosa la Mfumo**: Mifumo ya AI haijapakiwa vizuri. Tafadhali rudia tena." try: # Preprocess text cleaned_text = self.preprocess_text(text) # Analyze patterns suspicious_score, detected_patterns = self.analyze_message_patterns(text) # Vectorize text text_vector = self.vectorizer.transform([cleaned_text]) # Make prediction prediction = self.model.predict(text_vector)[0] prediction_proba = self.model.predict_proba(text_vector)[0] # Get confidence level confidence, emoji = self.get_confidence_level(prediction_proba) # Log prediction self.log_prediction(text, prediction, prediction_proba, suspicious_score) # Format prediction if prediction.lower() == 'scam': result_text = "**SCAM** 🚨" result_color = "danger" advice = "**Onyo**: Ujumbe huu unaweza kuwa wa udanganyifu. Usijibu au kutoa taarifa za kibinafsi." else: result_text = "**TRUST** ✅" result_color = "success" advice = "Ujumbe huu unaonekana kuwa wa kawaida, lakini bado kuwa makini." # Add pattern analysis to output pattern_analysis = "" if detected_patterns: pattern_analysis = f"\n**Dalili Zilizogunduliwa**:\n" + "\n".join([f"â€ĸ {pattern}" for pattern in detected_patterns]) pattern_analysis += f"\n**Alama za Utata**: {suspicious_score}/10" # Create detailed output output = f""" ## Matokeo ya Uchunguzi {emoji} **Ujumbe**: "{text[:100]}{'...' if len(text) > 100 else ''}" **Utabiri**: {result_text} **Kiwango cha Uhakika**: {confidence} **Maoni**: {advice} {pattern_analysis} --- *Tarehe*: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} *Urefu wa ujumbe*: {len(text)} herufi """ return output except Exception as e: logger.error(f"Prediction error: {str(e)}") return f"❌ **Kosa la Kihesabu**: {str(e)}" # Initialize detector detector = SMSScamDetector() # Sample SMS messages for testing sample_messages = [ "Hongera! Umeshinda Tsh 1,000,000. Piga *123# ili kupokea zawadi yako sasa hivi!", "Habari za leo? Tutaonana kesho uwandani kama tulivyopanga.", "URGENT: Your account will be closed. Click link to verify: http://fake-bank.com", "Mama, nimepoteza simu yangu. Hii ni nambari yangu mpya. Nitakuja nyumbani jioni." ] def load_sample(sample_text): """Load sample message into the textbox""" return sample_text # Create enhanced Gradio interface with gr.Blocks( theme=gr.themes.Soft(), title="Bongo SMS Scam Detector", css=""" .gradio-container { max-width: 800px !important; margin: auto !important; } .warning { background: linear-gradient(45deg, #ff6b6b, #feca57); padding: 15px; border-radius: 10px; margin: 10px 0; } """ ) as demo: gr.Markdown(""" # đŸ›Ąī¸ Bongo SMS Scam Detector **Kiunga cha Usalama wa SMS** - Chunguza ujumbe wa SMS ili kujua kama ni wa udanganyifu ⚡ Ingiza ujumbe wa SMS hapo chini na upate matokeo ya haraka """) with gr.Row(): with gr.Column(scale=2): # Main input sms_input = gr.Textbox( lines=6, placeholder="Nakili na ubandike ujumbe wa SMS hapa...\n\nMfano: 'Hongera! Umeshinda Tsh 500,000. Piga *150# ili kupokea pesa zako!'", label="📱 Ujumbe wa SMS", info="Ingiza ujumbe wowote wa SMS unaodai kushinda zawadi, pesa, au kutaka taarifa za kibinafsi" ) with gr.Row(): predict_btn = gr.Button("🔍 Chunguza SMS", variant="primary", size="lg") clear_btn = gr.Button("đŸ—‘ī¸ Futa", variant="secondary") with gr.Column(scale=1): gr.Markdown("### 📋 Mifano ya SMS") # Sample buttons for i, sample in enumerate(sample_messages, 1): sample_btn = gr.Button( f"Mfano {i}", variant="outline", size="sm" ) sample_btn.click( fn=lambda x=sample: x, outputs=sms_input ) # Output section output_result = gr.Markdown( label="📊 Matokeo", value="Matokeo yataonyeshwa hapa baada ya kuchunguza ujumbe..." ) with gr.Row(): with gr.Column(scale=1): # Analytics Section gr.Markdown("### 📊 Takwimu za Mfumo") analytics_btn = gr.Button("📈 Ona Takwimu", variant="outline") analytics_output = gr.Markdown("Bonyeza hapo juu kuona takwimu...") export_btn = gr.Button("📄 Export Ripoti", variant="outline") export_output = gr.Markdown("") # Information section with gr.Accordion("â„šī¸ Maelezo ya Ziada", open=False): gr.Markdown(""" ### Jinsi ya Kutumia: 1. **Nakili ujumbe** wa SMS kutoka kwa simu yako 2. **Ubandike hapa** kwenye kisanduku cha maandishi 3. **Bonyeza kitufe** cha "Chunguza SMS" 4. **Soma matokeo** na ufuate mapendekezo ### Dalili za SMS za Udanganyifu: - 🎁 Inadai umeshinda zawadi kubwa - 💰 Inahitaji malipo ya haraka - 🔗 Ina viungo vya kugusia (links) - ⚡ Inadai ni ya dharura - 📞 Inaomba taarifa za kibinafsi - 📱 Ina USSD codes (*123#) ### Vipimo Vipya: - **Pattern Analysis**: Mfumo unachunguza maneno na michoro ya kawaida - **Database Logging**: Kila ujumbe unahifadhiwa kwa takwimu - **Confidence Scoring**: Kiwango cha uhakika kinajumuishwa - **Analytics Dashboard**: Takwimu za jumla za matumizi ### Onyo Muhimu: Mfumo huu ni wa kusaidia tu. Daima tumia busara zako na usijibu SMS zisizoeleweka. """) # Advanced Features Section with gr.Accordion("🔧 Vipengele vya Kina", open=False): gr.Markdown(""" ### Uchanganuzi wa Kina: - **Keyword Detection**: Inachunguza maneno yenye hatari - **URL Analysis**: Inaangalia viungo vya web - **USSD Detection**: Inagundua nambari za *123# - **Money Pattern**: Inatambua maelezo ya pesa - **Urgency Detection**: Inagundua maneno ya dharura ### Data Analytics: - Takwimu za ujumbe wote uliochunguzwa - Asilimia ya scam vs trust - Wastani wa uhakika wa mfumo - Export ya ripoti za kina """) # Event handlers predict_btn.click( fn=detector.predict_sms, inputs=sms_input, outputs=output_result ) analytics_btn.click( fn=detector.get_analytics, outputs=analytics_output ) export_btn.click( fn=detector.export_report, outputs=export_output ) clear_btn.click( fn=lambda: ("", "Matokeo yataonyeshwa hapa baada ya kuchunguza ujumbe..."), outputs=[sms_input, output_result] ) sms_input.submit( fn=detector.predict_sms, inputs=sms_input, outputs=output_result ) # Launch configuration if __name__ == "__main__": demo.launch( share=False, server_name="0.0.0.0", server_port=7860, show_error=True, favicon_path=None, inbrowser=True )