File size: 6,755 Bytes
974cfa5 168bc75 f34a7ac 974cfa5 f34a7ac 974cfa5 f34a7ac 974cfa5 f34a7ac 974cfa5 f34a7ac 168bc75 f34a7ac 168bc75 f34a7ac 168bc75 f34a7ac 974cfa5 f34a7ac 974cfa5 168bc75 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 |
import requests
from bs4 import BeautifulSoup
import os
from urllib.request import urlretrieve
import tarfile
import glob
import pandas as pd
from pandas.errors import EmptyDataError, ParserError
import sys
import concurrent.futures
# URL of the archive directory
BASE_URL = 'https://www.ncei.noaa.gov/data/ghcnm/v4/precipitation/archive/'
# Global definitions for workers
colspecs = [
(0, 11), # 1. GHCN identifier (columns 1-11)
(12, 52), # 2. Station name (columns 13-52)
(53, 62), # 3. Latitude (columns 54-62)
(63, 73), # 4. Longitude (columns 64-73)
(74, 82), # 5. Elevation (meters) (columns 75-82)
(83, 89), # 6. Year and month (columns 84-89)
(90, 96), # 7. Precipitation value (columns 91-96)
(97, 98), # 8. Measurement flag (column 98)
(99, 100), # 9. Quality control flag (column 100)
(101, 102), # 10. Source flag (column 102)
(103, 109), # 11. Source index (columns 104-109)
]
column_names = [
"ghcn_id",
"station_name",
"latitude",
"longitude",
"elevation_m",
"year_month",
"precip_tenths_mm",
"measurement_flag",
"quality_flag",
"source_flag",
"source_index"
]
dtypes = {
"ghcn_id": "string",
"station_name": "string",
"latitude": "float32",
"longitude": "float32",
"elevation_m": "float32",
"year_month": "string",
"precip_tenths_mm": "string",
"measurement_flag": "string",
"quality_flag": "string",
"source_flag": "string",
"source_index": "string"
}
def process_file(file):
try:
# print(f"Processing {file}") # Too noisy for multiprocessing
df = pd.read_fwf(
file,
colspecs=colspecs,
names=column_names,
dtype=dtypes, # Apply defined data types
header=None # Important: Ensure pandas doesn't look for a header row
)
# 1. Parse 'year_month' into 'year' and 'month' columns
df['year'] = pd.to_numeric(df['year_month'].str[:4], errors='coerce').astype('Int16')
df['month'] = pd.to_numeric(df['year_month'].str[4:], errors='coerce').astype('Int8')
df.drop('year_month', axis=1, inplace=True) # Drop the original column
# Convert precip_tenths_mm to numeric, handling potential errors
df['precip_tenths_mm'] = pd.to_numeric(df['precip_tenths_mm'], errors='coerce')
df = df.dropna(subset=['precip_tenths_mm']) # Drop rows that failed conversion
df['precip_tenths_mm'] = df['precip_tenths_mm'].astype('Int32')
# 2. Handle Trace Precipitation (-1 becomes 0)
df['precip_tenths_mm'] = df['precip_tenths_mm'].replace(-1, 0)
# 3. Create precipitation column in millimeters (float)
df['precip_mm'] = (df['precip_tenths_mm'].astype('Float32') / 10.0).round(1)
# FIXED FILTERING LOGIC
# Using isin includes NAs as False (not in list), so ~False is True (Keep)
bad_flags = ['O', 'R', 'T', 'S', 'K']
mask = df['quality_flag'].isin(bad_flags).fillna(False)
df = df[~mask]
# skip is number of rows is less than 120
if len(df) < 120:
return None
# 4. Skip this file if any 'precip_mm' value is greater than 5000
if (df['precip_mm'] > 2000).any():
# print(f"Skipping {file} due to extreme precipitation value (> 2000mm)")
return None
return df
except Exception as e:
print(f"Error processing {file}: {e}")
return None
def main():
# Create a session
session = requests.Session()
# Get the HTML content of the archive page
try:
response = session.get(BASE_URL)
response.raise_for_status() # Raise an error for bad status
# Parse HTML with BeautifulSoup
soup = BeautifulSoup(response.text, 'html.parser')
# Find all .tar.gz files
tar_links = [a['href'] for a in soup.find_all('a', href=True) if a['href'].endswith('.tar.gz')]
if tar_links:
file_url = BASE_URL + tar_links[0]
filename = os.path.join('misc/data', tar_links[0])
if os.path.exists(filename):
print(f"File {filename} already exists. Skipping download.")
else:
print(f"Downloading {filename}...")
urlretrieve(file_url, filename)
if not os.path.exists('misc/data/pp') or not os.listdir('misc/data/pp'):
print("Extracting files...")
with tarfile.open(filename, 'r:gz') as tar:
tar.extractall(path='misc/data/pp')
else:
print("Files already extracted. Skipping extraction.")
except Exception as e:
print(f"Warning: Could not check/download updates: {e}")
# Path to CSV files
csv_path = 'misc/data/pp/*.csv'
# Get list of all CSV files
csv_files = glob.glob(csv_path)
print(f"Found {len(csv_files)} files. Starting processing with multiprocessing...")
# Process files in parallel
dataframes = []
# Use 75% of CPUs to avoid freezing system completely
max_workers = max(1, int(os.cpu_count() * 0.75))
with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor:
# Map returns results in order
for i, df in enumerate(executor.map(process_file, csv_files)):
if i % 1000 == 0:
print(f"Processed {i}/{len(csv_files)} files...")
if df is not None:
dataframes.append(df)
if not dataframes:
print("No valid dataframes found.")
return
print("Concatenating dataframes...")
combined_df = pd.concat(dataframes, ignore_index=True)
# Rename columns to match TAVG schema
combined_df = combined_df.rename(columns={
'ghcn_id': 'ID',
'year': 'YEAR',
'month': 'MONTH',
'precip_mm': 'VALUE'
})
# Filter out unneeded columns if desired, or keep them.
# Server.R uses: ID, YEAR, MONTH, VALUE.
# Ensure VALUE is float
combined_df['VALUE'] = combined_df['VALUE'].astype('float32')
print("Saving to parquet...")
# Save the DataFrame as a Parquet file
combined_df.to_parquet('www/data/tabs/prec_long.parquet', engine='pyarrow', index=False)
print(combined_df.describe())
# Calculate availability
print("Calculating availability...")
station_summary = combined_df.groupby('ID')['YEAR'].agg(first_year='min', last_year='max').reset_index()
# ID is already named ID
station_summary.to_csv("www/data/tabs/prec_availability.csv", index=False)
print("Done!")
if __name__ == '__main__':
main()
|