Spaces:
Runtime error
Runtime error
| import logging | |
| import os | |
| import io | |
| import json | |
| from google.cloud import bigquery | |
| from google.oauth2 import service_account | |
| from google.api_core.exceptions import GoogleAPIError | |
| job_config = bigquery.LoadJobConfig( | |
| schema=[ | |
| bigquery.SchemaField("timestamp", "TIMESTAMP", mode="REQUIRED"), | |
| bigquery.SchemaField("log_entry", "STRING", mode="REQUIRED"), | |
| ], | |
| write_disposition="WRITE_APPEND", | |
| ) | |
| class BigQueryLoggingHandler(logging.Handler): | |
| def __init__(self): | |
| super().__init__() | |
| try: | |
| project_id = os.getenv("BIGQUERY_PROJECT_ID") | |
| dataset_id = os.getenv("BIGQUERY_DATASET_ID") | |
| table_id = os.getenv("BIGQUERY_TABLE_ID") | |
| print(f"project_id: {project_id}") | |
| print(f"dataset_id: {dataset_id}") | |
| print(f"table_id: {table_id}") | |
| service_account_info = json.loads( | |
| os.getenv("GOOGLE_SERVICE_ACCOUNT_JSON") | |
| .replace('"', "") | |
| .replace("'", '"') | |
| ) | |
| print(f"service_account_info: {service_account_info}") | |
| print(f"service_account_info type: {type(service_account_info)}") | |
| print(f"service_account_info keys: {service_account_info.keys()}") | |
| credentials = service_account.Credentials.from_service_account_info( | |
| service_account_info | |
| ) | |
| self.client = bigquery.Client(credentials=credentials, project=project_id) | |
| self.table_ref = self.client.dataset(dataset_id).table(table_id) | |
| except Exception as e: | |
| print(f"Error: {e}") | |
| self.handleError(e) | |
| def emit(self, record): | |
| try: | |
| recordstr = f"{self.format(record)}" | |
| body = io.BytesIO(recordstr.encode("utf-8")) | |
| job = self.client.load_table_from_file( | |
| body, self.table_ref, job_config=job_config | |
| ) | |
| job.result() | |
| except GoogleAPIError as e: | |
| self.handleError(e) | |
| except Exception as e: | |
| self.handleError(e) | |
| def handleError(self, record): | |
| """ | |
| Handle errors associated with logging. | |
| This method prevents logging-related exceptions from propagating. | |
| Optionally, implement more sophisticated error handling here. | |
| """ | |
| if isinstance(record, logging.LogRecord): | |
| super().handleError(record) | |
| else: | |
| print(f"Logging error: {record}") | |
| logger = logging.getLogger(__name__) | |
| def setup_logger() -> None: | |
| """ | |
| Logger setup. | |
| """ | |
| logger.setLevel(logging.DEBUG) | |
| stream_formatter = logging.Formatter( | |
| "%(asctime)s - %(name)s - %(levelname)s - %(message)s" | |
| ) | |
| stream_handler = logging.StreamHandler() | |
| stream_handler.setFormatter(stream_formatter) | |
| logger.addHandler(stream_handler) | |
| bq_handler = BigQueryLoggingHandler() | |
| bq_handler.setFormatter(stream_formatter) | |
| logger.addHandler(bq_handler) | |