import gradio as gr import re import os from datetime import datetime, timedelta # Simple in-memory rate limiting (resets when app restarts) request_tracker = {} MAX_REQUESTS_PER_HOUR = 20 # Limit to 20 AI requests per hour total MAX_REQUESTS_PER_USER = 5 # Limit to 5 requests per user per hour def check_rate_limit(user_ip): """Check if request is within rate limits""" current_time = datetime.now() # Clean old entries cutoff_time = current_time - timedelta(hours=1) request_tracker[user_ip] = [t for t in request_tracker.get(user_ip, []) if t > cutoff_time] # Check user limit user_requests = len(request_tracker.get(user_ip, [])) if user_requests >= MAX_REQUESTS_PER_USER: return False, f"Rate limit reached. Please try again in an hour. (Limit: {MAX_REQUESTS_PER_USER} requests/hour per user)" # Check total limit total_requests = sum(len(times) for times in request_tracker.values()) if total_requests >= MAX_REQUESTS_PER_HOUR: return False, "System rate limit reached. Please try again later. (Demo usage limit)" return True, None def record_request(user_ip): """Record a successful request""" if user_ip not in request_tracker: request_tracker[user_ip] = [] request_tracker[user_ip].append(datetime.now()) # Known SBC Misconfiguration Detection Rules def check_codec_issues(config): """Check for codec-related problems""" config_lower = config.lower() issues = [] # Check if only one codec codec_keywords = ['opus', 'pcmu', 'pcma', 'g729', 'g711'] codec_count = sum(1 for c in codec_keywords if c in config_lower) if codec_count == 1 and 'opus' in config_lower and 'pcmu' not in config_lower: issues.append({ "finding": "Codec Mismatch - Only Opus configured, PCMU/PCMA missing", "impact": "Leads to one-way audio or call setup failure for carriers requiring PCMU/PCMA. Media negotiation will fail when remote endpoint doesn't support Opus.", "fix": "Add PCMU and PCMA codecs: codecs=pcmu,pcma,opus,g729", "confidence": 0.92, "root_cause": "Codec list not aligned between SIP trunk and carrier requirements", "severity": "high" }) return issues def check_security_issues(config): """Check for security problems""" config_lower = config.lower() issues = [] # Check SRTP if 'srtp=optional' in config_lower or ('srtp' not in config_lower): issues.append({ "finding": "SRTP Not Enforced (Security Risk)", "impact": "Media encryption not enforced. Voice traffic exposed to eavesdropping. Fails PCI-DSS and HIPAA compliance requirements.", "fix": "Enforce SRTP encryption: srtp=required, crypto_suites=AES_CM_128_HMAC_SHA1_80", "confidence": 0.97, "root_cause": "Default configuration prioritizes compatibility over security", "severity": "critical" }) # Check TLS version if 'tls_version=1.0' in config_lower or 'tls_version=1.1' in config_lower: issues.append({ "finding": "Deprecated TLS Version (1.0 or 1.1)", "impact": "Security vulnerability to downgrade attacks. Modern carriers reject TLS 1.0/1.1 connections, causing trunk registration failure.", "fix": "Upgrade to TLS 1.2 minimum: tls_version=1.2, tls_min_version=1.2", "confidence": 0.94, "root_cause": "Legacy TLS version not updated after security advisories (RFC 8996)", "severity": "critical" }) return issues def check_nat_issues(config): """Check for NAT traversal problems""" config_lower = config.lower() issues = [] if 'nat_traversal=disabled' in config_lower or 'nat' not in config_lower: issues.append({ "finding": "NAT Traversal Disabled", "impact": "One-way audio or no audio on calls traversing NAT boundaries. RTP packets cannot find return path. Critical for cloud-based SBCs.", "fix": "Enable NAT traversal: nat_traversal=enabled, ice_support=enabled, stun_server=stun.l.google.com:19302", "confidence": 0.93, "root_cause": "NAT handling disabled or STUN/ICE not configured for cloud deployment", "severity": "high" }) return issues def check_port_issues(config): """Check for port configuration problems""" config_lower = config.lower() issues = [] # Check if port is non-standard if 'port=' in config_lower: port_match = re.search(r'port=(\d+)', config_lower) if port_match: port = int(port_match.group(1)) if port != 5060 and port != 5061: issues.append({ "finding": f"Non-Standard SIP Port ({port})", "impact": "Most carriers expect SIP on port 5060 (UDP) or 5061 (TLS). Non-standard ports may be blocked by firewalls or rejected by carriers.", "fix": "Use standard ports: port=5060 for UDP/TCP, port=5061 for TLS", "confidence": 0.88, "root_cause": "Custom port configuration during migration or testing", "severity": "medium" }) # Check RTP port range if 'rtp_port_range' in config_lower: range_match = re.search(r'rtp_port_range=(\d+)-(\d+)', config_lower) if range_match: start, end = int(range_match.group(1)), int(range_match.group(2)) range_size = end - start if range_size < 1000: issues.append({ "finding": f"Insufficient RTP Port Range ({range_size} ports)", "impact": "Port exhaustion during high call volume (>50 concurrent calls). New calls fail with 'No RTP resources available'.", "fix": f"Expand RTP port range to at least 1000 ports: rtp_port_range=10000-11000", "confidence": 0.91, "root_cause": "Default narrow port range not scaled for production load", "severity": "medium" }) return issues def check_session_timer(config): """Check session timer settings""" config_lower = config.lower() issues = [] if 'session_expires' in config_lower: timer_match = re.search(r'session_expires=(\d+)', config_lower) if timer_match: timer = int(timer_match.group(1)) if timer < 900: issues.append({ "finding": f"Session Timer Too Short ({timer} seconds)", "impact": "Long-duration calls terminated prematurely during brief network issues. Customer complaints about dropped calls after {timer//60} minutes.", "fix": "Increase session timer: session_expires=1800, min_se=900", "confidence": 0.86, "root_cause": "Conservative timer settings from legacy PBX migration", "severity": "medium" }) return issues def check_dtmf_issues(config): """Check DTMF configuration""" config_lower = config.lower() issues = [] if 'dtmf_relay=inband' in config_lower or 'dtmf' not in config_lower: issues.append({ "finding": "DTMF Method Not Optimal", "impact": "In-band DTMF unreliable with compressed codecs. IVR systems may not receive digits correctly, causing customer frustration in phone menus.", "fix": "Use RFC2833 for DTMF: dtmf_relay=rfc2833 (preferred by most carriers)", "confidence": 0.89, "root_cause": "Default in-band DTMF not suitable for VoIP environments", "severity": "medium" }) return issues def check_dns_issues(config): """Check DNS configuration""" config_lower = config.lower() issues = [] if 'dns_srv_lookup=disabled' in config_lower or 'dns_srv=disabled' in config_lower: issues.append({ "finding": "DNS SRV Lookup Disabled", "impact": "SBC cannot discover carrier SIP servers via DNS SRV records. Loses automatic failover and load balancing capabilities.", "fix": "Enable DNS SRV resolution: dns_srv_lookup=enabled", "confidence": 0.85, "root_cause": "DNS SRV disabled to work around temporary DNS issues", "severity": "low" }) return issues def analyze_configuration(config_text): """Comprehensive configuration analysis""" if not config_text.strip(): return "Please enter a configuration to analyze." # Run all checks all_issues = [] all_issues.extend(check_codec_issues(config_text)) all_issues.extend(check_security_issues(config_text)) all_issues.extend(check_nat_issues(config_text)) all_issues.extend(check_port_issues(config_text)) all_issues.extend(check_session_timer(config_text)) all_issues.extend(check_dtmf_issues(config_text)) all_issues.extend(check_dns_issues(config_text)) # Build output if not all_issues: return """# ✅ Configuration Validation: PASSED **Status:** No critical issues detected **Summary:** Your SBC configuration follows the service best practices. The configuration includes: - ✅ Proper codec negotiation (PCMU/PCMA/Opus) - ✅ SRTP encryption enforced - ✅ TLS 1.2+ for signaling security - ✅ NAT traversal enabled with ICE support - ✅ Standard SIP ports (5060/5061) - ✅ Adequate RTP port range for scaling - ✅ Appropriate session timers - ✅ RFC2833 DTMF for IVR compatibility **Next Steps:** 1. Test with a trial call to verify audio quality 2. Monitor call metrics for 24-48 hours 3. Review Service Connect contact flow integration **Need Help?** Contact Support or your carrier's technical team for deployment assistance. """ # Issues found - categorize by severity critical = [i for i in all_issues if i['severity'] == 'critical'] high = [i for i in all_issues if i['severity'] == 'high'] medium = [i for i in all_issues if i['severity'] == 'medium'] low = [i for i in all_issues if i['severity'] == 'low'] output = [] # Customer impact header if critical or high: output.append("# ⚠️ Configuration Validation: ISSUES DETECTED\n") output.append("**Customer Impact:** This configuration will likely cause audio quality issues, call failures, or security vulnerabilities in production.\n") else: output.append("# ⚠️ Configuration Validation: WARNINGS\n") output.append("**Status:** Configuration will work but has optimization opportunities.\n") output.append(f"\n**Issues Found:** {len(all_issues)} ({len(critical)} critical, {len(high)} high, {len(medium)} medium, {len(low)} low)\n") output.append("\n" + "="*80 + "\n\n") # Show issues by severity issue_num = 1 for severity_name, severity_list in [("🔴 CRITICAL", critical), ("🟠 HIGH", high), ("🟡 MEDIUM", medium), ("⚪ LOW", low)]: if severity_list: output.append(f"## {severity_name} PRIORITY\n\n") for issue in severity_list: output.append(f"### Issue #{issue_num}: {issue['finding']}\n\n") output.append(f"**📊 Confidence:** {issue['confidence']:.0%} | ") output.append(f"**🎯 Root Cause:** {issue['root_cause']}\n\n") output.append(f"**💥 Customer Impact:**\n{issue['impact']}\n\n") output.append(f"**✅ How to Fix:**\n```\n{issue['fix']}\n```\n\n") output.append("-"*80 + "\n\n") issue_num += 1 # Action items output.append("## 📋 Recommended Actions\n\n") if critical: output.append("1. **URGENT:** Address all critical issues before deploying to production\n") if high: output.append("2. Fix high-priority issues to prevent customer-impacting audio problems\n") if medium: output.append("3. Optimize medium-priority items for better call quality and reliability\n") output.append("\n**Testing Checklist:**\n") output.append("- [ ] Make test call and verify two-way audio\n") output.append("- [ ] Test DTMF input in IVR menu\n") output.append("- [ ] Verify TLS handshake with carrier\n") output.append("- [ ] Confirm NAT traversal in cloud environment\n") return "".join(output) def ai_troubleshoot(symptom_description, request: gr.Request): """AI-powered troubleshooting using Claude API with rate limiting""" if not symptom_description.strip(): return "Please describe the issue you're experiencing with your SBC or calls." # Rate limiting user_ip = request.client.host if request else "unknown" allowed, error_msg = check_rate_limit(user_ip) if not allowed: return f"## ⚠️ Rate Limit Reached\n\n{error_msg}\n\nThis is a demo with usage limits to control costs." # Check for API key api_key = os.environ.get("ANTHROPIC_API_KEY") print(f"DEBUG: API key detected: {bool(api_key)}") # Debug log if not api_key: # Fallback to pattern-based analysis if no API key print("DEBUG: No API key found, using fallback") # Debug log return fallback_troubleshoot(symptom_description) try: # Use Claude API for real AI-powered analysis import anthropic print(f"DEBUG: Creating Anthropic client") # Debug client = anthropic.Anthropic(api_key=api_key) prompt = f"""You are an expert SBC (Session Border Controller) troubleshooting assistant for BYOC service deployments. A customer reports the following issue: "{symptom_description}" Provide a diagnostic analysis with: 1. **Likely Root Causes** - What SBC misconfigurations could cause this 2. **Configuration Parameters to Check** - Specific config settings to verify 3. **Recommended Fixes** - Exact configuration commands to try 4. **Confidence Level** - How confident you are in this diagnosis (as a percentage) Focus on common BYOC issues like: - NAT traversal for audio problems - DTMF configuration for IVR issues - TLS/security for connection failures - Codec negotiation for audio quality - Session timers for dropped calls - Port configuration for firewall issues Format your response in clear sections with markdown. Be specific and actionable.""" print(f"DEBUG: Calling Claude API") # Debug message = client.messages.create( model="claude-3-5-haiku-20241022", # Using Haiku for cost efficiency max_tokens=800, # Limit response length to control costs messages=[{ "role": "user", "content": prompt }] ) print(f"DEBUG: API call successful") # Debug # Record successful request record_request(user_ip) # Format response analysis = f"# 🤖 AI-Powered Troubleshooting Analysis\n\n" analysis += f"**Your Issue:** {symptom_description}\n\n" analysis += "**Powered by Claude 3.5 Haiku**\n\n" analysis += "---\n\n" analysis += message.content[0].text analysis += "\n\n---\n\n" analysis += "## 📋 Next Steps\n\n" analysis += "1. Review the configuration parameters listed above in your SBC\n" analysis += "2. Switch to the **Configuration Validator** tab to paste your full config for detailed analysis\n" analysis += "3. Apply the recommended fixes\n" analysis += "4. Test with a trial call\n" return analysis except Exception as e: # Fallback to pattern-based if API fails print(f"DEBUG: API call failed with error: {e}") # Debug return fallback_troubleshoot(symptom_description) def fallback_troubleshoot(symptom_description): """Pattern-based fallback troubleshooting when API is unavailable""" symptom_lower = symptom_description.lower() # Pattern-based analysis (original logic) analysis = [] analysis.append("# 🔍 Troubleshooting Analysis\n") analysis.append(f"**Your Issue:** {symptom_description}\n\n") analysis.append("**Note:** AI analysis temporarily unavailable. Using pattern-based diagnosis.\n\n") analysis.append("---\n\n") recommendations = [] # One-way audio symptoms if any(keyword in symptom_lower for keyword in ['one-way audio', 'one way audio', 'cant hear', "can't hear", 'no audio', 'audio not working']): recommendations.append({ 'issue': 'One-Way Audio / No Audio', 'likely_causes': [ 'NAT traversal disabled', 'Incorrect RTP port configuration', 'Firewall blocking RTP ports', 'Media anchoring disabled' ], 'configs_to_check': [ 'nat_traversal=enabled', 'ice_support=enabled', 'rtp_port_range=10000-20000', 'media_anchoring=enabled' ], 'confidence': 0.95 }) # IVR/DTMF issues if any(keyword in symptom_lower for keyword in ['ivr', 'dtmf', 'keypad', 'menu', 'press 1', 'touch tone', 'dial pad']): recommendations.append({ 'issue': 'IVR / DTMF Not Working', 'likely_causes': [ 'DTMF relay method mismatch', 'In-band DTMF with compressed codecs', 'Carrier expects RFC2833 but SBC using SIP INFO' ], 'configs_to_check': [ 'dtmf_relay=rfc2833', 'dtmf_mode=rfc2833', 'Verify codec supports DTMF (avoid highly compressed codecs)' ], 'confidence': 0.92 }) # Call setup failures if any(keyword in symptom_lower for keyword in ['call fails', 'cant connect', "can't connect", 'wont connect', "won't connect", '408', '503', '481', 'timeout']): recommendations.append({ 'issue': 'Call Setup Failures', 'likely_causes': [ 'SIP port misconfiguration', 'DNS SRV lookup disabled', 'TLS handshake failure', 'Incorrect Contact header IP' ], 'configs_to_check': [ 'port=5060 (UDP) or port=5061 (TLS)', 'dns_srv_lookup=enabled', 'tls_version=1.2', 'contact_ip=' ], 'confidence': 0.88 }) # Dropped calls if any(keyword in symptom_lower for keyword in ['dropped', 'disconnect', 'call drops', 'hangs up']): recommendations.append({ 'issue': 'Calls Dropping / Premature Disconnection', 'likely_causes': [ 'Session timer too short', 'Network instability with aggressive timers', 'Keep-alive mechanism not configured' ], 'configs_to_check': [ 'session_expires=1800', 'min_se=900', 'session_refresh=uac' ], 'confidence': 0.86 }) # Generate output if not recommendations: analysis.append("## 🔍 General Diagnostic Approach\n\n") analysis.append("I don't recognize specific symptoms in your description. Here's how to diagnose:\n\n") analysis.append("1. **Paste your SBC configuration** into the validator tab for automated analysis\n") analysis.append("2. **Check common issues:**\n") analysis.append(" - NAT traversal settings for audio issues\n") analysis.append(" - DTMF configuration for IVR problems\n") analysis.append(" - TLS/port settings for connection failures\n") analysis.append("3. **Review carrier documentation** for specific requirements\n") else: for rec in recommendations: analysis.append(f"## 🎯 Diagnosis: {rec['issue']}\n\n") analysis.append(f"**Confidence:** {rec['confidence']:.0%}\n\n") analysis.append("**Likely Root Causes:**\n") for cause in rec['likely_causes']: analysis.append(f"- {cause}\n") analysis.append("\n**Configuration Parameters to Check:**\n```\n") for config in rec['configs_to_check']: analysis.append(f"{config}\n") analysis.append("```\n\n") analysis.append("---\n\n") return "".join(analysis) # Example configurations BROKEN_CONFIG = """transport=udp port=5060 codecs=opus srtp=optional nat_traversal=disabled tls_version=1.0 session_expires=90 dtmf_relay=inband rtp_port_range=10000-10100 dns_srv_lookup=disabled""" VALID_CONFIG = """transport=tcp port=5061 codecs=pcmu,pcma,opus,g729 srtp=required crypto_suites=AES_CM_128_HMAC_SHA1_80 nat_traversal=enabled ice_support=enabled stun_server=stun.l.google.com:19302 tls_version=1.2 tls_min_version=1.2 cipher_suites=ECDHE-RSA-AES256-GCM-SHA384 session_expires=1800 min_se=900 dtmf_relay=rfc2833 rtp_port_range=10000-20000 dns_srv_lookup=enabled media_anchoring=enabled""" def load_broken_config(): return BROKEN_CONFIG def load_valid_config(): return VALID_CONFIG # Create Gradio interface demo = gr.Blocks(theme=gr.themes.Soft(), title="Service BYOC - AI-Powered SBC Validator") with demo: gr.Markdown(""" # 🔒 Bring Your Own Carrier BYOC - AI-Powered Configuration Validator ### Intelligent SBC Analysis & Troubleshooting **For Customers Using Bring Your Own Carrier (BYOC)** Validate your SBC configuration and get AI-powered troubleshooting recommendations before connecting to the service. """) with gr.Tabs(): with gr.Tab("🤖 AI Troubleshoot"): gr.Markdown(""" ### Describe Your Issue in Plain English Tell me what's wrong with your calls and I'll diagnose the likely SBC configuration issues. """) with gr.Row(): with gr.Column(scale=1): symptom_input = gr.Textbox( label="What issue are you experiencing?", lines=5, placeholder="Examples:\n- 'My calls have one-way audio'\n- 'IVR menu not responding to key presses'\n- 'Calls dropping after 2 minutes'\n- 'TLS handshake failing with carrier'", ) with gr.Row(): example_btn1 = gr.Button("Example: One-way audio", variant="secondary", size="sm") example_btn2 = gr.Button("Example: IVR not working", variant="secondary", size="sm") troubleshoot_btn = gr.Button("🔍 Diagnose Issue", variant="primary", size="lg") with gr.Column(scale=1): gr.Markdown("### AI Diagnosis") troubleshoot_output = gr.Markdown() # Example button actions example_btn1.click(fn=lambda: "I can hear the other person but they can't hear me", outputs=symptom_input) example_btn2.click(fn=lambda: "Customers can't navigate our IVR menu by pressing numbers", outputs=symptom_input) troubleshoot_btn.click(fn=ai_troubleshoot, inputs=symptom_input, outputs=troubleshoot_output) with gr.Tab("⚙️ Configuration Validator"): gr.Markdown(""" ### Paste Your SBC Configuration Get detailed analysis of specific configuration parameters. """) with gr.Row(): with gr.Column(scale=1): config_input = gr.Textbox( label="Paste your configuration here", lines=20, placeholder="Example:\ntransport=tcp\nport=5061\ncodecs=pcmu,pcma,opus\nsrtp=required\nnat_traversal=enabled\n...", ) with gr.Row(): broken_btn = gr.Button("📛 Load Example: Common Mistakes", variant="secondary", size="sm") valid_btn = gr.Button("✅ Load Example: Best Practice Config", variant="secondary", size="sm") analyze_btn = gr.Button("🔍 Validate Configuration", variant="primary", size="lg") with gr.Column(scale=1): gr.Markdown("### Validation Results") output = gr.Markdown() # Button actions broken_btn.click(fn=load_broken_config, outputs=config_input) valid_btn.click(fn=load_valid_config, outputs=config_input) analyze_btn.click(fn=analyze_configuration, inputs=config_input, outputs=output) gr.Markdown(""" --- ## About This Validator This tool checks for 10+ common SBC misconfigurations in BYOC deployments: **Security Issues:** - Weak or missing SRTP encryption - Deprecated TLS versions (1.0/1.1) - Unsecure cipher suites **Audio Quality Issues:** - Codec mismatches causing one-way audio - NAT traversal problems in cloud environments - Incorrect DTMF signaling for IVR systems **Connectivity Issues:** - Non-standard SIP ports - Session timer misconfigurations - DNS SRV lookup disabled - Insufficient RTP port ranges **Confidence Scores:** Each finding includes an AI-powered confidence score (85-97%) and specific remediation steps. **Who Should Use This:** - Customers setting up Bring Your Own Carrier - Telecom engineers configuring SBCs for cloud connectivity - Operations teams troubleshooting audio quality issues - Compliance teams validating security configurations --- 💡 **Pro Tip:** Run this validator before submitting support tickets. Most BYOC issues can be resolved by following these recommendations. Built by Philip Drammeh [LinkedIn](https://linkedin.com/in/philip-drammeh) """) if __name__ == "__main__": demo.launch()