import Foundation import Vapor import SwiftNetCDF import OmFileFormat /** Download wave model form the german weather service https://www.dwd.de/DE/leistungen/opendata/help/modelle/legend_ICON_wave_EN_pdf.pdf?__blob=publicationFile&v=3 All equations: https://library.wmo.int/doc_num.php?explnum_id=10979 */ struct DownloadIconWaveCommand: AsyncCommand { struct Signature: CommandSignature { @Argument(name: "domain") var domain: String @Option(name: "run") var run: String? @Option(name: "only-variables") var onlyVariables: String? @Option(name: "upload-s3-bucket", help: "Upload open-meteo database to an S3 bucket after processing") var uploadS3Bucket: String? @Flag(name: "create-netcdf") var createNetcdf: Bool @Option(name: "concurrent", short: "c", help: "Number of concurrent download/conversion jobs") var concurrent: Int? @Option(name: "max-forecast-hour", help: "Only download data until this forecast hour") var maxForecastHour: Int? } var help: String { "Download a specified wave model run" } func run(using context: CommandContext, signature: Signature) async throws { let domain = try IconWaveDomain.load(rawValue: signature.domain) let runHH = signature.run.map { guard let run = Int($0) else { fatalError("Invalid run '\($0)'") } return run } ?? domain.lastRun let onlyVariables = try IconWaveVariable.load(commaSeparatedOptional: signature.onlyVariables) let logger = context.application.logger let run = Timestamp.now().with(hour: runHH) logger.info("Downloading domain '\(domain.rawValue)' run '\(run.iso8601_YYYY_MM_dd_HH_mm)'") let variables = onlyVariables ?? IconWaveVariable.allCases let handles = try await download(application: context.application, domain: domain, run: run, variables: variables, maxForecastHour: signature.maxForecastHour) let nConcurrent = signature.concurrent ?? 1 try await GenericVariableHandle.convert(logger: logger, domain: domain, createNetcdf: signature.createNetcdf, run: run, handles: handles, concurrent: nConcurrent, writeUpdateJson: true, uploadS3Bucket: signature.uploadS3Bucket, uploadS3OnlyProbabilities: false) } /// Download all timesteps and preliminarily covnert it to compressed files func download(application: Application, domain: IconWaveDomain, run: Timestamp, variables: [IconWaveVariable], maxForecastHour: Int?) async throws -> [GenericVariableHandle] { // https://opendata.dwd.de/weather/maritime/wave_models/gwam/grib/00/mdww/GWAM_MDWW_2022072800_000.grib2.bz2 // https://opendata.dwd.de/weather/maritime/wave_models/ewam/grib/00/mdww/EWAM_MDWW_2022072800_000.grib2.bz2 let baseUrl = "http://opendata.dwd.de/weather/maritime/wave_models/\(domain.rawValue)/grib/\(run.hour.zeroPadded(len: 2))/" let logger = application.logger try FileManager.default.createDirectory(atPath: domain.downloadDirectory, withIntermediateDirectories: true) let curl = Curl(logger: logger, client: application.dedicatedHttpClient) let nx = domain.grid.nx let ny = domain.grid.ny let writer = OmFileSplitter.makeSpatialWriter(domain: domain) var grib2d = GribArray2D(nx: nx, ny: ny) let handles = try await (0..<(maxForecastHour ?? domain.countForecastHours)).asyncFlatMap { forecastStep in /// E.g. 0,3,6...174 for gwam let forecastHour = forecastStep * domain.dtHours logger.info("Downloading hour \(forecastHour)") return try await variables.asyncMap { variable in let url = "\(baseUrl)\(variable.dwdName)/\(domain.rawValue.uppercased())_\(variable.dwdName.uppercased())_\(run.format_YYYYMMddHH)_\(forecastHour.zeroPadded(len: 3)).grib2.bz2" let message = try await curl.downloadGrib(url: url, bzip2Decode: true)[0] try grib2d.load(message: message) if domain == .gwam { grib2d.array.shift180LongitudeAndFlipLatitude() } else { grib2d.array.flipLatitude() } /// Create elevation file for sea mask if !FileManager.default.fileExists(atPath: domain.surfaceElevationFileOm.getFilePath()) { var elevation = grib2d.array.data for i in elevation.indices { /// `NaN` out of domain, `-999` sea grid point elevation[i] = elevation[i].isNaN ? .nan : -999 } try domain.surfaceElevationFileOm.createDirectory() try elevation.writeOmFile2D(file: domain.surfaceElevationFileOm.getFilePath(), grid: domain.grid, createNetCdf: false) } let fn = try writer.writeTemporary(compressionType: .pfor_delta2d_int16, scalefactor: variable.scalefactor, all: grib2d.array.data) return GenericVariableHandle( variable: variable, time: run.add(hours: forecastHour), member: 0, fn: fn ) } } await curl.printStatistics() return handles } } extension IconWaveDomain { /// Based on the current time , guess the current run that should be available soon on the open-data server fileprivate var lastRun: Int { let t = Timestamp.now() switch self { case .ewam: fallthrough case .gwam: // Wave models have a delay of 3-4 hours after initialisation return ((t.hour - 3 + 24) % 24) / 12 * 12 } } }