open-wether / Sources /App /Helper /Writer /GenericVariableHandle.swift
soiz1's picture
Migrated from GitHub
6ee917b verified
import OmFileFormat
import SwiftNetCDF
import Foundation
import Logging
/// Downloaders return FileHandles to keep files open while downloading
/// If another download starts and would overlap, this still keeps the old file open
struct GenericVariableHandle {
let variable: GenericVariable
let time: Timestamp
let member: Int
private let fn: FileHandle
public init(variable: GenericVariable, time: Timestamp, member: Int, fn: FileHandle) {
self.variable = variable
self.time = time
self.member = member
self.fn = fn
}
public func makeReader() throws -> OmFileReaderArray<MmapFile, Float> {
try OmFileReader(fn: try MmapFile(fn: fn)).asArray(of: Float.self)!
}
/// Process concurrently
static func convert(logger: Logger, domain: GenericDomain, createNetcdf: Bool, run: Timestamp?, handles: [Self], concurrent: Int, writeUpdateJson: Bool, uploadS3Bucket: String?, uploadS3OnlyProbabilities: Bool, compression: CompressionType = .pfor_delta2d_int16) async throws {
let startTime = DispatchTime.now()
try await convertConcurrent(logger: logger, domain: domain, createNetcdf: createNetcdf, run: run, handles: handles, onlyGeneratePreviousDays: false, concurrent: concurrent, compression: compression)
logger.info("Convert completed in \(startTime.timeElapsedPretty())")
/// Write new model meta data, but only of it contains temperature_2m, precipitation, 10m wind or pressure. Ignores e.g. upper level runs
if writeUpdateJson, let run, handles.contains(where: {["temperature_2m", "precipitation", "precipitation_probability", "wind_u_component_10m", "pressure_msl", "river_discharge", "ocean_u_current", "wave_height", "pm10", "methane", "shortwave_radiation"].contains($0.variable.omFileName.file)}) {
let end = handles.max(by: {$0.time < $1.time})?.time.add(domain.dtSeconds) ?? Timestamp(0)
//let writer = OmFileWriter(dim0: 1, dim1: 1, chunk0: 1, chunk1: 1)
// generate model update timeseries
//let range = TimerangeDt(start: run, to: end, dtSeconds: domain.dtSeconds)
let current = Timestamp.now()
/*let initTimes = try range.flatMap {
// TODO timestamps need 64 bit integration
return [
GenericVariableHandle(
variable: ModelTimeVariable.initialisation_time,
time: $0,
member: 0,
fn: try writer.writeTemporary(compressionType: .pfor_delta2d_int16, scalefactor: 1, all: [Float($0.timeIntervalSince1970)])
),
GenericVariableHandle(
variable: ModelTimeVariable.modification_time,
time: $0,
member: 0,
fn: try writer.writeTemporary(compressionType: .pfor_delta2d_int16, scalefactor: 1, all: [Float(current.timeIntervalSince1970)])
)
]
}
let storePreviousForecast = handles.first(where: {$0.variable.storePreviousForecast}) != nil
try convert(logger: logger, domain: domain, createNetcdf: false, run: run, handles: initTimes, storePreviousForecastOverwrite: storePreviousForecast)*/
try ModelUpdateMetaJson.update(domain: domain, run: run, end: end, now: current)
}
if let uploadS3Bucket = uploadS3Bucket {
logger.info("AWS upload to bucket \(uploadS3Bucket)")
let startTimeAws = DispatchTime.now()
let variables = handles.map { $0.variable }.uniqued(on: { $0.omFileName.file })
do {
try domain.domainRegistry.syncToS3(
bucket: uploadS3Bucket,
variables: uploadS3OnlyProbabilities ? [ProbabilityVariable.precipitation_probability] : variables
)
} catch {
logger.error("Sync to AWS failed: \(error)")
}
logger.info("AWS upload completed in \(startTimeAws.timeElapsedPretty())")
}
if let run {
// if run is nil, do not attempt to generate previous days files
logger.info("Convert previous day database if required")
let startTimePreviousDays = DispatchTime.now()
try await convertConcurrent(logger: logger, domain: domain, createNetcdf: createNetcdf, run: run, handles: handles, onlyGeneratePreviousDays: true, concurrent: concurrent, compression: compression)
logger.info("Previous day convert in \(startTimePreviousDays.timeElapsedPretty())")
}
}
static private func convertConcurrent(logger: Logger, domain: GenericDomain, createNetcdf: Bool, run: Timestamp?, handles: [Self], onlyGeneratePreviousDays: Bool, concurrent: Int, compression: CompressionType) async throws {
if concurrent > 1 {
try await handles
.filter({ onlyGeneratePreviousDays == false || $0.variable.storePreviousForecast })
.groupedPreservedOrder(by: {"\($0.variable.omFileName.file)"})
.evenlyChunked(in: concurrent)
.foreachConcurrent(nConcurrent: concurrent, body: {
try convertSerial3D(logger: logger, domain: domain, createNetcdf: createNetcdf, run: run, handles: $0.flatMap{$0.values}, onlyGeneratePreviousDays: onlyGeneratePreviousDays, compression: compression)
})
} else {
try convertSerial3D(logger: logger, domain: domain, createNetcdf: createNetcdf, run: run, handles: handles, onlyGeneratePreviousDays: onlyGeneratePreviousDays, compression: compression)
}
}
/// Process each variable and update time-series optimised files
static private func convertSerial3D(logger: Logger, domain: GenericDomain, createNetcdf: Bool, run: Timestamp?, handles: [Self], onlyGeneratePreviousDays: Bool, compression: CompressionType) throws {
let grid = domain.grid
let nx = grid.nx
let ny = grid.ny
let nLocations = grid.count
let dtSeconds = domain.dtSeconds
for (_, handles) in handles.groupedPreservedOrder(by: {"\($0.variable.omFileName.file)"}) {
let readers: [(time: TimerangeDt, reader: OmFileReaderArray<MmapFile, Float>, member: Int)] = try handles.grouped(by: {$0.time}).flatMap { (time, h) in
return try h.map {
let reader = try $0.makeReader()
let dimensions = reader.getDimensions()
let nt = dimensions.count == 3 ? Int(dimensions[2]) : 1
guard dimensions[0] == ny && dimensions[1] == nx else {
fatalError("Dimensions do not match \(dimensions). Ny \(ny), Nx \(nx)")
}
let time = TimerangeDt(start: time, nTime: nt, dtSeconds: dtSeconds)
return(time, reader, $0.member)
}
}
guard let timeMin = readers.min(by: {$0.time.range.lowerBound < $1.time.range.lowerBound})?.time.range.lowerBound else {
logger.warning("No data to convert")
return
}
guard let timeMax = readers.max(by: {$0.time.range.upperBound < $1.time.range.upperBound})?.time.range.upperBound else {
logger.warning("No data to convert")
return
}
guard let maxTimeStepsPerFile = readers.max(by: {$0.time.count < $1.time.count})?.time.count else {
logger.warning("No data to convert")
return
}
/// `timeMinMax.min.time` has issues with `skip`
/// Start time (timeMinMax.min) might be before run time in case of MF wave which contains hindcast data
let startTime = min(run ?? timeMin, timeMin)
let time = TimerangeDt(range: startTime..<timeMax, dtSeconds: dtSeconds)
let variable = handles[0].variable
let nMembers = (handles.max(by: {$0.member < $1.member})?.member ?? 0) + 1
let nMembersStr = nMembers > 1 ? " (\(nMembers) nMembers)" : ""
let storePreviousForecast = variable.storePreviousForecast && nMembers <= 1
if onlyGeneratePreviousDays && !storePreviousForecast {
// No need to generate previous day forecast
continue
}
/// If only one value is set, this could be the model initialisation or modifcation time
/// TODO: check if single value mode is still required
//let isSingleValueVariable = readers.first?.reader.first?.fn.count == 1
let om = OmFileSplitter(domain,
//nLocations: isSingleValueVariable ? 1 : nil,
nMembers: nMembers/*,
chunknLocations: nMembers > 1 ? nMembers : nil*/
)
//let nLocationsPerChunk = om.nLocationsPerChunk
let spatialChunks = OmFileSplitter.calculateSpatialXYChunk(domain: domain, nMembers: nMembers, nTime: 1)
var data3d = Array3DFastTime(nLocations: spatialChunks.x * spatialChunks.y, nLevel: nMembers, nTime: time.count)
var readTemp = [Float](repeating: .nan, count: spatialChunks.x * spatialChunks.y * maxTimeStepsPerFile)
// Create netcdf file for debugging
if createNetcdf && !onlyGeneratePreviousDays {
logger.info("Generating NetCDF file for \(variable)")
try FileManager.default.createDirectory(atPath: domain.downloadDirectory, withIntermediateDirectories: true)
let ncFile = try NetCDF.create(path: "\(domain.downloadDirectory)\(variable.omFileName.file).nc", overwriteExisting: true)
try ncFile.setAttribute("TITLE", "\(domain) \(variable)")
var ncVariable = try ncFile.createVariable(name: "data", type: Float.self, dimensions: [
try ncFile.createDimension(name: "time", length: time.count),
try ncFile.createDimension(name: "member", length: nMembers),
try ncFile.createDimension(name: "LAT", length: grid.ny),
try ncFile.createDimension(name: "LON", length: grid.nx)
])
for reader in readers {
let data = try reader.reader.read()
let nt = reader.time.count
let timeArrayIndex = time.index(of: reader.time.range.lowerBound)!
if nt > 1 {
let fastSpace = Array2DFastTime(data: data, nLocations: grid.count, nTime: nt).transpose().data
try ncVariable.write(fastSpace, offset: [timeArrayIndex, reader.member, 0, 0], count: [nt, 1, grid.ny, grid.nx])
} else {
try ncVariable.write(data, offset: [timeArrayIndex, reader.member, 0, 0], count: [1, 1, grid.ny, grid.nx])
}
}
}
let progress = TransferAmountTracker(logger: logger, totalSize: nx*ny*time.count*nMembers*MemoryLayout<Float>.size, name: "Convert \(variable.rawValue)\(nMembersStr) \(time.prettyString())")
try om.updateFromTimeOrientedStreaming3D(variable: variable.omFileName.file, time: time, scalefactor: variable.scalefactor, compression: compression, onlyGeneratePreviousDays: onlyGeneratePreviousDays) { (yRange, xRange, memberRange) in
let nLoc = yRange.count * xRange.count
data3d.data.fillWithNaNs()
for reader in readers {
let dimensions = reader.reader.getDimensions()
let timeArrayIndex = time.index(of: reader.time.range.lowerBound)!
if dimensions.count == 3 {
/// Number of time steps in this file
let nt = dimensions[2]
try reader.reader.read(into: &readTemp, range: [yRange, xRange, 0..<nt])
data3d[0..<nLoc, reader.member, timeArrayIndex ..< timeArrayIndex+Int(nt)] = readTemp[0..<nLoc*Int(nt)]
} else {
// Single time step
try reader.reader.read(into: &readTemp, range: [yRange, xRange])
data3d[0..<nLoc, reader.member, timeArrayIndex] = readTemp[0..<nLoc]
}
}
// Interpolate all missing values
data3d.interpolateInplace(
type: variable.interpolation,
time: time,
grid: domain.grid,
locationRange: RegularGridSlice(grid: domain.grid, yRange: Int(yRange.lowerBound) ..< Int(yRange.upperBound), xRange: Int(xRange.lowerBound) ..< Int(xRange.upperBound))
)
progress.add(nLoc * memberRange.count * time.count * MemoryLayout<Float>.size)
return data3d.data[0..<nLoc * memberRange.count * time.count]
}
progress.finish()
}
}
}
actor GenericVariableHandleStorage {
var handles = [GenericVariableHandle]()
func append(_ element: GenericVariableHandle) {
handles.append(element)
}
func append(_ element: GenericVariableHandle?) {
guard let element else {
return
}
handles.append(element)
}
func append(contentsOf elements: [GenericVariableHandle]) {
handles.append(contentsOf: elements)
}
}
/// Thread safe storage for downloading grib messages. Can be used to post process data.
actor VariablePerMemberStorage<V: Hashable> {
struct VariableAndMember: Hashable {
let variable: V
let timestamp: Timestamp
let member: Int
func with(variable: V, timestamp: Timestamp? = nil) -> VariableAndMember {
.init(variable: variable, timestamp: timestamp ?? self.timestamp, member: self.member)
}
var timestampAndMember: TimestampAndMember {
return .init(timestamp: timestamp, member: member)
}
}
struct TimestampAndMember: Equatable {
let timestamp: Timestamp
let member: Int
}
var data = [VariableAndMember: Array2D]()
init(data: [VariableAndMember : Array2D] = [VariableAndMember: Array2D]()) {
self.data = data
}
func set(variable: V, timestamp: Timestamp, member: Int, data: Array2D) {
self.data[.init(variable: variable, timestamp: timestamp, member: member)] = data
}
func get(variable: V, timestamp: Timestamp, member: Int) -> Array2D? {
return data[.init(variable: variable, timestamp: timestamp, member: member)]
}
func get(_ variable: VariableAndMember) -> Array2D? {
return data[variable]
}
}
extension VariablePerMemberStorage {
/// Calculate wind speed and direction from U/V components for all available members an timesteps.
/// if `trueNorth` is given, correct wind direction due to rotated grid projections. E.g. DMI HARMONIE AROME using LambertCC
func calculateWindSpeed(u: V, v: V, outSpeedVariable: GenericVariable, outDirectionVariable: GenericVariable?, writer: OmFileWriterHelper, trueNorth: [Float]? = nil) throws -> [GenericVariableHandle] {
return try self.data
.groupedPreservedOrder(by: {$0.key.timestampAndMember})
.flatMap({ (t, handles) -> [GenericVariableHandle] in
guard let u = handles.first(where: {$0.key.variable == u}), let v = handles.first(where: {$0.key.variable == v}) else {
return []
}
let speed = zip(u.value.data, v.value.data).map(Meteorology.windspeed)
let speedHandle = GenericVariableHandle(
variable: outSpeedVariable,
time: t.timestamp,
member: t.member,
fn: try writer.writeTemporary(compressionType: .pfor_delta2d_int16, scalefactor: outSpeedVariable.scalefactor, all: speed)
)
if let outDirectionVariable {
var direction = Meteorology.windirectionFast(u: u.value.data, v: v.value.data)
if let trueNorth {
direction = zip(direction, trueNorth).map({($0-$1+360).truncatingRemainder(dividingBy: 360)})
}
let directionHandle = GenericVariableHandle(
variable: outDirectionVariable,
time: t.timestamp,
member: t.member,
fn: try writer.writeTemporary(compressionType: .pfor_delta2d_int16, scalefactor: outDirectionVariable.scalefactor, all: direction)
)
return [speedHandle, directionHandle]
}
return [speedHandle]
}
)
}
/// Generate elevation file
/// - `elevation`: in metres
/// - `landMask` 0 = sea, 1 = land. Fractions below 0.5 are considered sea.
func generateElevationFile(elevation: V, landmask: V, domain: GenericDomain) throws {
let elevationFile = domain.surfaceElevationFileOm
if FileManager.default.fileExists(atPath: elevationFile.getFilePath()) {
return
}
guard var elevation = self.data.first(where: {$0.key.variable == elevation})?.value.data,
let landMask = self.data.first(where: {$0.key.variable == landmask})?.value.data else {
return
}
try elevationFile.createDirectory()
for i in elevation.indices {
if elevation[i] >= 9000 {
fatalError("Elevation greater 90000")
}
if landMask[i] < 0.5 {
// mask sea
elevation[i] = -999
}
}
#if Xcode
try Array2D(data: elevation, nx: domain.grid.nx, ny: domain.grid.ny).writeNetcdf(filename: domain.surfaceElevationFileOm.getFilePath().replacingOccurrences(of: ".om", with: ".nc"))
#endif
try elevation.writeOmFile2D(file: elevationFile.getFilePath(), grid: domain.grid, createNetCdf: false)
}
}
/// Keep values from previous timestep. Actori isolated, because of concurrent data conversion
actor GribDeaverager {
var data: [String: (step: Int, data: [Float])]
/// Set new value and get previous value out
func set(variable: GenericVariable, member: Int, step: Int, data d: [Float]) -> (step: Int, data: [Float])? {
let key = "\(variable)_member\(member)"
let previous = data[key]
data[key] = (step, d)
return previous
}
/// Make a deep copy
func copy() -> GribDeaverager {
return .init(data: data)
}
public init(data: [String : (step: Int, data: [Float])] = [String: (step: Int, data: [Float])]()) {
self.data = data
}
/// Returns false if step should be skipped
func deaccumulateIfRequired(variable: GenericVariable, member: Int, stepType: String, stepRange: String, grib2d: inout GribArray2D) async -> Bool {
// Deaccumulate precipitation
if stepType == "accum" {
guard let (startStep, currentStep) = stepRange.splitTo2Integer(), startStep != currentStep else {
return false
}
// Store data for next timestep
let previous = set(variable: variable, member: member, step: currentStep, data: grib2d.array.data)
// For the overall first timestep or the first step of each repeating section, deaveraging is not required
if let previous, previous.step != startStep, currentStep > previous.step {
for l in previous.data.indices {
grib2d.array.data[l] -= previous.data[l]
}
}
}
// Deaverage data
if stepType == "avg" {
guard let (startStep, currentStep) = stepRange.splitTo2Integer(), startStep != currentStep else {
return false
}
// Store data for next timestep
let previous = set(variable: variable, member: member, step: currentStep, data: grib2d.array.data)
// For the overall first timestep or the first step of each repeating section, deaveraging is not required
if let previous, previous.step != startStep, currentStep > previous.step {
let deltaHours = Float(currentStep - startStep)
let deltaHoursPrevious = Float(previous.step - startStep)
for l in previous.data.indices {
grib2d.array.data[l] = (grib2d.array.data[l] * deltaHours - previous.data[l] * deltaHoursPrevious) / (deltaHours - deltaHoursPrevious)
}
}
}
return true
}
}