Spaces:
Running
Running
Monitoring & Logging - BDR Agent Factory
Overview
Comprehensive monitoring and logging infrastructure for tracking AI capability performance, detecting issues, and ensuring compliance.
Architecture
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Monitoring Stack β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β
β β Metrics β β Logs β β Traces β β
β β (Prometheus)β β (Elasticsearch)β β (Jaeger) β β
β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β
β β β β β
β ββββββββββββββββββββ΄βββββββββββββββββββ β
β β β
β ββββββββββΌβββββββββ β
β β Grafana β β
β β Dashboards β β
β βββββββββββββββββββ β
β β β
β ββββββββββΌβββββββββ β
β β Alert Manager β β
β βββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
1. Metrics Collection
Prometheus Configuration
# prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
- job_name: 'bdr-agent-factory'
static_configs:
- targets: ['localhost:8000']
metrics_path: '/metrics'
- job_name: 'capability-services'
kubernetes_sd_configs:
- role: pod
relabel_configs:
- source_labels: [__meta_kubernetes_pod_label_app]
regex: capability-.*
action: keep
Key Metrics
Request Metrics
from prometheus_client import Counter, Histogram, Gauge
# Request counter
request_count = Counter(
'capability_requests_total',
'Total number of capability requests',
['capability_id', 'status', 'decision_type']
)
# Request duration
request_duration = Histogram(
'capability_request_duration_seconds',
'Request duration in seconds',
['capability_id'],
buckets=[0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]
)
# Active requests
active_requests = Gauge(
'capability_active_requests',
'Number of active requests',
['capability_id']
)
# Error rate
error_count = Counter(
'capability_errors_total',
'Total number of errors',
['capability_id', 'error_type']
)
Model Performance Metrics
# Model accuracy
model_accuracy = Gauge(
'model_accuracy',
'Model accuracy score',
['capability_id', 'model_version']
)
# Prediction confidence
prediction_confidence = Histogram(
'prediction_confidence',
'Prediction confidence scores',
['capability_id'],
buckets=[0.5, 0.6, 0.7, 0.8, 0.9, 0.95, 0.99, 1.0]
)
# Model inference time
inference_time = Histogram(
'model_inference_duration_seconds',
'Model inference duration',
['capability_id', 'model_version']
)
Business Metrics
# Claims processed
claims_processed = Counter(
'claims_processed_total',
'Total claims processed',
['system_id', 'decision_type']
)
# Fraud detected
fraud_detected = Counter(
'fraud_cases_detected_total',
'Total fraud cases detected',
['risk_level']
)
# Compliance violations
compliance_violations = Counter(
'compliance_violations_total',
'Total compliance violations',
['framework', 'violation_type']
)
Metrics Instrumentation
from prometheus_client import start_http_server
import time
class CapabilityMetrics:
def __init__(self):
self.request_count = request_count
self.request_duration = request_duration
self.active_requests = active_requests
self.error_count = error_count
def track_request(self, capability_id, func):
"""Decorator to track capability requests"""
def wrapper(*args, **kwargs):
self.active_requests.labels(capability_id=capability_id).inc()
start_time = time.time()
try:
result = func(*args, **kwargs)
status = 'success'
decision_type = result.get('decision_type', 'unknown')
return result
except Exception as e:
status = 'error'
decision_type = 'error'
self.error_count.labels(
capability_id=capability_id,
error_type=type(e).__name__
).inc()
raise
finally:
duration = time.time() - start_time
self.request_duration.labels(
capability_id=capability_id
).observe(duration)
self.request_count.labels(
capability_id=capability_id,
status=status,
decision_type=decision_type
).inc()
self.active_requests.labels(
capability_id=capability_id
).dec()
return wrapper
# Start metrics server
start_http_server(8001)
2. Logging
Structured Logging Configuration
import logging
import json
from datetime import datetime
class StructuredLogger:
def __init__(self, name):
self.logger = logging.getLogger(name)
self.logger.setLevel(logging.INFO)
# JSON formatter
handler = logging.StreamHandler()
handler.setFormatter(self.JSONFormatter())
self.logger.addHandler(handler)
class JSONFormatter(logging.Formatter):
def format(self, record):
log_data = {
'timestamp': datetime.utcnow().isoformat(),
'level': record.levelname,
'logger': record.name,
'message': record.getMessage(),
'module': record.module,
'function': record.funcName,
'line': record.lineno
}
# Add extra fields
if hasattr(record, 'capability_id'):
log_data['capability_id'] = record.capability_id
if hasattr(record, 'request_id'):
log_data['request_id'] = record.request_id
if hasattr(record, 'user_id'):
log_data['user_id'] = record.user_id
if hasattr(record, 'system_id'):
log_data['system_id'] = record.system_id
return json.dumps(log_data)
def info(self, message, **kwargs):
self.logger.info(message, extra=kwargs)
def warning(self, message, **kwargs):
self.logger.warning(message, extra=kwargs)
def error(self, message, **kwargs):
self.logger.error(message, extra=kwargs)
def debug(self, message, **kwargs):
self.logger.debug(message, extra=kwargs)
# Usage
logger = StructuredLogger('bdr_agent_factory')
logger.info(
'Capability invoked',
capability_id='cap_text_classification',
request_id='req_12345',
user_id='user_789'
)
Log Levels
- DEBUG: Detailed diagnostic information
- INFO: General informational messages
- WARNING: Warning messages for potentially harmful situations
- ERROR: Error events that might still allow the application to continue
- CRITICAL: Critical events that may cause the application to abort
Log Categories
Application Logs
logger.info('Application started', version='1.0.0')
logger.info('Capability registered', capability_id='cap_text_classification')
logger.warning('High memory usage detected', memory_usage_mb=8192)
Request Logs
logger.info(
'Request received',
request_id='req_12345',
capability_id='cap_text_classification',
user_id='user_789',
ip_address='192.168.1.1'
)
logger.info(
'Request completed',
request_id='req_12345',
duration_ms=142,
status='success'
)
Error Logs
logger.error(
'Capability invocation failed',
request_id='req_12345',
capability_id='cap_text_classification',
error_type='ValidationError',
error_message='Input text exceeds maximum length',
stack_trace=traceback.format_exc()
)
Audit Logs
logger.info(
'Audit trail created',
audit_id='audit_67890',
request_id='req_12345',
capability_id='cap_text_classification',
user_id='user_789',
decision_type='approve',
compliance_flags={'gdpr': True, 'ifrs17': True}
)
Security Logs
logger.warning(
'Authentication failed',
user_id='user_789',
ip_address='192.168.1.1',
reason='Invalid token'
)
logger.warning(
'Rate limit exceeded',
user_id='user_789',
ip_address='192.168.1.1',
requests_per_minute=150
)
Elasticsearch Configuration
# logstash.conf
input {
file {
path => "/var/log/bdr-agent-factory/*.log"
codec => json
}
}
filter {
# Parse timestamp
date {
match => ["timestamp", "ISO8601"]
target => "@timestamp"
}
# Add tags for different log types
if [capability_id] {
mutate {
add_tag => ["capability_log"]
}
}
if [audit_id] {
mutate {
add_tag => ["audit_log"]
}
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "bdr-agent-factory-%{+YYYY.MM.dd}"
}
}
3. Distributed Tracing
Jaeger Configuration
from opentelemetry import trace
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
# Configure tracer
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)
# Configure Jaeger exporter
jaeger_exporter = JaegerExporter(
agent_host_name='localhost',
agent_port=6831,
)
trace.get_tracer_provider().add_span_processor(
BatchSpanProcessor(jaeger_exporter)
)
# Usage
with tracer.start_as_current_span('capability_invocation') as span:
span.set_attribute('capability_id', 'cap_text_classification')
span.set_attribute('request_id', 'req_12345')
# Perform capability invocation
result = invoke_capability()
span.set_attribute('decision_type', result.decision_type)
span.set_attribute('confidence', result.confidence)
4. Dashboards
Grafana Dashboard Configuration
System Overview Dashboard
{
"dashboard": {
"title": "BDR Agent Factory - System Overview",
"panels": [
{
"title": "Request Rate",
"targets": [
{
"expr": "rate(capability_requests_total[5m])"
}
]
},
{
"title": "Error Rate",
"targets": [
{
"expr": "rate(capability_errors_total[5m])"
}
]
},
{
"title": "P95 Latency",
"targets": [
{
"expr": "histogram_quantile(0.95, rate(capability_request_duration_seconds_bucket[5m]))"
}
]
},
{
"title": "Active Requests",
"targets": [
{
"expr": "sum(capability_active_requests)"
}
]
}
]
}
}
Capability Performance Dashboard
- Request Volume by Capability: Bar chart showing requests per capability
- Latency Distribution: Heatmap of latency percentiles
- Error Rate by Capability: Line chart of error rates
- Model Accuracy: Gauge showing current accuracy
- Prediction Confidence: Histogram of confidence scores
Compliance Dashboard
- Compliance Rate: Gauge showing overall compliance percentage
- Violations by Framework: Bar chart of violations per framework
- Audit Trail Coverage: Percentage of requests with audit trails
- Data Retention Status: Status of data retention policies
5. Alerting
Alert Rules
# prometheus-alerts.yml
groups:
- name: capability_alerts
interval: 30s
rules:
# High error rate
- alert: HighErrorRate
expr: |
rate(capability_errors_total[5m]) > 0.05
for: 5m
labels:
severity: warning
annotations:
summary: "High error rate detected"
description: "Error rate is {{ $value }} errors/sec for {{ $labels.capability_id }}"
# High latency
- alert: HighLatency
expr: |
histogram_quantile(0.95, rate(capability_request_duration_seconds_bucket[5m])) > 1.0
for: 5m
labels:
severity: warning
annotations:
summary: "High latency detected"
description: "P95 latency is {{ $value }}s for {{ $labels.capability_id }}"
# Low model accuracy
- alert: LowModelAccuracy
expr: |
model_accuracy < 0.85
for: 10m
labels:
severity: critical
annotations:
summary: "Model accuracy below threshold"
description: "Model accuracy is {{ $value }} for {{ $labels.capability_id }}"
# Compliance violation
- alert: ComplianceViolation
expr: |
increase(compliance_violations_total[1h]) > 0
labels:
severity: critical
annotations:
summary: "Compliance violation detected"
description: "{{ $value }} violations detected for {{ $labels.framework }}"
# Service down
- alert: ServiceDown
expr: |
up{job="bdr-agent-factory"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "Service is down"
description: "BDR Agent Factory service is not responding"
Alert Channels
# alertmanager.yml
route:
group_by: ['alertname', 'capability_id']
group_wait: 10s
group_interval: 10s
repeat_interval: 12h
receiver: 'default'
routes:
- match:
severity: critical
receiver: 'pagerduty'
- match:
severity: warning
receiver: 'slack'
receivers:
- name: 'default'
email_configs:
- to: 'ops@bdragentfactory.com'
- name: 'slack'
slack_configs:
- api_url: 'https://hooks.slack.com/services/XXX'
channel: '#alerts'
title: 'BDR Agent Factory Alert'
- name: 'pagerduty'
pagerduty_configs:
- service_key: 'XXX'
6. Log Retention
Retention Policies
| Log Type | Retention Period | Storage |
|---|---|---|
| Application Logs | 30 days | Elasticsearch |
| Request Logs | 90 days | Elasticsearch |
| Audit Logs | 7 years | S3 + Elasticsearch |
| Error Logs | 1 year | Elasticsearch |
| Security Logs | 2 years | S3 + Elasticsearch |
| Metrics | 1 year | Prometheus |
Elasticsearch Index Lifecycle Management
{
"policy": {
"phases": {
"hot": {
"actions": {
"rollover": {
"max_size": "50GB",
"max_age": "1d"
}
}
},
"warm": {
"min_age": "7d",
"actions": {
"shrink": {
"number_of_shards": 1
},
"forcemerge": {
"max_num_segments": 1
}
}
},
"cold": {
"min_age": "30d",
"actions": {
"freeze": {}
}
},
"delete": {
"min_age": "90d",
"actions": {
"delete": {}
}
}
}
}
}
7. Performance Monitoring
SLA Monitoring
class SLAMonitor:
def __init__(self):
self.sla_targets = {
'availability': 0.999, # 99.9% uptime
'p95_latency_ms': 300, # 300ms P95 latency
'error_rate': 0.001, # 0.1% error rate
}
def check_sla_compliance(self, time_window='24h'):
"""Check if SLAs are being met"""
metrics = self.get_metrics(time_window)
compliance = {
'availability': metrics['uptime'] >= self.sla_targets['availability'],
'latency': metrics['p95_latency_ms'] <= self.sla_targets['p95_latency_ms'],
'error_rate': metrics['error_rate'] <= self.sla_targets['error_rate']
}
return all(compliance.values()), compliance
8. Troubleshooting
Common Issues
High Latency
# Check P95 latency by capability
curl -G 'http://localhost:9090/api/v1/query' \
--data-urlencode 'query=histogram_quantile(0.95, rate(capability_request_duration_seconds_bucket[5m]))'
# Check slow queries in logs
curl -X GET "localhost:9200/bdr-agent-factory-*/_search" -H 'Content-Type: application/json' -d'
{
"query": {
"range": {
"duration_ms": { "gte": 1000 }
}
},
"sort": [{ "duration_ms": "desc" }]
}'
High Error Rate
# Check error distribution
curl -G 'http://localhost:9090/api/v1/query' \
--data-urlencode 'query=sum by (error_type) (rate(capability_errors_total[5m]))'
# View recent errors
curl -X GET "localhost:9200/bdr-agent-factory-*/_search" -H 'Content-Type: application/json' -d'
{
"query": {
"match": { "level": "ERROR" }
},
"sort": [{ "timestamp": "desc" }],
"size": 100
}'
9. Best Practices
- Use structured logging - Always log in JSON format
- Include context - Add request_id, user_id, capability_id to all logs
- Monitor SLAs - Track availability, latency, and error rates
- Set up alerts - Configure alerts for critical metrics
- Retain audit logs - Keep audit logs for compliance requirements
- Use distributed tracing - Track requests across services
- Dashboard everything - Create dashboards for all key metrics
- Regular reviews - Review logs and metrics regularly
- Optimize queries - Use efficient Elasticsearch queries
- Archive old data - Move old logs to cold storage
Support
For monitoring support:
- Documentation: https://docs.bdragentfactory.com/monitoring
- Email: ops@bdragentfactory.com