sbc_validator / app.py
Phildram1's picture
Update app.py
9e5800d verified
import gradio as gr
import re
import os
from datetime import datetime, timedelta
# Simple in-memory rate limiting (resets when app restarts)
request_tracker = {}
MAX_REQUESTS_PER_HOUR = 20 # Limit to 20 AI requests per hour total
MAX_REQUESTS_PER_USER = 5 # Limit to 5 requests per user per hour
def check_rate_limit(user_ip):
"""Check if request is within rate limits"""
current_time = datetime.now()
# Clean old entries
cutoff_time = current_time - timedelta(hours=1)
request_tracker[user_ip] = [t for t in request_tracker.get(user_ip, []) if t > cutoff_time]
# Check user limit
user_requests = len(request_tracker.get(user_ip, []))
if user_requests >= MAX_REQUESTS_PER_USER:
return False, f"Rate limit reached. Please try again in an hour. (Limit: {MAX_REQUESTS_PER_USER} requests/hour per user)"
# Check total limit
total_requests = sum(len(times) for times in request_tracker.values())
if total_requests >= MAX_REQUESTS_PER_HOUR:
return False, "System rate limit reached. Please try again later. (Demo usage limit)"
return True, None
def record_request(user_ip):
"""Record a successful request"""
if user_ip not in request_tracker:
request_tracker[user_ip] = []
request_tracker[user_ip].append(datetime.now())
# Known SBC Misconfiguration Detection Rules
def check_codec_issues(config):
"""Check for codec-related problems"""
config_lower = config.lower()
issues = []
# Check if only one codec
codec_keywords = ['opus', 'pcmu', 'pcma', 'g729', 'g711']
codec_count = sum(1 for c in codec_keywords if c in config_lower)
if codec_count == 1 and 'opus' in config_lower and 'pcmu' not in config_lower:
issues.append({
"finding": "Codec Mismatch - Only Opus configured, PCMU/PCMA missing",
"impact": "Leads to one-way audio or call setup failure for carriers requiring PCMU/PCMA. Media negotiation will fail when remote endpoint doesn't support Opus.",
"fix": "Add PCMU and PCMA codecs: codecs=pcmu,pcma,opus,g729",
"confidence": 0.92,
"root_cause": "Codec list not aligned between SIP trunk and carrier requirements",
"severity": "high"
})
return issues
def check_security_issues(config):
"""Check for security problems"""
config_lower = config.lower()
issues = []
# Check SRTP
if 'srtp=optional' in config_lower or ('srtp' not in config_lower):
issues.append({
"finding": "SRTP Not Enforced (Security Risk)",
"impact": "Media encryption not enforced. Voice traffic exposed to eavesdropping. Fails PCI-DSS and HIPAA compliance requirements.",
"fix": "Enforce SRTP encryption: srtp=required, crypto_suites=AES_CM_128_HMAC_SHA1_80",
"confidence": 0.97,
"root_cause": "Default configuration prioritizes compatibility over security",
"severity": "critical"
})
# Check TLS version
if 'tls_version=1.0' in config_lower or 'tls_version=1.1' in config_lower:
issues.append({
"finding": "Deprecated TLS Version (1.0 or 1.1)",
"impact": "Security vulnerability to downgrade attacks. Modern carriers reject TLS 1.0/1.1 connections, causing trunk registration failure.",
"fix": "Upgrade to TLS 1.2 minimum: tls_version=1.2, tls_min_version=1.2",
"confidence": 0.94,
"root_cause": "Legacy TLS version not updated after security advisories (RFC 8996)",
"severity": "critical"
})
return issues
def check_nat_issues(config):
"""Check for NAT traversal problems"""
config_lower = config.lower()
issues = []
if 'nat_traversal=disabled' in config_lower or 'nat' not in config_lower:
issues.append({
"finding": "NAT Traversal Disabled",
"impact": "One-way audio or no audio on calls traversing NAT boundaries. RTP packets cannot find return path. Critical for cloud-based SBCs.",
"fix": "Enable NAT traversal: nat_traversal=enabled, ice_support=enabled, stun_server=stun.l.google.com:19302",
"confidence": 0.93,
"root_cause": "NAT handling disabled or STUN/ICE not configured for cloud deployment",
"severity": "high"
})
return issues
def check_port_issues(config):
"""Check for port configuration problems"""
config_lower = config.lower()
issues = []
# Check if port is non-standard
if 'port=' in config_lower:
port_match = re.search(r'port=(\d+)', config_lower)
if port_match:
port = int(port_match.group(1))
if port != 5060 and port != 5061:
issues.append({
"finding": f"Non-Standard SIP Port ({port})",
"impact": "Most carriers expect SIP on port 5060 (UDP) or 5061 (TLS). Non-standard ports may be blocked by firewalls or rejected by carriers.",
"fix": "Use standard ports: port=5060 for UDP/TCP, port=5061 for TLS",
"confidence": 0.88,
"root_cause": "Custom port configuration during migration or testing",
"severity": "medium"
})
# Check RTP port range
if 'rtp_port_range' in config_lower:
range_match = re.search(r'rtp_port_range=(\d+)-(\d+)', config_lower)
if range_match:
start, end = int(range_match.group(1)), int(range_match.group(2))
range_size = end - start
if range_size < 1000:
issues.append({
"finding": f"Insufficient RTP Port Range ({range_size} ports)",
"impact": "Port exhaustion during high call volume (>50 concurrent calls). New calls fail with 'No RTP resources available'.",
"fix": f"Expand RTP port range to at least 1000 ports: rtp_port_range=10000-11000",
"confidence": 0.91,
"root_cause": "Default narrow port range not scaled for production load",
"severity": "medium"
})
return issues
def check_session_timer(config):
"""Check session timer settings"""
config_lower = config.lower()
issues = []
if 'session_expires' in config_lower:
timer_match = re.search(r'session_expires=(\d+)', config_lower)
if timer_match:
timer = int(timer_match.group(1))
if timer < 900:
issues.append({
"finding": f"Session Timer Too Short ({timer} seconds)",
"impact": "Long-duration calls terminated prematurely during brief network issues. Customer complaints about dropped calls after {timer//60} minutes.",
"fix": "Increase session timer: session_expires=1800, min_se=900",
"confidence": 0.86,
"root_cause": "Conservative timer settings from legacy PBX migration",
"severity": "medium"
})
return issues
def check_dtmf_issues(config):
"""Check DTMF configuration"""
config_lower = config.lower()
issues = []
if 'dtmf_relay=inband' in config_lower or 'dtmf' not in config_lower:
issues.append({
"finding": "DTMF Method Not Optimal",
"impact": "In-band DTMF unreliable with compressed codecs. IVR systems may not receive digits correctly, causing customer frustration in phone menus.",
"fix": "Use RFC2833 for DTMF: dtmf_relay=rfc2833 (preferred by most carriers)",
"confidence": 0.89,
"root_cause": "Default in-band DTMF not suitable for VoIP environments",
"severity": "medium"
})
return issues
def check_dns_issues(config):
"""Check DNS configuration"""
config_lower = config.lower()
issues = []
if 'dns_srv_lookup=disabled' in config_lower or 'dns_srv=disabled' in config_lower:
issues.append({
"finding": "DNS SRV Lookup Disabled",
"impact": "SBC cannot discover carrier SIP servers via DNS SRV records. Loses automatic failover and load balancing capabilities.",
"fix": "Enable DNS SRV resolution: dns_srv_lookup=enabled",
"confidence": 0.85,
"root_cause": "DNS SRV disabled to work around temporary DNS issues",
"severity": "low"
})
return issues
def analyze_configuration(config_text):
"""Comprehensive configuration analysis"""
if not config_text.strip():
return "Please enter a configuration to analyze."
# Run all checks
all_issues = []
all_issues.extend(check_codec_issues(config_text))
all_issues.extend(check_security_issues(config_text))
all_issues.extend(check_nat_issues(config_text))
all_issues.extend(check_port_issues(config_text))
all_issues.extend(check_session_timer(config_text))
all_issues.extend(check_dtmf_issues(config_text))
all_issues.extend(check_dns_issues(config_text))
# Build output
if not all_issues:
return """# βœ… Configuration Validation: PASSED
**Status:** No critical issues detected
**Summary:** Your SBC configuration follows the service best practices. The configuration includes:
- βœ… Proper codec negotiation (PCMU/PCMA/Opus)
- βœ… SRTP encryption enforced
- βœ… TLS 1.2+ for signaling security
- βœ… NAT traversal enabled with ICE support
- βœ… Standard SIP ports (5060/5061)
- βœ… Adequate RTP port range for scaling
- βœ… Appropriate session timers
- βœ… RFC2833 DTMF for IVR compatibility
**Next Steps:**
1. Test with a trial call to verify audio quality
2. Monitor call metrics for 24-48 hours
3. Review Service Connect contact flow integration
**Need Help?** Contact Support or your carrier's technical team for deployment assistance.
"""
# Issues found - categorize by severity
critical = [i for i in all_issues if i['severity'] == 'critical']
high = [i for i in all_issues if i['severity'] == 'high']
medium = [i for i in all_issues if i['severity'] == 'medium']
low = [i for i in all_issues if i['severity'] == 'low']
output = []
# Customer impact header
if critical or high:
output.append("# ⚠️ Configuration Validation: ISSUES DETECTED\n")
output.append("**Customer Impact:** This configuration will likely cause audio quality issues, call failures, or security vulnerabilities in production.\n")
else:
output.append("# ⚠️ Configuration Validation: WARNINGS\n")
output.append("**Status:** Configuration will work but has optimization opportunities.\n")
output.append(f"\n**Issues Found:** {len(all_issues)} ({len(critical)} critical, {len(high)} high, {len(medium)} medium, {len(low)} low)\n")
output.append("\n" + "="*80 + "\n\n")
# Show issues by severity
issue_num = 1
for severity_name, severity_list in [("πŸ”΄ CRITICAL", critical), ("🟠 HIGH", high), ("🟑 MEDIUM", medium), ("βšͺ LOW", low)]:
if severity_list:
output.append(f"## {severity_name} PRIORITY\n\n")
for issue in severity_list:
output.append(f"### Issue #{issue_num}: {issue['finding']}\n\n")
output.append(f"**πŸ“Š Confidence:** {issue['confidence']:.0%} | ")
output.append(f"**🎯 Root Cause:** {issue['root_cause']}\n\n")
output.append(f"**πŸ’₯ Customer Impact:**\n{issue['impact']}\n\n")
output.append(f"**βœ… How to Fix:**\n```\n{issue['fix']}\n```\n\n")
output.append("-"*80 + "\n\n")
issue_num += 1
# Action items
output.append("## πŸ“‹ Recommended Actions\n\n")
if critical:
output.append("1. **URGENT:** Address all critical issues before deploying to production\n")
if high:
output.append("2. Fix high-priority issues to prevent customer-impacting audio problems\n")
if medium:
output.append("3. Optimize medium-priority items for better call quality and reliability\n")
output.append("\n**Testing Checklist:**\n")
output.append("- [ ] Make test call and verify two-way audio\n")
output.append("- [ ] Test DTMF input in IVR menu\n")
output.append("- [ ] Verify TLS handshake with carrier\n")
output.append("- [ ] Confirm NAT traversal in cloud environment\n")
return "".join(output)
def ai_troubleshoot(symptom_description, request: gr.Request):
"""AI-powered troubleshooting using Claude API with rate limiting"""
if not symptom_description.strip():
return "Please describe the issue you're experiencing with your SBC or calls."
# Rate limiting
user_ip = request.client.host if request else "unknown"
allowed, error_msg = check_rate_limit(user_ip)
if not allowed:
return f"## ⚠️ Rate Limit Reached\n\n{error_msg}\n\nThis is a demo with usage limits to control costs."
# Check for API key
api_key = os.environ.get("ANTHROPIC_API_KEY")
print(f"DEBUG: API key detected: {bool(api_key)}") # Debug log
if not api_key:
# Fallback to pattern-based analysis if no API key
print("DEBUG: No API key found, using fallback") # Debug log
return fallback_troubleshoot(symptom_description)
try:
# Use Claude API for real AI-powered analysis
import anthropic
print(f"DEBUG: Creating Anthropic client") # Debug
client = anthropic.Anthropic(api_key=api_key)
prompt = f"""You are an expert SBC (Session Border Controller) troubleshooting assistant for BYOC service deployments.
A customer reports the following issue:
"{symptom_description}"
Provide a diagnostic analysis with:
1. **Likely Root Causes** - What SBC misconfigurations could cause this
2. **Configuration Parameters to Check** - Specific config settings to verify
3. **Recommended Fixes** - Exact configuration commands to try
4. **Confidence Level** - How confident you are in this diagnosis (as a percentage)
Focus on common BYOC issues like:
- NAT traversal for audio problems
- DTMF configuration for IVR issues
- TLS/security for connection failures
- Codec negotiation for audio quality
- Session timers for dropped calls
- Port configuration for firewall issues
Format your response in clear sections with markdown. Be specific and actionable."""
print(f"DEBUG: Calling Claude API") # Debug
message = client.messages.create(
model="claude-3-5-haiku-20241022", # Using Haiku for cost efficiency
max_tokens=800, # Limit response length to control costs
messages=[{
"role": "user",
"content": prompt
}]
)
print(f"DEBUG: API call successful") # Debug
# Record successful request
record_request(user_ip)
# Format response
analysis = f"# πŸ€– AI-Powered Troubleshooting Analysis\n\n"
analysis += f"**Your Issue:** {symptom_description}\n\n"
analysis += "**Powered by Claude 3.5 Haiku**\n\n"
analysis += "---\n\n"
analysis += message.content[0].text
analysis += "\n\n---\n\n"
analysis += "## πŸ“‹ Next Steps\n\n"
analysis += "1. Review the configuration parameters listed above in your SBC\n"
analysis += "2. Switch to the **Configuration Validator** tab to paste your full config for detailed analysis\n"
analysis += "3. Apply the recommended fixes\n"
analysis += "4. Test with a trial call\n"
return analysis
except Exception as e:
# Fallback to pattern-based if API fails
print(f"DEBUG: API call failed with error: {e}") # Debug
return fallback_troubleshoot(symptom_description)
def fallback_troubleshoot(symptom_description):
"""Pattern-based fallback troubleshooting when API is unavailable"""
symptom_lower = symptom_description.lower()
# Pattern-based analysis (original logic)
analysis = []
analysis.append("# πŸ” Troubleshooting Analysis\n")
analysis.append(f"**Your Issue:** {symptom_description}\n\n")
analysis.append("**Note:** AI analysis temporarily unavailable. Using pattern-based diagnosis.\n\n")
analysis.append("---\n\n")
recommendations = []
# One-way audio symptoms
if any(keyword in symptom_lower for keyword in ['one-way audio', 'one way audio', 'cant hear', "can't hear", 'no audio', 'audio not working']):
recommendations.append({
'issue': 'One-Way Audio / No Audio',
'likely_causes': [
'NAT traversal disabled',
'Incorrect RTP port configuration',
'Firewall blocking RTP ports',
'Media anchoring disabled'
],
'configs_to_check': [
'nat_traversal=enabled',
'ice_support=enabled',
'rtp_port_range=10000-20000',
'media_anchoring=enabled'
],
'confidence': 0.95
})
# IVR/DTMF issues
if any(keyword in symptom_lower for keyword in ['ivr', 'dtmf', 'keypad', 'menu', 'press 1', 'touch tone', 'dial pad']):
recommendations.append({
'issue': 'IVR / DTMF Not Working',
'likely_causes': [
'DTMF relay method mismatch',
'In-band DTMF with compressed codecs',
'Carrier expects RFC2833 but SBC using SIP INFO'
],
'configs_to_check': [
'dtmf_relay=rfc2833',
'dtmf_mode=rfc2833',
'Verify codec supports DTMF (avoid highly compressed codecs)'
],
'confidence': 0.92
})
# Call setup failures
if any(keyword in symptom_lower for keyword in ['call fails', 'cant connect', "can't connect", 'wont connect', "won't connect", '408', '503', '481', 'timeout']):
recommendations.append({
'issue': 'Call Setup Failures',
'likely_causes': [
'SIP port misconfiguration',
'DNS SRV lookup disabled',
'TLS handshake failure',
'Incorrect Contact header IP'
],
'configs_to_check': [
'port=5060 (UDP) or port=5061 (TLS)',
'dns_srv_lookup=enabled',
'tls_version=1.2',
'contact_ip=<public_IP>'
],
'confidence': 0.88
})
# Dropped calls
if any(keyword in symptom_lower for keyword in ['dropped', 'disconnect', 'call drops', 'hangs up']):
recommendations.append({
'issue': 'Calls Dropping / Premature Disconnection',
'likely_causes': [
'Session timer too short',
'Network instability with aggressive timers',
'Keep-alive mechanism not configured'
],
'configs_to_check': [
'session_expires=1800',
'min_se=900',
'session_refresh=uac'
],
'confidence': 0.86
})
# Generate output
if not recommendations:
analysis.append("## πŸ” General Diagnostic Approach\n\n")
analysis.append("I don't recognize specific symptoms in your description. Here's how to diagnose:\n\n")
analysis.append("1. **Paste your SBC configuration** into the validator tab for automated analysis\n")
analysis.append("2. **Check common issues:**\n")
analysis.append(" - NAT traversal settings for audio issues\n")
analysis.append(" - DTMF configuration for IVR problems\n")
analysis.append(" - TLS/port settings for connection failures\n")
analysis.append("3. **Review carrier documentation** for specific requirements\n")
else:
for rec in recommendations:
analysis.append(f"## 🎯 Diagnosis: {rec['issue']}\n\n")
analysis.append(f"**Confidence:** {rec['confidence']:.0%}\n\n")
analysis.append("**Likely Root Causes:**\n")
for cause in rec['likely_causes']:
analysis.append(f"- {cause}\n")
analysis.append("\n**Configuration Parameters to Check:**\n```\n")
for config in rec['configs_to_check']:
analysis.append(f"{config}\n")
analysis.append("```\n\n")
analysis.append("---\n\n")
return "".join(analysis)
# Example configurations
BROKEN_CONFIG = """transport=udp
port=5060
codecs=opus
srtp=optional
nat_traversal=disabled
tls_version=1.0
session_expires=90
dtmf_relay=inband
rtp_port_range=10000-10100
dns_srv_lookup=disabled"""
VALID_CONFIG = """transport=tcp
port=5061
codecs=pcmu,pcma,opus,g729
srtp=required
crypto_suites=AES_CM_128_HMAC_SHA1_80
nat_traversal=enabled
ice_support=enabled
stun_server=stun.l.google.com:19302
tls_version=1.2
tls_min_version=1.2
cipher_suites=ECDHE-RSA-AES256-GCM-SHA384
session_expires=1800
min_se=900
dtmf_relay=rfc2833
rtp_port_range=10000-20000
dns_srv_lookup=enabled
media_anchoring=enabled"""
def load_broken_config():
return BROKEN_CONFIG
def load_valid_config():
return VALID_CONFIG
# Create Gradio interface
demo = gr.Blocks(theme=gr.themes.Soft(), title="Service BYOC - AI-Powered SBC Validator")
with demo:
gr.Markdown("""
# πŸ”’ Bring Your Own Carrier BYOC - AI-Powered Configuration Validator
### Intelligent SBC Analysis & Troubleshooting
**For Customers Using Bring Your Own Carrier (BYOC)**
Validate your SBC configuration and get AI-powered troubleshooting recommendations before connecting to the service.
""")
with gr.Tabs():
with gr.Tab("πŸ€– AI Troubleshoot"):
gr.Markdown("""
### Describe Your Issue in Plain English
Tell me what's wrong with your calls and I'll diagnose the likely SBC configuration issues.
""")
with gr.Row():
with gr.Column(scale=1):
symptom_input = gr.Textbox(
label="What issue are you experiencing?",
lines=5,
placeholder="Examples:\n- 'My calls have one-way audio'\n- 'IVR menu not responding to key presses'\n- 'Calls dropping after 2 minutes'\n- 'TLS handshake failing with carrier'",
)
with gr.Row():
example_btn1 = gr.Button("Example: One-way audio", variant="secondary", size="sm")
example_btn2 = gr.Button("Example: IVR not working", variant="secondary", size="sm")
troubleshoot_btn = gr.Button("πŸ” Diagnose Issue", variant="primary", size="lg")
with gr.Column(scale=1):
gr.Markdown("### AI Diagnosis")
troubleshoot_output = gr.Markdown()
# Example button actions
example_btn1.click(fn=lambda: "I can hear the other person but they can't hear me", outputs=symptom_input)
example_btn2.click(fn=lambda: "Customers can't navigate our IVR menu by pressing numbers", outputs=symptom_input)
troubleshoot_btn.click(fn=ai_troubleshoot, inputs=symptom_input, outputs=troubleshoot_output)
with gr.Tab("βš™οΈ Configuration Validator"):
gr.Markdown("""
### Paste Your SBC Configuration
Get detailed analysis of specific configuration parameters.
""")
with gr.Row():
with gr.Column(scale=1):
config_input = gr.Textbox(
label="Paste your configuration here",
lines=20,
placeholder="Example:\ntransport=tcp\nport=5061\ncodecs=pcmu,pcma,opus\nsrtp=required\nnat_traversal=enabled\n...",
)
with gr.Row():
broken_btn = gr.Button("πŸ“› Load Example: Common Mistakes", variant="secondary", size="sm")
valid_btn = gr.Button("βœ… Load Example: Best Practice Config", variant="secondary", size="sm")
analyze_btn = gr.Button("πŸ” Validate Configuration", variant="primary", size="lg")
with gr.Column(scale=1):
gr.Markdown("### Validation Results")
output = gr.Markdown()
# Button actions
broken_btn.click(fn=load_broken_config, outputs=config_input)
valid_btn.click(fn=load_valid_config, outputs=config_input)
analyze_btn.click(fn=analyze_configuration, inputs=config_input, outputs=output)
gr.Markdown("""
---
## About This Validator
This tool checks for 10+ common SBC misconfigurations in BYOC deployments:
**Security Issues:**
- Weak or missing SRTP encryption
- Deprecated TLS versions (1.0/1.1)
- Unsecure cipher suites
**Audio Quality Issues:**
- Codec mismatches causing one-way audio
- NAT traversal problems in cloud environments
- Incorrect DTMF signaling for IVR systems
**Connectivity Issues:**
- Non-standard SIP ports
- Session timer misconfigurations
- DNS SRV lookup disabled
- Insufficient RTP port ranges
**Confidence Scores:** Each finding includes an AI-powered confidence score (85-97%) and specific remediation steps.
**Who Should Use This:**
- Customers setting up Bring Your Own Carrier
- Telecom engineers configuring SBCs for cloud connectivity
- Operations teams troubleshooting audio quality issues
- Compliance teams validating security configurations
---
πŸ’‘ **Pro Tip:** Run this validator before submitting support tickets. Most BYOC issues can be resolved by following these recommendations.
Built by Philip Drammeh [LinkedIn](https://linkedin.com/in/philip-drammeh)
""")
if __name__ == "__main__":
demo.launch()