Skip to main content

Aggregation Pipeline

The aggregation pipeline computes pre-aggregated statistics (sum, avg, min, max, count) from raw data across 4 cascading time levels: hourly → daily → monthly → yearly.

Pipeline Architecture

Raw Data (Flush) → Hourly → Daily → Monthly → Yearly
(1h) (1d) (1M) (1y)

Each level aggregates from the previous level:

LevelSourceStorage PathWorker Concurrency
HourlyRaw dataagg/agg_1h/100
DailyHourly aggregatesagg/agg_1d/50
MonthlyDaily aggregatesagg/agg_1M/20
YearlyMonthly aggregatesagg/agg_1y/10

Trigger Flow

The pipeline is triggered by flush events and cascades automatically:

Storage Flush → FlushCompleteEvent


Pipeline.OnFlushComplete()


Hourly WorkerPool (aggregate raw → hourly)
│ └─ AggregateCompleteEvent

Daily WorkerPool (aggregate hourly → daily)
│ └─ AggregateCompleteEvent

Monthly WorkerPool (aggregate daily → monthly)
│ └─ AggregateCompleteEvent

Yearly WorkerPool (aggregate monthly → yearly)

Worker Pool Pattern

Each level has a partitioned worker pool:

  • One goroutine per partition key (db:collection:date)
  • Semaphore-based concurrency limiting (100/50/20/10 per level)
  • Worker states: idle → pending → running → waitingForJob
  • Batch delay: Workers wait 2–10s after first notification to batch multiple updates
  • Each level's nextPool links to the next level for cascade

Aggregation Functions

For each field, the following statistics are computed per time bucket:

FunctionDescription
sumTotal sum of values
avgAverage value
minMinimum value
maxMaximum value
countNumber of data points

The AggregatedField struct also supports derived statistics: Variance() and StdDev().

Cascading Aggregation

Higher levels aggregate from already-aggregated data using Merge() logic:

  • sum: Sum of child sums
  • min: Min of child mins
  • max: Max of child maxes
  • count: Sum of child counts
  • avg: Recomputed as sum / count

This avoids re-reading raw data for daily/monthly/yearly levels.

Storage Format

Aggregation results use V6 columnar format with device groups:

agg/
├── agg_1h/
│ └── group_0000/mydb/sensors/2026/01/15/
│ ├── _metadata.idx
│ └── dg_0000/
│ ├── _metadata.idx
│ └── part_0000.bin
├── agg_1d/
├── agg_1M/
└── agg_1y/

Each part file stores per-metric columns (sum, avg, min, max, count) for each aggregated field.

Timezone Support

All aggregation levels apply a configurable timezone for time truncation. This ensures hourly buckets align with local time boundaries, not just UTC.