Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import joblib | |
| import os | |
| import logging | |
| import sqlite3 | |
| import hashlib | |
| import json | |
| import pandas as pd | |
| from datetime import datetime | |
| from collections import Counter | |
| import re | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class SMSScamDetector: | |
| """Enhanced SMS Scam Detection System with Analytics and Reporting""" | |
| def __init__(self): | |
| self.model = None | |
| self.vectorizer = None | |
| self.db_path = "sms_analytics.db" | |
| self.init_database() | |
| self.load_models() | |
| self.scam_patterns = self.load_scam_patterns() | |
| def init_database(self): | |
| """Initialize SQLite database for analytics""" | |
| try: | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS sms_logs ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| message_hash TEXT UNIQUE, | |
| prediction TEXT, | |
| confidence REAL, | |
| timestamp DATETIME, | |
| message_length INTEGER, | |
| suspicious_keywords INTEGER | |
| ) | |
| ''') | |
| conn.commit() | |
| conn.close() | |
| logger.info("Database initialized successfully") | |
| except Exception as e: | |
| logger.error(f"Database initialization error: {str(e)}") | |
| def load_scam_patterns(self): | |
| """Load common scam patterns and keywords""" | |
| return { | |
| 'prize_keywords': ['ushindi', 'zawadi', 'hongera', 'umeshinda', 'pesa', 'dola'], | |
| 'urgency_keywords': ['haraka', 'sasa hivi', 'urgent', 'muda mchache'], | |
| 'suspicious_urls': [r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+'], | |
| 'phone_patterns': [r'\*\d+#', r'\d{10,}'], | |
| 'money_patterns': [r'tsh?\s*[\d,]+', r'usd?\s*[\d,]+', r'[\d,]+\s*shilling'] | |
| } | |
| def analyze_message_patterns(self, text): | |
| """Analyze message for suspicious patterns""" | |
| text_lower = text.lower() | |
| suspicious_score = 0 | |
| detected_patterns = [] | |
| # Check for prize/money keywords | |
| for keyword in self.scam_patterns['prize_keywords']: | |
| if keyword in text_lower: | |
| suspicious_score += 2 | |
| detected_patterns.append(f"Prize keyword: {keyword}") | |
| # Check for urgency keywords | |
| for keyword in self.scam_patterns['urgency_keywords']: | |
| if keyword in text_lower: | |
| suspicious_score += 1 | |
| detected_patterns.append(f"Urgency keyword: {keyword}") | |
| # Check for URLs | |
| if re.search(self.scam_patterns['suspicious_urls'][0], text): | |
| suspicious_score += 3 | |
| detected_patterns.append("Contains suspicious URL") | |
| # Check for USSD codes | |
| if re.search(self.scam_patterns['phone_patterns'][0], text): | |
| suspicious_score += 2 | |
| detected_patterns.append("Contains USSD code") | |
| # Check for money mentions | |
| for pattern in self.scam_patterns['money_patterns']: | |
| if re.search(pattern, text_lower): | |
| suspicious_score += 1 | |
| detected_patterns.append("Contains money amount") | |
| break | |
| return suspicious_score, detected_patterns | |
| def log_prediction(self, text, prediction, confidence, suspicious_score): | |
| """Log prediction to database for analytics""" | |
| try: | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| message_hash = hashlib.md5(text.encode()).hexdigest() | |
| cursor.execute(''' | |
| INSERT OR REPLACE INTO sms_logs | |
| (message_hash, prediction, confidence, timestamp, message_length, suspicious_keywords) | |
| VALUES (?, ?, ?, ?, ?, ?) | |
| ''', ( | |
| message_hash, | |
| prediction, | |
| float(max(confidence)), | |
| datetime.now().isoformat(), | |
| len(text), | |
| suspicious_score | |
| )) | |
| conn.commit() | |
| conn.close() | |
| except Exception as e: | |
| logger.error(f"Logging error: {str(e)}") | |
| def get_analytics(self): | |
| """Get analytics data from database""" | |
| try: | |
| conn = sqlite3.connect(self.db_path) | |
| df = pd.read_sql_query("SELECT * FROM sms_logs ORDER BY timestamp DESC LIMIT 100", conn) | |
| conn.close() | |
| if df.empty: | |
| return "Hakuna data ya kutosha kwa takwimu" | |
| total_messages = len(df) | |
| scam_count = len(df[df['prediction'] == 'scam']) | |
| trust_count = len(df[df['prediction'] == 'trust']) | |
| avg_confidence = df['confidence'].mean() | |
| analytics = f""" | |
| ## π Takwimu za Mfumo | |
| **Jumla ya Ujumbe**: {total_messages} | |
| **Scam**: {scam_count} ({scam_count/total_messages*100:.1f}%) | |
| **Trust**: {trust_count} ({trust_count/total_messages*100:.1f}%) | |
| **Wastani wa Uhakika**: {avg_confidence:.2f} | |
| ### Takwimu za Wiki Hii | |
| - Ujumbe mrefu zaidi: {df['message_length'].max()} herufi | |
| - Ujumbe mfupi zaidi: {df['message_length'].min()} herufi | |
| - Wastani wa urefu: {df['message_length'].mean():.0f} herufi | |
| """ | |
| return analytics | |
| except Exception as e: | |
| return f"Kosa la takwimu: {str(e)}" | |
| def export_report(self): | |
| """Export detailed report""" | |
| try: | |
| conn = sqlite3.connect(self.db_path) | |
| df = pd.read_sql_query(""" | |
| SELECT prediction, confidence, timestamp, message_length, suspicious_keywords | |
| FROM sms_logs | |
| ORDER BY timestamp DESC LIMIT 1000 | |
| """, conn) | |
| conn.close() | |
| if df.empty: | |
| return "Hakuna data ya kuexport" | |
| # Create summary report | |
| report = { | |
| 'total_analyzed': len(df), | |
| 'scam_percentage': (df['prediction'] == 'scam').mean() * 100, | |
| 'average_confidence': df['confidence'].mean(), | |
| 'date_range': { | |
| 'from': df['timestamp'].min(), | |
| 'to': df['timestamp'].max() | |
| }, | |
| 'message_stats': { | |
| 'avg_length': df['message_length'].mean(), | |
| 'max_length': df['message_length'].max(), | |
| 'min_length': df['message_length'].min() | |
| } | |
| } | |
| # Save to JSON | |
| report_file = f"sms_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" | |
| with open(report_file, 'w') as f: | |
| json.dump(report, f, indent=2, default=str) | |
| return f"Ripoti imehifadhiwa: {report_file}" | |
| except Exception as e: | |
| return f"Kosa la report: {str(e)}" | |
| def load_models(self): | |
| """Load machine learning models with error handling""" | |
| try: | |
| if os.path.exists("scam_classifier_model.joblib"): | |
| self.model = joblib.load("scam_classifier_model.joblib") | |
| logger.info("Model loaded successfully") | |
| else: | |
| logger.error("Model file not found") | |
| if os.path.exists("tfidf_vectorizer.joblib"): | |
| self.vectorizer = joblib.load("tfidf_vectorizer.joblib") | |
| logger.info("Vectorizer loaded successfully") | |
| else: | |
| logger.error("Vectorizer file not found") | |
| except Exception as e: | |
| logger.error(f"Error loading models: {str(e)}") | |
| self.model = None | |
| self.vectorizer = None | |
| def preprocess_text(self, text): | |
| """Clean and preprocess input text""" | |
| if not text or not isinstance(text, str): | |
| return "" | |
| # Basic cleaning | |
| text = text.strip() | |
| text = ' '.join(text.split()) # Remove extra whitespace | |
| return text | |
| def get_confidence_level(self, prediction_proba): | |
| """Determine confidence level based on prediction probability""" | |
| max_prob = max(prediction_proba) | |
| if max_prob >= 0.8: | |
| return "Imara sana (Very High)", "π΄" | |
| elif max_prob >= 0.65: | |
| return "Imara (High)", "π " | |
| elif max_prob >= 0.5: | |
| return "Wastani (Medium)", "π‘" | |
| else: | |
| return "Haba (Low)", "π’" | |
| def predict_sms(self, text): | |
| """Enhanced prediction function with detailed output and logging""" | |
| # Input validation | |
| if not text or len(text.strip()) == 0: | |
| return "β **Kosa**: Tafadhali ingiza ujumbe wa SMS" | |
| if len(text.strip()) < 5: | |
| return "β οΈ **Onyo**: Ujumbe mfupi sana. Ingiza ujumbe kamili." | |
| # Check if models are loaded | |
| if self.model is None or self.vectorizer is None: | |
| return "β **Kosa la Mfumo**: Mifumo ya AI haijapakiwa vizuri. Tafadhali rudia tena." | |
| try: | |
| # Preprocess text | |
| cleaned_text = self.preprocess_text(text) | |
| # Analyze patterns | |
| suspicious_score, detected_patterns = self.analyze_message_patterns(text) | |
| # Vectorize text | |
| text_vector = self.vectorizer.transform([cleaned_text]) | |
| # Make prediction | |
| prediction = self.model.predict(text_vector)[0] | |
| prediction_proba = self.model.predict_proba(text_vector)[0] | |
| # Get confidence level | |
| confidence, emoji = self.get_confidence_level(prediction_proba) | |
| # Log prediction | |
| self.log_prediction(text, prediction, prediction_proba, suspicious_score) | |
| # Format prediction | |
| if prediction.lower() == 'scam': | |
| result_text = "**SCAM** π¨" | |
| result_color = "danger" | |
| advice = "**Onyo**: Ujumbe huu unaweza kuwa wa udanganyifu. Usijibu au kutoa taarifa za kibinafsi." | |
| else: | |
| result_text = "**TRUST** β " | |
| result_color = "success" | |
| advice = "Ujumbe huu unaonekana kuwa wa kawaida, lakini bado kuwa makini." | |
| # Add pattern analysis to output | |
| pattern_analysis = "" | |
| if detected_patterns: | |
| pattern_analysis = f"\n**Dalili Zilizogunduliwa**:\n" + "\n".join([f"β’ {pattern}" for pattern in detected_patterns]) | |
| pattern_analysis += f"\n**Alama za Utata**: {suspicious_score}/10" | |
| # Create detailed output | |
| output = f""" | |
| ## Matokeo ya Uchunguzi {emoji} | |
| **Ujumbe**: "{text[:100]}{'...' if len(text) > 100 else ''}" | |
| **Utabiri**: {result_text} | |
| **Kiwango cha Uhakika**: {confidence} | |
| **Maoni**: {advice} | |
| {pattern_analysis} | |
| --- | |
| *Tarehe*: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} | |
| *Urefu wa ujumbe*: {len(text)} herufi | |
| """ | |
| return output | |
| except Exception as e: | |
| logger.error(f"Prediction error: {str(e)}") | |
| return f"β **Kosa la Kihesabu**: {str(e)}" | |
| # Initialize detector | |
| detector = SMSScamDetector() | |
| # Sample SMS messages for testing | |
| sample_messages = [ | |
| "Hongera! Umeshinda Tsh 1,000,000. Piga *123# ili kupokea zawadi yako sasa hivi!", | |
| "Habari za leo? Tutaonana kesho uwandani kama tulivyopanga.", | |
| "URGENT: Your account will be closed. Click link to verify: http://fake-bank.com", | |
| "Mama, nimepoteza simu yangu. Hii ni nambari yangu mpya. Nitakuja nyumbani jioni." | |
| ] | |
| def load_sample(sample_text): | |
| """Load sample message into the textbox""" | |
| return sample_text | |
| # Create enhanced Gradio interface | |
| with gr.Blocks( | |
| theme=gr.themes.Soft(), | |
| title="Bongo SMS Scam Detector", | |
| css=""" | |
| .gradio-container { | |
| max-width: 800px !important; | |
| margin: auto !important; | |
| } | |
| .warning { | |
| background: linear-gradient(45deg, #ff6b6b, #feca57); | |
| padding: 15px; | |
| border-radius: 10px; | |
| margin: 10px 0; | |
| } | |
| """ | |
| ) as demo: | |
| gr.Markdown(""" | |
| # π‘οΈ Bongo SMS Scam Detector | |
| **Kiunga cha Usalama wa SMS** - Chunguza ujumbe wa SMS ili kujua kama ni wa udanganyifu | |
| β‘ Ingiza ujumbe wa SMS hapo chini na upate matokeo ya haraka | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| # Main input | |
| sms_input = gr.Textbox( | |
| lines=6, | |
| placeholder="Nakili na ubandike ujumbe wa SMS hapa...\n\nMfano: 'Hongera! Umeshinda Tsh 500,000. Piga *150# ili kupokea pesa zako!'", | |
| label="π± Ujumbe wa SMS", | |
| info="Ingiza ujumbe wowote wa SMS unaodai kushinda zawadi, pesa, au kutaka taarifa za kibinafsi" | |
| ) | |
| with gr.Row(): | |
| predict_btn = gr.Button("π Chunguza SMS", variant="primary", size="lg") | |
| clear_btn = gr.Button("ποΈ Futa", variant="secondary") | |
| with gr.Column(scale=1): | |
| gr.Markdown("### π Mifano ya SMS") | |
| # Sample buttons | |
| for i, sample in enumerate(sample_messages, 1): | |
| sample_btn = gr.Button( | |
| f"Mfano {i}", | |
| variant="outline", | |
| size="sm" | |
| ) | |
| sample_btn.click( | |
| fn=lambda x=sample: x, | |
| outputs=sms_input | |
| ) | |
| # Output section | |
| output_result = gr.Markdown( | |
| label="π Matokeo", | |
| value="Matokeo yataonyeshwa hapa baada ya kuchunguza ujumbe..." | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| # Analytics Section | |
| gr.Markdown("### π Takwimu za Mfumo") | |
| analytics_btn = gr.Button("π Ona Takwimu", variant="outline") | |
| analytics_output = gr.Markdown("Bonyeza hapo juu kuona takwimu...") | |
| export_btn = gr.Button("π Export Ripoti", variant="outline") | |
| export_output = gr.Markdown("") | |
| # Information section | |
| with gr.Accordion("βΉοΈ Maelezo ya Ziada", open=False): | |
| gr.Markdown(""" | |
| ### Jinsi ya Kutumia: | |
| 1. **Nakili ujumbe** wa SMS kutoka kwa simu yako | |
| 2. **Ubandike hapa** kwenye kisanduku cha maandishi | |
| 3. **Bonyeza kitufe** cha "Chunguza SMS" | |
| 4. **Soma matokeo** na ufuate mapendekezo | |
| ### Dalili za SMS za Udanganyifu: | |
| - π Inadai umeshinda zawadi kubwa | |
| - π° Inahitaji malipo ya haraka | |
| - π Ina viungo vya kugusia (links) | |
| - β‘ Inadai ni ya dharura | |
| - π Inaomba taarifa za kibinafsi | |
| - π± Ina USSD codes (*123#) | |
| ### Vipimo Vipya: | |
| - **Pattern Analysis**: Mfumo unachunguza maneno na michoro ya kawaida | |
| - **Database Logging**: Kila ujumbe unahifadhiwa kwa takwimu | |
| - **Confidence Scoring**: Kiwango cha uhakika kinajumuishwa | |
| - **Analytics Dashboard**: Takwimu za jumla za matumizi | |
| ### Onyo Muhimu: | |
| Mfumo huu ni wa kusaidia tu. Daima tumia busara zako na usijibu SMS zisizoeleweka. | |
| """) | |
| # Advanced Features Section | |
| with gr.Accordion("π§ Vipengele vya Kina", open=False): | |
| gr.Markdown(""" | |
| ### Uchanganuzi wa Kina: | |
| - **Keyword Detection**: Inachunguza maneno yenye hatari | |
| - **URL Analysis**: Inaangalia viungo vya web | |
| - **USSD Detection**: Inagundua nambari za *123# | |
| - **Money Pattern**: Inatambua maelezo ya pesa | |
| - **Urgency Detection**: Inagundua maneno ya dharura | |
| ### Data Analytics: | |
| - Takwimu za ujumbe wote uliochunguzwa | |
| - Asilimia ya scam vs trust | |
| - Wastani wa uhakika wa mfumo | |
| - Export ya ripoti za kina | |
| """) | |
| # Event handlers | |
| predict_btn.click( | |
| fn=detector.predict_sms, | |
| inputs=sms_input, | |
| outputs=output_result | |
| ) | |
| analytics_btn.click( | |
| fn=detector.get_analytics, | |
| outputs=analytics_output | |
| ) | |
| export_btn.click( | |
| fn=detector.export_report, | |
| outputs=export_output | |
| ) | |
| clear_btn.click( | |
| fn=lambda: ("", "Matokeo yataonyeshwa hapa baada ya kuchunguza ujumbe..."), | |
| outputs=[sms_input, output_result] | |
| ) | |
| sms_input.submit( | |
| fn=detector.predict_sms, | |
| inputs=sms_input, | |
| outputs=output_result | |
| ) | |
| # Launch configuration | |
| if __name__ == "__main__": | |
| demo.launch( | |
| share=False, | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| show_error=True, | |
| favicon_path=None, | |
| inbrowser=True | |
| ) |