Aggregation Pipeline
The aggregation pipeline computes pre-aggregated statistics (sum, avg, min, max, count) from raw data across 4 cascading time levels: hourly → daily → monthly → yearly.
Pipeline Architecture
Raw Data (Flush) → Hourly → Daily → Monthly → Yearly
(1h) (1d) (1M) (1y)
Each level aggregates from the previous level:
| Level | Source | Storage Path | Worker Concurrency |
|---|---|---|---|
| Hourly | Raw data | agg/agg_1h/ | 100 |
| Daily | Hourly aggregates | agg/agg_1d/ | 50 |
| Monthly | Daily aggregates | agg/agg_1M/ | 20 |
| Yearly | Monthly aggregates | agg/agg_1y/ | 10 |
Trigger Flow
The pipeline is triggered by flush events and cascades automatically:
Storage Flush → FlushCompleteEvent
│
▼
Pipeline.OnFlushComplete()
│
▼
Hourly WorkerPool (aggregate raw → hourly)
│ └─ AggregateCompleteEvent
▼
Daily WorkerPool (aggregate hourly → daily)
│ └─ AggregateCompleteEvent
▼
Monthly WorkerPool (aggregate daily → monthly)
│ └─ AggregateCompleteEvent
▼
Yearly WorkerPool (aggregate monthly → yearly)
Worker Pool Pattern
Each level has a partitioned worker pool:
- One goroutine per partition key (
db:collection:date) - Semaphore-based concurrency limiting (100/50/20/10 per level)
- Worker states:
idle → pending → running → waitingForJob - Batch delay: Workers wait 2–10s after first notification to batch multiple updates
- Each level's
nextPoollinks to the next level for cascade
Aggregation Functions
For each field, the following statistics are computed per time bucket:
| Function | Description |
|---|---|
| sum | Total sum of values |
| avg | Average value |
| min | Minimum value |
| max | Maximum value |
| count | Number of data points |
The AggregatedField struct also supports derived statistics: Variance() and StdDev().
Cascading Aggregation
Higher levels aggregate from already-aggregated data using Merge() logic:
- sum: Sum of child sums
- min: Min of child mins
- max: Max of child maxes
- count: Sum of child counts
- avg: Recomputed as
sum / count
This avoids re-reading raw data for daily/monthly/yearly levels.
Storage Format
Aggregation results use V6 columnar format with device groups:
agg/
├── agg_1h/
│ └── group_0000/mydb/sensors/2026/01/15/
│ ├── _metadata.idx
│ └── dg_0000/
│ ├── _metadata.idx
│ └── part_0000.bin
├── agg_1d/
├── agg_1M/
└── agg_1y/
Each part file stores per-metric columns (sum, avg, min, max, count) for each aggregated field.
Timezone Support
All aggregation levels apply a configurable timezone for time truncation. This ensures hourly buckets align with local time boundaries, not just UTC.