File size: 1,937 Bytes
1521ef5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
"""
BigQuery utility functions.
Provides common functionality for working with BigQuery.
"""

import datetime
from google.cloud import bigquery

def handle_partition_filter(table, start_date=None, end_date=None):
    """
    Creates a partition filter clause for BigQuery queries based on table partitioning.
    
    Args:
        table: A BigQuery table object
        start_date: ISO formatted date string for the start date of the partition filter
        end_date: ISO formatted date string for the end date of the partition filter
        
    Returns:
        A string containing the WHERE clause for the partition filter, or an empty string if not applicable
    """
    partition_filter = ""
    if table.time_partitioning:
        # Use the partition field if specified; otherwise default to ingestion time.
        partition_field = table.time_partitioning.field or "_PARTITIONTIME"
        # If ingestion-time partitioning is in use, switch to the DATE pseudo column.
        if partition_field == "_PARTITIONTIME":
            partition_field = "_PARTITIONDATE"
        
        if start_date and end_date:
            partition_filter = f"WHERE `{partition_field}` BETWEEN '{start_date}' AND '{end_date}'"
        elif start_date:
            partition_filter = f"WHERE `{partition_field}` >= '{start_date}'"
        elif end_date:
            partition_filter = f"WHERE `{partition_field}` <= '{end_date}'"
        else:
            # If no dates are provided, default to today's date.
            today = datetime.date.today().isoformat()
            partition_filter = f"WHERE `{partition_field}` = '{today}'"
    
    return partition_filter

def flatten_column_dict(columns):
    """
    Flattens a nested column dictionary.
    
    Args:
        columns: A dictionary of column information
        
    Returns:
        A flattened dictionary
    """
    return {col_name: info for col_name, info in columns.items()}