| | import asyncio |
| | from concurrent.futures import ThreadPoolExecutor |
| | from typing import TypedDict |
| | import duckdb |
| | import pandas as pd |
| |
|
| | async def execute_sql_query(sql_query: str) -> pd.DataFrame: |
| | """Executes a SQL query on the DRIAS database and returns the results. |
| | |
| | This function connects to the DuckDB database containing DRIAS climate data |
| | and executes the provided SQL query. It handles the database connection and |
| | returns the results as a pandas DataFrame. |
| | |
| | Args: |
| | sql_query (str): The SQL query to execute |
| | |
| | Returns: |
| | pd.DataFrame: A DataFrame containing the query results |
| | |
| | Raises: |
| | duckdb.Error: If there is an error executing the SQL query |
| | """ |
| | def _execute_query(): |
| | |
| | con = duckdb.connect() |
| | results = con.sql(sql_query).fetchdf() |
| | |
| | return results |
| |
|
| | |
| | loop = asyncio.get_event_loop() |
| | with ThreadPoolExecutor() as executor: |
| | return await loop.run_in_executor(executor, _execute_query) |
| |
|
| |
|
| | class IndicatorPerYearAtLocationQueryParams(TypedDict, total=False): |
| | """Parameters for querying an indicator's values over time at a location. |
| | |
| | This class defines the parameters needed to query climate indicator data |
| | for a specific location over multiple years. |
| | |
| | Attributes: |
| | indicator_column (str): The column name for the climate indicator |
| | latitude (str): The latitude coordinate of the location |
| | longitude (str): The longitude coordinate of the location |
| | model (str): The climate model to use (optional) |
| | """ |
| | indicator_column: str |
| | latitude: str |
| | longitude: str |
| | model: str |
| |
|
| |
|
| | def indicator_per_year_at_location_query( |
| | table: str, params: IndicatorPerYearAtLocationQueryParams |
| | ) -> str: |
| | """SQL Query to get the evolution of an indicator per year at a certain location |
| | |
| | Args: |
| | table (str): sql table of the indicator |
| | params (IndicatorPerYearAtLocationQueryParams) : dictionary with the required params for the query |
| | |
| | Returns: |
| | str: the sql query |
| | """ |
| | indicator_column = params.get("indicator_column") |
| | latitude = params.get("latitude") |
| | longitude = params.get("longitude") |
| | |
| | if indicator_column is None or latitude is None or longitude is None: |
| | return "" |
| | |
| | table = f"'hf://datasets/timeki/drias_db/{table.lower()}.parquet'" |
| |
|
| | sql_query = f"SELECT year, {indicator_column}, model\nFROM {table}\nWHERE latitude = {latitude} \nAnd longitude = {longitude} \nOrder by Year" |
| |
|
| | return sql_query |
| |
|
| | class IndicatorForGivenYearQueryParams(TypedDict, total=False): |
| | """Parameters for querying an indicator's values across locations for a year. |
| | |
| | This class defines the parameters needed to query climate indicator data |
| | across different locations for a specific year. |
| | |
| | Attributes: |
| | indicator_column (str): The column name for the climate indicator |
| | year (str): The year to query |
| | model (str): The climate model to use (optional) |
| | """ |
| | indicator_column: str |
| | year: str |
| | model: str |
| |
|
| | def indicator_for_given_year_query( |
| | table:str, params: IndicatorForGivenYearQueryParams |
| | ) -> str: |
| | """SQL Query to get the values of an indicator with their latitudes, longitudes and models for a given year |
| | |
| | Args: |
| | table (str): sql table of the indicator |
| | params (IndicatorForGivenYearQueryParams): dictionarry with the required params for the query |
| | |
| | Returns: |
| | str: the sql query |
| | """ |
| | indicator_column = params.get("indicator_column") |
| | year = params.get('year') |
| | if year is None or indicator_column is None: |
| | return "" |
| | |
| | table = f"'hf://datasets/timeki/drias_db/{table.lower()}.parquet'" |
| |
|
| | sql_query = f"Select {indicator_column}, latitude, longitude, model\nFrom {table}\nWhere year = {year}" |
| | return sql_query |