| | """Debug logger module for detailed API request/response logging""" |
| | import json |
| | import logging |
| | from datetime import datetime |
| | from pathlib import Path |
| | from typing import Dict, Any, Optional |
| | from .config import config |
| |
|
| | class DebugLogger: |
| | """Debug logger for API requests and responses""" |
| |
|
| | def __init__(self): |
| | self.log_file = Path("logs.txt") |
| | self._setup_logger() |
| |
|
| | def _setup_logger(self): |
| | """Setup file logger""" |
| | |
| | self.logger = logging.getLogger("debug_logger") |
| | self.logger.setLevel(logging.DEBUG) |
| |
|
| | |
| | self.logger.handlers.clear() |
| |
|
| | |
| | file_handler = logging.FileHandler( |
| | self.log_file, |
| | mode='a', |
| | encoding='utf-8' |
| | ) |
| | file_handler.setLevel(logging.DEBUG) |
| |
|
| | |
| | formatter = logging.Formatter( |
| | '%(message)s', |
| | datefmt='%Y-%m-%d %H:%M:%S' |
| | ) |
| | file_handler.setFormatter(formatter) |
| |
|
| | |
| | self.logger.addHandler(file_handler) |
| |
|
| | |
| | self.logger.propagate = False |
| |
|
| | def _mask_token(self, token: str) -> str: |
| | """Mask token for logging (show first 6 and last 6 characters)""" |
| | if not config.debug_mask_token or len(token) <= 12: |
| | return token |
| | return f"{token[:6]}...{token[-6:]}" |
| |
|
| | def _format_timestamp(self) -> str: |
| | """Format current timestamp""" |
| | return datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] |
| |
|
| | def _write_separator(self, char: str = "=", length: int = 100): |
| | """Write separator line""" |
| | self.logger.info(char * length) |
| |
|
| | def log_request( |
| | self, |
| | method: str, |
| | url: str, |
| | headers: Dict[str, str], |
| | body: Optional[Any] = None, |
| | files: Optional[Dict] = None, |
| | proxy: Optional[str] = None |
| | ): |
| | """Log API request details to log.txt""" |
| |
|
| | if not config.debug_enabled or not config.debug_log_requests: |
| | return |
| |
|
| | try: |
| | self._write_separator() |
| | self.logger.info(f"π΅ [REQUEST] {self._format_timestamp()}") |
| | self._write_separator("-") |
| |
|
| | |
| | self.logger.info(f"Method: {method}") |
| | self.logger.info(f"URL: {url}") |
| |
|
| | |
| | self.logger.info("\nπ Headers:") |
| | masked_headers = dict(headers) |
| | if "Authorization" in masked_headers or "authorization" in masked_headers: |
| | auth_key = "Authorization" if "Authorization" in masked_headers else "authorization" |
| | auth_value = masked_headers[auth_key] |
| | if auth_value.startswith("Bearer "): |
| | token = auth_value[7:] |
| | masked_headers[auth_key] = f"Bearer {self._mask_token(token)}" |
| |
|
| | |
| | if "Cookie" in masked_headers: |
| | cookie_value = masked_headers["Cookie"] |
| | if "__Secure-next-auth.session-token=" in cookie_value: |
| | parts = cookie_value.split("=", 1) |
| | if len(parts) == 2: |
| | st_token = parts[1].split(";")[0] |
| | masked_headers["Cookie"] = f"__Secure-next-auth.session-token={self._mask_token(st_token)}" |
| |
|
| | for key, value in masked_headers.items(): |
| | self.logger.info(f" {key}: {value}") |
| |
|
| | |
| | if body is not None: |
| | self.logger.info("\nπ¦ Request Body:") |
| | if isinstance(body, (dict, list)): |
| | body_str = json.dumps(body, indent=2, ensure_ascii=False) |
| | self.logger.info(body_str) |
| | else: |
| | self.logger.info(str(body)) |
| |
|
| | |
| | if files: |
| | self.logger.info("\nπ Files:") |
| | try: |
| | if hasattr(files, 'keys') and callable(getattr(files, 'keys', None)): |
| | for key in files.keys(): |
| | self.logger.info(f" {key}: <file data>") |
| | else: |
| | self.logger.info(" <multipart form data>") |
| | except (AttributeError, TypeError): |
| | self.logger.info(" <binary file data>") |
| |
|
| | |
| | if proxy: |
| | self.logger.info(f"\nπ Proxy: {proxy}") |
| |
|
| | self._write_separator() |
| | self.logger.info("") |
| |
|
| | except Exception as e: |
| | self.logger.error(f"Error logging request: {e}") |
| |
|
| | def log_response( |
| | self, |
| | status_code: int, |
| | headers: Dict[str, str], |
| | body: Any, |
| | duration_ms: Optional[float] = None |
| | ): |
| | """Log API response details to log.txt""" |
| |
|
| | if not config.debug_enabled or not config.debug_log_responses: |
| | return |
| |
|
| | try: |
| | self._write_separator() |
| | self.logger.info(f"π’ [RESPONSE] {self._format_timestamp()}") |
| | self._write_separator("-") |
| |
|
| | |
| | status_emoji = "β
" if 200 <= status_code < 300 else "β" |
| | self.logger.info(f"Status: {status_code} {status_emoji}") |
| |
|
| | |
| | if duration_ms is not None: |
| | self.logger.info(f"Duration: {duration_ms:.2f}ms") |
| |
|
| | |
| | self.logger.info("\nπ Response Headers:") |
| | for key, value in headers.items(): |
| | self.logger.info(f" {key}: {value}") |
| |
|
| | |
| | self.logger.info("\nπ¦ Response Body:") |
| | if isinstance(body, (dict, list)): |
| | body_str = json.dumps(body, indent=2, ensure_ascii=False) |
| | self.logger.info(body_str) |
| | elif isinstance(body, str): |
| | |
| | try: |
| | parsed = json.loads(body) |
| | body_str = json.dumps(parsed, indent=2, ensure_ascii=False) |
| | self.logger.info(body_str) |
| | except: |
| | |
| | if len(body) > 2000: |
| | self.logger.info(f"{body[:2000]}... (truncated)") |
| | else: |
| | self.logger.info(body) |
| | else: |
| | self.logger.info(str(body)) |
| |
|
| | self._write_separator() |
| | self.logger.info("") |
| |
|
| | except Exception as e: |
| | self.logger.error(f"Error logging response: {e}") |
| |
|
| | def log_error( |
| | self, |
| | error_message: str, |
| | status_code: Optional[int] = None, |
| | response_text: Optional[str] = None |
| | ): |
| | """Log API error details to log.txt""" |
| |
|
| | if not config.debug_enabled: |
| | return |
| |
|
| | try: |
| | self._write_separator() |
| | self.logger.info(f"π΄ [ERROR] {self._format_timestamp()}") |
| | self._write_separator("-") |
| |
|
| | if status_code: |
| | self.logger.info(f"Status Code: {status_code}") |
| |
|
| | self.logger.info(f"Error Message: {error_message}") |
| |
|
| | if response_text: |
| | self.logger.info("\nπ¦ Error Response:") |
| | |
| | try: |
| | parsed = json.loads(response_text) |
| | body_str = json.dumps(parsed, indent=2, ensure_ascii=False) |
| | self.logger.info(body_str) |
| | except: |
| | |
| | if len(response_text) > 2000: |
| | self.logger.info(f"{response_text[:2000]}... (truncated)") |
| | else: |
| | self.logger.info(response_text) |
| |
|
| | self._write_separator() |
| | self.logger.info("") |
| |
|
| | except Exception as e: |
| | self.logger.error(f"Error logging error: {e}") |
| |
|
| | def log_info(self, message: str): |
| | """Log general info message to log.txt""" |
| | if not config.debug_enabled: |
| | return |
| | try: |
| | self.logger.info(f"βΉοΈ [{self._format_timestamp()}] {message}") |
| | except Exception as e: |
| | self.logger.error(f"Error logging info: {e}") |
| |
|
| | def log_warning(self, message: str): |
| | """Log warning message to log.txt""" |
| | if not config.debug_enabled: |
| | return |
| | try: |
| | self.logger.warning(f"β οΈ [{self._format_timestamp()}] {message}") |
| | except Exception as e: |
| | self.logger.error(f"Error logging warning: {e}") |
| |
|
| | |
| | debug_logger = DebugLogger() |
| |
|