Skip to main content

Architecture Overview

Soltix is a distributed time-series database designed for IoT and sensor data. It uses a two-service architecture (Router + Storage) with device-based sharding, V6 columnar storage, adaptive compression, and automatic replication.

System Overview

                         ┌───────────────────────────┐
│ Clients │
│ (HTTP REST / SSE / gRPC) │
└────────────┬──────────────┘


┌───────────────────────────┐
│ Router Service │
│ ┌──────────────────────┐ │
│ │ HTTP API (Fiber) │ │
│ │ ShardRouter │ │
│ │ QueryCoordinator │ │
│ │ GroupAutoAssigner │ │
│ │ Post-Processing │ │
│ │ (Downsampling, │ │
│ │ Anomaly Detection, │ │
│ │ Timezone) │ │
│ └──────────────────────┘ │
└───────┬──────────┬────────┘
│ │
Write │ │ Query
(via Queue) │ │ (via gRPC)
│ │
▼ ▼
┌──────────────────────┐ ┌───────────────────────────┐
│ Message Queue │ │ Storage Nodes (N) │
│ (NATS/Redis/Kafka) │ │ │
└──────────┬──────────-┘ │ ┌─────────────────────┐ │
│ │ │ Subscriber │ │
│ │ │ WriteWorkerPool │ │
└───────────────►│ │ PartitionedWAL │ │
│ │ MemoryStore │ │
│ │ FlushWorkerPool │ │
│ │ TieredStorage │ │
│ │ CompactionWorker │ │
│ │ AggregationPipeline │ │
│ │ gRPC Server │ │
│ │ SyncManager │ │
│ └─────────────────────┘ │
└──────────┬────────────────┘


┌──────────────┐
│ etcd │
│ (metadata, │
│ registry, │
│ groups) │
└──────────────┘

Services

Router Service

The Router is the user-facing API gateway. All client requests flow through the Router, which validates input, routes writes to the correct storage nodes via a message queue, and coordinates scatter-gather queries via gRPC.

Key Components:

ComponentDescription
HTTP API (Fiber)REST endpoints for writes, queries, streaming (SSE), database/collection CRUD, admin, forecasting, and CSV download
ShardRouterRoutes each device_id to a logical group via GroupManager (FNV-32a hash) and resolves the group to primary + replica nodes
QueryCoordinatorFan-out/scatter-gather query engine — groups device queries by node, executes parallel gRPC calls, merges and deduplicates results
GroupAutoAssignerBackground goroutine that polls etcd for node changes and auto-distributes groups evenly across storage nodes
Post-Processing PipelineApplied after query results are merged: anomaly detection (Z-Score, IQR, Moving Average), downsampling (LTTB, MinMax, Average, M4), timezone conversion
API Key AuthMiddleware supporting X-API-Key, Authorization: Bearer, and Authorization: ApiKey headers

Storage Service

The Storage service handles data persistence, retrieval, aggregation, and replication. It subscribes to the message queue for writes and exposes a gRPC server for queries.

Key Components:

ComponentDescription
SubscriberListens to soltix.write.node.<nodeID> on the message queue (NATS/Redis/Kafka)
WriteWorkerPoolOne worker goroutine per partition key (db:collection:date), processing writes in parallel
PartitionedWALWrite-Ahead Log partitioned by db/collection/date. Protobuf-encoded entries with CRC32 checksums. Batch writer with 10ms flush interval
MemoryStore64-shard FNV-hash partitioned in-memory store for hot data (recent 2 hours). Supports concurrent writes without contention
FlushWorkerPoolEvent-driven (triggered on WAL segment boundaries). Reads WAL segments → flushes to TieredStorage → removes processed segments
TieredStorage3-tier group-aware columnar engine: Group → Device Group → Partition. Append-only writes with atomic batch rename for crash safety
CompactionWorkerBackground process (every 30s) that merges small part files within device groups
AggregationPipelineCascading 4-level pipeline: 1h → 1d → 1M → 1y. Triggered after flush, each level feeds the next
SyncManagerStartup sync + anti-entropy (hourly SHA-256 checksum comparison) for group-scoped replication
gRPC ServerServes QueryShard requests from Router — queries both MemoryStore (hot) and TieredStorage (cold) concurrently, merges results

Data Flow

Write Path

Client HTTP POST


Router: Validate → ShardRouter.RouteWrite(db, collection, device_id)
│ └── GroupID = FNV32a(db:collection:device_id) % TotalGroups
│ └── Lookup GroupAssignment (cache → etcd → create lazily)

Queue: Publish to "soltix.write.node.<nodeID>" for each node (primary + replicas)


Storage: Subscriber receives message → WriteWorkerPool.Submit()

├── 1. PartitionedWAL.Write() ← always (durability)
├── 2. MemoryStore.Write() ← if data age ≤ maxAge (hot data)
└── 3. FlushWorkerPool.Notify() ← only on new WAL segments


FlushWorker: Read WAL segments → TieredStorage.WriteBatch()
│ └── Group by GroupID → per-group Storage engine
│ └── Append-only V6 part files (.tmp → atomic rename)

AggregationPipeline.OnFlushComplete()
└── Hourly → Daily → Monthly → Yearly (cascading)

Query Path

Client HTTP GET/POST


