Spaces:
Sleeping
Sleeping
| import Foundation | |
| import Vapor | |
| extension BodyStreamWriter { | |
| /// Execute async code and capture any errors. In case of error, print the error to the output stream | |
| func submit(_ task: @escaping () async throws -> Void) { | |
| _ = eventLoop.makeFutureWithTask { try await task() } | |
| .flatMapError({ error in | |
| write(.buffer(.init(string: "Unexpected error while streaming data: \(error)"))) | |
| .flatMap({write(.error(error))}) | |
| }) | |
| } | |
| } | |
| extension ForecastapiResult { | |
| /** | |
| Stream a potentially very large resultset to the client. The JSON file could easily be 20 MB. | |
| Instead of generating a massive string in memory, we only allocate 18kb and flush every time the buffer exceeds 16kb. | |
| Memory footprint is therefore much smaller and fits better into L2/L3 caches. | |
| Additionally code is fully async, to not block the a thread for almost a second to generate a JSON response... | |
| */ | |
| func toJsonResponse(fixedGenerationTime: Double?) throws -> Response { | |
| // First excution outside stream, to capture potential errors better | |
| //var first = try self.first?() | |
| let response = Response(body: .init(stream: { writer in | |
| writer.submit { | |
| var b = BufferAndWriter(writer: writer) | |
| /// For multiple locations, create an array of results | |
| let isMultiPoint = results.count > 1 | |
| if isMultiPoint { | |
| b.buffer.writeString("[") | |
| } | |
| /*if let first { | |
| try await first.streamJsonResponse(to: &b) | |
| } | |
| first = nil*/ | |
| for (i,location) in results.enumerated() { | |
| if i != 0 { | |
| b.buffer.writeString(",") | |
| } | |
| try await location.streamJsonResponse(to: &b, timeformat: timeformat, fixedGenerationTime: fixedGenerationTime) | |
| } | |
| if isMultiPoint { | |
| b.buffer.writeString("]") | |
| } | |
| try await b.flush() | |
| try await b.end() | |
| } | |
| })) | |
| response.headers.replaceOrAdd(name: .contentType, value: "application/json; charset=utf-8") | |
| return response | |
| } | |
| } | |
| extension ForecastapiResult.PerLocation { | |
| fileprivate func streamJsonResponse(to b: inout BufferAndWriter, timeformat: Timeformat, fixedGenerationTime: Double?) async throws { | |
| let generationTimeStart = Date() | |
| guard let first = results.first else { | |
| throw ForecastapiError.noDataAvilableForThisLocation | |
| } | |
| let sections = try runAllSections() | |
| let current = try first.current?() | |
| let generationTimeMs = fixedGenerationTime ?? (Date().timeIntervalSince(generationTimeStart) * 1000) | |
| b.buffer.writeString(""" | |
| {"latitude":\(first.latitude),"longitude":\(first.longitude),"generationtime_ms":\(generationTimeMs),"utc_offset_seconds":\(utc_offset_seconds),"timezone":"\(timezone.identifier)","timezone_abbreviation":"\(timezone.abbreviation)" | |
| """) | |
| if let elevation = first.elevation, elevation.isFinite { | |
| b.buffer.writeString(",\"elevation\":\(elevation)") | |
| } | |
| if locationId != 0 { | |
| b.buffer.writeString(",\"location_id\":\(locationId)") | |
| } | |
| if let current { | |
| b.buffer.writeString(",\"\(current.name)_units\":") | |
| b.buffer.writeString("{") | |
| switch timeformat { | |
| case .iso8601: | |
| b.buffer.writeString("\"time\":\"\(SiUnit.iso8601.abbreviation)\"") | |
| case .unixtime: | |
| b.buffer.writeString("\"time\":\"\(SiUnit.unixTime.abbreviation)\"") | |
| } | |
| b.buffer.writeString(",\"interval\":\"seconds\"") | |
| for e in current.columns { | |
| b.buffer.writeString(",\"\(e.variable.rawValue)\":\"\(e.unit.abbreviation)\"") | |
| } | |
| b.buffer.writeString("}") | |
| b.buffer.writeString(",\"\(current.name)\":") | |
| b.buffer.writeString("{") | |
| b.buffer.writeString("\"time\":") | |
| b.buffer.writeString(current.time.formated(format: timeformat, utc_offset_seconds: utc_offset_seconds, quotedString: true)) | |
| b.buffer.writeString(",\"interval\":\(current.dtSeconds)") | |
| /// Write data | |
| for e in current.columns { | |
| let format = "%.\(e.unit.significantDigits)f" | |
| b.buffer.writeString(",") | |
| b.buffer.writeString("\"\(e.variable.rawValue)\":\(e.value.isFinite ? String(format: format, e.value) : "null")") | |
| } | |
| b.buffer.writeString("}") | |
| try await b.flushIfRequired() | |
| } | |
| /// process sections like hourly or daily | |
| for section in sections { | |
| b.buffer.writeString(",\"\(section.name)_units\":") | |
| b.buffer.writeString("{") | |
| switch timeformat { | |
| case .iso8601: | |
| b.buffer.writeString("\"time\":\"\(SiUnit.iso8601.abbreviation)\"") | |
| case .unixtime: | |
| b.buffer.writeString("\"time\":\"\(SiUnit.unixTime.abbreviation)\"") | |
| } | |
| for e in section.columns { | |
| b.buffer.writeString(",\"\(e.variable)\":\"\(e.unit.abbreviation)\"") | |
| try await b.flushIfRequired() | |
| } | |
| b.buffer.writeString("}") | |
| b.buffer.writeString(",\"\(section.name)\":") | |
| b.buffer.writeString("{") | |
| b.buffer.writeString("\"time\":[") | |
| // Write time axis | |
| var firstValue = true | |
| for time in section.time.itterate(format: timeformat, utc_offset_seconds: utc_offset_seconds, quotedString: true, onlyDate: section.time.dtSeconds == 86400) { | |
| if firstValue { | |
| firstValue = false | |
| } else { | |
| b.buffer.writeString(",") | |
| } | |
| b.buffer.writeString(time) | |
| try await b.flushIfRequired() | |
| } | |
| b.buffer.writeString("]") | |
| /// Write data | |
| for e in section.columns { | |
| b.buffer.writeString(",") | |
| b.buffer.writeString("\"\(e.variable)\":") | |
| b.buffer.writeString("[") | |
| var firstValue = true | |
| switch e.data { | |
| case .float(let floats): | |
| let format = "%.\(e.unit.significantDigits)f" | |
| for v in floats { | |
| if firstValue { | |
| firstValue = false | |
| } else { | |
| b.buffer.writeString(",") | |
| } | |
| if v.isFinite { | |
| b.buffer.writeString(String(format: format, v)) | |
| } else { | |
| b.buffer.writeString("null") | |
| } | |
| try await b.flushIfRequired() | |
| } | |
| case .timestamp(let timestamps): | |
| for time in timestamps.itterate(format: timeformat, utc_offset_seconds: utc_offset_seconds, quotedString: true, onlyDate: false) { | |
| if firstValue { | |
| firstValue = false | |
| } else { | |
| b.buffer.writeString(",") | |
| } | |
| b.buffer.writeString(time) | |
| try await b.flushIfRequired() | |
| } | |
| } | |
| b.buffer.writeString("]") | |
| try await b.flushIfRequired() | |
| } | |
| b.buffer.writeString("}") | |
| } | |
| b.buffer.writeString("}") | |
| } | |
| } | |