File size: 8,201 Bytes
50a7bf0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
"""

Response compression middleware.

Implements gzip compression for responses to reduce bandwidth usage.

"""

import gzip
from typing import List, Optional
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.middleware.gzip import GZipMiddleware
from starlette.types import ASGIApp

from ..core.config import get_settings
from ..core.logger import get_logger

settings = get_settings()
logger = get_logger(__name__)


class EnhancedCompressionMiddleware(BaseHTTPMiddleware):
    """

    Enhanced compression middleware with configurable options.

    """
    
    def __init__(

        self,

        app: ASGIApp,

        minimum_size: int = 500,

        compressible_types: Optional[List[str]] = None,

        compression_level: int = 6,

        exclude_paths: Optional[List[str]] = None,

    ):
        super().__init__(app)
        self.minimum_size = minimum_size
        self.compression_level = compression_level
        self.exclude_paths = exclude_paths or [
            "/health",
            "/metrics",
            "/favicon.ico",
        ]
        
        # Default compressible content types
        self.compressible_types = compressible_types or [
            "application/json",
            "application/javascript",
            "application/xml",
            "text/html",
            "text/css",
            "text/javascript",
            "text/plain",
            "text/xml",
            "text/csv",
            "image/svg+xml",
        ]
    
    async def dispatch(self, request: Request, call_next) -> Response:
        """

        Apply compression to eligible responses.

        

        Args:

            request: Incoming HTTP request

            call_next: Next middleware/endpoint in chain

            

        Returns:

            Response: Potentially compressed HTTP response

        """
        # Skip compression for excluded paths
        if request.url.path in self.exclude_paths:
            return await call_next(request)
        
        # Check if client accepts gzip encoding
        accept_encoding = request.headers.get("accept-encoding", "")
        if "gzip" not in accept_encoding.lower():
            return await call_next(request)
        
        # Process the request
        response = await call_next(request)
        
        # Check if response should be compressed
        if not self._should_compress(response):
            return response
        
        # Apply compression
        return await self._compress_response(response)
    
    def _should_compress(self, response: Response) -> bool:
        """

        Determine if response should be compressed.

        

        Args:

            response: HTTP response

            

        Returns:

            bool: True if response should be compressed

        """
        # Skip if already compressed
        if response.headers.get("content-encoding"):
            return False
        
        # Check content type
        content_type = response.headers.get("content-type", "").split(";")[0].strip()
        if content_type not in self.compressible_types:
            return False
        
        # Check content length
        content_length = response.headers.get("content-length")
        if content_length and int(content_length) < self.minimum_size:
            return False
        
        # Skip for certain status codes
        if response.status_code < 200 or response.status_code >= 300:
            if response.status_code not in [404, 410]:  # Compress common error pages
                return False
        
        return True
    
    async def _compress_response(self, response: Response) -> Response:
        """

        Compress response body using gzip.

        

        Args:

            response: HTTP response to compress

            

        Returns:

            Response: Compressed HTTP response

        """
        try:
            # Get response body
            body = b""
            async for chunk in response.body_iterator:
                body += chunk
            
            # Compress the body
            compressed_body = gzip.compress(body, compresslevel=self.compression_level)
            
            # Calculate compression ratio
            original_size = len(body)
            compressed_size = len(compressed_body)
            compression_ratio = (1 - compressed_size / original_size) * 100 if original_size > 0 else 0
            
            # Log compression stats for monitoring
            if settings.debug:
                logger.debug(
                    "Response compressed",
                    original_size=original_size,
                    compressed_size=compressed_size,
                    compression_ratio=f"{compression_ratio:.1f}%",
                    content_type=response.headers.get("content-type"),
                )
            
            # Update response headers
            response.headers["content-encoding"] = "gzip"
            response.headers["content-length"] = str(compressed_size)
            response.headers["vary"] = "Accept-Encoding"
            
            # Add compression info header for debugging
            if settings.debug:
                response.headers["X-Compression-Ratio"] = f"{compression_ratio:.1f}%"
                response.headers["X-Original-Size"] = str(original_size)
            
            # Create new response with compressed body
            return Response(
                content=compressed_body,
                status_code=response.status_code,
                headers=response.headers,
                media_type=response.media_type,
            )
            
        except Exception as e:
            logger.warning(
                "Failed to compress response",
                error=str(e),
                content_type=response.headers.get("content-type"),
            )
            # Return original response if compression fails
            return response


def setup_compression_middleware(app: ASGIApp, **kwargs) -> None:
    """

    Setup response compression middleware.

    

    Args:

        app: FastAPI application instance

        **kwargs: Additional compression configuration options

    """
    # Use FastAPI's built-in GZip middleware for simplicity in production
    # or our enhanced version for more control
    use_enhanced = kwargs.pop("use_enhanced", settings.debug)
    
    if use_enhanced:
        compression_config = {
            "minimum_size": 500,
            "compression_level": 6,
            "exclude_paths": [
                "/health",
                "/metrics", 
                "/favicon.ico",
                "/docs",
                "/redoc",
                "/openapi.json",
            ],
            "compressible_types": [
                "application/json",
                "application/javascript", 
                "application/xml",
                "text/html",
                "text/css",
                "text/javascript",
                "text/plain",
                "text/xml",
                "text/csv",
                "image/svg+xml",
            ],
        }
        
        # Override with any provided kwargs
        compression_config.update(kwargs)
        
        logger.info(
            "Setting up enhanced compression middleware",
            minimum_size=compression_config["minimum_size"],
            compression_level=compression_config["compression_level"],
            compressible_types_count=len(compression_config["compressible_types"]),
        )
        
        app.add_middleware(EnhancedCompressionMiddleware, **compression_config)
    else:
        # Use FastAPI's built-in GZip middleware
        minimum_size = kwargs.get("minimum_size", 500)
        
        logger.info(
            "Setting up standard GZip compression middleware",
            minimum_size=minimum_size,
        )
        
        app.add_middleware(GZipMiddleware, minimum_size=minimum_size)