open-wether / Sources /App /Commands /MigrationCommand.swift
soiz1's picture
Migrated from GitHub
6ee917b verified
import Foundation
import Vapor
import OmFileFormat
/**
Upgrade legacy om-files to new version. Transposes data to proper 3d context.
*/
struct MigrationCommand: AsyncCommand {
struct Signature: CommandSignature {
@Flag(name: "execute", help: "Perform file moves")
var execute: Bool
}
var help: String {
"Perform database migration"
}
func run(using context: CommandContext, signature: Signature) async throws {
let logger = context.application.logger
// loop over data directory
let execute = signature.execute
let pathUrl = URL(fileURLWithPath: OpenMeteo.dataDirectory, isDirectory: true)
let resourceKeys = Set<URLResourceKey>([.nameKey, .isDirectoryKey, .contentModificationDateKey, .fileSizeKey])
guard let directoryEnumerator = FileManager.default.enumerator(at: pathUrl, includingPropertiesForKeys: Array(resourceKeys), options: [.skipsHiddenFiles, .skipsSubdirectoryDescendants]) else {
logger.warning("No files at \(pathUrl)")
return
}
for case let fileURL as URL in directoryEnumerator {
guard let resourceValues = try? fileURL.resourceValues(forKeys: resourceKeys),
let isDirectory = resourceValues.isDirectory,
let name = resourceValues.name,
!name.contains("~"),
isDirectory
else {
continue
}
guard let domain = DomainRegistry(rawValue: name) else {
logger.warning("Skipping \(name)")
continue
}
let grid = domain == .copernicus_dem90 ? nil : domain.getDomain().grid
guard let directoryEnumerator = FileManager.default.enumerator(at: URL(fileURLWithPath: "\(OpenMeteo.dataDirectory)\(name)", isDirectory: true), includingPropertiesForKeys: Array(resourceKeys), options: [.skipsHiddenFiles, .skipsSubdirectoryDescendants]) else {
logger.warning("No files at \(OpenMeteo.dataDirectory)\(name)")
continue
}
for case let fileURL as URL in directoryEnumerator {
guard let resourceValues = try? fileURL.resourceValues(forKeys: resourceKeys),
let isDirectory = resourceValues.isDirectory,
let variable = resourceValues.name,
!variable.contains("~"),
isDirectory
else {
continue
}
let path = "\(OpenMeteo.dataDirectory)\(name)/\(variable)"
guard let directoryEnumerator = FileManager.default.enumerator(at: URL(fileURLWithPath: path, isDirectory: true), includingPropertiesForKeys: Array(resourceKeys), options: [.skipsHiddenFiles, .skipsSubdirectoryDescendants]) else {
logger.warning("No files at \(path)")
continue
}
for case let fileURL as URL in directoryEnumerator {
guard let resourceValues = try? fileURL.resourceValues(forKeys: resourceKeys),
let isDirectory = resourceValues.isDirectory,
let file = resourceValues.name,
!file.contains("~"),
file.suffix(3) == ".om",
!isDirectory
else {
continue
}
//logger.info("Processing \(domain)/\(variable)/\(file)")
try await convertToNewFormat(logger: logger, file: "\(path)/\(file)", grid: grid, execute: execute)
}
}
}
}
/// Read om file and write it as version 3 and reshape data to proper 3d files
/// If no grid is given, assume that files are converted 1:1. This is the case for the DEM model
func convertToNewFormat(logger: Logger, file: String, grid: Gridable?, execute: Bool) async throws {
let temporary = "\(file)~"
FileManager.default.waitIfFileWasRecentlyModified(at: temporary)
try FileManager.default.removeItemIfExists(at: temporary)
// Read data from the input OM file
guard let readfile = try? OmFileReader(file: file) else {
logger.warning("Failed to open file: \(file)")
return
}
guard let reader = readfile.asArray(of: Float.self), readfile.isLegacyFormat() else {
logger.info("File already in new format \(file)")
return
}
let dimensions = Array(reader.getDimensions())
let chunks = Array(reader.getChunkDimensions())
guard dimensions.count == 2 else {
logger.warning("Invalid dimension count \(file)")
return
}
let ny = UInt64(grid?.ny ?? Int(dimensions[0]))
let nx = UInt64(grid?.nx ?? Int(dimensions[1]))
// Simple file like surface elevation
// No need to re-chunk data. Just 1:1 migration
if dimensions.count == 2 && ny == dimensions[0] && nx == dimensions[1] {
logger.info("Migrate simple \(file) dimensions=\(dimensions) chunks=\(chunks)")
guard execute else {
return
}
let writeFn = try FileHandle.createNewFile(file: temporary)
let fileWriter = OmFileWriter(fn: writeFn, initialCapacity: 1024 * 1024 * 10)
let writer = try fileWriter.prepareArray(
type: Float.self,
dimensions: dimensions,
chunkDimensions: chunks,
compression: reader.compression,
scale_factor: reader.scaleFactor,
add_offset: reader.addOffset
)
let data = try reader.read()
try writer.writeData(array: data)
let variable = try fileWriter.write(
array: try writer.finalise(),
name: "",
children: []
)
try fileWriter.writeTrailer(rootVariable: variable)
try writeFn.close()
/// Read data again to ensure the written data matches exactly
guard let verify = try OmFileReader(file: temporary).asArray(of: Float.self)?.read() else {
fatalError("Could not read temporary file")
}
guard data.isSimilar(verify) else {
fatalError("Data does not match")
}
try FileManager.default.moveFileOverwrite(from: temporary, to: file)
return
}
guard dimensions.count == 2, nx * ny == dimensions[0] else {
logger.warning("Dimensions do not agree \(file). E.g. no support for ensemble files")
return
}
let nt = dimensions[1]
let dimensionsOut = [ny, nx, nt]
let chunksOut = [1,chunks[0],chunks[1]]
logger.info("Migrate \(file) new dimensions=\(dimensionsOut) chunks=\(chunksOut)")
guard execute else {
return
}
let writeFn = try FileHandle.createNewFile(file: temporary)
let fileWriter = OmFileWriter(fn: writeFn, initialCapacity: 1024 * 1024 * 10)
let writer = try fileWriter.prepareArray(
type: Float.self,
dimensions: dimensionsOut,
chunkDimensions: chunksOut,
compression: reader.compression,
scale_factor: reader.scaleFactor,
add_offset: reader.addOffset
)
let progress = TransferAmountTracker(logger: logger, totalSize: 4 * Int(dimensions.reduce(1, *)), name: "Convert")
/// Reshape data from flattened 2D to 3D context
for yStart in stride(from: 0, to: ny, by: UInt64.Stride(chunksOut[0])) {
for xStart in stride(from: 0, to: nx, by: UInt64.Stride(chunksOut[1])) {
for tStart in stride(from: 0, to: nt, by: UInt64.Stride(chunksOut[2])) {
let yRange = yStart ..< min(yStart + chunksOut[0], ny)
let xRange = xStart ..< min(xStart + chunksOut[1], nx)
let tRange = tStart ..< min(tStart + chunksOut[2], nt)
//print("chunk y=\(yRange) x=\(xRange) t=\(tRange)")
var chunk = [Float](repeating: .nan, count: yRange.count * xRange.count * tRange.count)
for (row, y) in yRange.enumerated() {
try reader.read(
into: &chunk,
range: [y * nx + xRange.startIndex ..< y * nx + xRange.endIndex, tRange],
intoCubeOffset: [UInt64(row * xRange.count), 0],
intoCubeDimension: [UInt64(yRange.count * xRange.count), UInt64(tRange.count)]
)
}
try writer.writeData(
array: chunk,
arrayDimensions: [UInt64(yRange.count), UInt64(xRange.count), UInt64(tRange.count)],
arrayOffset: nil,
arrayCount: nil
)
progress.add(chunk.count * 4)
}
}
}
progress.finish()
let variable = try fileWriter.write(
array: try writer.finalise(),
name: "",
children: []
)
try fileWriter.writeTrailer(rootVariable: variable)
try writeFn.close()
/// Read data again to ensure the written data matches exactly
guard let verify = try OmFileReader(file: temporary).asArray(of: Float.self) else {
fatalError("Could not read temporary file")
}
let progressVerify = TransferAmountTracker(logger: logger, totalSize: 4 * Int(dimensions.reduce(1, *)), name: "Verify")
for yStart in stride(from: 0, to: ny, by: UInt64.Stride(chunksOut[0])) {
for xStart in stride(from: 0, to: nx, by: UInt64.Stride(chunksOut[1])) {
for tStart in stride(from: 0, to: nt, by: UInt64.Stride(chunksOut[2])) {
let yRange = yStart ..< min(yStart + chunksOut[0], ny)
let xRange = xStart ..< min(xStart + chunksOut[1], nx)
let tRange = tStart ..< min(tStart + chunksOut[2], nt)
var chunk = [Float](repeating: .nan, count: yRange.count * xRange.count * tRange.count)
for (row, y) in yRange.enumerated() {
try reader.read(
into: &chunk,
range: [y * nx + xRange.startIndex ..< y * nx + xRange.endIndex, tRange],
intoCubeOffset: [UInt64(row * xRange.count), 0],
intoCubeDimension: [UInt64(yRange.count * xRange.count), UInt64(tRange.count)]
)
}
let verifyData = try verify.read(range: [yRange, xRange, tRange])
guard chunk.isSimilar(verifyData) else {
fatalError("Data does not match \(yRange) \(xRange) \(tRange)")
}
progressVerify.add(chunk.count * 4)
}
}
}
progressVerify.finish()
try FileManager.default.moveFileOverwrite(from: temporary, to: file)
}
}
public extension Array where Element == Float {
func isSimilar(_ b: [Float], accuracy: Float = 0.0001) -> Bool {
let a = self
guard a.count == b.count else {
return false
}
for (a1,b1) in zip(a,b) {
if a1.isNaN && b1.isNaN {
continue
}
if a1.isNaN || b1.isNaN || abs(a1 - b1) > accuracy {
return false
}
}
return true
}
}