sbc-validator / app.py
Phildram1's picture
Rename app (8).py to app.py
89f0f71 verified
import gradio as gr
import re
import os
from datetime import datetime, timedelta
import json
# Usage tracking - persists across sessions via file
USAGE_FILE = "usage_stats.json"
def load_usage_stats():
"""Load usage statistics from file"""
try:
if os.path.exists(USAGE_FILE):
with open(USAGE_FILE, 'r') as f:
return json.load(f)
except:
pass
return {
"total_ai_requests": 0,
"total_config_validations": 0,
"total_visitors": 0,
"first_request": None,
"last_request": None,
"requests_by_hour": {}
}
def save_usage_stats(stats):
"""Save usage statistics to file"""
try:
with open(USAGE_FILE, 'w') as f:
json.dump(stats, f, indent=2)
except:
pass
usage_stats = load_usage_stats()
# Rate limiting (in-memory, resets on restart)
request_tracker = {}
MAX_REQUESTS_PER_HOUR = 20
MAX_REQUESTS_PER_USER = 5
def check_rate_limit(user_ip):
"""Check if request is within rate limits"""
current_time = datetime.now()
# Clean old entries
cutoff_time = current_time - timedelta(hours=1)
request_tracker[user_ip] = [t for t in request_tracker.get(user_ip, []) if t > cutoff_time]
# Check user limit
user_requests = len(request_tracker.get(user_ip, []))
if user_requests >= MAX_REQUESTS_PER_USER:
return False, f"Rate limit reached. Please try again in an hour. (Limit: {MAX_REQUESTS_PER_USER} requests/hour per user)"
# Check total limit
total_requests = sum(len(times) for times in request_tracker.values())
if total_requests >= MAX_REQUESTS_PER_HOUR:
return False, "System rate limit reached. Please try again later. (Demo usage limit)"
return True, None
def record_request(user_ip, request_type="ai"):
"""Record a successful request"""
if user_ip not in request_tracker:
request_tracker[user_ip] = []
request_tracker[user_ip].append(datetime.now())
# Update global stats
current_time = datetime.now().isoformat()
hour_key = datetime.now().strftime("%Y-%m-%d-%H")
if request_type == "ai":
usage_stats["total_ai_requests"] += 1
elif request_type == "config":
usage_stats["total_config_validations"] += 1
usage_stats["total_visitors"] = len(set(request_tracker.keys()))
usage_stats["last_request"] = current_time
if not usage_stats["first_request"]:
usage_stats["first_request"] = current_time
if hour_key not in usage_stats["requests_by_hour"]:
usage_stats["requests_by_hour"][hour_key] = 0
usage_stats["requests_by_hour"][hour_key] += 1
save_usage_stats(usage_stats)
# SBC validation functions
def check_codec_issues(config):
config_lower = config.lower()
issues = []
codec_keywords = ['opus', 'pcmu', 'pcma', 'g729', 'g711']
codec_count = sum(1 for c in codec_keywords if c in config_lower)
if codec_count == 1 and 'opus' in config_lower and 'pcmu' not in config_lower:
issues.append({
"finding": "Codec Mismatch - Only Opus configured, PCMU/PCMA missing",
"impact": "Leads to one-way audio or call setup failure for carriers requiring PCMU/PCMA. Media negotiation will fail when remote endpoint doesn't support Opus.",
"fix": "Add PCMU and PCMA codecs: codecs=pcmu,pcma,opus,g729",
"confidence": 0.92,
"root_cause": "Codec list not aligned between SIP trunk and carrier requirements",
"severity": "high"
})
return issues
def check_security_issues(config):
config_lower = config.lower()
issues = []
if 'srtp=optional' in config_lower or ('srtp' not in config_lower):
issues.append({
"finding": "SRTP Not Enforced (Security Risk)",
"impact": "Media encryption not enforced. Voice traffic exposed to eavesdropping. Fails PCI-DSS and HIPAA compliance requirements.",
"fix": "Enforce SRTP encryption: srtp=required, crypto_suites=AES_CM_128_HMAC_SHA1_80",
"confidence": 0.97,
"root_cause": "Default configuration prioritizes compatibility over security",
"severity": "critical"
})
if 'tls_version=1.0' in config_lower or 'tls_version=1.1' in config_lower:
issues.append({
"finding": "Deprecated TLS Version (1.0 or 1.1)",
"impact": "Security vulnerability to downgrade attacks. Modern carriers reject TLS 1.0/1.1 connections, causing trunk registration failure.",
"fix": "Upgrade to TLS 1.2 minimum: tls_version=1.2, tls_min_version=1.2",
"confidence": 0.94,
"root_cause": "Legacy TLS version not updated after security advisories (RFC 8996)",
"severity": "critical"
})
return issues
def check_nat_issues(config):
config_lower = config.lower()
issues = []
if 'nat_traversal=disabled' in config_lower or 'nat' not in config_lower:
issues.append({
"finding": "NAT Traversal Disabled",
"impact": "One-way audio or no audio on calls traversing NAT boundaries. RTP packets cannot find return path. Critical for cloud-based SBC deployments.",
"fix": "Enable NAT traversal: nat_traversal=enabled, ice_support=enabled, stun_server=stun.l.google.com:19302",
"confidence": 0.93,
"root_cause": "NAT handling disabled or STUN/ICE not configured for cloud deployment",
"severity": "high"
})
return issues
def check_port_issues(config):
config_lower = config.lower()
issues = []
if 'port=' in config_lower:
port_match = re.search(r'port=(\d+)', config_lower)
if port_match:
port = int(port_match.group(1))
if port != 5060 and port != 5061:
issues.append({
"finding": f"Non-Standard SIP Port ({port})",
"impact": "Most carriers expect SIP on port 5060 (UDP) or 5061 (TLS). Non-standard ports may be blocked by firewalls or rejected by carriers.",
"fix": "Use standard ports: port=5060 for UDP/TCP, port=5061 for TLS",
"confidence": 0.88,
"root_cause": "Custom port configuration during migration or testing",
"severity": "medium"
})
if 'rtp_port_range' in config_lower:
range_match = re.search(r'rtp_port_range=(\d+)-(\d+)', config_lower)
if range_match:
start, end = int(range_match.group(1)), int(range_match.group(2))
range_size = end - start
if range_size < 1000:
issues.append({
"finding": f"Insufficient RTP Port Range ({range_size} ports)",
"impact": "Port exhaustion during high call volume (>50 concurrent calls). New calls fail with 'No RTP resources available'.",
"fix": f"Expand RTP port range to at least 1000 ports: rtp_port_range=10000-11000",
"confidence": 0.91,
"root_cause": "Default narrow port range not scaled for production load",
"severity": "medium"
})
return issues
def check_session_timer(config):
config_lower = config.lower()
issues = []
if 'session_expires' in config_lower:
timer_match = re.search(r'session_expires=(\d+)', config_lower)
if timer_match:
timer = int(timer_match.group(1))
if timer < 900:
issues.append({
"finding": f"Session Timer Too Short ({timer} seconds)",
"impact": "Long-duration calls terminated prematurely during brief network issues. Customer complaints about dropped calls after {timer//60} minutes.",
"fix": "Increase session timer: session_expires=1800, min_se=900",
"confidence": 0.86,
"root_cause": "Conservative timer settings from legacy PBX migration",
"severity": "medium"
})
return issues
def check_dtmf_issues(config):
config_lower = config.lower()
issues = []
if 'dtmf_relay=inband' in config_lower or 'dtmf' not in config_lower:
issues.append({
"finding": "DTMF Method Not Optimal",
"impact": "In-band DTMF unreliable with compressed codecs. IVR systems may not receive digits correctly, causing customer frustration in phone menus.",
"fix": "Use RFC2833 for DTMF: dtmf_relay=rfc2833 (preferred by most carriers)",
"confidence": 0.89,
"root_cause": "Default in-band DTMF not suitable for VoIP environments",
"severity": "medium"
})
return issues
def check_dns_issues(config):
config_lower = config.lower()
issues = []
if 'dns_srv_lookup=disabled' in config_lower or 'dns_srv=disabled' in config_lower:
issues.append({
"finding": "DNS SRV Lookup Disabled",
"impact": "SBC cannot discover carrier SIP servers via DNS SRV records. Loses automatic failover and load balancing capabilities.",
"fix": "Enable DNS SRV resolution: dns_srv_lookup=enabled",
"confidence": 0.85,
"root_cause": "DNS SRV disabled to work around temporary DNS issues",
"severity": "low"
})
return issues
def analyze_configuration(config_text, request: gr.Request):
"""Comprehensive configuration analysis"""
if not config_text.strip():
return "Please enter a configuration to analyze."
# Track usage
user_ip = request.client.host if request else "unknown"
record_request(user_ip, "config")
# Run all checks
all_issues = []
all_issues.extend(check_codec_issues(config_text))
all_issues.extend(check_security_issues(config_text))
all_issues.extend(check_nat_issues(config_text))
all_issues.extend(check_port_issues(config_text))
all_issues.extend(check_session_timer(config_text))
all_issues.extend(check_dtmf_issues(config_text))
all_issues.extend(check_dns_issues(config_text))
# Build output
if not all_issues:
return """# ✅ Configuration Validation: PASSED
**Status:** No critical issues detected
**Summary:** Your SBC configuration follows enterprise VoIP best practices. The configuration includes:
- ✅ Proper codec negotiation (PCMU/PCMA/Opus)
- ✅ SRTP encryption enforced
- ✅ TLS 1.2+ for signaling security
- ✅ NAT traversal enabled with ICE support
- ✅ Standard SIP ports (5060/5061)
- ✅ Adequate RTP port range for scaling
- ✅ Appropriate session timers
- ✅ RFC2833 DTMF for IVR compatibility
**Next Steps:**
1. Test with a trial call to verify audio quality
2. Monitor call metrics for 24-48 hours
3. Review integration with your UC platform
**Need Help?** Contact your carrier's technical team or Bandwidth Support for deployment assistance.
"""
# Issues found - categorize by severity
critical = [i for i in all_issues if i['severity'] == 'critical']
high = [i for i in all_issues if i['severity'] == 'high']
medium = [i for i in all_issues if i['severity'] == 'medium']
low = [i for i in all_issues if i['severity'] == 'low']
output = []
# Customer impact header
if critical or high:
output.append("# ⚠️ Configuration Validation: ISSUES DETECTED\n")
output.append("**Customer Impact:** This configuration will likely cause audio quality issues, call failures, or security vulnerabilities in production deployments.\n")
else:
output.append("# ⚠️ Configuration Validation: WARNINGS\n")
output.append("**Status:** Configuration will work but has optimization opportunities.\n")
output.append(f"\n**Issues Found:** {len(all_issues)} ({len(critical)} critical, {len(high)} high, {len(medium)} medium, {len(low)} low)\n")
output.append("\n" + "="*80 + "\n\n")
# Show issues by severity
issue_num = 1
for severity_name, severity_list in [("🔴 CRITICAL", critical), ("🟠 HIGH", high), ("🟡 MEDIUM", medium), ("⚪ LOW", low)]:
if severity_list:
output.append(f"## {severity_name} PRIORITY\n\n")
for issue in severity_list:
output.append(f"### Issue #{issue_num}: {issue['finding']}\n\n")
output.append(f"**📊 Confidence:** {issue['confidence']:.0%} | ")
output.append(f"**🎯 Root Cause:** {issue['root_cause']}\n\n")
output.append(f"**💥 Customer Impact:**\n{issue['impact']}\n\n")
output.append(f"**✅ How to Fix:**\n```\n{issue['fix']}\n```\n\n")
output.append("-"*80 + "\n\n")
issue_num += 1
# Action items
output.append("## 📋 Recommended Actions\n\n")
if critical:
output.append("1. **URGENT:** Address all critical issues before deploying to production\n")
if high:
output.append("2. Fix high-priority issues to prevent customer-impacting audio problems\n")
if medium:
output.append("3. Optimize medium-priority items for better call quality and reliability\n")
output.append("\n**Testing Checklist:**\n")
output.append("- [ ] Make test call and verify two-way audio\n")
output.append("- [ ] Test DTMF input in IVR menu\n")
output.append("- [ ] Verify TLS handshake with carrier\n")
output.append("- [ ] Confirm NAT traversal in cloud environment\n")
output.append("- [ ] Test emergency calling if applicable\n")
return "".join(output)
def ai_troubleshoot(symptom_description, request: gr.Request):
"""AI-powered troubleshooting with usage tracking"""
if not symptom_description.strip():
return "Please describe the issue you're experiencing with your SBC or calls."
# Rate limiting
user_ip = request.client.host if request else "unknown"
allowed, error_msg = check_rate_limit(user_ip)
if not allowed:
return f"## ⚠️ Rate Limit Reached\n\n{error_msg}\n\nThis is a demo with usage limits to control costs."
# Check for API key
api_key = os.environ.get("ANTHROPIC_API_KEY")
if not api_key:
return fallback_troubleshoot(symptom_description)
try:
import anthropic
client = anthropic.Anthropic(api_key=api_key)
prompt = f"""You are an expert SBC (Session Border Controller) troubleshooting assistant for enterprise VoIP deployments.
A customer reports the following issue:
"{symptom_description}"
Provide a diagnostic analysis with:
1. **Likely Root Causes** - What SBC misconfigurations could cause this in an enterprise VoIP environment
2. **Configuration Parameters to Check** - Specific config settings to verify
3. **Recommended Fixes** - Exact configuration commands to try
4. **Confidence Level** - How confident you are in this diagnosis (as a percentage)
Focus on common enterprise VoIP issues:
- NAT traversal for audio problems in cloud deployments
- DTMF configuration for IVR issues
- TLS/security for connection failures
- Codec negotiation for audio quality (PCMU/PCMA/Opus compatibility)
- Session timers for dropped calls
- Port configuration for firewall issues
- Emergency calling routing
Format your response in clear sections with markdown. Be specific and actionable."""
message = client.messages.create(
model="claude-3-5-haiku-20241022",
max_tokens=800,
messages=[{
"role": "user",
"content": prompt
}]
)
# Record successful request
record_request(user_ip, "ai")
# Format response
analysis = f"# 🤖 AI-Powered Troubleshooting Analysis\n\n"
analysis += f"**Your Issue:** {symptom_description}\n\n"
analysis += "**Powered by Claude 3.5 Haiku** | **Optimized for Bandwidth Duet®**\n\n"
analysis += "---\n\n"
analysis += message.content[0].text
analysis += "\n\n---\n\n"
analysis += "## 📋 Next Steps\n\n"
analysis += "1. Review the configuration parameters listed above in your SBC\n"
analysis += "2. Switch to the **Configuration Validator** tab to paste your full config for detailed analysis\n"
analysis += "3. Apply the recommended fixes\n"
analysis += "4. Test with a trial call\n"
analysis += "5. Verify emergency calling if applicable\n"
return analysis
except Exception as e:
return fallback_troubleshoot(symptom_description)
def fallback_troubleshoot(symptom_description):
"""Fallback pattern-based analysis"""
return "## 🔍 Troubleshooting Analysis\n\nAI analysis temporarily unavailable. Please use the Configuration Validator tab or contact Bandwidth Support."
def get_usage_stats():
"""Return formatted usage statistics"""
stats = load_usage_stats()
output = f"""# 📊 Bandwidth Duet® SBC Validator - Usage Statistics
**Total AI Troubleshooting Requests:** {stats['total_ai_requests']}
**Total Configuration Validations:** {stats['total_config_validations']}
**Unique Visitors:** {stats['total_visitors']}
**First Request:** {stats['first_request'] or 'No requests yet'}
**Last Request:** {stats['last_request'] or 'No requests yet'}
**Requests by Hour (Last 24 hours):**
"""
# Show last 24 hours of activity
if stats['requests_by_hour']:
sorted_hours = sorted(stats['requests_by_hour'].items(), reverse=True)[:24]
for hour, count in sorted_hours:
output += f"\n- {hour}: {count} requests"
else:
output += "\nNo activity recorded yet"
return output
# Example configurations
BROKEN_CONFIG = """transport=udp
port=5060
codecs=opus
srtp=optional
nat_traversal=disabled
tls_version=1.0
session_expires=90
dtmf_relay=inband
rtp_port_range=10000-10100
dns_srv_lookup=disabled"""
VALID_CONFIG = """transport=tcp
port=5061
codecs=pcmu,pcma,opus,g729
srtp=required
crypto_suites=AES_CM_128_HMAC_SHA1_80
nat_traversal=enabled
ice_support=enabled
stun_server=stun.l.google.com:19302
tls_version=1.2
tls_min_version=1.2
cipher_suites=ECDHE-RSA-AES256-GCM-SHA384
session_expires=1800
min_se=900
dtmf_relay=rfc2833
rtp_port_range=10000-20000
dns_srv_lookup=enabled
media_anchoring=enabled"""
def load_broken_config():
return BROKEN_CONFIG
def load_valid_config():
return VALID_CONFIG
# Create Gradio interface
demo = gr.Blocks(theme=gr.themes.Soft(), title="Bandwidth Duet® - SBC Configuration Validator")
with demo:
gr.Markdown("""
# 🔊 Bandwidth Duet® - AI-Powered SBC Configuration Validator
### Intelligent Analysis & Troubleshooting for Enterprise VoIP Deployments
**Powered by Bandwidth® - The Global Enterprise Cloud Communications Company**
Validate your SBC configuration and get AI-powered troubleshooting recommendations for enterprise voice deployments.
""")
with gr.Tabs():
with gr.Tab("🤖 AI Troubleshoot"):
gr.Markdown("""
### Describe Your Issue in Plain English
Tell me what's wrong with your calls and I'll diagnose the likely SBC configuration issues.
""")
with gr.Row():
with gr.Column(scale=1):
symptom_input = gr.Textbox(
label="What issue are you experiencing?",
lines=5,
placeholder="Examples:\n- 'My calls have one-way audio'\n- 'IVR menu not responding to key presses'\n- 'Calls dropping after 2 minutes'\n- 'TLS handshake failing with carrier'\n- 'Emergency calls not routing correctly'",
)
with gr.Row():
example_btn1 = gr.Button("Example: One-way audio", variant="secondary", size="sm")
example_btn2 = gr.Button("Example: IVR not working", variant="secondary", size="sm")
troubleshoot_btn = gr.Button("🔍 Diagnose Issue", variant="primary", size="lg")
with gr.Column(scale=1):
gr.Markdown("### AI Diagnosis")
troubleshoot_output = gr.Markdown()
# Example button actions
example_btn1.click(fn=lambda: "I can hear the other person but they can't hear me", outputs=symptom_input)
example_btn2.click(fn=lambda: "Customers can't navigate our IVR menu by pressing numbers", outputs=symptom_input)
troubleshoot_btn.click(fn=ai_troubleshoot, inputs=symptom_input, outputs=troubleshoot_output)
with gr.Tab("⚙️ Configuration Validator"):
gr.Markdown("""
### Paste Your SBC Configuration
Get detailed analysis of specific configuration parameters for enterprise VoIP deployments.
""")
with gr.Row():
with gr.Column(scale=1):
config_input = gr.Textbox(
label="Paste your configuration here",
lines=20,
placeholder="Example:\ntransport=tcp\nport=5061\ncodecs=pcmu,pcma,opus\nsrtp=required\nnat_traversal=enabled\n...",
)
with gr.Row():
broken_btn = gr.Button("📛 Load Example: Common Mistakes", variant="secondary", size="sm")
valid_btn = gr.Button("✅ Load Example: Best Practice Config", variant="secondary", size="sm")
analyze_btn = gr.Button("🔍 Validate Configuration", variant="primary", size="lg")
with gr.Column(scale=1):
gr.Markdown("### Validation Results")
output = gr.Markdown()
# Button actions
broken_btn.click(fn=load_broken_config, outputs=config_input)
valid_btn.click(fn=load_valid_config, outputs=config_input)
analyze_btn.click(fn=analyze_configuration, inputs=config_input, outputs=output)
with gr.Tab("📊 Usage Stats"):
gr.Markdown("""
### Validator Usage Statistics
Track how many people are using this tool.
""")
stats_output = gr.Markdown()
refresh_btn = gr.Button("🔄 Refresh Stats", variant="primary")
refresh_btn.click(fn=get_usage_stats, outputs=stats_output)
demo.load(fn=get_usage_stats, outputs=stats_output) # Auto-load on page load
gr.Markdown("""
---
## About Bandwidth Duet®
**Bandwidth Duet®** is a comprehensive solution for enterprise voice connectivity, available directly from a carrier who owns and operates its underlying infrastructure.
**Key Features:**
- **Direct Routing:** Seamless enterprise calling with existing SIP trunking
- **Dynamic E911:** Certified emergency calling with accurate location routing
- **Software-Driven SIP:** Built on Bandwidth's nationwide IP voice network
- **Simplified Migration:** Accelerate telecom migration with less complexity
**This Validator Checks For:**
- Security vulnerabilities (SRTP, TLS, authentication)
- Codec negotiation issues for enterprise UC platforms
- NAT traversal problems in cloud environments
- DTMF signaling for IVR systems
- Port configuration and firewall compatibility
- Session timer optimizations
- Emergency calling routing readiness
**Powered By:**
- AI Model: Claude 3.5 Haiku for natural language troubleshooting
- Detection Engine: Rule-based validation across 7 configuration categories
- Rate Limiting: 5 requests/user/hour (demo environment)
---
💡 **Pro Tip:** Run this validator before deploying your SBC to production. Most VoIP issues can be prevented with proper configuration.
Built by Philip Drammeh | Independent Researcher | [LinkedIn](https://www.linkedin.com/in/philipdrammeh)
*Bandwidth Duet® is a registered trademark of Bandwidth Inc. This tool is an independent project and not officially affiliated with or endorsed by Bandwidth Inc.*
""")
if __name__ == "__main__":
demo.launch()