Spaces:
Sleeping
Sleeping
File size: 21,302 Bytes
6ee917b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 | import OmFileFormat
import SwiftNetCDF
import Foundation
import Logging
/// Downloaders return FileHandles to keep files open while downloading
/// If another download starts and would overlap, this still keeps the old file open
struct GenericVariableHandle {
let variable: GenericVariable
let time: Timestamp
let member: Int
private let fn: FileHandle
public init(variable: GenericVariable, time: Timestamp, member: Int, fn: FileHandle) {
self.variable = variable
self.time = time
self.member = member
self.fn = fn
}
public func makeReader() throws -> OmFileReaderArray<MmapFile, Float> {
try OmFileReader(fn: try MmapFile(fn: fn)).asArray(of: Float.self)!
}
/// Process concurrently
static func convert(logger: Logger, domain: GenericDomain, createNetcdf: Bool, run: Timestamp?, handles: [Self], concurrent: Int, writeUpdateJson: Bool, uploadS3Bucket: String?, uploadS3OnlyProbabilities: Bool, compression: CompressionType = .pfor_delta2d_int16) async throws {
let startTime = DispatchTime.now()
try await convertConcurrent(logger: logger, domain: domain, createNetcdf: createNetcdf, run: run, handles: handles, onlyGeneratePreviousDays: false, concurrent: concurrent, compression: compression)
logger.info("Convert completed in \(startTime.timeElapsedPretty())")
/// Write new model meta data, but only of it contains temperature_2m, precipitation, 10m wind or pressure. Ignores e.g. upper level runs
if writeUpdateJson, let run, handles.contains(where: {["temperature_2m", "precipitation", "precipitation_probability", "wind_u_component_10m", "pressure_msl", "river_discharge", "ocean_u_current", "wave_height", "pm10", "methane", "shortwave_radiation"].contains($0.variable.omFileName.file)}) {
let end = handles.max(by: {$0.time < $1.time})?.time.add(domain.dtSeconds) ?? Timestamp(0)
//let writer = OmFileWriter(dim0: 1, dim1: 1, chunk0: 1, chunk1: 1)
// generate model update timeseries
//let range = TimerangeDt(start: run, to: end, dtSeconds: domain.dtSeconds)
let current = Timestamp.now()
/*let initTimes = try range.flatMap {
// TODO timestamps need 64 bit integration
return [
GenericVariableHandle(
variable: ModelTimeVariable.initialisation_time,
time: $0,
member: 0,
fn: try writer.writeTemporary(compressionType: .pfor_delta2d_int16, scalefactor: 1, all: [Float($0.timeIntervalSince1970)])
),
GenericVariableHandle(
variable: ModelTimeVariable.modification_time,
time: $0,
member: 0,
fn: try writer.writeTemporary(compressionType: .pfor_delta2d_int16, scalefactor: 1, all: [Float(current.timeIntervalSince1970)])
)
]
}
let storePreviousForecast = handles.first(where: {$0.variable.storePreviousForecast}) != nil
try convert(logger: logger, domain: domain, createNetcdf: false, run: run, handles: initTimes, storePreviousForecastOverwrite: storePreviousForecast)*/
try ModelUpdateMetaJson.update(domain: domain, run: run, end: end, now: current)
}
if let uploadS3Bucket = uploadS3Bucket {
logger.info("AWS upload to bucket \(uploadS3Bucket)")
let startTimeAws = DispatchTime.now()
let variables = handles.map { $0.variable }.uniqued(on: { $0.omFileName.file })
do {
try domain.domainRegistry.syncToS3(
bucket: uploadS3Bucket,
variables: uploadS3OnlyProbabilities ? [ProbabilityVariable.precipitation_probability] : variables
)
} catch {
logger.error("Sync to AWS failed: \(error)")
}
logger.info("AWS upload completed in \(startTimeAws.timeElapsedPretty())")
}
if let run {
// if run is nil, do not attempt to generate previous days files
logger.info("Convert previous day database if required")
let startTimePreviousDays = DispatchTime.now()
try await convertConcurrent(logger: logger, domain: domain, createNetcdf: createNetcdf, run: run, handles: handles, onlyGeneratePreviousDays: true, concurrent: concurrent, compression: compression)
logger.info("Previous day convert in \(startTimePreviousDays.timeElapsedPretty())")
}
}
static private func convertConcurrent(logger: Logger, domain: GenericDomain, createNetcdf: Bool, run: Timestamp?, handles: [Self], onlyGeneratePreviousDays: Bool, concurrent: Int, compression: CompressionType) async throws {
if concurrent > 1 {
try await handles
.filter({ onlyGeneratePreviousDays == false || $0.variable.storePreviousForecast })
.groupedPreservedOrder(by: {"\($0.variable.omFileName.file)"})
.evenlyChunked(in: concurrent)
.foreachConcurrent(nConcurrent: concurrent, body: {
try convertSerial3D(logger: logger, domain: domain, createNetcdf: createNetcdf, run: run, handles: $0.flatMap{$0.values}, onlyGeneratePreviousDays: onlyGeneratePreviousDays, compression: compression)
})
} else {
try convertSerial3D(logger: logger, domain: domain, createNetcdf: createNetcdf, run: run, handles: handles, onlyGeneratePreviousDays: onlyGeneratePreviousDays, compression: compression)
}
}
/// Process each variable and update time-series optimised files
static private func convertSerial3D(logger: Logger, domain: GenericDomain, createNetcdf: Bool, run: Timestamp?, handles: [Self], onlyGeneratePreviousDays: Bool, compression: CompressionType) throws {
let grid = domain.grid
let nx = grid.nx
let ny = grid.ny
let nLocations = grid.count
let dtSeconds = domain.dtSeconds
for (_, handles) in handles.groupedPreservedOrder(by: {"\($0.variable.omFileName.file)"}) {
let readers: [(time: TimerangeDt, reader: OmFileReaderArray<MmapFile, Float>, member: Int)] = try handles.grouped(by: {$0.time}).flatMap { (time, h) in
return try h.map {
let reader = try $0.makeReader()
let dimensions = reader.getDimensions()
let nt = dimensions.count == 3 ? Int(dimensions[2]) : 1
guard dimensions[0] == ny && dimensions[1] == nx else {
fatalError("Dimensions do not match \(dimensions). Ny \(ny), Nx \(nx)")
}
let time = TimerangeDt(start: time, nTime: nt, dtSeconds: dtSeconds)
return(time, reader, $0.member)
}
}
guard let timeMin = readers.min(by: {$0.time.range.lowerBound < $1.time.range.lowerBound})?.time.range.lowerBound else {
logger.warning("No data to convert")
return
}
guard let timeMax = readers.max(by: {$0.time.range.upperBound < $1.time.range.upperBound})?.time.range.upperBound else {
logger.warning("No data to convert")
return
}
guard let maxTimeStepsPerFile = readers.max(by: {$0.time.count < $1.time.count})?.time.count else {
logger.warning("No data to convert")
return
}
/// `timeMinMax.min.time` has issues with `skip`
/// Start time (timeMinMax.min) might be before run time in case of MF wave which contains hindcast data
let startTime = min(run ?? timeMin, timeMin)
let time = TimerangeDt(range: startTime..<timeMax, dtSeconds: dtSeconds)
let variable = handles[0].variable
let nMembers = (handles.max(by: {$0.member < $1.member})?.member ?? 0) + 1
let nMembersStr = nMembers > 1 ? " (\(nMembers) nMembers)" : ""
let storePreviousForecast = variable.storePreviousForecast && nMembers <= 1
if onlyGeneratePreviousDays && !storePreviousForecast {
// No need to generate previous day forecast
continue
}
/// If only one value is set, this could be the model initialisation or modifcation time
/// TODO: check if single value mode is still required
//let isSingleValueVariable = readers.first?.reader.first?.fn.count == 1
let om = OmFileSplitter(domain,
//nLocations: isSingleValueVariable ? 1 : nil,
nMembers: nMembers/*,
chunknLocations: nMembers > 1 ? nMembers : nil*/
)
//let nLocationsPerChunk = om.nLocationsPerChunk
let spatialChunks = OmFileSplitter.calculateSpatialXYChunk(domain: domain, nMembers: nMembers, nTime: 1)
var data3d = Array3DFastTime(nLocations: spatialChunks.x * spatialChunks.y, nLevel: nMembers, nTime: time.count)
var readTemp = [Float](repeating: .nan, count: spatialChunks.x * spatialChunks.y * maxTimeStepsPerFile)
// Create netcdf file for debugging
if createNetcdf && !onlyGeneratePreviousDays {
logger.info("Generating NetCDF file for \(variable)")
try FileManager.default.createDirectory(atPath: domain.downloadDirectory, withIntermediateDirectories: true)
let ncFile = try NetCDF.create(path: "\(domain.downloadDirectory)\(variable.omFileName.file).nc", overwriteExisting: true)
try ncFile.setAttribute("TITLE", "\(domain) \(variable)")
var ncVariable = try ncFile.createVariable(name: "data", type: Float.self, dimensions: [
try ncFile.createDimension(name: "time", length: time.count),
try ncFile.createDimension(name: "member", length: nMembers),
try ncFile.createDimension(name: "LAT", length: grid.ny),
try ncFile.createDimension(name: "LON", length: grid.nx)
])
for reader in readers {
let data = try reader.reader.read()
let nt = reader.time.count
let timeArrayIndex = time.index(of: reader.time.range.lowerBound)!
if nt > 1 {
let fastSpace = Array2DFastTime(data: data, nLocations: grid.count, nTime: nt).transpose().data
try ncVariable.write(fastSpace, offset: [timeArrayIndex, reader.member, 0, 0], count: [nt, 1, grid.ny, grid.nx])
} else {
try ncVariable.write(data, offset: [timeArrayIndex, reader.member, 0, 0], count: [1, 1, grid.ny, grid.nx])
}
}
}
let progress = TransferAmountTracker(logger: logger, totalSize: nx*ny*time.count*nMembers*MemoryLayout<Float>.size, name: "Convert \(variable.rawValue)\(nMembersStr) \(time.prettyString())")
try om.updateFromTimeOrientedStreaming3D(variable: variable.omFileName.file, time: time, scalefactor: variable.scalefactor, compression: compression, onlyGeneratePreviousDays: onlyGeneratePreviousDays) { (yRange, xRange, memberRange) in
let nLoc = yRange.count * xRange.count
data3d.data.fillWithNaNs()
for reader in readers {
let dimensions = reader.reader.getDimensions()
let timeArrayIndex = time.index(of: reader.time.range.lowerBound)!
if dimensions.count == 3 {
/// Number of time steps in this file
let nt = dimensions[2]
try reader.reader.read(into: &readTemp, range: [yRange, xRange, 0..<nt])
data3d[0..<nLoc, reader.member, timeArrayIndex ..< timeArrayIndex+Int(nt)] = readTemp[0..<nLoc*Int(nt)]
} else {
// Single time step
try reader.reader.read(into: &readTemp, range: [yRange, xRange])
data3d[0..<nLoc, reader.member, timeArrayIndex] = readTemp[0..<nLoc]
}
}
// Interpolate all missing values
data3d.interpolateInplace(
type: variable.interpolation,
time: time,
grid: domain.grid,
locationRange: RegularGridSlice(grid: domain.grid, yRange: Int(yRange.lowerBound) ..< Int(yRange.upperBound), xRange: Int(xRange.lowerBound) ..< Int(xRange.upperBound))
)
progress.add(nLoc * memberRange.count * time.count * MemoryLayout<Float>.size)
return data3d.data[0..<nLoc * memberRange.count * time.count]
}
progress.finish()
}
}
}
actor GenericVariableHandleStorage {
var handles = [GenericVariableHandle]()
func append(_ element: GenericVariableHandle) {
handles.append(element)
}
func append(_ element: GenericVariableHandle?) {
guard let element else {
return
}
handles.append(element)
}
func append(contentsOf elements: [GenericVariableHandle]) {
handles.append(contentsOf: elements)
}
}
/// Thread safe storage for downloading grib messages. Can be used to post process data.
actor VariablePerMemberStorage<V: Hashable> {
struct VariableAndMember: Hashable {
let variable: V
let timestamp: Timestamp
let member: Int
func with(variable: V, timestamp: Timestamp? = nil) -> VariableAndMember {
.init(variable: variable, timestamp: timestamp ?? self.timestamp, member: self.member)
}
var timestampAndMember: TimestampAndMember {
return .init(timestamp: timestamp, member: member)
}
}
struct TimestampAndMember: Equatable {
let timestamp: Timestamp
let member: Int
}
var data = [VariableAndMember: Array2D]()
init(data: [VariableAndMember : Array2D] = [VariableAndMember: Array2D]()) {
self.data = data
}
func set(variable: V, timestamp: Timestamp, member: Int, data: Array2D) {
self.data[.init(variable: variable, timestamp: timestamp, member: member)] = data
}
func get(variable: V, timestamp: Timestamp, member: Int) -> Array2D? {
return data[.init(variable: variable, timestamp: timestamp, member: member)]
}
func get(_ variable: VariableAndMember) -> Array2D? {
return data[variable]
}
}
extension VariablePerMemberStorage {
/// Calculate wind speed and direction from U/V components for all available members an timesteps.
/// if `trueNorth` is given, correct wind direction due to rotated grid projections. E.g. DMI HARMONIE AROME using LambertCC
func calculateWindSpeed(u: V, v: V, outSpeedVariable: GenericVariable, outDirectionVariable: GenericVariable?, writer: OmFileWriterHelper, trueNorth: [Float]? = nil) throws -> [GenericVariableHandle] {
return try self.data
.groupedPreservedOrder(by: {$0.key.timestampAndMember})
.flatMap({ (t, handles) -> [GenericVariableHandle] in
guard let u = handles.first(where: {$0.key.variable == u}), let v = handles.first(where: {$0.key.variable == v}) else {
return []
}
let speed = zip(u.value.data, v.value.data).map(Meteorology.windspeed)
let speedHandle = GenericVariableHandle(
variable: outSpeedVariable,
time: t.timestamp,
member: t.member,
fn: try writer.writeTemporary(compressionType: .pfor_delta2d_int16, scalefactor: outSpeedVariable.scalefactor, all: speed)
)
if let outDirectionVariable {
var direction = Meteorology.windirectionFast(u: u.value.data, v: v.value.data)
if let trueNorth {
direction = zip(direction, trueNorth).map({($0-$1+360).truncatingRemainder(dividingBy: 360)})
}
let directionHandle = GenericVariableHandle(
variable: outDirectionVariable,
time: t.timestamp,
member: t.member,
fn: try writer.writeTemporary(compressionType: .pfor_delta2d_int16, scalefactor: outDirectionVariable.scalefactor, all: direction)
)
return [speedHandle, directionHandle]
}
return [speedHandle]
}
)
}
/// Generate elevation file
/// - `elevation`: in metres
/// - `landMask` 0 = sea, 1 = land. Fractions below 0.5 are considered sea.
func generateElevationFile(elevation: V, landmask: V, domain: GenericDomain) throws {
let elevationFile = domain.surfaceElevationFileOm
if FileManager.default.fileExists(atPath: elevationFile.getFilePath()) {
return
}
guard var elevation = self.data.first(where: {$0.key.variable == elevation})?.value.data,
let landMask = self.data.first(where: {$0.key.variable == landmask})?.value.data else {
return
}
try elevationFile.createDirectory()
for i in elevation.indices {
if elevation[i] >= 9000 {
fatalError("Elevation greater 90000")
}
if landMask[i] < 0.5 {
// mask sea
elevation[i] = -999
}
}
#if Xcode
try Array2D(data: elevation, nx: domain.grid.nx, ny: domain.grid.ny).writeNetcdf(filename: domain.surfaceElevationFileOm.getFilePath().replacingOccurrences(of: ".om", with: ".nc"))
#endif
try elevation.writeOmFile2D(file: elevationFile.getFilePath(), grid: domain.grid, createNetCdf: false)
}
}
/// Keep values from previous timestep. Actori isolated, because of concurrent data conversion
actor GribDeaverager {
var data: [String: (step: Int, data: [Float])]
/// Set new value and get previous value out
func set(variable: GenericVariable, member: Int, step: Int, data d: [Float]) -> (step: Int, data: [Float])? {
let key = "\(variable)_member\(member)"
let previous = data[key]
data[key] = (step, d)
return previous
}
/// Make a deep copy
func copy() -> GribDeaverager {
return .init(data: data)
}
public init(data: [String : (step: Int, data: [Float])] = [String: (step: Int, data: [Float])]()) {
self.data = data
}
/// Returns false if step should be skipped
func deaccumulateIfRequired(variable: GenericVariable, member: Int, stepType: String, stepRange: String, grib2d: inout GribArray2D) async -> Bool {
// Deaccumulate precipitation
if stepType == "accum" {
guard let (startStep, currentStep) = stepRange.splitTo2Integer(), startStep != currentStep else {
return false
}
// Store data for next timestep
let previous = set(variable: variable, member: member, step: currentStep, data: grib2d.array.data)
// For the overall first timestep or the first step of each repeating section, deaveraging is not required
if let previous, previous.step != startStep, currentStep > previous.step {
for l in previous.data.indices {
grib2d.array.data[l] -= previous.data[l]
}
}
}
// Deaverage data
if stepType == "avg" {
guard let (startStep, currentStep) = stepRange.splitTo2Integer(), startStep != currentStep else {
return false
}
// Store data for next timestep
let previous = set(variable: variable, member: member, step: currentStep, data: grib2d.array.data)
// For the overall first timestep or the first step of each repeating section, deaveraging is not required
if let previous, previous.step != startStep, currentStep > previous.step {
let deltaHours = Float(currentStep - startStep)
let deltaHoursPrevious = Float(previous.step - startStep)
for l in previous.data.indices {
grib2d.array.data[l] = (grib2d.array.data[l] * deltaHours - previous.data[l] * deltaHoursPrevious) / (deltaHours - deltaHoursPrevious)
}
}
}
return true
}
}
|