Router: Parse query params → QueryCoordinator.Query()
│ ├── Route each device_id → group → primary node
│ ├── Group queries by node (minimize RPCs)
│ └── Parallel gRPC QueryShard calls via ConnectionPool


Storage (per node): gRPC QueryShard handler
├── MemoryStore.Query() ← hot data (recent ~2h, binary search)
├── TieredStorage.Query() ← cold data (V6 columnar, footer-based seeks)
│ ├── Time-range pruning via metadata min/max timestamps
│ ├── Device → DeviceGroup routing via DeviceGroupMap
│ ├── Column projection (only requested fields)
│ └── Bloom filter for fast device existence check
└── Merge + deduplicate (keep latest by InsertedAt)


Router: Merge results from all nodes → Post-processing pipeline
├── 1. Anomaly detection (Z-Score / IQR / Moving Average)
├── 2. Downsampling (LTTB / MinMax / Average / M4)
└── 3. Timezone conversion


Response (columnar format: device_id, timestamps[], field → values[])

Device-Based Sharding

Data is distributed across logical groups (default: 256) based on device_id:

GroupID = FNV32a(database + ":" + collection + ":" + device_id) % TotalGroups

Each group is assigned to a primary node and replica nodes. The assignment is managed by the GroupAutoAssigner in the Router and persisted in etcd.

Key properties:

  • Deterministic: Same device always maps to the same group
  • Lazy creation: Groups are created on first write, no pre-provisioning needed
  • Adaptive hashing: Small clusters (< 20 nodes) use Rendezvous hashing for better distribution; large clusters switch to Consistent hashing with virtual nodes for O(log N) lookups
  • Auto-rebalancing: When nodes join or leave, the GroupAutoAssigner redistributes groups evenly

V6 Columnar Storage Format

On-disk data uses a single-file columnar format with 3-tier organization (Group → Device → Partition):

data/
group_{gid}/
{database}/
{collection}/
{year}/{month}/{date}/
_metadata.idx ← Global metadata (fields, DG manifests, device→DG map)
dg_0000/
_metadata.idx ← DG metadata (parts, device→part map)
part_0000.bin ← V6 columnar file
part_0001.bin
dg_0001/
...

V6 Part File Structure:

┌──────────────────────────────────┐
│ Header (64 bytes) │
├──────────────────────────────────┤
│ Column Chunk: device0._time │ ← Delta encoded + Snappy
│ Column Chunk: device0.field1 │ ← Gorilla/Delta/Dict/Bool + Snappy
│ Column Chunk: device0.field2 │
│ Column Chunk: device1._time │
│ ... │
├──────────────────────────────────┤
│ Footer │
│ ├── ColumnIndex[] │ ← (deviceIdx, fieldIdx, offset, size, rowCount, type)
│ ├── FieldDictionary │
│ └── DeviceIndex │
├──────────────────────────────────┤
│ FooterSize (4 bytes) │
│ FooterOffset (8 bytes) │ ← Last 8 bytes of file
└──────────────────────────────────┘

Adaptive Compression

Each column uses a type-specific encoder before being wrapped with Snappy block compression:

Data TypeEncoderAlgorithm
float64GorillaEncoderFacebook Gorilla XOR bit-packing (~1.37 bytes/value)
int64 / timestampsDeltaEncoderDelta + ZigZag + Varint encoding
stringDictionaryEncoderUnique-string dictionary with varint indices
boolBoolEncoderBitmap (1 bit per value)

Compression pipeline: Raw values → Column Encoder → Snappy Compress → Disk

Multi-Level Aggregation

A cascading pipeline computes pre-aggregated data at 4 levels:

Raw Data → Flush Complete Event


Hourly (1h) → aggregate raw points → notify Daily


Daily (1d) → aggregate hourly points → notify Monthly


Monthly (1M) → aggregate daily points → notify Yearly


Yearly (1y) → aggregate monthly points

Each level computes sum, avg, min, max, count per field, stored in V6 columnar format under data/agg/agg_{level}/. The pipeline uses partitioned worker pools with semaphore-based concurrency limiting (100 hourly, 50 daily, 20 monthly, 10 yearly).

Replication & Sync

Data replication operates at the group scope — only replicas within the same group sync with each other.

ModeDescription
Write replicationRouter publishes writes to all nodes in a group's assignment (primary + replicas)
Startup syncOn node restart, recovers missed data from replicas via gRPC streaming
Anti-entropyHourly background process: SHA-256 checksum comparison over a 24h window, auto-repairs on mismatch

Coordination Layer (etcd)

etcd serves as the single source of truth for all cluster state:

Key PrefixData
/soltix/nodes/Node registration with lease-based heartbeats (10s TTL)
/soltix/groups/Group assignments (primary, replicas, state, epoch)
/soltix/databases/Database metadata
/soltix/collections/Collection metadata, field schemas, device tracking

All Router and Storage nodes are stateless — cluster state lives entirely in etcd, enabling zero-downtime restarts and easy horizontal scaling.

Message Queue

Writes are decoupled through a pluggable message queue:

BackendImplementationNotes
NATS (default)JetStreamAsync publish, durable consumers, file storage
RedisRedis StreamsXADD/XREADGROUP with consumer groups
Kafkasegmentio/kafka-goStandard Kafka producer/consumer
MemoryIn-process channelsFor testing only

All backends implement the same Publisher/Subscriber interface, making them interchangeable via configuration.