import Foundation import Vapor /** Download the open-meteo weather database from a S3 server. Arguments: models Weather model domains separated by comma. E.g. 'cmc_gem_gdps,dwd_icon' variables Weather variables separated by comma. E.g. 'temperature_2m,relative_humidity_2m' Options: apikey Sync API key for accessing Open-Meteo servers directly. Not required for AWS open-data. server Server base URL. Default 'https://openmeteo.s3.amazonaws.com/' rate Transfer rate in megabytes per second. Not applicable for AWS open-data. past-days Maximum age of synchronised files. Default 7 days. repeat-interval If set, check for new files every specified amount of minutes. Example to download from a local endpoint DATA_DIRECTORY=/Volumes/2TB_1GBs/data/ API_SYNC_APIKEYS=123 openmeteo-api DATA_DIRECTORY=/Volumes/2TB_1GBs/data2/ openmeteo-api sync cmc_gem_gdps,dwd_icon_d2,dwd_icon temperature_2m --server http://127.0.0.1:8080/ --apikey 123 --past-days 30 --repeat-interval 5 */ struct SyncCommand: AsyncCommand { var help: String { return "Download the open-meteo weather database from a S3 server." } struct Signature: CommandSignature { @Argument(name: "models", help: "Weather model domains separated by comma. E.g. 'cmc_gem_gdps,dwd_icon'") var models: String @Argument(name: "variables", help: "Weather variables separated by comma. E.g. 'temperature_2m,relative_humidity_2m'") var variables: String @Option(name: "apikey", help: "Sync API key for accessing Open-Meteo servers directly. Not required for AWS open-data.") var apikey: String? @Option(name: "server", help: "Server base URL. Default 'https://openmeteo.s3.amazonaws.com/'") var server: String? @Option(name: "rate", help: "Transfer rate in megabytes per second. Not applicable for AWS open-data.") var rate: Int? @Option(name: "past-days", help: "Maximum age of synchronised files. Default 7 days.") var pastDays: Int? @Option(name: "repeat-interval", help: "If set, check for new files every specified amount of minutes.") var repeatInterval: Int? @Option(name: "concurrent", short: "c", help: "Number of concurrent file download. Default 4") var concurrent: Int? @Option(name: "data-directory-max-size-gb", help: "Trim data directory to the speicfied target size in gigabyte GB") var dataDirectoryMaxSize: Int? @Option(name: "cache-directory-max-size-gb", help: "Trim cache directory to the speicfied target size in gigabyte GB") var cacheDirectoryMaxSize: Int? @Flag(name: "execute", help: "Actually perfom file delete on cleanup") var execute: Bool @Option(name: "year", help: "Download one year or a range of years (e.g. 2000-2005)") var year: String? } /// All weather variables that may be available for `previous days API` static var previousDayVariables = ["temperature_2m", "dew_point_2m", "relative_humidity_2m", "precipitation", "snowfall_water_equivalent", "snowfall", "frozen_precipitation_percent", "pressure_msl", "cloud_cover", "wind_u_component_10m", "wind_v_component_10m", "showers", "shortwave_radiation", "direct_radiation", "diffuse_radiation", "wind_gusts_10m", "wind_speed_10m", "wind_direction_10m", "weather_code", "cape", "lifted_index", "wind_speed_80m", "wind_direction_80m", "wind_speed_120m", "wind_direction_120m", "wind_speed_180m", "wind_direction_180m", "wind_u_component_70m", "wind_v_component_70m", "wind_u_component_80m", "wind_v_component_80m", "wind_u_component_100m", "wind_v_component_100m", "wind_u_component_120m", "wind_v_component_120m", "wind_u_component_180m", "wind_v_component_180m"] func run(using context: CommandContext, signature: Signature) async throws { let logger = context.application.logger disableIdleSleep() let serverSet = (signature.server ?? "https://openmeteo.s3.amazonaws.com/").split(separator: ";").map(String.init) for server in serverSet { guard server.last == "/" else { fatalError("Server name must include http and end with a trailing slash.") } } let modelsSet = try signature.models.split(separator: ";").map({ try DomainRegistry.load(commaSeparated: String($0)) }) guard Curl.cacheDirectory == nil else { fatalError("HTTP_CACHE enabled") } guard serverSet.count == modelsSet.count else { fatalError("Number of servers and models sets must be the same") } let variablesSetA = signature.variables.split(separator: ";").map { $0.split(separator: ",").map(String.init) + ["static"] } if variablesSetA.count > 1 && variablesSetA.count != serverSet.count { fatalError("Number of servers and variables sets must be the same") } let variablesSet = variablesSetA.count != serverSet.count ? [[String]](repeating: variablesSetA[0], count: serverSet.count) : variablesSetA let pastDays = signature.pastDays ?? 7 let concurrent = signature.concurrent ?? 4 /// Select all files that contain data within a range of years let yearRange = signature.year.map { yearStr in if yearStr.contains("-") { let split = yearStr.split(separator: "-") guard split.count == 2, split[0].count == 4, split[1].count == 4, let start = Int(split[0]), let end = Int(split[1]), start >= 1800, start <= 2200, end >= 1800, end <= 2200 else { fatalError("year invalid") } return Timestamp(start, 1, 1) ..< Timestamp(end+1, 1, 1) } guard yearStr.count == 4, let year = Int(yearStr), year >= 1800, year <= 2200 else { fatalError("year invalid") } return Timestamp(year, 1, 1) ..< Timestamp(year+1, 1, 1) } /// Download from each server concurrently await zip(serverSet, zip(modelsSet, variablesSet)).foreachConcurrent(nConcurrent: serverSet.count) { (server, arg1) in let (models, variablesSig) = arg1 /// Undocumented switch to download all weather variables. This can generate immense traffic! let downloadAllVariables = variablesSig.contains("really_download_all_variables") let downloadAllButPressureOncePerDay = variablesSig.contains("really_download_all_but_pressure_once_per_day") let downloadAllPreviousDay = variablesSig.contains("really_download_all_previous_day") let downloadAllPressureLevel = variablesSig.contains("really_download_all_pressure_levels") let downloadAllSurface = variablesSig.contains("really_download_all_surface_levels") let variables = downloadAllPreviousDay ? Self.previousDayVariables : variablesSig let curl = Curl(logger: logger, client: context.application.dedicatedHttpClient, retryError4xx: false) var lastPressureDownloadDate = Timestamp.now().with(hour: 0).add(days: -1) while true { /// Used for `really_download_all_but_pressure_once_per_day` to download pressure data only once per day let downloadPressureNow = lastPressureDownloadDate != Timestamp.now().with(hour: 0) do { if let yearRange { let start = yearRange.lowerBound.toComponents().year let end = yearRange.upperBound.toComponents().year-1 logger.info("Checking for files within year \(start)-\(end)") } else { logger.info("Checking for files to with more than \(pastDays) past days data") } let timeRange = yearRange ?? Timestamp.now().add(-24 * 3600 * pastDays) ..< Timestamp(2200, 1, 1) /// Get a list of all variables from all models let remotes: [(DomainRegistry, String)] = try await models.mapConcurrent(nConcurrent: concurrent) { model -> [(DomainRegistry, String)] in let remoteDirectories = try await curl.s3list(server: server, prefix: "data/\(model.rawValue)/", apikey: signature.apikey, deadLineHours: 0.1).directories return remoteDirectories.map { return (model, $0) } }.flatMap({$0}) /// Filter variables to download let toDownload: [S3DataController.S3ListV2File] = try await remotes.mapConcurrent(nConcurrent: concurrent) { (model, remoteDirectory) -> [S3DataController.S3ListV2File] in guard let variablePos = remoteDirectory.dropLast().lastIndex(of: "/") else { fatalError("could not get variable from string") } let variable = remoteDirectory[remoteDirectory.index(after: variablePos).. 0 { try cacheDirectoryCleanup(logger: logger, cacheDirectory: OpenMeteo.dataDirectory, maxSize: dataDirectoryMaxSize * 1<<30, execute: signature.execute) } /*if let cacheDirectoryMaxSize = signature.cacheDirectoryMaxSize, cacheDirectoryMaxSize > 0, let cacheDirectory = OpenMeteo.cacheDirectory { try cacheDirectoryCleanup(logger: logger, cacheDirectory: cacheDirectory, maxSize: cacheDirectoryMaxSize * 1<<30, execute: signature.execute) }*/ logger.info("Repeat in \(repeatInterval) minutes") try await Task.sleep(nanoseconds: UInt64(repeatInterval * 60_000_000_000)) } catch { logger.critical("Error during sync \(error)") fatalError() } if downloadPressureNow { lastPressureDownloadDate = Timestamp.now().with(hour: 0) } } await curl.printStatistics() } } /** Delete old files to trim directory size */ func cacheDirectoryCleanup(logger: Logger, cacheDirectory: String, maxSize: Int, execute: Bool) throws { logger.info("Checking directory size of '\(cacheDirectory)'. Target size \(maxSize.bytesHumanReadable)") if cacheDirectory.isEmpty, maxSize <= 0 { fatalError() } let resourceKeys : [URLResourceKey] = [.isRegularFileKey, .contentModificationDateKey] guard let enumerator = FileManager.default.enumerator( at: URL(fileURLWithPath: cacheDirectory), includingPropertiesForKeys: resourceKeys, options: [.skipsHiddenFiles, .skipsPackageDescendants] ) else { fatalError("Could not get enumerator") } var files = [(file: URL, modifiedAt: Date, size: Int)]() var totalSize: Int = 0 for case let fileURL as URL in enumerator { do { if fileURL.absoluteString.last == "~" { continue } let fileAttributes = try fileURL.resourceValues(forKeys: Set(resourceKeys)) guard fileAttributes.isRegularFile == true, let modificationDate = fileAttributes.contentModificationDate else { continue } let size = fileURL.getAllocatedSize() totalSize += size if !fileURL.absoluteString.contains(".om") || fileURL.absoluteString.contains("/static/") { continue } files.append((fileURL, modificationDate, size)) } catch { print(error, fileURL) } } if totalSize < maxSize { logger.info("OK, Total Size: \(totalSize.bytesHumanReadable)") return } logger.info("Cleanup, current size \(totalSize.bytesHumanReadable), deleting \((totalSize-maxSize).bytesHumanReadable)") // Sort by modification date files.sort(by: {$0.modifiedAt < $1.modifiedAt}) for file in files { guard totalSize > maxSize else { break } if execute { logger.info("Remove file \(file.file), modified at \(file.modifiedAt), size \(file.size.bytesHumanReadable)") do { try FileManager.default.removeItem(at: file.file) } catch { print(error, file.file) } } else { logger.info("[DRY RUN] Would remove file \(file.file), modified at \(file.modifiedAt), size \(file.size.bytesHumanReadable)") } totalSize -= file.size } logger.info("New size \(totalSize.bytesHumanReadable)") } } fileprivate extension URL { /// Get the allocated size on disk. Swift implementation reports incorrect sizes on Linx https://github.com/apple/swift-corelibs-foundation/issues/4885 func getAllocatedSize() -> Int { return withUnsafeFileSystemRepresentation({ path in guard let path else { return 0 } var stats = stat() guard lstat(path, &stats) != -1 else { let error = String(cString: strerror(errno)) fatalError("lstat failed. Error \(errno) \(error)") } // must be 512 https://unix.stackexchange.com/questions/521151/why-is-st-blocks-always-reported-in-512-byte-blocks return Int(stats.st_blocks) * 512 }) } } fileprivate extension Array where Element == S3DataController.S3ListV2File { /// Only include files with data newer than a given timestamp. This is based on evaluating the time-chunk in the filename and is not based on the modification time func includeFiles(timeRange: Range, domain: DomainRegistry) -> [Element] { let omFileLength = domain.getDomain().omFileLength let dtSeconds = domain.getDomain().dtSeconds return self.filter({ file in if file.name.contains("/static/") { return true } let last = file.name.lastIndex(of: "/") ?? file.name.startIndex let name = file.name[file.name.index(after: last).. [Element] { let resourceKeys = Set([.contentModificationDateKey, .fileSizeKey]) return self.filter({ remoteFile in let pathNoData = remoteFile.name[remoteFile.name.index(remoteFile.name.startIndex, offsetBy: 5).. modificationTime.addingTimeInterval(1) } // Add one seconds delay due to inaccuracy in timestamps return remoteFile.fileSize != size || remoteFile.modificationTime > modificationTime.addingTimeInterval(1) }) } } extension StringProtocol { /// Interpret the given string as XML and iterate over a list of keys func xmlSection(_ section: String) -> AnySequence { return AnySequence { () -> AnyIterator in var pos = startIndex return AnyIterator { guard let start = range(of: "<\(section)>", range: pos..", range: start.upperBound.. SubSequence? { guard let start = range(of: "<\(section)>", range: startIndex..", range: start.upperBound.. (files: [S3DataController.S3ListV2File], directories: [String]) { var request = ClientRequest(method: .GET, url: URI("\(server)")) let params = S3DataController.S3ListV2(list_type: 2, delimiter: "/", prefix: prefix, apikey: apikey) try request.query.encode(params) var response = try await downloadInMemoryAsync(url: request.url.string, minSize: nil, deadLineHours: deadLineHours) guard let body = response.readString(length: response.readableBytes) else { return ([],[]) } let files = body.xmlSection("Contents").map { guard let name = $0.xmlFirst("Key"), let modificationTimeString = $0.xmlFirst("LastModified"), let modificationTime = DateFormatter.awsS3DateTime.date(from: String(modificationTimeString)), let fileSizeString = $0.xmlFirst("Size"), let fileSize = Int(fileSizeString) else { fatalError() } return S3DataController.S3ListV2File(name: String(name), modificationTime: modificationTime, fileSize: fileSize) } let directories = body.xmlSection("CommonPrefixes").map { guard let prefix = $0.xmlFirst("Prefix") else { fatalError() } return String(prefix) } return (files, directories) } }