Spaces:
Running
Running
Bader Alabddan
Add core implementation: API, registry, audit, Docker config, and quickstart guide
8036886
| #!/usr/bin/env python3 | |
| """Script to create all core implementation files for BDR Agent Factory.""" | |
| import os | |
| from pathlib import Path | |
| # Core configuration file | |
| CONFIG_PY = '''"""Configuration management for BDR Agent Factory.""" | |
| from typing import Optional | |
| from pydantic_settings import BaseSettings | |
| from pydantic import Field | |
| class Settings(BaseSettings): | |
| """Application settings.""" | |
| # Application | |
| app_name: str = "BDR Agent Factory" | |
| app_version: str = "0.1.0" | |
| environment: str = Field(default="development", env="ENVIRONMENT") | |
| debug: bool = Field(default=False, env="DEBUG") | |
| # API | |
| api_host: str = Field(default="0.0.0.0", env="API_HOST") | |
| api_port: int = Field(default=8000, env="API_PORT") | |
| api_prefix: str = "/api/v1" | |
| # Database | |
| database_url: str = Field( | |
| default="postgresql://bdr_user:bdr_pass@localhost:5432/bdr_agent_factory", | |
| env="DATABASE_URL" | |
| ) | |
| # Redis | |
| redis_url: str = Field(default="redis://localhost:6379/0", env="REDIS_URL") | |
| redis_ttl: int = Field(default=3600, env="REDIS_TTL") | |
| # Security | |
| secret_key: str = Field(default="change-in-production", env="SECRET_KEY") | |
| algorithm: str = "HS256" | |
| access_token_expire_minutes: int = 30 | |
| # Governance | |
| require_explainability: bool = True | |
| require_audit_trail: bool = True | |
| audit_retention_days: int = 2555 # 7 years | |
| class Config: | |
| env_file = ".env" | |
| case_sensitive = False | |
| settings = Settings() | |
| ''' | |
| # Capability Registry | |
| REGISTRY_PY = '''"""Capability Registry for managing AI capabilities.""" | |
| from typing import Dict, List, Optional | |
| from datetime import datetime | |
| import yaml | |
| class CapabilityRegistry: | |
| """Central registry for AI capabilities.""" | |
| def __init__(self, capability_dict_path: str = "AI_CAPABILITY_DICTIONARY.yaml"): | |
| self.capability_dict_path = capability_dict_path | |
| self.capabilities: Dict[str, dict] = {} | |
| self.load_capabilities() | |
| def load_capabilities(self): | |
| """Load capabilities from YAML file.""" | |
| try: | |
| with open(self.capability_dict_path, 'r') as f: | |
| data = yaml.safe_load(f) | |
| if 'capabilities' in data: | |
| for cap in data['capabilities']: | |
| self.capabilities[cap['id']] = cap | |
| except FileNotFoundError: | |
| print(f"Warning: {self.capability_dict_path} not found") | |
| def get_capability(self, capability_id: str) -> Optional[dict]: | |
| """Get capability by ID.""" | |
| return self.capabilities.get(capability_id) | |
| def list_capabilities(self, category: Optional[str] = None) -> List[dict]: | |
| """List all capabilities, optionally filtered by category.""" | |
| if category: | |
| return [c for c in self.capabilities.values() if c.get('category') == category] | |
| return list(self.capabilities.values()) | |
| def search_capabilities(self, query: str) -> List[dict]: | |
| """Search capabilities by name or description.""" | |
| query_lower = query.lower() | |
| return [ | |
| c for c in self.capabilities.values() | |
| if query_lower in c.get('name', '').lower() | |
| or query_lower in c.get('description', '').lower() | |
| ] | |
| ''' | |
| # Audit Logger | |
| AUDIT_PY = '''"""Audit logging for compliance and governance.""" | |
| from typing import Any, Dict, Optional | |
| from datetime import datetime | |
| import json | |
| import hashlib | |
| class AuditLogger: | |
| """Audit trail logger for AI operations.""" | |
| def __init__(self, storage_backend: str = "database"): | |
| self.storage_backend = storage_backend | |
| self.audit_records = [] | |
| def log_request( | |
| self, | |
| capability_id: str, | |
| user_id: str, | |
| input_data: Any, | |
| metadata: Optional[Dict] = None | |
| ) -> str: | |
| """Log an AI capability request.""" | |
| audit_id = self._generate_audit_id() | |
| record = { | |
| "audit_id": audit_id, | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "capability_id": capability_id, | |
| "user_id": user_id, | |
| "input_hash": self._hash_data(input_data), | |
| "metadata": metadata or {}, | |
| "event_type": "request" | |
| } | |
| self._store_record(record) | |
| return audit_id | |
| def log_response( | |
| self, | |
| audit_id: str, | |
| output_data: Any, | |
| explainability: Optional[Dict] = None, | |
| decision_type: Optional[str] = None | |
| ): | |
| """Log an AI capability response.""" | |
| record = { | |
| "audit_id": audit_id, | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "output_hash": self._hash_data(output_data), | |
| "explainability": explainability, | |
| "decision_type": decision_type, | |
| "event_type": "response" | |
| } | |
| self._store_record(record) | |
| def _generate_audit_id(self) -> str: | |
| """Generate unique audit ID.""" | |
| timestamp = datetime.utcnow().isoformat() | |
| return hashlib.sha256(timestamp.encode()).hexdigest()[:16] | |
| def _hash_data(self, data: Any) -> str: | |
| """Hash data for audit trail.""" | |
| data_str = json.dumps(data, sort_keys=True, default=str) | |
| return hashlib.sha256(data_str.encode()).hexdigest() | |
| def _store_record(self, record: Dict): | |
| """Store audit record.""" | |
| self.audit_records.append(record) | |
| # In production, this would write to database | |
| ''' | |
| # Main API application | |
| MAIN_PY = '''"""Main FastAPI application.""" | |
| from fastapi import FastAPI | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from .core.config import settings | |
| from .core.registry import CapabilityRegistry | |
| from .api import capabilities, health | |
| app = FastAPI( | |
| title=settings.app_name, | |
| version=settings.app_version, | |
| description="Enterprise AI Governance Framework for Insurance" | |
| ) | |
| # CORS | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Initialize registry | |
| registry = CapabilityRegistry() | |
| # Include routers | |
| app.include_router(health.router, prefix=settings.api_prefix, tags=["health"]) | |
| app.include_router(capabilities.router, prefix=settings.api_prefix, tags=["capabilities"]) | |
| @app.on_event("startup") | |
| async def startup_event(): | |
| """Startup tasks.""" | |
| print(f"Starting {settings.app_name} v{settings.app_version}") | |
| print(f"Loaded {len(registry.capabilities)} capabilities") | |
| @app.on_event("shutdown") | |
| async def shutdown_event(): | |
| """Shutdown tasks.""" | |
| print("Shutting down BDR Agent Factory") | |
| ''' | |
| # Health check endpoints | |
| HEALTH_PY = '''"""Health check endpoints.""" | |
| from fastapi import APIRouter | |
| from datetime import datetime | |
| router = APIRouter() | |
| @router.get("/health") | |
| async def health_check(): | |
| """Health check endpoint.""" | |
| return { | |
| "status": "healthy", | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "service": "BDR Agent Factory" | |
| } | |
| @router.get("/ready") | |
| async def readiness_check(): | |
| """Readiness check endpoint.""" | |
| return { | |
| "status": "ready", | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| ''' | |
| # Capabilities API endpoints | |
| CAPABILITIES_API_PY = '''"""Capabilities API endpoints.""" | |
| from fastapi import APIRouter, HTTPException | |
| from typing import Optional, List | |
| from ..core.registry import CapabilityRegistry | |
| from ..core.audit import AuditLogger | |
| router = APIRouter() | |
| registry = CapabilityRegistry() | |
| audit_logger = AuditLogger() | |
| @router.get("/capabilities") | |
| async def list_capabilities(category: Optional[str] = None): | |
| """List all capabilities.""" | |
| capabilities = registry.list_capabilities(category=category) | |
| return { | |
| "count": len(capabilities), | |
| "capabilities": capabilities | |
| } | |
| @router.get("/capabilities/{capability_id}") | |
| async def get_capability(capability_id: str): | |
| """Get capability by ID.""" | |
| capability = registry.get_capability(capability_id) | |
| if not capability: | |
| raise HTTPException(status_code=404, detail="Capability not found") | |
| return capability | |
| @router.get("/capabilities/search/{query}") | |
| async def search_capabilities(query: str): | |
| """Search capabilities.""" | |
| results = registry.search_capabilities(query) | |
| return { | |
| "query": query, | |
| "count": len(results), | |
| "results": results | |
| } | |
| ''' | |
| # Create all files | |
| files_to_create = { | |
| "src/bdr_agent_factory/core/config.py": CONFIG_PY, | |
| "src/bdr_agent_factory/core/registry.py": REGISTRY_PY, | |
| "src/bdr_agent_factory/core/audit.py": AUDIT_PY, | |
| "src/bdr_agent_factory/main.py": MAIN_PY, | |
| "src/bdr_agent_factory/api/health.py": HEALTH_PY, | |
| "src/bdr_agent_factory/api/capabilities.py": CAPABILITIES_API_PY, | |
| } | |
| print("Creating implementation files...") | |
| for filepath, content in files_to_create.items(): | |
| Path(filepath).parent.mkdir(parents=True, exist_ok=True) | |
| with open(filepath, 'w') as f: | |
| f.write(content) | |
| print(f"✓ Created {filepath}") | |
| print("\n✅ All core implementation files created successfully!") | |