SLICE-1.1 Design Notes — Multi-Tag Telemetry
- Slice: SLICE-1.1
- Implementation status: Completed (criterion 7 amended)
- Audience: anyone modifying the tag pipeline, the noise simulator, or the relationship between high-rate emitters and the canonical store
This doc explains how the 50-tag telemetry pipeline actually works in code — why there are two tick cadences (per-tag emitters and a snapshot publisher), why the snapshot stage exists at all, and why this is the slice where high-rate data still flows through AppState (the inverse of SLICE-1.3's encoder bypass). Read this if you're adding new tags, changing emit cadence, or planning Phase 2.3's tag-stream lift-out.
If you want a scenario-first explanation of when this runs in the app, how to configure it, and what C#/.NET techniques are being used, read the companion article: SLICE-1.1 telemetry deep dive.
1. Quick reference
Key types:
| Type | Project | Role |
|---|---|---|
TagQuality (enum) | Application | Good / Uncertain / Bad (OPC UA-style) |
TagSample (record) | Application | Time-stamped value for one named tag |
TagDefinition (record) | Application | Tag metadata: name, unit, interval, noise model |
TagSnapshot (record) | Application | Immutable map tag.name → TagSample, captured at one tick |
NoiseModel (sealed hierarchy) | Application | SineNoise / DriftNoise / RandomWalkNoise / StepNoise |
NoiseModelEvaluator (static) | Application | Pure-function evaluator; per-tag state owned by caller |
ITagStream | Application | The bounded read-only contract the consumer holds |
SimulatedTagSource | Infrastructure | The producer; ITagStream + IDisposable |
TagStreamPipelineService | Application | The consumer; BackgroundService |
SimulatorTagsValidator | Infrastructure | Simulator:Tags config validation |
Key files:
src/InspectionPrototype.Application/State/TagQuality.cs
src/InspectionPrototype.Application/State/TagSample.cs
src/InspectionPrototype.Application/State/TagDefinition.cs
src/InspectionPrototype.Application/State/TagSnapshot.cs
src/InspectionPrototype.Application/State/NoiseModel.cs
src/InspectionPrototype.Application/State/NoiseModelEvaluator.cs
src/InspectionPrototype.Application/Abstractions/ITagStream.cs
src/InspectionPrototype.Application/Services/TagStreamPipelineService.cs
src/InspectionPrototype.Infrastructure/Simulator/SimulatedTagSource.cs
src/InspectionPrototype.Infrastructure/Simulator/SimulatorTagOptions.cs
src/InspectionPrototype.Infrastructure/Simulator/SimulatorTagsOptions.cs
src/InspectionPrototype.Infrastructure/Simulator/SimulatorTagsValidator.cs
src/InspectionPrototype.App/appsettings.json // Simulator:Tags — 50-tag seedKey tests (all in tests/InspectionPrototype.Tests/):
| Test | Asserts |
|---|---|
NoiseModelEvaluatorTests | Each variant returns expected values; random-walk clamps |
SimulatorTagsValidatorTests | Duplicate names, empty names, interval range, noise validation |
TagStreamPipelineServiceTests | Drain → LatestTagValues updated; coalesce → diagnostics entry |
MainViewModelTelemetryReadoutTests | UI projects LatestTagValues["temperature.celsius"] etc. |
AppMetricsTagDimensionTests | samples.ingested{tag.name} increments per tag |
SimulatedTagSourceDisposalTests | Double-dispose (DI registers under two service types) is safe |
Key metrics (on the InspectionPrototype meter):
| Counter / gauge | Dimension | Emitted by |
|---|---|---|
samples.ingested | tag.name | SimulatedTagSource (per emitter cycle) |
samples.coalesced | tag.name | SimulatedTagSource (cell overwrite) |
telemetry.ingested | none | TagStreamPipelineService (per snapshot) |
telemetry.coalesced | none | TagStreamPipelineService (per drop) |
tags.active (observable gauge) | none | SimulatedTagSource (definition count) |
samples.* are per-tag (emit-side counters). telemetry.* are per-snapshot (consume-side counters). The two are not the same number — under typical configuration a snapshot represents many samples.ingested events.
2. Class shape
┌──────────────────────────────────────┐
│ appsettings.json │
│ "Simulator:Tags": [50 entries] │
│ Name, Unit, IntervalMs, Noise{…} │
└──────────────────┬───────────────────┘
│ binds via
│ IOptionsMonitor<SimulatorTagsOptions>
▼
┌──────────────────────────────────────┐
│ SimulatorTagsValidator │ ── ValidateOnStart
│ - no duplicate names │
│ - no empty names │
│ - IntervalMs in [2, 1000] │
│ - Noise.Kind ∈ {Sine, Drift, │
│ RandomWalk, Step} │
│ - Required fields per kind │
└──────────────────┬───────────────────┘
│
▼
┌────────────────────────────────────────────────────────────────────┐
│ SimulatedTagSource Implements: ITagStream + IDisposable │
│ (Infrastructure) │
│ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ 50× emitter Tasks (one Task.Run per TagDefinition) │ │
│ │ tick @ TagDefinition.IntervalMs │ │
│ │ value = NoiseModelEvaluator.Evaluate(model, now, │ │
│ │ ref _refState[name], _rng[name]) │ │
│ │ sample = TagSample(name, now, value, Good) │ │
│ │ _latestValues.AddOrUpdate(name, sample) │ │
│ │ ── on overwrite: _perTagCoalescedCounts[name]++ │ │
│ │ ── + samples.coalesced{tag.name}.Add(1) │ │
│ │ samples.ingested{tag.name}.Add(1) │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ writes │
│ ▼ │
│ ┌───────────────────────────────────────────────┐ │
│ │ ConcurrentDictionary<string, TagSample> │ │
│ │ _latestValues (per-tag latest cell) │ │
│ └───────────────────────────────────────────────┘ │
│ │ │
│ │ snapshot publisher reads │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ 1× snapshot publisher Task │ │
│ │ tick @ profile.TelemetryIntervalMs │ │
│ │ freeze _latestValues into ImmutableDictionary │ │
│ │ snapshot = TagSnapshot(now, frozen) │ │
│ │ if channel full → _snapshotCoalescedCount++ │ │
│ │ channel.Writer.TryWrite(snapshot) │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ Channel<TagSnapshot> capacity=1, DropOldest, single rw │ │
│ └─────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────┬──────────────────────────────────┘
│ IEncoderStream.Reader
▼
┌────────────────────────────────────────┐
│ TagStreamPipelineService │
│ (Application.Services) │
│ : BackgroundService │
│ │
│ ExecuteAsync drain: │
│ if SnapshotCoalescedCount delta > 0 │
│ Update(s with │
│ LatestTagValues = snapshot.Values,
│ PipelineCounters with │
│ TelemetryCoalesced = total) │
│ + Warning diagnostics entry │
│ else │
│ Update(s with │
│ LatestTagValues = snapshot.Values)
│ telemetry.ingested.Add(1) │
└────────────────┬───────────────────────┘
│ writes
▼
┌────────────────────────────────────────┐
│ AppStateStore │
│ AppState.LatestTagValues │
│ ImmutableDictionary<string, │
│ TagSample> │
└────────────────────────────────────────┘
│
▼
┌────────────────────────────────────────┐
│ MainViewModel │
│ reads LatestTagValues["temperature…"] │
│ LatestTagValues["pressure.bar"] │
│ … etc. │
└────────────────────────────────────────┘3. Lifecycle
SimulatedTagSource is registered as a singleton under both SimulatedTagSource and ITagStream (DI registers the same instance under two service types — that's why double-dispose protection is required, see traps). Constructed eagerly at host startup; emitters and the snapshot publisher start in the constructor, not in a separate IHostedService.StartAsync. This is intentional: there's no host-controlled StartAsync/StopAsync ordering — the source lives for the lifetime of the process from construction to disposal.
host startup host shutdown
│ │
▼ ▼
┌──────────────┐ ┌──────────────────────┐ ┌─────────────────┐
│ Construct │────────▶│ Producing │────────▶│ Disposed │
│ │ │ │ │ │
│ - read │ │ 50× emitter Tasks │ │ Interlocked- │
│ options │ │ tick independently │ │ Exchange │
│ - build │ │ per IntervalMs │ │ guard against │
│ refState[] │ │ │ │ double- │
│ - register │ │ 1× snapshot Task │ │ dispose; │
│ tags.active│ │ ticks at profile. │ │ _cts.Cancel(); │
│ gauge │ │ TelemetryIntervalMs│ │ channel.Writer. │
│ - start 50 │ │ │ │ TryComplete() │
│ emitter │ │ Profile changes │ │ │
│ tasks │ │ adjust the snapshot │ │ Emitter Tasks │
│ - start │ │ rate on next tick │ │ exit on the │
│ snapshot │ │ (no rebuild needed) │ │ next Task.Delay │
│ publisher │ │ │ │ throwing OCE │
└──────────────┘ └──────────────────────┘ └─────────────────┘Per-tag emit cadence is fixed at construction — read once from TagDefinition.IntervalMs per tag, never adjusted at runtime even if the profile changes. Profile changes affect only the snapshot publisher's rate (profile.TelemetryIntervalMs), not the underlying emitters. This is a deliberate design separation: the producer's per-tag accuracy is tied to its config, not to the operator's profile selection.
The snapshot publisher reads _profileProvider.CurrentProfile.TelemetryIntervalMs at the start of each tick, so a profile change picks up on the next snapshot — no timer rebuild needed because Task.Delay is used instead of PeriodicTimer (each iteration creates a fresh delay).
4. Runtime flow
The headline flow has two distinct cadences feeding one consumer:
emitter[i] cell-store snapshot channel pipeline store
(50× concurrent) (ConcurrentDict) publisher service
───────────── ──────────────── ───────── ─────── ───────── ─────
│ │
│── Task.Delay(IntervalMs) ──┐ │
│ │ │
│ noise = Eval(model, now, │ │
│ ref _refState[name], │ │
│ _rng[per-emitter]) │ │
│ ▼ │
│ AddOrUpdate(name, sample) │ ── on overwrite (emitter ran twice │
│ │ before snapshot publisher read): │
│ │ _perTagCoalescedCounts[name]++ │
│ │ samples.coalesced{tag.name}.Add(1) │
│ samples.ingested{tag.name}│ │
│ .Add(1) │ │
│ │ │
⋯ 49 more emitters concurrently … │
│ │
│ ┌── (independent) Task.Delay(profile.TelemetryIntervalMs) │
│ │ │
│◀──┤ │
│ │ freeze cells into ImmutableDictionary │
│ │ snapshot = TagSnapshot(now, dict) │
│ │ │
│ │ if channel.Reader.Count >= 1 │
│ │ _snapshotCoalescedCount++ │
│ │ TryWrite(snapshot) ──────▶ │ │
│ │ │── ReadAllAsync ──▶ │
│ │ │ │ │
│ │ │ │ if newCoalesces > 0:
│ │ │ │ Update(s with
│ │ │ │ LatestTagValues = …,
│ │ │ │ PipelineCounters w/
│ │ │ │ TelemetryCoalesced)
│ │ │ │ + Warning diag
│ │ │ │ else:
│ │ │ │ Update(s with
│ │ │ │ LatestTagValues = …)
│ │ │ │
│ │ │ │ telemetry.ingested.Add(1)
│ │ │ │ │
│ │ │ │ ▼
│ │ │ │ [AppStateStore.Update]The key observation: there are two coalesce paths, with different meanings.
samples.coalesced{tag.name}increments when an emitter writes its cell faster than the snapshot publisher reads from cells. This means the emitter's individual rate is faster than the snapshot rate. Expected at high tag rates (a 500 Hzvibration.x.mms2emitter naturally overwrites its cell ~25 times between 50 ms snapshots).telemetry.coalesced(snapshot-level) increments when the consumer (TagStreamPipelineService) lags behind the snapshot publisher and the channel drops a stale snapshot. This means the AppState write rate is slower than the snapshot rate. Should be near zero in healthy operation.
The first is a rate-aliasing artifact — by design. The second is back-pressure — a problem if it grows.
5. Decisions made during implementation
These are choices the spec intentionally abstracted, made concrete by the code.
(a) Two-stage producer (per-tag emitters + one snapshot publisher) instead of one timer that emits all 50 tags. The simpler design — one PeriodicTimer ticking at the slowest required interval, evaluating all 50 tags per tick — would force every tag to share the same effective rate, which is wrong for this domain. Real telemetry tags emit at wildly different rates: vibration at 500 Hz, ambient temperature at 1 Hz. Modelling that as 50 independent emitters is faithful to real machines and lets per-tag accuracy be evaluated independently. The cost is 50 concurrent Tasks — see traps for thread-pool concerns.
(b) Tag stream writes through AppState (inverse of SLICE-1.3's encoder bypass). The decision was deliberate at slice time: tag snapshots are intermittent (20 Hz typical) and the canonical store is the right place for them — UI command guards, run-summary capture, and recipe-validation logic all read tag values, and routing them through AppState keeps "what does the operator see" in one place. The encoder stream's 1 kHz rate would have dominated AppState.Update if routed the same way (50× the snapshot rate), which is why SLICE-1.3 chose the opposite. Phase 2.3 may revisit this if measurement shows the tag stream has become the dominant AppState.Update caller — that's exactly what SLICE-2.0 is being opened to measure.
(c) Per-emitter Random instances instead of Random.Shared. Each emitter constructs its own Random (via new Random() in the emitter loop). Sharing Random.Shared across 50 concurrent emitters would create contention that's measurable at the tag rates SLICE-1.1 produces. The dedicated Random per emitter is single-threaded by construction; no synchronization needed.
(d) ConcurrentDictionary<string, TagSample> for the cell-store. The cell-store is read by one Task (the snapshot publisher) and written by 50 Tasks (the emitters). ConcurrentDictionary.AddOrUpdate provides per-key locking; the snapshot publisher's read is a foreach over the dictionary which gets a consistent enumeration without locking the whole structure. Memory footprint: 50 entries × ~100 bytes per TagSample + dictionary overhead ≈ 8 KB total — negligible.
(e) Snapshot publisher uses Task.Delay not PeriodicTimer. The snapshot rate changes when the operator switches profiles (e.g., Normal 200 ms → MultiTag 50 ms). Using Task.Delay and re-reading profile.TelemetryIntervalMs each iteration picks up profile changes on the next tick without any timer-rebuild logic. The encoder source uses PeriodicTimer because it cares about precise tick alignment at high rates; the tag snapshot publisher doesn't — 50 ms ± a few ms is fine.
(f) _perTagCoalescedCounts is a ConcurrentDictionary even though only the producer writes it. The interface (ITagStream.PerTagCoalescedCount(string)) is read from outside the producer (UI queries, tests). Concurrent dictionary makes that read thread-safe without extra locking.
(g) tags.active is an observable gauge, not a counter. The number of active tags is a property of the configuration (read once at construction and stored in _definitions.Count), not a rate. Observable gauges support exactly this case — Func<long> polled by the meter when read. The producer registers itself as the provider via _metrics.RegisterTagsActiveProvider(() => _definitions.Count).
(h) tags.active returns 0 when the gauge is unregistered. The AppMetrics.cs constructor stores the provider as a nullable Func<long>?; the gauge polls it with ?? 0L fallback. This means a capture taken before SimulatedTagSource is constructed (e.g., during host startup) shows 0 — not a missing reading. Reviewers debugging tags.active = 0 in a capture should check the producer's startup log first; if SimulatedTagSource constructed but the gauge reads 0, the provider registration failed (an early-Pass-3 bug — was fixed by the explicit RegisterTagsActiveProvider call).
6. Invariants and traps
50 concurrent Tasks on the .NET thread pool. Each emitter is a Task.Run(() => EmitTagAsync(...)) that loops on Task.Delay. At 500 Hz emit (the fastest-configured tags) each emitter is scheduled ~500 times per second, so the pool sees ~9 000 task-resume events per second across all 50 tags. The default thread pool handles this comfortably on any modern host but is not free — under contention from CPU-heavy work elsewhere, individual emitters can drift their rates by 5-15% (the criterion-7 amendment quantifies this for the 10/50 Hz bands). Don't add more emitters without re-measuring. Phase 2.3 (data-plane lift-out) is the right place to consider switching to a single-timer-many-tags design if the tag count grows substantially.
Per-tag emit cadence is fixed at construction. TagDefinition.IntervalMs is read once per tag at construction; the emitter's Task.Delay uses that fixed interval forever. Switching profiles does NOT change per-tag rates. If you want runtime-adjustable tag rates, you need to either: (a) re-read the definition each tick and rebuild the delay, or (b) restart the producer. Neither is implemented; both are out of scope for SLICE-1.1.
Double-dispose protection is load-bearing. SimulatedTagSource is registered in DI under both SimulatedTagSource (the concrete type) and ITagStream (the interface). The DI container tracks both registrations and calls Dispose() on each at host disposal — yielding two calls on the same instance. The Interlocked.Exchange(ref _disposed, 1) != 0 guard at the top of Dispose() makes the second call a no-op. If you remove the guard, the second call hits an already-disposed _cts and throws ObjectDisposedException — which TASK-1.5.1 documented as a real bug-fix. Don't remove the guard.
Task.Delay ignores the configured interval at sub-tick precision. On Windows the default timer resolution is ~15.6 ms. The 2 ms (vibration.*) and 4 ms (encoder.*.counts, tilt.*, stage.z.um) tags can't actually emit at their configured rates without a winmm.timeBeginPeriod(1) boost — and SLICE-1.1 doesn't acquire that boost (only SLICE-1.3's encoder source does). Result: tags at 100 Hz+ cap near 64 Hz on Windows. The criterion-7 amendment documents this — the per-tag accuracy bands are documented-not-gated. If you ever need sub-15 ms tag accuracy on Windows, follow the encoder source's pattern (WinMmTimePeriod.AcquireOrFallback) — but acquiring a system-wide boost from the tag source has different consequences than from the encoder (long-running boost vs. high-rate-only).
samples.coalesced does NOT mean "data was lost." It means an emitter overwrote a cell faster than the snapshot publisher consumed it. The overwrite is by design — only the latest cell value is meaningful for snapshot semantics. The counter is informational (helps interpret per-tag rates) not a fault indicator. Don't alarm on samples.coalesced > 0 — high-rate tags will always show non-zero counts.
telemetry.coalesced IS meaningful. This counter increments when the consumer (TagStreamPipelineService) lags behind the snapshot publisher and the channel drops a stale snapshot. Healthy operation should show near-zero values — under sustained backpressure it grows linearly with the lag. The Soak8h capture (slice-1-4-soak-8h) showed 12 telemetry coalesces over 28 809 s — about 1 per 40 minutes — consistent with TelemetryDropoutChance = 0.01 deferring snapshots, not with structural backpressure.
Reserved tag names. temperature.celsius and pressure.bar are named in the spec as reserved — MainViewModel's legacy readout binds to these specific keys. The seed configuration keeps them present, but the current validator does not enforce those two names by itself. Don't rename them unless you also update the UI binding sites; the binding doesn't fail at startup, it just shows blank readouts at runtime.
7. Test surface
Covered by unit tests:
NoiseModelEvaluatorfor each of the four variants: produces expected values, random-walk respects clamps, Box-Muller sample is approximately standard-normal.SimulatorTagsValidatorrejects duplicates, empty names, out-of-range intervals, missing noise parameters, and unknown noise kinds.TagStreamPipelineServicedrains aFakeTagStreamand updatesLatestTagValues; coalesce events produce a Warning diagnostics entry; theTelemetryCoalescedcounter increments correctly.MainViewModelcorrectly projectsLatestTagValues["temperature.celsius"]andLatestTagValues["pressure.bar"]through to the readout properties.AppMetricsper-tag dimensions: emitting a sample withtag.name="X"and another withtag.name="Y"produces two distinct dimension series in aMeterListenerreadback.SimulatedTagSourcedouble-dispose guard (SimulatedTagSourceDisposalTests).
Covered by capture (slice-1-1-multi-tag-telemetry row):
- All 50 configured tags appear in
samples.ingested(criterion that the seed config and the validator agree). - Per-tag rates are within the criterion-7 amended bands: ≤ 5 Hz tags within ±2%; 10 Hz hits ~9.2 Hz; 50 Hz hits ~32 Hz; ≥ 100 Hz tags cap at ~64 Hz on default Windows tick.
- Aggregate
telemetry.ingested ratematches the profile'sTelemetryIntervalMsexactly (e.g., 19.7 Hz under 50 ms profiles).
Not covered (intentional gaps):
- Profile-change-mid-run snapshot-rate adjustment. Manual smoke only — switch profiles, observe the dotnet-counters rate change. No automated test because it would require mock-time infrastructure or behavioral assertions on
Task.Delay(both fragile). - High-contention thread-pool starvation. No test runs a CPU-saturating workload alongside the 50 emitters and asserts how rates degrade. Falls under the criterion-7 amendment's "documented not gated" — empirically captured rather than mechanically verified.
- Reserved-name binding regression.
MainViewModelbinds toLatestTagValues["temperature.celsius"]directly; if a future refactor renames the reserved tag, the binding silently shows blank rather than failing. There's no test that the rendered UI displays the right values; visual smoke at app startup is the surface that catches this. Phase 3 UI work should add binding-fidelity tests if it touches these readouts.
Notably absent test: there is no test for "consumer crashes with LatestTagValues corrupted." The pipeline service's exception handling catches OperationCanceledException during normal shutdown but doesn't catch other exceptions — a malformed snapshot would propagate and crash the host. The spec-time decision was that snapshots can't realistically be malformed (the producer always builds them via the same code path); revisit if a real driver replaces the simulator.