Spaces:
Sleeping
Sleeping
File size: 5,138 Bytes
be7c937 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 | #include "wayy_db/ops/aggregations.hpp"
#include <algorithm>
#include <cmath>
#include <numeric>
#ifdef WAYY_USE_AVX2
#include <immintrin.h>
#endif
namespace wayy_db::ops {
// Scalar implementations
template<typename T>
T sum(const ColumnView<T>& col) {
return std::accumulate(col.begin(), col.end(), T{0});
}
template int64_t sum(const ColumnView<int64_t>&);
template double sum(const ColumnView<double>&);
template<typename T>
T min(const ColumnView<T>& col) {
if (col.empty()) {
throw InvalidOperation("min() on empty column");
}
return *std::min_element(col.begin(), col.end());
}
template int64_t min(const ColumnView<int64_t>&);
template double min(const ColumnView<double>&);
template<typename T>
T max(const ColumnView<T>& col) {
if (col.empty()) {
throw InvalidOperation("max() on empty column");
}
return *std::max_element(col.begin(), col.end());
}
template int64_t max(const ColumnView<int64_t>&);
template double max(const ColumnView<double>&);
template<typename T>
double variance(const ColumnView<T>& col) {
if (col.empty()) {
return std::numeric_limits<double>::quiet_NaN();
}
double mean = avg(col);
double sum_sq = 0.0;
for (const auto& val : col) {
double diff = static_cast<double>(val) - mean;
sum_sq += diff * diff;
}
return sum_sq / static_cast<double>(col.size());
}
template double variance(const ColumnView<int64_t>&);
template double variance(const ColumnView<double>&);
template<typename T>
double std_dev(const ColumnView<T>& col) {
return std::sqrt(variance(col));
}
template double std_dev(const ColumnView<int64_t>&);
template double std_dev(const ColumnView<double>&);
// SIMD implementations
#ifdef WAYY_USE_AVX2
double sum_simd(const ColumnView<double>& col) {
const double* data = col.data();
size_t n = col.size();
__m256d vsum = _mm256_setzero_pd();
// Process 4 doubles per iteration
size_t i = 0;
for (; i + 4 <= n; i += 4) {
__m256d v = _mm256_loadu_pd(data + i);
vsum = _mm256_add_pd(vsum, v);
}
// Horizontal reduction
__m128d vlow = _mm256_castpd256_pd128(vsum);
__m128d vhigh = _mm256_extractf128_pd(vsum, 1);
vlow = _mm_add_pd(vlow, vhigh);
__m128d high64 = _mm_unpackhi_pd(vlow, vlow);
double result = _mm_cvtsd_f64(_mm_add_sd(vlow, high64));
// Handle remainder
for (; i < n; ++i) {
result += data[i];
}
return result;
}
int64_t sum_simd(const ColumnView<int64_t>& col) {
const int64_t* data = col.data();
size_t n = col.size();
__m256i vsum = _mm256_setzero_si256();
// Process 4 int64s per iteration
size_t i = 0;
for (; i + 4 <= n; i += 4) {
__m256i v = _mm256_loadu_si256(reinterpret_cast<const __m256i*>(data + i));
vsum = _mm256_add_epi64(vsum, v);
}
// Horizontal reduction
alignas(32) int64_t temp[4];
_mm256_store_si256(reinterpret_cast<__m256i*>(temp), vsum);
int64_t result = temp[0] + temp[1] + temp[2] + temp[3];
// Handle remainder
for (; i < n; ++i) {
result += data[i];
}
return result;
}
#else
double sum_simd(const ColumnView<double>& col) {
return sum(col);
}
int64_t sum_simd(const ColumnView<int64_t>& col) {
return sum(col);
}
#endif
// Type-erased implementations
double sum(const Column& col) {
switch (col.dtype()) {
case DType::Int64:
case DType::Timestamp:
return static_cast<double>(sum_simd(const_cast<Column&>(col).as_int64()));
case DType::Float64:
return sum_simd(const_cast<Column&>(col).as_float64());
default:
throw InvalidOperation("sum() not supported for this type");
}
}
double avg(const Column& col) {
if (col.size() == 0) {
return std::numeric_limits<double>::quiet_NaN();
}
return sum(col) / static_cast<double>(col.size());
}
double min_val(const Column& col) {
switch (col.dtype()) {
case DType::Int64:
case DType::Timestamp:
return static_cast<double>(min(const_cast<Column&>(col).as_int64()));
case DType::Float64:
return min(const_cast<Column&>(col).as_float64());
default:
throw InvalidOperation("min() not supported for this type");
}
}
double max_val(const Column& col) {
switch (col.dtype()) {
case DType::Int64:
case DType::Timestamp:
return static_cast<double>(max(const_cast<Column&>(col).as_int64()));
case DType::Float64:
return max(const_cast<Column&>(col).as_float64());
default:
throw InvalidOperation("max() not supported for this type");
}
}
double std_dev(const Column& col) {
switch (col.dtype()) {
case DType::Int64:
case DType::Timestamp:
return std_dev(const_cast<Column&>(col).as_int64());
case DType::Float64:
return std_dev(const_cast<Column&>(col).as_float64());
default:
throw InvalidOperation("std_dev() not supported for this type");
}
}
} // namespace wayy_db::ops
|