""" Main FastAPI application for Anthropic Topic Segmentation Microservice. This microservice processes interview transcripts and extracts actionable business insights using Anthropic's Claude models for topic segmentation and summarization. """ import time import logging import uuid import json from contextlib import asynccontextmanager from typing import Dict, Any from datetime import datetime from fastapi import FastAPI, HTTPException, status, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse from fastapi.encoders import jsonable_encoder from pydantic import ValidationError from config.settings import get_settings, get_api_config from config.logging import setup_logging, get_logger from core.model_manager import get_model_manager, close_model_manager from models.input import TranscriptRequest, HealthCheckRequest, ModelSwitchRequest, PromptConfiguration from models.output import SegmentationResult, ErrorDetail, HealthCheckResponse, ModelStatusResponse # Setup logging early setup_logging() logger = get_logger(__name__) # Global application state app_state = { "startup_time": time.time(), "anthropic_client_initialized": False, "request_count": 0, "last_health_check": None, "initialization_error": None, } # Custom JSON Response class with proper Unicode handling class UnicodeJSONResponse(JSONResponse): def render(self, content: Any) -> bytes: return json.dumps( jsonable_encoder(content), ensure_ascii=False, allow_nan=False, indent=None, separators=(",", ":"), ).encode("utf-8") @asynccontextmanager async def lifespan(app: FastAPI): """Application lifespan manager for startup and shutdown events.""" # Startup logger.info("Starting Anthropic Topic Segmentation Microservice") try: settings = get_settings() logger.info(f"Application: {settings.app_name} v{settings.app_version}") logger.info(f"Environment: {'HuggingFace Spaces' if settings.is_huggingface_spaces else 'Development'}") logger.info(f"Anthropic Model: {settings.anthropic_model.value}") # Initialize model manager with graceful degradation model_manager = get_model_manager() # Try to initialize with minimal validation try: # Only validate the primary model, not all models primary_client = model_manager.get_client(settings.anthropic_model) if primary_client: app_state["anthropic_client_initialized"] = True logger.info(f"Anthropic integration initialized with primary model: {settings.anthropic_model.value}") else: logger.warning("Primary model not available, but continuing startup") app_state["anthropic_client_initialized"] = False except Exception as model_error: logger.warning(f"Model validation failed during startup: {str(model_error)}") logger.info("Continuing startup without full model validation") app_state["anthropic_client_initialized"] = False logger.info("Application startup completed successfully") except Exception as e: logger.error(f"Failed to initialize application: {str(e)}") app_state["initialization_error"] = str(e) app_state["anthropic_client_initialized"] = False yield # Shutdown logger.info("Shutting down Anthropic Topic Segmentation Microservice") await close_model_manager() # Initialize FastAPI application settings = get_settings() api_config = get_api_config() app = FastAPI( title="🎯 Anthropic Topic Segmentation Microservice", description="Extract actionable business insights from interview transcripts using Anthropic's Claude models. Perfect for product managers, business analysts, and workflow automation.", version=settings.app_version, docs_url="/docs", redoc_url="/redoc", lifespan=lifespan, default_response_class=UnicodeJSONResponse, openapi_tags=[ {"name": "Health", "description": "Service health and status monitoring"}, {"name": "Configuration", "description": "Application and model configuration"}, {"name": "Segmentation", "description": "Topic extraction and analysis endpoints"}, ] ) # Configure CORS app.add_middleware( CORSMiddleware, allow_origins=api_config["cors_origins"], allow_credentials=not settings.is_huggingface_spaces, # Disable credentials for HF Spaces allow_methods=["GET", "POST"], allow_headers=["*"], ) @app.get("/", tags=["Health"]) async def root(): """Root endpoint with service information.""" uptime = time.time() - app_state["startup_time"] return { "service": "🎯 Anthropic Topic Segmentation Microservice", "version": settings.app_version, "status": "running", "uptime_seconds": round(uptime, 2), "anthropic_ready": app_state["anthropic_client_initialized"], "requests_processed": app_state["request_count"], "documentation": { "interactive_docs": "/docs", "api_reference": "/redoc", "health_check": "/health" }, "features": { "anthropic_integration": True, "dynamic_prompts": True, "multi_model_support": True, "n8n_compatible": True, "business_intelligence": True } } @app.get("/health", tags=["Health"]) async def health_check(): """ Comprehensive health check endpoint. Returns detailed service status including: - Application health - Anthropic client status - Performance metrics - Configuration validation """ current_time = time.time() uptime = current_time - app_state["startup_time"] app_state["last_health_check"] = current_time # Determine overall health status is_healthy = ( app_state["anthropic_client_initialized"] and app_state["initialization_error"] is None ) health_data = { "status": "healthy" if is_healthy else "unhealthy", "timestamp": current_time, "uptime_seconds": round(uptime, 2), "service_info": { "name": settings.app_name, "version": settings.app_version, "environment": "spaces" if settings.is_huggingface_spaces else "development" }, "anthropic_status": { "client_initialized": app_state["anthropic_client_initialized"], "model": settings.anthropic_model.value, "initialization_error": app_state["initialization_error"] }, "performance": { "requests_processed": app_state["request_count"], "avg_requests_per_minute": round((app_state["request_count"] / uptime) * 60, 2) if uptime > 0 else 0, "max_sentences_limit": settings.max_sentences, "request_timeout": settings.request_timeout }, "configuration": { "debug_mode": settings.debug, "log_level": settings.log_level.value, "cors_enabled": len(api_config["cors_origins"]) > 0 } } # Return appropriate HTTP status code status_code = status.HTTP_200_OK if is_healthy else status.HTTP_503_SERVICE_UNAVAILABLE return UnicodeJSONResponse(content=health_data, status_code=status_code) @app.get("/config", tags=["Configuration"]) async def get_configuration(): """ Get current application configuration. Returns non-sensitive configuration information including: - Model settings - API limits - Feature flags """ return { "application": { "name": settings.app_name, "version": settings.app_version, "debug": settings.debug, "log_level": settings.log_level.value }, "anthropic": { "model": settings.anthropic_model.value, "api_key_configured": bool(settings.anthropic_api_key), "max_retries": settings.max_retries, "retry_delay": settings.retry_delay }, "api_limits": { "max_sentences": settings.max_sentences, "request_timeout": settings.request_timeout, "rate_limit_requests": settings.rate_limit_requests, "rate_limit_window": settings.rate_limit_window }, "environment": { "is_huggingface_spaces": settings.is_huggingface_spaces, "is_development": settings.is_development, "cors_origins": api_config["cors_origins"] } } @app.get("/models", tags=["Configuration"]) async def get_model_status(): """ Get detailed model status and performance metrics. Returns: - Current active model - Health status for all models - Performance statistics - Model switching capabilities """ try: model_manager = get_model_manager() # Get health status for all models health_status = await model_manager.health_check() # Get performance statistics performance_stats = model_manager.get_performance_stats() # Get best performing model best_model = model_manager.get_best_performing_model() return { "current_model": model_manager.current_model.value, "best_performing_model": best_model.value, "model_health": health_status, "performance_stats": performance_stats, "available_models": [model.value for model in settings.anthropic_model.__class__], "fallback_enabled": True, "last_updated": time.time() } except Exception as e: logger.error(f"Error getting model status: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to get model status: {str(e)}" ) @app.post("/segment", tags=["Segmentation"], response_model=SegmentationResult) async def segment_transcript(request: TranscriptRequest): """ Extract topics from transcript using Anthropic models. This endpoint processes interview transcripts and extracts actionable business insights using advanced topic segmentation and categorization. **Features:** - Supports up to 1500 sentences - Dynamic prompt injection - Multi-model support with fallback - Business-focused categorization - Speaker analysis and insights - Confidence scoring **Returns:** - Extracted topics with business categorization - Speaker insights and analysis - Processing metadata and statistics - Executive summary and key takeaways """ start_time = time.time() request_id = str(uuid.uuid4()) try: # Update request counter app_state["request_count"] += 1 logger.info(f"Processing transcript segmentation request {request_id}") logger.info(f"Transcript: {len(request.sentences)} sentences, " f"{len(set(s.speaker for s in request.sentences))} speakers") # Validate transcript size if len(request.sentences) > settings.max_sentences: raise HTTPException( status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE, detail=f"Transcript too large. Maximum {settings.max_sentences} sentences allowed, " f"got {len(request.sentences)}" ) # Get model manager model_manager = get_model_manager() # Initialize topic extractor from core.topic_extractor import TopicExtractor, ExtractionContext from models.input import LanguageCode topic_extractor = TopicExtractor(model_manager) # Create extraction context context = ExtractionContext( request_id=request_id, transcript_id=request.transcript_id, language=request.prompt_config.language if request.prompt_config else LanguageCode.AUTO_DETECT, business_domain=request.prompt_config.business_domain if request.prompt_config else None, total_sentences=len(request.sentences), total_duration=request.sentences[-1].end_time - request.sentences[0].start_time, unique_speakers=list(set(s.speaker for s in request.sentences)), prompt_config=request.prompt_config or PromptConfiguration() ) # Extract topics using Anthropic models result = await topic_extractor.extract_topics(request, context) logger.info(f"Completed transcript segmentation in {result.metadata.processing_time:.2f}s") logger.info(f"Extracted {len(result.topics)} topics with average confidence {result.metadata.average_confidence:.2f}") return result except HTTPException: raise except ValidationError as e: logger.error(f"Validation error in transcript request: {str(e)}") raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=f"Validation error: {str(e)}" ) except Exception as e: logger.error(f"Error processing transcript: {str(e)}", exc_info=True) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Internal server error during transcript processing" ) @app.post("/models/switch", tags=["Configuration"]) async def switch_model(request: ModelSwitchRequest): """ Switch the active Anthropic model. Allows dynamic switching between available Anthropic models for performance optimization or testing purposes. """ try: model_manager = get_model_manager() old_model = model_manager.current_model.value model_manager.switch_model(request.model) logger.info(f"Model switched from {old_model} to {request.model.value}") if request.reason: logger.info(f"Switch reason: {request.reason}") return { "status": "success", "message": f"Model switched from {old_model} to {request.model.value}", "old_model": old_model, "new_model": request.model.value, "reason": request.reason, "timestamp": datetime.now().isoformat() } except Exception as e: logger.error(f"Error switching model: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to switch model: {str(e)}" ) @app.get("/prompts/templates", tags=["Configuration"]) async def get_prompt_templates(): """ Get available prompt templates. Returns a list of available prompt templates with descriptions and supported languages for dynamic prompt selection. """ try: from core.prompt_manager import get_prompt_manager prompt_manager = get_prompt_manager() templates = prompt_manager.get_available_templates() return { "templates": templates, "total_count": len(templates), "supported_languages": ["en", "cs", "sk"], "template_variables": prompt_manager.get_template_variables() } except Exception as e: logger.error(f"Error getting prompt templates: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to get prompt templates: {str(e)}" ) @app.post("/prompts/validate", tags=["Configuration"]) async def validate_prompt(request: Dict[str, Any]): """ Validate a custom prompt for safety and compliance. Performs comprehensive validation including safety checks, format compliance, and business context validation. """ try: from utils.validation import get_prompt_validator if "prompt" not in request: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Missing 'prompt' field in request" ) prompt_text = request["prompt"] context = request.get("context", {}) validator = get_prompt_validator() validation_result = validator.validate_prompt(prompt_text, context) # Get safety recommendations recommendations = validator.get_safety_recommendations(validation_result) return { "is_valid": validation_result.is_valid, "risk_score": validation_result.risk_score, "has_errors": validation_result.has_errors, "has_warnings": validation_result.has_warnings, "issues": [ { "severity": issue.severity.value, "code": issue.code, "message": issue.message, "location": issue.location, "suggestion": issue.suggestion } for issue in validation_result.issues ], "recommendations": recommendations, "sanitized_content": validation_result.sanitized_content } except HTTPException: raise except Exception as e: logger.error(f"Error validating prompt: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to validate prompt: {str(e)}" ) @app.post("/prompts/preview", tags=["Configuration"]) async def preview_template(request: Dict[str, Any]): """ Preview a prompt template with optional variable substitution. Allows users to see how a template will look with specific variables before using it in actual requests. """ try: from core.prompt_manager import get_prompt_manager from models.input import PromptTemplate, LanguageCode if "template" not in request: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Missing 'template' field in request" ) template_name = request["template"] language = request.get("language", "en") business_domain = request.get("business_domain") variables = request.get("variables", {}) # Validate template try: template_enum = PromptTemplate(template_name) except ValueError: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=f"Invalid template: {template_name}" ) # Validate language try: language_enum = LanguageCode(language) except ValueError: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=f"Invalid language: {language}" ) prompt_manager = get_prompt_manager() preview = prompt_manager.preview_template( template_enum, language_enum, business_domain, variables ) return { "template": template_name, "language": language, "business_domain": business_domain, "variables_applied": variables, "preview": preview, "estimated_tokens": len(preview.split()) * 1.3 # Rough estimate } except HTTPException: raise except Exception as e: logger.error(f"Error previewing template: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to preview template: {str(e)}" ) @app.get("/prompts/stats", tags=["Configuration"]) async def get_prompt_stats(): """ Get prompt processing statistics. Returns statistics about prompt usage, validation results, and template popularity for monitoring and optimization. """ try: from core.prompt_manager import get_prompt_manager prompt_manager = get_prompt_manager() stats = prompt_manager.get_processing_stats() return { "processing_stats": stats, "timestamp": datetime.now().isoformat(), "service_uptime": time.time() - app_state["startup_time"] } except Exception as e: logger.error(f"Error getting prompt stats: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to get prompt stats: {str(e)}" ) @app.exception_handler(ValidationError) async def validation_exception_handler(request: Request, exc: ValidationError): """Handle Pydantic validation errors with detailed field information.""" logger.warning(f"Validation error on {request.url}: {str(exc)}") error_detail = ErrorDetail( error_code="VALIDATION_ERROR", error_message="Request validation failed", error_type="validation", field_errors={}, suggestions=[] ) # Extract field-specific errors for error in exc.errors(): field_path = " -> ".join(str(loc) for loc in error["loc"]) error_msg = error["msg"] if field_path not in error_detail.field_errors: error_detail.field_errors[field_path] = [] error_detail.field_errors[field_path].append(error_msg) # Add suggestions based on error type if "required" in error_msg.lower(): error_detail.suggestions.append(f"Provide required field: {field_path}") elif "invalid" in error_msg.lower(): error_detail.suggestions.append(f"Check format of field: {field_path}") return JSONResponse( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, content=error_detail.model_dump() ) @app.exception_handler(Exception) async def global_exception_handler(request, exc): """Global exception handler for unhandled errors.""" logger.error(f"Unhandled exception: {str(exc)}", exc_info=True) return UnicodeJSONResponse( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={ "error": "Internal server error", "message": "An unexpected error occurred. Please try again later.", "request_id": getattr(request.state, "request_id", None) } ) if __name__ == "__main__": import uvicorn # Run the application uvicorn.run( "app:app", host="0.0.0.0", port=7860, # HuggingFace Spaces default port reload=settings.is_development, log_level=settings.log_level.value.lower() )