import Foundation import OmFileFormat /// Read any time from multiple files struct OmFileSplitter { let domain: DomainRegistry let masterTimeRange: Range? let hasYearlyFiles: Bool /// actually also in file var nLocations: Int { nx * ny * nMembers } let ny: Int let nx: Int /// Number of ensemble members or levels let nMembers: Int /// actually also in file let nTimePerFile: Int /// Compression ratio largly depends on chunk size for location. For small timeranges e.g. icon-d2 (121 timesteps per file the nlocation parameter effects the file size: /// 1 = 2.09 GB /// 2 = 1.15 GB /// 6 = 870.7 MB /// 12 = 739.4 MB /// 24 = 683.7 MB /// 33 = 669.2 MB /// 48 = 650 MB /// 96 = 637.1 MB /// Decompress performance is mostly the same, because the chunks are so small, that IO overhead is more dominant than CPU cycles let chunknLocations: Int /// With dynamic nLocation selection based on time, we get chunk locations for each domain. Minimum is set to 6, because spatial correlation does not work well with lower than 6 steps /// icon = 12 /// icon-eu = 16 /// icon-d2 = 25 /// ecmwf = 30 /// NOTE: carefull with reducing nchunkLocaiton, because updates will use the wrong buffer size!!!! static func calcChunknLocations(nTimePerFile: Int) -> Int { max(6, 3072 / nTimePerFile) } init(_ domain: Domain, nMembers: Int? = nil, chunknLocations: Int? = nil) { self.init( domain: domain.domainRegistry, nMembers: max(nMembers ?? 1, 1), nx: domain.grid.nx, ny: domain.grid.ny, nTimePerFile: domain.omFileLength, hasYearlyFiles: domain.hasYearlyFiles, masterTimeRange: domain.masterTimeRange, chunknLocations: chunknLocations ) } init(domain: DomainRegistry, nMembers: Int, nx: Int, ny: Int, nTimePerFile: Int, hasYearlyFiles: Bool, masterTimeRange: Range?, chunknLocations: Int? = nil) { self.domain = domain self.nMembers = nMembers self.nx = nx self.ny = ny self.nTimePerFile = nTimePerFile self.hasYearlyFiles = hasYearlyFiles self.masterTimeRange = masterTimeRange let nLocations = nx * ny * nMembers self.chunknLocations = chunknLocations ?? min(nLocations, Self.calcChunknLocations(nTimePerFile: nTimePerFile)) } // optimise to use 8 MB memory, but aligned to even `chunknLocations` var nLocationsPerChunk: Int { min(nLocations, 8*1024*1024 / MemoryLayout.stride / nTimePerFile / chunknLocations * chunknLocations) } /// Prefetch all required data into memory func willNeed(variable: String, location: Range, level: Int, time: TimerangeDtAndSettings) throws { // TODO: maybe we can keep the file handles better in scope let indexTime = time.time.toIndexTime() let nTime = indexTime.count /// If yearly files are present, the start parameter is moved to read fewer files later var start = indexTime.lowerBound if let masterTimeRange { let fileTime = TimerangeDt(range: masterTimeRange, dtSeconds: time.dtSeconds).toIndexTime() if let offsets = indexTime.intersect(fileTime: fileTime), let omFile = try OmFileManager.get(.domainChunk(domain: domain, variable: variable, type: .master, chunk: 0, ensembleMember: time.ensembleMember, previousDay: time.previousDay)) { try omFile.willNeed3D(ny: ny, nx: nx, nTime: nTime, nMembers: nMembers, location: location, level: level, timeOffsets: offsets) start = fileTime.upperBound } } if start >= indexTime.upperBound { return } if hasYearlyFiles { let startYear = time.range.lowerBound.toComponents().year /// end year is included in itteration range let endYear = time.range.upperBound.add(-1 * time.dtSeconds).toComponents().year for year in startYear ... endYear { let yeartime = TimerangeDt(start: Timestamp(year, 1, 1), to: Timestamp(year+1, 1, 1), dtSeconds: time.dtSeconds) /// as index let fileTime = yeartime.toIndexTime() guard let offsets = indexTime.intersect(fileTime: fileTime) else { continue } guard let omFile = try OmFileManager.get(.domainChunk(domain: domain, variable: variable, type: .year, chunk: year, ensembleMember: time.ensembleMember, previousDay: time.previousDay)) else { continue } try omFile.willNeed3D(ny: ny, nx: nx, nTime: nTime, nMembers: nMembers, location: location, level: level, timeOffsets: offsets) start = fileTime.upperBound } } if start >= indexTime.upperBound { return } let subring = start ..< indexTime.upperBound for timeChunk in subring.divideRoundedUp(divisor: nTimePerFile) { let fileTime = timeChunk * nTimePerFile ..< (timeChunk+1) * nTimePerFile guard let offsets = indexTime.intersect(fileTime: fileTime) else { continue } guard let omFile = try OmFileManager.get(.domainChunk(domain: domain, variable: variable, type: .chunk, chunk: timeChunk, ensembleMember: time.ensembleMember, previousDay: time.previousDay)) else { continue } try omFile.willNeed3D(ny: ny, nx: nx, nTime: nTime, nMembers: nMembers, location: location, level: level, timeOffsets: offsets) } } func read2D(variable: String, location: Range, level: Int, time: TimerangeDtAndSettings) throws -> Array2DFastTime { let data = try read(variable: variable, location: location, level: level, time: time) return Array2DFastTime(data: data, nLocations: location.count, nTime: time.time.count) } func read(variable: String, location: Range, level: Int, time: TimerangeDtAndSettings) throws -> [Float] { let indexTime = time.time.toIndexTime() let nTime = indexTime.count var start = indexTime.lowerBound /// If yearly files are present, the start parameter is moved to read fewer files later var out = [Float](repeating: .nan, count: nTime * location.count) if let masterTimeRange { let fileTime = TimerangeDt(range: masterTimeRange, dtSeconds: time.dtSeconds).toIndexTime() if let offsets = indexTime.intersect(fileTime: fileTime), let omFile = try OmFileManager.get(.domainChunk(domain: domain, variable: variable, type: .master, chunk: 0, ensembleMember: time.ensembleMember, previousDay: time.previousDay)) { try omFile.read3D(into: &out, ny: ny, nx: nx, nTime: nTime, nMembers: nMembers, location: location, level: level, timeOffsets: offsets) start = fileTime.upperBound } } if hasYearlyFiles { let startYear = time.range.lowerBound.toComponents().year /// end year is included in itteration range let endYear = time.range.upperBound.add(-1 * time.dtSeconds).toComponents().year for year in startYear ... endYear { let yeartime = TimerangeDt(start: Timestamp(year, 1, 1), to: Timestamp(year+1, 1, 1), dtSeconds: time.dtSeconds) /// as index let fileTime = yeartime.toIndexTime() guard let offsets = indexTime.intersect(fileTime: fileTime) else { continue } guard let omFile = try OmFileManager.get(.domainChunk(domain: domain, variable: variable, type: .year, chunk: year, ensembleMember: time.ensembleMember, previousDay: time.previousDay)) else { continue } try omFile.read3D(into: &out, ny: ny, nx: nx, nTime: nTime, nMembers: nMembers, location: location, level: level, timeOffsets: offsets) start = fileTime.upperBound } } let delta = start - indexTime.lowerBound if start >= indexTime.upperBound { return out } let subring = start ..< indexTime.upperBound for timeChunk in subring.divideRoundedUp(divisor: nTimePerFile) { let fileTime = timeChunk * nTimePerFile ..< (timeChunk+1) * nTimePerFile guard let offsets = subring.intersect(fileTime: fileTime) else { continue } guard let omFile = try OmFileManager.get(.domainChunk(domain: domain, variable: variable, type: .chunk, chunk: timeChunk, ensembleMember: time.ensembleMember, previousDay: time.previousDay)) else { continue } try omFile.read3D(into: &out, ny: ny, nx: nx, nTime: nTime, nMembers: nMembers, location: location, level: level, timeOffsets: (offsets.file, offsets.array.add(delta))) } return out } /** Write new data to the archived storage and combine it with existint data. Updates are done in chunks to keep memory size low. Otherwise ICON update would take 4+ GB memory for just this function. TODO: should use Array3DFastTime */ func updateFromTimeOriented(variable: String, array2d: Array2DFastTime, time: TimerangeDt, scalefactor: Float, compression: CompressionType = .pfor_delta2d_int16) throws { precondition(array2d.nTime == time.count) precondition(array2d.nLocations == nx * ny) // Process at most 8 MB at once try updateFromTimeOrientedStreaming3D(variable: variable, time: time, scalefactor: scalefactor, compression: compression, onlyGeneratePreviousDays: false) { (yRange, xRange, memberRange) in guard yRange.count == 1 || xRange.count == nx else { fatalError("chunk dimensions need to be either parts of X or a mutliple or X") } let start = Int(yRange.lowerBound) * nx + Int(xRange.lowerBound) let count = Int(yRange.count * xRange.count) let locationRange = start ..< start+count let dataRange = locationRange.multiply(array2d.nTime) return array2d.data[dataRange] } } /** Write new data to archived storage and combine it with existing data. `supplyChunk` should provide data for a couple of thousands locations at once. Upates are done streamlingly to low memory usage */ func updateFromTimeOrientedStreaming3D(variable: String, time: TimerangeDt, scalefactor: Float, compression: CompressionType = .pfor_delta2d_int16, onlyGeneratePreviousDays: Bool, supplyChunk: (_ y: Range, _ x: Range, _ member: Range) throws -> ArraySlice) throws { let indexTime = time.toIndexTime() let indextimeChunked = indexTime.divideRoundedUp(divisor: nTimePerFile) /// Previous days of forecast to keep. Max 7 past days /// `0..<1` if previous days are not generated /// `1.. = onlyGeneratePreviousDays ? (1..? let writeFile: OmFileWriter let write: OmFileWriterArray let writeFn: FileHandle let offsets: (file: CountableRange, array: CountableRange) let fileName: String let skip: Int } // open all files for all timeranges and write a header let writers: [WriterPerStep] = try indextimeChunked.flatMap { timeChunk -> [WriterPerStep] in let fileTime = timeChunk * nTimePerFile ..< (timeChunk+1) * nTimePerFile guard let offsets = indexTime.intersect(fileTime: fileTime) else { return [] } return try previousDaysRange.map { previousDay -> WriterPerStep in let skip = previousDay * 86400 / time.dtSeconds let readFile = OmFileManagerReadable.domainChunk(domain: domain, variable: variable, type: .chunk, chunk: timeChunk, ensembleMember: 0, previousDay: previousDay) try readFile.createDirectory() let tempFile = readFile.getFilePath() + "~" // Another process might be updating this file right now. E.g. Second flush of GFS ensemble FileManager.default.waitIfFileWasRecentlyModified(at: tempFile) try FileManager.default.removeItemIfExists(at: tempFile) let fn = try FileHandle.createNewFile(file: tempFile) let omRead = try? OmFileReader(file: readFile.getFilePath()) let writeFile = OmFileWriter(fn: fn, initialCapacity: 1024*1024) let writer = try writeFile.prepareArray( type: Float.self, dimensions: nMembers <= 1 ? [UInt64(ny), UInt64(nx), UInt64(nTimePerFile)] : [UInt64(ny), UInt64(nx), UInt64(nMembers), UInt64(nTimePerFile)], chunkDimensions: nMembers <= 1 ? [1, UInt64(chunknLocations), UInt64(nTimePerFile)] : [1, UInt64(chunknLocations), 1, UInt64(nTimePerFile)], compression: compression, scale_factor: scalefactor, add_offset: 0 ) return WriterPerStep(read: omRead, writeFile: writeFile, write: writer, writeFn: fn, offsets: offsets, fileName: readFile.getFilePath(), skip: skip) } } let nIndexTime = indexTime.count /// Spatial files use chunks multiple time larger than the final chunk. E.g. [15,526] will be [1,15] in the final time-series file let spatialChunks = OmFileSplitter.calculateSpatialXYChunk(domain: domain.getDomain(), nMembers: nMembers, nTime: 1) var fileData = [Float](repeating: .nan, count: spatialChunks.y * spatialChunks.x * nTimePerFile * nMembers) for yStart in stride(from: 0, to: UInt64(ny), by: UInt64.Stride(spatialChunks.y)) { for xStart in stride(from: 0, to: UInt64(nx), by: UInt64.Stride(spatialChunks.x)) { let yRange = yStart ..< min(yStart + UInt64(spatialChunks.y), UInt64(ny)) let xRange = xStart ..< min(xStart + UInt64(spatialChunks.x), UInt64(nx)) let memberRange = 0 ..< UInt64(nMembers) // Contains the entire time-series to be updated for a chunks of locations let data = try supplyChunk(yRange, xRange, memberRange) // TODO check if chunks need to be reorganised for ensemble files!!! for writer in writers { if let omRead = writer.read?.asArray(of: Float.self) { // Read existing data for a range of locations let dimensions = omRead.getDimensions() switch dimensions.count { case 2: // Old legacy file if dimensions[0] == UInt64(ny * nx) { // Dimensions are ok, read data. Ignores legacy ensemble files let start = yRange.lowerBound * UInt64(nx) + xRange.lowerBound let count = UInt64(yRange.count * xRange.count) try omRead.read( into: &fileData, range: [start ..< start + count, 0.., level: Int, timeOffsets: (file: CountableRange, array: CountableRange)) throws { let dimensions = self.getDimensions() switch dimensions.count { case 2: // Legacy files use 2 dimensions and flatten XY coordinates let dim0 = Int(dimensions[0]) //let dim1 = Int(dimensions[1]) guard dim0 % (nx*ny) == 0 else { return // in case dimensions got change and do not agree anymore, ignore this file } /// Even worse, they also flatten `levels` dimensions which is used for ensemble files let nLevels = dim0 / (nx*ny) if nLevels > 1 && location.count > 1 { fatalError("Multi level and multi location not supported") } guard level < nLevels else { return } let nLocations = UInt64(location.count) let dim0Range = location.lowerBound * nLevels + level ..< location.lowerBound * nLevels + level + location.count try read( into: &into, range: [dim0Range.toUInt64(), timeOffsets.file.toUInt64()], intoCubeOffset: [0, UInt64(timeOffsets.array.lowerBound)], intoCubeDimension: [nLocations, UInt64(nTime)] ) case 3: // File uses dimensions [ny,nx,ntime] guard ny == dimensions[0], nx == dimensions[1] else { return } let x = UInt64(location.lowerBound % nx) ..< UInt64((location.upperBound-1) % nx) + 1 let y = UInt64(location.lowerBound / nx) ..< UInt64(location.lowerBound / nx + 1) let fileTime = UInt64(timeOffsets.file.lowerBound) ..< UInt64(timeOffsets.file.upperBound) let range = [y, x, fileTime] do { try read( into: &into, range: range, intoCubeOffset: [0, 0, UInt64(timeOffsets.array.lowerBound)], intoCubeDimension: [UInt64(y.count), UInt64(x.count), UInt64(nTime)] ) } catch OmFileFormatSwiftError.omDecoder(let error) { print("\(error) range=\(range) [ny=\(ny) nx=\(nx) nTime=\(nTime) location=\(location) nMembers=\(nMembers) level=\(level) timeOffsets=\(timeOffsets)]") throw OmFileFormatSwiftError.omDecoder(error: "\(error) range=\(range) [ny=\(ny) nx=\(nx) nTime=\(nTime) location=\(location) nMembers=\(nMembers) level=\(level) timeOffsets=\(timeOffsets)]") } case 4: // File uses dimensions [ny,nx,nLevel,ntime] //print("4D \(dimensions.map{Int($0)}) ny\(ny) nx\(nx) nMembers\(nMembers) l\(level)") guard ny == dimensions[0], nx == dimensions[1], level < dimensions[2] else { return } let x = UInt64(location.lowerBound % nx) ..< UInt64((location.upperBound-1) % nx) + 1 let y = UInt64(location.lowerBound / nx) ..< UInt64(location.lowerBound / nx + 1) let l = UInt64(level) ..< UInt64(level+1) let fileTime = UInt64(timeOffsets.file.lowerBound) ..< UInt64(timeOffsets.file.upperBound) let range = [y, x, l, fileTime] do { try read( into: &into, range: range, intoCubeOffset: [0, 0, 0, UInt64(timeOffsets.array.lowerBound)], intoCubeDimension: [UInt64(y.count), UInt64(x.count), 1, UInt64(nTime)] ) } catch OmFileFormatSwiftError.omDecoder(let error) { print("\(error) range=\(range) [ny=\(ny) nx=\(nx) nTime=\(nTime) location=\(location) nMembers=\(nMembers) level=\(level) timeOffsets=\(timeOffsets)]") throw OmFileFormatSwiftError.omDecoder(error: "\(error) range=\(range) [ny=\(ny) nx=\(nx) nTime=\(nTime) location=\(location) nMembers=\(nMembers) level=\(level) timeOffsets=\(timeOffsets)]") } default: fatalError("ndims not implemented") } } /// Prefetch data for fast access. Switch between old legacy files and new multi dimensional files /// Note: `nTime` is the output array nTime. It is not the file nTime! /// /// TODO: nMembers variable is wrong if called via API controller. Aways 1 func willNeed3D(ny: Int, nx: Int, nTime: Int, nMembers: Int, location: Range, level: Int, timeOffsets: (file: CountableRange, array: CountableRange)) throws { let dimensions = self.getDimensions() switch dimensions.count { case 2: // Legacy files use 2 dimensions and flatten XY coordinates let dim0 = Int(dimensions[0]) //let dim1 = Int(dimensions[1]) guard dim0 % (nx*ny) == 0 else { return // in case dimensions got change and do not agree anymore, ignore this file } /// Even worse, they also flatten `levels` dimensions which is used for ensemble files let nLevels = dim0 / (nx*ny) if nLevels > 1 && location.count > 1 { fatalError("Multi level and multi location not supported") } guard level < nLevels else { return } let dim0Range = location.lowerBound * nLevels + level ..< location.lowerBound * nLevels + level + location.count try willNeed(range: [dim0Range.toUInt64(), timeOffsets.file.toUInt64()]) case 3: // File uses dimensions [ny,nx,ntime] guard ny == dimensions[0], nx == dimensions[1] else { return } let x = UInt64(location.lowerBound % nx) ..< UInt64((location.upperBound-1) % nx) + 1 let y = UInt64(location.lowerBound / nx) ..< UInt64(location.lowerBound / nx + 1) let fileTime = UInt64(timeOffsets.file.lowerBound) ..< UInt64(timeOffsets.file.upperBound) let range = [y, x, fileTime] do { try willNeed(range: range) } catch OmFileFormatSwiftError.omDecoder(let error) { print("\(error) range=\(range) [ny=\(ny) nx=\(nx) nTime=\(nTime) location=\(location) nMembers=\(nMembers) level=\(level) timeOffsets=\(timeOffsets)]") throw OmFileFormatSwiftError.omDecoder(error: "\(error) range=\(range) [ny=\(ny) nx=\(nx) nTime=\(nTime) location=\(location) nMembers=\(nMembers) level=\(level) timeOffsets=\(timeOffsets)]") } case 4: // File uses dimensions [ny,nx,nLevel,ntime] guard ny == dimensions[0], nx == dimensions[1], level < dimensions[2] else { return } let x = UInt64(location.lowerBound % nx) ..< UInt64((location.upperBound-1) % nx) + 1 let y = UInt64(location.lowerBound / nx) ..< UInt64(location.lowerBound / nx + 1) let l = UInt64(level) ..< UInt64(level+1) let fileTime = UInt64(timeOffsets.file.lowerBound) ..< UInt64(timeOffsets.file.upperBound) let range = [y, x, l, fileTime] do { try willNeed(range: range) } catch OmFileFormatSwiftError.omDecoder(let error) { print("\(error) range=\(range) [ny=\(ny) nx=\(nx) nTime=\(nTime) location=\(location) nMembers=\(nMembers) level=\(level) timeOffsets=\(timeOffsets)]") throw OmFileFormatSwiftError.omDecoder(error: "\(error) range=\(range) [ny=\(ny) nx=\(nx) nTime=\(nTime) location=\(location) nMembers=\(nMembers) level=\(level) timeOffsets=\(timeOffsets)]") } default: fatalError("ndims not implemented") } } } extension OmFileSplitter { /// Prepare a write to store individual time-steps as spatial encoded files /// This makes it easier to migrate to the new file format writer /// If `nTime` is set, the spatial file contains TIME SERIES oriented steps as well static func makeSpatialWriter(domain: GenericDomain, nMembers: Int = 1, nTime: Int = 1) -> OmFileWriterHelper { let chunks = calculateSpatialXYChunk(domain: domain, nMembers: nMembers, nTime: nTime) if nTime > 1 { return OmFileWriterHelper(dimensions: [domain.grid.ny, domain.grid.nx, nTime], chunks: [chunks.y, chunks.x, nTime]) } return OmFileWriterHelper(dimensions: [domain.grid.ny, domain.grid.nx], chunks: [chunks.y, chunks.x]) } static func calculateSpatialXYChunk(domain: GenericDomain, nMembers: Int, nTime: Int) -> (y: Int, x: Int) { let splitter = OmFileSplitter(domain, nMembers: nMembers/*, chunknLocations: nMembers > 1 ? nMembers : nil*/) let nx = domain.grid.nx let ny = domain.grid.ny let nTimePerFile = splitter.nTimePerFile // domain.omFileLength let chunknLocations = splitter.chunknLocations // max(6, 3072 / nTimePerFile) let xchunks = max(1, min(nx, 8*1024*1024 / MemoryLayout.stride / nTimePerFile / nTime / nMembers / chunknLocations * chunknLocations)) let ychunks = max(1, min(ny, 8*1024*1024 / MemoryLayout.stride / nTimePerFile / nTime / nMembers / xchunks)) //print("Chunks [\(ychunks),\(xchunks)] nTimePerFile=\(nTimePerFile) chunknLocations=\(chunknLocations)") return (ychunks, xchunks) } } extension Range where Bound == Int { func toUInt64() -> Range { .init(uncheckedBounds: (UInt64(lowerBound), UInt64(upperBound))) } }