Spaces:
Runtime error
Runtime error
File size: 1,937 Bytes
1521ef5 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 | """
BigQuery utility functions.
Provides common functionality for working with BigQuery.
"""
import datetime
from google.cloud import bigquery
def handle_partition_filter(table, start_date=None, end_date=None):
"""
Creates a partition filter clause for BigQuery queries based on table partitioning.
Args:
table: A BigQuery table object
start_date: ISO formatted date string for the start date of the partition filter
end_date: ISO formatted date string for the end date of the partition filter
Returns:
A string containing the WHERE clause for the partition filter, or an empty string if not applicable
"""
partition_filter = ""
if table.time_partitioning:
# Use the partition field if specified; otherwise default to ingestion time.
partition_field = table.time_partitioning.field or "_PARTITIONTIME"
# If ingestion-time partitioning is in use, switch to the DATE pseudo column.
if partition_field == "_PARTITIONTIME":
partition_field = "_PARTITIONDATE"
if start_date and end_date:
partition_filter = f"WHERE `{partition_field}` BETWEEN '{start_date}' AND '{end_date}'"
elif start_date:
partition_filter = f"WHERE `{partition_field}` >= '{start_date}'"
elif end_date:
partition_filter = f"WHERE `{partition_field}` <= '{end_date}'"
else:
# If no dates are provided, default to today's date.
today = datetime.date.today().isoformat()
partition_filter = f"WHERE `{partition_field}` = '{today}'"
return partition_filter
def flatten_column_dict(columns):
"""
Flattens a nested column dictionary.
Args:
columns: A dictionary of column information
Returns:
A flattened dictionary
"""
return {col_name: info for col_name, info in columns.items()} |