BDR-Agent-Factory / create_implementation.py
Bader Alabddan
Add core implementation: API, registry, audit, Docker config, and quickstart guide
8036886
#!/usr/bin/env python3
"""Script to create all core implementation files for BDR Agent Factory."""
import os
from pathlib import Path
# Core configuration file
CONFIG_PY = '''"""Configuration management for BDR Agent Factory."""
from typing import Optional
from pydantic_settings import BaseSettings
from pydantic import Field
class Settings(BaseSettings):
"""Application settings."""
# Application
app_name: str = "BDR Agent Factory"
app_version: str = "0.1.0"
environment: str = Field(default="development", env="ENVIRONMENT")
debug: bool = Field(default=False, env="DEBUG")
# API
api_host: str = Field(default="0.0.0.0", env="API_HOST")
api_port: int = Field(default=8000, env="API_PORT")
api_prefix: str = "/api/v1"
# Database
database_url: str = Field(
default="postgresql://bdr_user:bdr_pass@localhost:5432/bdr_agent_factory",
env="DATABASE_URL"
)
# Redis
redis_url: str = Field(default="redis://localhost:6379/0", env="REDIS_URL")
redis_ttl: int = Field(default=3600, env="REDIS_TTL")
# Security
secret_key: str = Field(default="change-in-production", env="SECRET_KEY")
algorithm: str = "HS256"
access_token_expire_minutes: int = 30
# Governance
require_explainability: bool = True
require_audit_trail: bool = True
audit_retention_days: int = 2555 # 7 years
class Config:
env_file = ".env"
case_sensitive = False
settings = Settings()
'''
# Capability Registry
REGISTRY_PY = '''"""Capability Registry for managing AI capabilities."""
from typing import Dict, List, Optional
from datetime import datetime
import yaml
class CapabilityRegistry:
"""Central registry for AI capabilities."""
def __init__(self, capability_dict_path: str = "AI_CAPABILITY_DICTIONARY.yaml"):
self.capability_dict_path = capability_dict_path
self.capabilities: Dict[str, dict] = {}
self.load_capabilities()
def load_capabilities(self):
"""Load capabilities from YAML file."""
try:
with open(self.capability_dict_path, 'r') as f:
data = yaml.safe_load(f)
if 'capabilities' in data:
for cap in data['capabilities']:
self.capabilities[cap['id']] = cap
except FileNotFoundError:
print(f"Warning: {self.capability_dict_path} not found")
def get_capability(self, capability_id: str) -> Optional[dict]:
"""Get capability by ID."""
return self.capabilities.get(capability_id)
def list_capabilities(self, category: Optional[str] = None) -> List[dict]:
"""List all capabilities, optionally filtered by category."""
if category:
return [c for c in self.capabilities.values() if c.get('category') == category]
return list(self.capabilities.values())
def search_capabilities(self, query: str) -> List[dict]:
"""Search capabilities by name or description."""
query_lower = query.lower()
return [
c for c in self.capabilities.values()
if query_lower in c.get('name', '').lower()
or query_lower in c.get('description', '').lower()
]
'''
# Audit Logger
AUDIT_PY = '''"""Audit logging for compliance and governance."""
from typing import Any, Dict, Optional
from datetime import datetime
import json
import hashlib
class AuditLogger:
"""Audit trail logger for AI operations."""
def __init__(self, storage_backend: str = "database"):
self.storage_backend = storage_backend
self.audit_records = []
def log_request(
self,
capability_id: str,
user_id: str,
input_data: Any,
metadata: Optional[Dict] = None
) -> str:
"""Log an AI capability request."""
audit_id = self._generate_audit_id()
record = {
"audit_id": audit_id,
"timestamp": datetime.utcnow().isoformat(),
"capability_id": capability_id,
"user_id": user_id,
"input_hash": self._hash_data(input_data),
"metadata": metadata or {},
"event_type": "request"
}
self._store_record(record)
return audit_id
def log_response(
self,
audit_id: str,
output_data: Any,
explainability: Optional[Dict] = None,
decision_type: Optional[str] = None
):
"""Log an AI capability response."""
record = {
"audit_id": audit_id,
"timestamp": datetime.utcnow().isoformat(),
"output_hash": self._hash_data(output_data),
"explainability": explainability,
"decision_type": decision_type,
"event_type": "response"
}
self._store_record(record)
def _generate_audit_id(self) -> str:
"""Generate unique audit ID."""
timestamp = datetime.utcnow().isoformat()
return hashlib.sha256(timestamp.encode()).hexdigest()[:16]
def _hash_data(self, data: Any) -> str:
"""Hash data for audit trail."""
data_str = json.dumps(data, sort_keys=True, default=str)
return hashlib.sha256(data_str.encode()).hexdigest()
def _store_record(self, record: Dict):
"""Store audit record."""
self.audit_records.append(record)
# In production, this would write to database
'''
# Main API application
MAIN_PY = '''"""Main FastAPI application."""
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from .core.config import settings
from .core.registry import CapabilityRegistry
from .api import capabilities, health
app = FastAPI(
title=settings.app_name,
version=settings.app_version,
description="Enterprise AI Governance Framework for Insurance"
)
# CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Initialize registry
registry = CapabilityRegistry()
# Include routers
app.include_router(health.router, prefix=settings.api_prefix, tags=["health"])
app.include_router(capabilities.router, prefix=settings.api_prefix, tags=["capabilities"])
@app.on_event("startup")
async def startup_event():
"""Startup tasks."""
print(f"Starting {settings.app_name} v{settings.app_version}")
print(f"Loaded {len(registry.capabilities)} capabilities")
@app.on_event("shutdown")
async def shutdown_event():
"""Shutdown tasks."""
print("Shutting down BDR Agent Factory")
'''
# Health check endpoints
HEALTH_PY = '''"""Health check endpoints."""
from fastapi import APIRouter
from datetime import datetime
router = APIRouter()
@router.get("/health")
async def health_check():
"""Health check endpoint."""
return {
"status": "healthy",
"timestamp": datetime.utcnow().isoformat(),
"service": "BDR Agent Factory"
}
@router.get("/ready")
async def readiness_check():
"""Readiness check endpoint."""
return {
"status": "ready",
"timestamp": datetime.utcnow().isoformat()
}
'''
# Capabilities API endpoints
CAPABILITIES_API_PY = '''"""Capabilities API endpoints."""
from fastapi import APIRouter, HTTPException
from typing import Optional, List
from ..core.registry import CapabilityRegistry
from ..core.audit import AuditLogger
router = APIRouter()
registry = CapabilityRegistry()
audit_logger = AuditLogger()
@router.get("/capabilities")
async def list_capabilities(category: Optional[str] = None):
"""List all capabilities."""
capabilities = registry.list_capabilities(category=category)
return {
"count": len(capabilities),
"capabilities": capabilities
}
@router.get("/capabilities/{capability_id}")
async def get_capability(capability_id: str):
"""Get capability by ID."""
capability = registry.get_capability(capability_id)
if not capability:
raise HTTPException(status_code=404, detail="Capability not found")
return capability
@router.get("/capabilities/search/{query}")
async def search_capabilities(query: str):
"""Search capabilities."""
results = registry.search_capabilities(query)
return {
"query": query,
"count": len(results),
"results": results
}
'''
# Create all files
files_to_create = {
"src/bdr_agent_factory/core/config.py": CONFIG_PY,
"src/bdr_agent_factory/core/registry.py": REGISTRY_PY,
"src/bdr_agent_factory/core/audit.py": AUDIT_PY,
"src/bdr_agent_factory/main.py": MAIN_PY,
"src/bdr_agent_factory/api/health.py": HEALTH_PY,
"src/bdr_agent_factory/api/capabilities.py": CAPABILITIES_API_PY,
}
print("Creating implementation files...")
for filepath, content in files_to_create.items():
Path(filepath).parent.mkdir(parents=True, exist_ok=True)
with open(filepath, 'w') as f:
f.write(content)
print(f"✓ Created {filepath}")
print("\n✅ All core implementation files created successfully!")