open-wether / Sources /App /Helper /Writer /ForecastapiResult.swift
soiz1's picture
Migrated from GitHub
6ee917b verified
import Foundation
import Vapor
import FlatBuffers
import OpenMeteoSdk
protocol FlatBuffersVariable: RawRepresentableString {
func getFlatBuffersMeta() -> FlatBufferVariableMeta
}
protocol ModelFlatbufferSerialisable: RawRepresentableString {
associatedtype HourlyVariable: FlatBuffersVariable
associatedtype HourlyPressureType: FlatBuffersVariable, RawRepresentable, Equatable
associatedtype HourlyHeightType: FlatBuffersVariable, RawRepresentable, Equatable
associatedtype DailyVariable: FlatBuffersVariable
/// 0=all members start at control, 1=Members start at `member01` (Used in CFSv2)
static var memberOffset: Int { get }
var flatBufferModel: openmeteo_sdk_Model { get }
}
extension ModelFlatbufferSerialisable {
static var memberOffset: Int {
return 0
}
}
fileprivate struct ModelAndSection<Model: ModelFlatbufferSerialisable, Variable: RawRepresentableString> {
let model: Model
let section: () throws -> ApiSection<Variable>
static func run(sections: [Self]) throws -> ApiSectionString {
let run = try sections.compactMap({ m in
let h = try m.section()
return ApiSectionString(name: h.name, time: h.time, columns: h.columns.flatMap { c in
return c.variables.enumerated().map { (member, data) in
let member = member + Model.memberOffset
let variableAndMember = member > 0 ? "\(c.variable.rawValue)_member\(member.zeroPadded(len: 2))" : c.variable.rawValue
let variable = sections.count > 1 ? "\(variableAndMember)_\(m.model.rawValue)" : variableAndMember
return ApiColumnString(variable: variable, unit: c.unit, data: data)
}
})
})
guard let first = run.first else {
throw ForecastapiError.noDataAvilableForThisLocation
}
guard run.first(where: {$0.time.dtSeconds != first.time.dtSeconds }) == nil else {
throw ForecastapiError.cannotReturnModelsWithDiffernetTimeIntervals
}
return ApiSectionString(name: first.name, time: first.time, columns: run.flatMap { $0.columns})
}
}
/// Stores the API output for multiple locations
struct ForecastapiResult<Model: ModelFlatbufferSerialisable> {
let timeformat: Timeformat
/// per location, per model
let results: [PerLocation]
struct PerLocation {
let timezone: TimezoneWithOffset
let time: TimerangeLocal
let locationId: Int
let results: [PerModel]
var utc_offset_seconds: Int {
timezone.utcOffsetSeconds
}
func runAllSections() throws -> [ApiSectionString] {
return [try minutely15?(), try hourly?(), try sixHourly?(), try daily?()].compactMap({$0})
}
var current: (() throws -> ApiSectionSingle<String>)? {
let run = results.compactMap({ m in m.current.map{ (model: m.model, section: $0)} })
guard run.count > 0 else {
return nil
}
return {
let run = try run.compactMap({ m in
let h = try m.section()
return ApiSectionSingle<String>(name: h.name, time: h.time, dtSeconds: h.dtSeconds, columns: h.columns.map { c in
let variable = run.count > 1 ? "\(c.variable.rawValue)_\(m.model.rawValue)" : c.variable.rawValue
return ApiColumnSingle<String>(variable: variable, unit: c.unit, value: c.value)
})
})
guard let first = run.first else {
throw ForecastapiError.noDataAvilableForThisLocation
}
return ApiSectionSingle<String>(name: first.name, time: first.time, dtSeconds: first.dtSeconds, columns: run.flatMap { $0.columns})
}
}
/// Merge all hourly sections and prefix with the domain name if required
var hourly: (() throws -> ApiSectionString)? {
let run = results.compactMap({ m in m.hourly.map{ ModelAndSection(model: m.model, section: $0)} })
guard run.count > 0 else {
return nil
}
return {
try ModelAndSection.run(sections: run)
}
}
var daily: (() throws -> ApiSectionString)? {
let run = results.compactMap({ m in m.daily.map{ ModelAndSection(model: m.model, section: $0)} })
guard run.count > 0 else {
return nil
}
return {
try ModelAndSection.run(sections: run)
}
}
var sixHourly: (() throws -> ApiSectionString)? {
let run = results.compactMap({ m in m.sixHourly.map{ ModelAndSection(model: m.model, section: $0)} })
guard run.count > 0 else {
return nil
}
return {
try ModelAndSection.run(sections: run)
}
}
var minutely15: (() throws -> ApiSectionString)? {
let run = results.compactMap({ m in m.minutely15.map{ ModelAndSection(model: m.model, section: $0)} })
guard run.count > 0 else {
return nil
}
return {
try ModelAndSection.run(sections: run)
}
}
}
struct PerModel {
let model: Model
let latitude: Float
let longitude: Float
/// Desired elevation from a DEM. Used in statistical downscaling
let elevation: Float?
let prefetch: (() throws -> ())
let current: (() throws -> ApiSectionSingle<SurfacePressureAndHeightVariable>)?
let hourly: (() throws -> ApiSection<SurfacePressureAndHeightVariable>)?
let daily: (() throws -> ApiSection<Model.DailyVariable>)?
let sixHourly: (() throws -> ApiSection<SurfacePressureAndHeightVariable>)?
let minutely15: (() throws -> ApiSection<SurfacePressureAndHeightVariable>)?
/// e.g. `52.52N13.42E38m`
var formatedCoordinatesFilename: String {
let lat = latitude < 0 ? String(format: "%.2fS", abs(latitude)) : String(format: "%.2fN", latitude)
let ele = elevation.map { $0.isFinite ? String(format: "%.0fm", $0) : "" } ?? ""
return longitude < 0 ? String(format: "\(lat)%.2fW\(ele)", abs(longitude)) : String(format: "\(lat)%.2fE\(ele)", longitude)
}
}
struct PressureVariableAndLevel {
let variable: Model.HourlyPressureType
let level: Int
init(_ variable: Model.HourlyPressureType, _ level: Int) {
self.variable = variable
self.level = level
}
}
struct HeightVariableAndLevel {
let variable: Model.HourlyHeightType
let level: Int
init(_ variable: Model.HourlyHeightType, _ level: Int) {
self.variable = variable
self.level = level
}
}
/// Enum with surface and pressure variable
enum SurfacePressureAndHeightVariable: RawRepresentableString, FlatBuffersVariable {
init?(rawValue: String) {
fatalError()
}
var rawValue: String {
switch self {
case .surface(let v):
return v.rawValue
case .pressure(let v):
return "\(v.variable.rawValue)_\(v.level)hPa"
case .height(let v):
return "\(v.variable.rawValue)_\(v.level)m"
}
}
case surface(Model.HourlyVariable)
case pressure(PressureVariableAndLevel)
case height(HeightVariableAndLevel)
func getFlatBuffersMeta() -> FlatBufferVariableMeta {
switch self {
case .surface(let hourlyVariable):
return hourlyVariable.getFlatBuffersMeta()
case .pressure(let pressureVariableAndLevel):
let meta = pressureVariableAndLevel.variable.getFlatBuffersMeta()
return FlatBufferVariableMeta(
variable: meta.variable,
aggregation: meta.aggregation,
altitude: meta.altitude,
pressureLevel: Int16(pressureVariableAndLevel.level),
depth: meta.depth,
depthTo: meta.depthTo
)
case .height(let heightVariableAndLevel):
let meta = heightVariableAndLevel.variable.getFlatBuffersMeta()
return FlatBufferVariableMeta(
variable: meta.variable,
aggregation: meta.aggregation,
altitude: Int16(heightVariableAndLevel.level),
depth: meta.depth,
depthTo: meta.depthTo
)
}
}
}
/// Output the given result set with a specified format
/// timestamp and fixedGenerationTime are used to overwrite dynamic fields in unit tests
func response(format: ForecastResultFormat, numberOfLocationsMaximum: (numberOfLocations: Int, apikey: String?), timestamp: Timestamp = .now(), fixedGenerationTime: Double? = nil) async throws -> Response {
let loop = (numberOfLocationsMaximum.apikey?.starts(with: "xHV7AdGfV") ?? false) ? ForecastapiController.isolationLoop : ForecastapiController.runLoop
return try await loop.next().submit {
if results.count > numberOfLocationsMaximum.numberOfLocations {
throw ForecastapiError.generic(message: "Only up to \(numberOfLocationsMaximum.numberOfLocations) locations can be requested at once")
}
if format == .xlsx && results.count > 100 {
throw ForecastapiError.generic(message: "XLSX supports only up to 100 locations")
}
for location in results {
for model in location.results {
try model.prefetch()
}
}
switch format {
case .json:
return try toJsonResponse(fixedGenerationTime: fixedGenerationTime)
case .xlsx:
return try toXlsxResponse(timestamp: timestamp)
case .csv:
return try toCsvResponse()
case .flatbuffers:
return try toFlatbuffersResponse(fixedGenerationTime: fixedGenerationTime)
}
}.get()
}
/// Calculate excess weight of an API query. The following factors are considered:
/// - 14 days of data are considered a weight of 1
/// - 10 weather variables are a weight of 1
/// - The number of dails and weather variables is scaled linearly afterwards. E.g. 15 weather variales, account for 1.5 weight.
/// - Number of locations
///
/// `weight = max(variables / 10, variables / 10 * days / 14) * locations`
///
/// See: https://github.com/open-meteo/open-meteo/issues/438#issuecomment-1722945326
func calculateQueryWeight(nVariablesModels: Int) -> Float {
let referenceDays = 14
let referenceVariables = 10
// Sum up weights for each location. Technically each location can have a different time interval
return results.reduce(0, {
let nDays = $1.time.range.durationSeconds / 86400
let timeFraction = Float(nDays) / Float(referenceDays)
let variablesFraction = Float(nVariablesModels) / Float(referenceVariables)
let weight = max(variablesFraction, timeFraction * variablesFraction)
return $0 + max(1, weight)
})
}
}
enum ApiArray {
case float([Float])
case timestamp([Timestamp])
var count: Int {
switch self {
case .float(let a):
return a.count
case .timestamp(let a):
return a.count
}
}
}
struct ApiColumn<Variable> {
let variable: Variable
let unit: SiUnit
// one entry per ensemble member
let variables: [ApiArray]
}
/// Similar to ApiColumn, but no separation for multipl ensemble members anymore
struct ApiColumnString {
let variable: String
let unit: SiUnit
let data: ApiArray
}
/// Contain a single value
struct ApiColumnSingle<Variable> {
let variable: Variable
let unit: SiUnit
let value: Float
}
struct ApiColumnSingleString {
let variable: String
let unit: SiUnit
let value: Float
}
struct ApiSection<Variable> {
// e.g. hourly or daily
let name: String
let time: TimerangeDt
let columns: [ApiColumn<Variable>]
}
struct ApiSectionString {
// e.g. hourly or daily
let name: String
let time: TimerangeDt
let columns: [ApiColumnString]
}
/// Sfore current weather information giving only a single value per variable
struct ApiSectionSingle<Variable> {
let name: String
let time: Timestamp
let dtSeconds: Int
let columns: [ApiColumnSingle<Variable>]
}
enum ForecastResultFormat: String, Codable {
case json
case xlsx
case csv
case flatbuffers
}
/// Simplify flush commands
struct BufferAndWriter {
let writer: BodyStreamWriter
var buffer: ByteBuffer
@inlinable init(writer: BodyStreamWriter) {
self.writer = writer
self.buffer = ByteBufferAllocator().buffer(capacity: 4*1024)
}
/// Check if enough data has been written to the buffer and flush if required
@inlinable mutating func flushIfRequired() async throws {
if buffer.writerIndex > 3*1024 {
try await flush()
}
}
@inlinable mutating func flush() async throws {
guard buffer.writerIndex > 0 else {
return
}
let bufferCopy = buffer
let writer = writer
try await writer.eventLoop.flatSubmit { writer.write(.buffer(bufferCopy)) }.get()
buffer.moveWriterIndex(to: 0)
}
@inlinable mutating func end() async throws {
let writer = writer
try await writer.eventLoop.flatSubmit { writer.write(.end) }.get()
}
}
extension Timestamp {
func formated(format: Timeformat, utc_offset_seconds: Int, quotedString: Bool) -> String {
switch format {
case .iso8601:
let iso = add(utc_offset_seconds).iso8601_YYYY_MM_dd_HH_mm
if quotedString {
return "\"\(iso)\""
}
return iso
case .unixtime:
return "\(timeIntervalSince1970)"
}
}
}
extension Sequence where Element == Timestamp {
/// includes quotes characters if `quotedString` is true
func itterateIso8601(utc_offset_seconds: Int, quotedString: Bool, onlyDate: Bool) -> AnySequence<String> {
return AnySequence<String> { () -> AnyIterator<String> in
var itterator = self.makeIterator()
var t = tm()
var dateCalculated = Int.min
if onlyDate {
return AnyIterator<String> {
guard let element = itterator.next()?.add(utc_offset_seconds) else {
return nil
}
if quotedString {
return "\"\(element.iso8601_YYYY_MM_dd)\""
}
return element.iso8601_YYYY_MM_dd
}
}
return AnyIterator<String> {
guard let element = itterator.next()?.add(utc_offset_seconds) else {
return nil
}
var time = element.timeIntervalSince1970
if dateCalculated != time - time.moduloPositive(86400) {
dateCalculated = time - time.moduloPositive(86400)
gmtime_r(&time, &t)
}
let year = Int(t.tm_year+1900)
let month = Int(t.tm_mon+1)
let day = Int(t.tm_mday)
let hour = time.moduloPositive(86400) / 3600
let minute = time.moduloPositive(3600) / 60
if quotedString {
return "\"\(year)-\(month.zeroPadded(len: 2))-\(day.zeroPadded(len: 2))T\(hour.zeroPadded(len: 2)):\(minute.zeroPadded(len: 2))\""
} else {
return "\(year)-\(month.zeroPadded(len: 2))-\(day.zeroPadded(len: 2))T\(hour.zeroPadded(len: 2)):\(minute.zeroPadded(len: 2))"
}
}
}
}
/// Optimised time itteration function
func itterate(format: Timeformat, utc_offset_seconds: Int, quotedString: Bool, onlyDate: Bool) -> AnySequence<String> {
switch format {
case .iso8601:
return itterateIso8601(utc_offset_seconds: utc_offset_seconds, quotedString: quotedString, onlyDate: onlyDate)
case .unixtime:
return AnySequence<String> { () -> AnyIterator<String> in
var itterator = self.makeIterator()
return AnyIterator<String> {
guard let element = itterator.next() else {
return nil
}
return "\(element.timeIntervalSince1970)"
}
}
}
}
}