import gradio as gr import re import os from datetime import datetime, timedelta import json # Usage tracking - persists across sessions via file USAGE_FILE = "usage_stats.json" def load_usage_stats(): """Load usage statistics from file""" try: if os.path.exists(USAGE_FILE): with open(USAGE_FILE, 'r') as f: return json.load(f) except: pass return { "total_ai_requests": 0, "total_config_validations": 0, "total_visitors": 0, "first_request": None, "last_request": None, "requests_by_hour": {} } def save_usage_stats(stats): """Save usage statistics to file""" try: with open(USAGE_FILE, 'w') as f: json.dump(stats, f, indent=2) except: pass usage_stats = load_usage_stats() # Rate limiting (in-memory, resets on restart) request_tracker = {} MAX_REQUESTS_PER_HOUR = 20 MAX_REQUESTS_PER_USER = 5 def check_rate_limit(user_ip): """Check if request is within rate limits""" current_time = datetime.now() # Clean old entries cutoff_time = current_time - timedelta(hours=1) request_tracker[user_ip] = [t for t in request_tracker.get(user_ip, []) if t > cutoff_time] # Check user limit user_requests = len(request_tracker.get(user_ip, [])) if user_requests >= MAX_REQUESTS_PER_USER: return False, f"Rate limit reached. Please try again in an hour. (Limit: {MAX_REQUESTS_PER_USER} requests/hour per user)" # Check total limit total_requests = sum(len(times) for times in request_tracker.values()) if total_requests >= MAX_REQUESTS_PER_HOUR: return False, "System rate limit reached. Please try again later. (Demo usage limit)" return True, None def record_request(user_ip, request_type="ai"): """Record a successful request""" if user_ip not in request_tracker: request_tracker[user_ip] = [] request_tracker[user_ip].append(datetime.now()) # Update global stats current_time = datetime.now().isoformat() hour_key = datetime.now().strftime("%Y-%m-%d-%H") if request_type == "ai": usage_stats["total_ai_requests"] += 1 elif request_type == "config": usage_stats["total_config_validations"] += 1 usage_stats["total_visitors"] = len(set(request_tracker.keys())) usage_stats["last_request"] = current_time if not usage_stats["first_request"]: usage_stats["first_request"] = current_time if hour_key not in usage_stats["requests_by_hour"]: usage_stats["requests_by_hour"][hour_key] = 0 usage_stats["requests_by_hour"][hour_key] += 1 save_usage_stats(usage_stats) # SBC validation functions def check_codec_issues(config): config_lower = config.lower() issues = [] codec_keywords = ['opus', 'pcmu', 'pcma', 'g729', 'g711'] codec_count = sum(1 for c in codec_keywords if c in config_lower) if codec_count == 1 and 'opus' in config_lower and 'pcmu' not in config_lower: issues.append({ "finding": "Codec Mismatch - Only Opus configured, PCMU/PCMA missing", "impact": "Leads to one-way audio or call setup failure for carriers requiring PCMU/PCMA. Media negotiation will fail when remote endpoint doesn't support Opus.", "fix": "Add PCMU and PCMA codecs: codecs=pcmu,pcma,opus,g729", "confidence": 0.92, "root_cause": "Codec list not aligned between SIP trunk and carrier requirements", "severity": "high" }) return issues def check_security_issues(config): config_lower = config.lower() issues = [] if 'srtp=optional' in config_lower or ('srtp' not in config_lower): issues.append({ "finding": "SRTP Not Enforced (Security Risk)", "impact": "Media encryption not enforced. Voice traffic exposed to eavesdropping. Fails PCI-DSS and HIPAA compliance requirements.", "fix": "Enforce SRTP encryption: srtp=required, crypto_suites=AES_CM_128_HMAC_SHA1_80", "confidence": 0.97, "root_cause": "Default configuration prioritizes compatibility over security", "severity": "critical" }) if 'tls_version=1.0' in config_lower or 'tls_version=1.1' in config_lower: issues.append({ "finding": "Deprecated TLS Version (1.0 or 1.1)", "impact": "Security vulnerability to downgrade attacks. Modern carriers reject TLS 1.0/1.1 connections, causing trunk registration failure.", "fix": "Upgrade to TLS 1.2 minimum: tls_version=1.2, tls_min_version=1.2", "confidence": 0.94, "root_cause": "Legacy TLS version not updated after security advisories (RFC 8996)", "severity": "critical" }) return issues def check_nat_issues(config): config_lower = config.lower() issues = [] if 'nat_traversal=disabled' in config_lower or 'nat' not in config_lower: issues.append({ "finding": "NAT Traversal Disabled", "impact": "One-way audio or no audio on calls traversing NAT boundaries. RTP packets cannot find return path. Critical for cloud-based SBC deployments.", "fix": "Enable NAT traversal: nat_traversal=enabled, ice_support=enabled, stun_server=stun.l.google.com:19302", "confidence": 0.93, "root_cause": "NAT handling disabled or STUN/ICE not configured for cloud deployment", "severity": "high" }) return issues def check_port_issues(config): config_lower = config.lower() issues = [] if 'port=' in config_lower: port_match = re.search(r'port=(\d+)', config_lower) if port_match: port = int(port_match.group(1)) if port != 5060 and port != 5061: issues.append({ "finding": f"Non-Standard SIP Port ({port})", "impact": "Most carriers expect SIP on port 5060 (UDP) or 5061 (TLS). Non-standard ports may be blocked by firewalls or rejected by carriers.", "fix": "Use standard ports: port=5060 for UDP/TCP, port=5061 for TLS", "confidence": 0.88, "root_cause": "Custom port configuration during migration or testing", "severity": "medium" }) if 'rtp_port_range' in config_lower: range_match = re.search(r'rtp_port_range=(\d+)-(\d+)', config_lower) if range_match: start, end = int(range_match.group(1)), int(range_match.group(2)) range_size = end - start if range_size < 1000: issues.append({ "finding": f"Insufficient RTP Port Range ({range_size} ports)", "impact": "Port exhaustion during high call volume (>50 concurrent calls). New calls fail with 'No RTP resources available'.", "fix": f"Expand RTP port range to at least 1000 ports: rtp_port_range=10000-11000", "confidence": 0.91, "root_cause": "Default narrow port range not scaled for production load", "severity": "medium" }) return issues def check_session_timer(config): config_lower = config.lower() issues = [] if 'session_expires' in config_lower: timer_match = re.search(r'session_expires=(\d+)', config_lower) if timer_match: timer = int(timer_match.group(1)) if timer < 900: issues.append({ "finding": f"Session Timer Too Short ({timer} seconds)", "impact": "Long-duration calls terminated prematurely during brief network issues. Customer complaints about dropped calls after {timer//60} minutes.", "fix": "Increase session timer: session_expires=1800, min_se=900", "confidence": 0.86, "root_cause": "Conservative timer settings from legacy PBX migration", "severity": "medium" }) return issues def check_dtmf_issues(config): config_lower = config.lower() issues = [] if 'dtmf_relay=inband' in config_lower or 'dtmf' not in config_lower: issues.append({ "finding": "DTMF Method Not Optimal", "impact": "In-band DTMF unreliable with compressed codecs. IVR systems may not receive digits correctly, causing customer frustration in phone menus.", "fix": "Use RFC2833 for DTMF: dtmf_relay=rfc2833 (preferred by most carriers)", "confidence": 0.89, "root_cause": "Default in-band DTMF not suitable for VoIP environments", "severity": "medium" }) return issues def check_dns_issues(config): config_lower = config.lower() issues = [] if 'dns_srv_lookup=disabled' in config_lower or 'dns_srv=disabled' in config_lower: issues.append({ "finding": "DNS SRV Lookup Disabled", "impact": "SBC cannot discover carrier SIP servers via DNS SRV records. Loses automatic failover and load balancing capabilities.", "fix": "Enable DNS SRV resolution: dns_srv_lookup=enabled", "confidence": 0.85, "root_cause": "DNS SRV disabled to work around temporary DNS issues", "severity": "low" }) return issues def analyze_configuration(config_text, request: gr.Request): """Comprehensive configuration analysis""" if not config_text.strip(): return "Please enter a configuration to analyze." # Track usage user_ip = request.client.host if request else "unknown" record_request(user_ip, "config") # Run all checks all_issues = [] all_issues.extend(check_codec_issues(config_text)) all_issues.extend(check_security_issues(config_text)) all_issues.extend(check_nat_issues(config_text)) all_issues.extend(check_port_issues(config_text)) all_issues.extend(check_session_timer(config_text)) all_issues.extend(check_dtmf_issues(config_text)) all_issues.extend(check_dns_issues(config_text)) # Build output if not all_issues: return """# ✅ Configuration Validation: PASSED **Status:** No critical issues detected **Summary:** Your SBC configuration follows enterprise VoIP best practices. The configuration includes: - ✅ Proper codec negotiation (PCMU/PCMA/Opus) - ✅ SRTP encryption enforced - ✅ TLS 1.2+ for signaling security - ✅ NAT traversal enabled with ICE support - ✅ Standard SIP ports (5060/5061) - ✅ Adequate RTP port range for scaling - ✅ Appropriate session timers - ✅ RFC2833 DTMF for IVR compatibility **Next Steps:** 1. Test with a trial call to verify audio quality 2. Monitor call metrics for 24-48 hours 3. Review integration with your UC platform **Need Help?** Contact your carrier's technical team or Bandwidth Support for deployment assistance. """ # Issues found - categorize by severity critical = [i for i in all_issues if i['severity'] == 'critical'] high = [i for i in all_issues if i['severity'] == 'high'] medium = [i for i in all_issues if i['severity'] == 'medium'] low = [i for i in all_issues if i['severity'] == 'low'] output = [] # Customer impact header if critical or high: output.append("# ⚠️ Configuration Validation: ISSUES DETECTED\n") output.append("**Customer Impact:** This configuration will likely cause audio quality issues, call failures, or security vulnerabilities in production deployments.\n") else: output.append("# ⚠️ Configuration Validation: WARNINGS\n") output.append("**Status:** Configuration will work but has optimization opportunities.\n") output.append(f"\n**Issues Found:** {len(all_issues)} ({len(critical)} critical, {len(high)} high, {len(medium)} medium, {len(low)} low)\n") output.append("\n" + "="*80 + "\n\n") # Show issues by severity issue_num = 1 for severity_name, severity_list in [("🔴 CRITICAL", critical), ("🟠 HIGH", high), ("🟡 MEDIUM", medium), ("⚪ LOW", low)]: if severity_list: output.append(f"## {severity_name} PRIORITY\n\n") for issue in severity_list: output.append(f"### Issue #{issue_num}: {issue['finding']}\n\n") output.append(f"**📊 Confidence:** {issue['confidence']:.0%} | ") output.append(f"**🎯 Root Cause:** {issue['root_cause']}\n\n") output.append(f"**💥 Customer Impact:**\n{issue['impact']}\n\n") output.append(f"**✅ How to Fix:**\n```\n{issue['fix']}\n```\n\n") output.append("-"*80 + "\n\n") issue_num += 1 # Action items output.append("## 📋 Recommended Actions\n\n") if critical: output.append("1. **URGENT:** Address all critical issues before deploying to production\n") if high: output.append("2. Fix high-priority issues to prevent customer-impacting audio problems\n") if medium: output.append("3. Optimize medium-priority items for better call quality and reliability\n") output.append("\n**Testing Checklist:**\n") output.append("- [ ] Make test call and verify two-way audio\n") output.append("- [ ] Test DTMF input in IVR menu\n") output.append("- [ ] Verify TLS handshake with carrier\n") output.append("- [ ] Confirm NAT traversal in cloud environment\n") output.append("- [ ] Test emergency calling if applicable\n") return "".join(output) def ai_troubleshoot(symptom_description, request: gr.Request): """AI-powered troubleshooting with usage tracking""" if not symptom_description.strip(): return "Please describe the issue you're experiencing with your SBC or calls." # Rate limiting user_ip = request.client.host if request else "unknown" allowed, error_msg = check_rate_limit(user_ip) if not allowed: return f"## ⚠️ Rate Limit Reached\n\n{error_msg}\n\nThis is a demo with usage limits to control costs." # Check for API key api_key = os.environ.get("ANTHROPIC_API_KEY") if not api_key: return fallback_troubleshoot(symptom_description) try: import anthropic client = anthropic.Anthropic(api_key=api_key) prompt = f"""You are an expert SBC (Session Border Controller) troubleshooting assistant for enterprise VoIP deployments. A customer reports the following issue: "{symptom_description}" Provide a diagnostic analysis with: 1. **Likely Root Causes** - What SBC misconfigurations could cause this in an enterprise VoIP environment 2. **Configuration Parameters to Check** - Specific config settings to verify 3. **Recommended Fixes** - Exact configuration commands to try 4. **Confidence Level** - How confident you are in this diagnosis (as a percentage) Focus on common enterprise VoIP issues: - NAT traversal for audio problems in cloud deployments - DTMF configuration for IVR issues - TLS/security for connection failures - Codec negotiation for audio quality (PCMU/PCMA/Opus compatibility) - Session timers for dropped calls - Port configuration for firewall issues - Emergency calling routing Format your response in clear sections with markdown. Be specific and actionable.""" message = client.messages.create( model="claude-3-5-haiku-20241022", max_tokens=800, messages=[{ "role": "user", "content": prompt }] ) # Record successful request record_request(user_ip, "ai") # Format response analysis = f"# 🤖 AI-Powered Troubleshooting Analysis\n\n" analysis += f"**Your Issue:** {symptom_description}\n\n" analysis += "**Powered by Claude 3.5 Haiku** | **Optimized for Bandwidth Duet®**\n\n" analysis += "---\n\n" analysis += message.content[0].text analysis += "\n\n---\n\n" analysis += "## 📋 Next Steps\n\n" analysis += "1. Review the configuration parameters listed above in your SBC\n" analysis += "2. Switch to the **Configuration Validator** tab to paste your full config for detailed analysis\n" analysis += "3. Apply the recommended fixes\n" analysis += "4. Test with a trial call\n" analysis += "5. Verify emergency calling if applicable\n" return analysis except Exception as e: return fallback_troubleshoot(symptom_description) def fallback_troubleshoot(symptom_description): """Fallback pattern-based analysis""" return "## 🔍 Troubleshooting Analysis\n\nAI analysis temporarily unavailable. Please use the Configuration Validator tab or contact Bandwidth Support." def get_usage_stats(): """Return formatted usage statistics""" stats = load_usage_stats() output = f"""# 📊 Bandwidth Duet® SBC Validator - Usage Statistics **Total AI Troubleshooting Requests:** {stats['total_ai_requests']} **Total Configuration Validations:** {stats['total_config_validations']} **Unique Visitors:** {stats['total_visitors']} **First Request:** {stats['first_request'] or 'No requests yet'} **Last Request:** {stats['last_request'] or 'No requests yet'} **Requests by Hour (Last 24 hours):** """ # Show last 24 hours of activity if stats['requests_by_hour']: sorted_hours = sorted(stats['requests_by_hour'].items(), reverse=True)[:24] for hour, count in sorted_hours: output += f"\n- {hour}: {count} requests" else: output += "\nNo activity recorded yet" return output # Example configurations BROKEN_CONFIG = """transport=udp port=5060 codecs=opus srtp=optional nat_traversal=disabled tls_version=1.0 session_expires=90 dtmf_relay=inband rtp_port_range=10000-10100 dns_srv_lookup=disabled""" VALID_CONFIG = """transport=tcp port=5061 codecs=pcmu,pcma,opus,g729 srtp=required crypto_suites=AES_CM_128_HMAC_SHA1_80 nat_traversal=enabled ice_support=enabled stun_server=stun.l.google.com:19302 tls_version=1.2 tls_min_version=1.2 cipher_suites=ECDHE-RSA-AES256-GCM-SHA384 session_expires=1800 min_se=900 dtmf_relay=rfc2833 rtp_port_range=10000-20000 dns_srv_lookup=enabled media_anchoring=enabled""" def load_broken_config(): return BROKEN_CONFIG def load_valid_config(): return VALID_CONFIG # Create Gradio interface demo = gr.Blocks(theme=gr.themes.Soft(), title="Bandwidth Duet® - SBC Configuration Validator") with demo: gr.Markdown(""" # 🔊 Bandwidth Duet® - AI-Powered SBC Configuration Validator ### Intelligent Analysis & Troubleshooting for Enterprise VoIP Deployments **Powered by Bandwidth® - The Global Enterprise Cloud Communications Company** Validate your SBC configuration and get AI-powered troubleshooting recommendations for enterprise voice deployments. """) with gr.Tabs(): with gr.Tab("🤖 AI Troubleshoot"): gr.Markdown(""" ### Describe Your Issue in Plain English Tell me what's wrong with your calls and I'll diagnose the likely SBC configuration issues. """) with gr.Row(): with gr.Column(scale=1): symptom_input = gr.Textbox( label="What issue are you experiencing?", lines=5, placeholder="Examples:\n- 'My calls have one-way audio'\n- 'IVR menu not responding to key presses'\n- 'Calls dropping after 2 minutes'\n- 'TLS handshake failing with carrier'\n- 'Emergency calls not routing correctly'", ) with gr.Row(): example_btn1 = gr.Button("Example: One-way audio", variant="secondary", size="sm") example_btn2 = gr.Button("Example: IVR not working", variant="secondary", size="sm") troubleshoot_btn = gr.Button("🔍 Diagnose Issue", variant="primary", size="lg") with gr.Column(scale=1): gr.Markdown("### AI Diagnosis") troubleshoot_output = gr.Markdown() # Example button actions example_btn1.click(fn=lambda: "I can hear the other person but they can't hear me", outputs=symptom_input) example_btn2.click(fn=lambda: "Customers can't navigate our IVR menu by pressing numbers", outputs=symptom_input) troubleshoot_btn.click(fn=ai_troubleshoot, inputs=symptom_input, outputs=troubleshoot_output) with gr.Tab("⚙️ Configuration Validator"): gr.Markdown(""" ### Paste Your SBC Configuration Get detailed analysis of specific configuration parameters for enterprise VoIP deployments. """) with gr.Row(): with gr.Column(scale=1): config_input = gr.Textbox( label="Paste your configuration here", lines=20, placeholder="Example:\ntransport=tcp\nport=5061\ncodecs=pcmu,pcma,opus\nsrtp=required\nnat_traversal=enabled\n...", ) with gr.Row(): broken_btn = gr.Button("📛 Load Example: Common Mistakes", variant="secondary", size="sm") valid_btn = gr.Button("✅ Load Example: Best Practice Config", variant="secondary", size="sm") analyze_btn = gr.Button("🔍 Validate Configuration", variant="primary", size="lg") with gr.Column(scale=1): gr.Markdown("### Validation Results") output = gr.Markdown() # Button actions broken_btn.click(fn=load_broken_config, outputs=config_input) valid_btn.click(fn=load_valid_config, outputs=config_input) analyze_btn.click(fn=analyze_configuration, inputs=config_input, outputs=output) with gr.Tab("📊 Usage Stats"): gr.Markdown(""" ### Validator Usage Statistics Track how many people are using this tool. """) stats_output = gr.Markdown() refresh_btn = gr.Button("🔄 Refresh Stats", variant="primary") refresh_btn.click(fn=get_usage_stats, outputs=stats_output) demo.load(fn=get_usage_stats, outputs=stats_output) # Auto-load on page load gr.Markdown(""" --- ## About Bandwidth Duet® **Bandwidth Duet®** is a comprehensive solution for enterprise voice connectivity, available directly from a carrier who owns and operates its underlying infrastructure. **Key Features:** - **Direct Routing:** Seamless enterprise calling with existing SIP trunking - **Dynamic E911:** Certified emergency calling with accurate location routing - **Software-Driven SIP:** Built on Bandwidth's nationwide IP voice network - **Simplified Migration:** Accelerate telecom migration with less complexity **This Validator Checks For:** - Security vulnerabilities (SRTP, TLS, authentication) - Codec negotiation issues for enterprise UC platforms - NAT traversal problems in cloud environments - DTMF signaling for IVR systems - Port configuration and firewall compatibility - Session timer optimizations - Emergency calling routing readiness **Powered By:** - AI Model: Claude 3.5 Haiku for natural language troubleshooting - Detection Engine: Rule-based validation across 7 configuration categories - Rate Limiting: 5 requests/user/hour (demo environment) --- 💡 **Pro Tip:** Run this validator before deploying your SBC to production. Most VoIP issues can be prevented with proper configuration. Built by Philip Drammeh | Independent Researcher | [LinkedIn](https://www.linkedin.com/in/philipdrammeh) *Bandwidth Duet® is a registered trademark of Bandwidth Inc. This tool is an independent project and not officially affiliated with or endorsed by Bandwidth Inc.* """) if __name__ == "__main__": demo.launch()