Skip to contents

The three-phase skeleton pipeline

This vignette describes how swereg builds and incrementally rebuilds per-batch skeleton files in production. If you just want to create a small skeleton and attach some diagnoses by hand, read vignette("skeleton-create") first. This vignette is for people running full-registry pipelines where total build time is measured in days and every iteration matters.

The problem

A production swereg pipeline on Swedish registry data typically has two dominant costs:

  1. Rawbatch creation: split the raw registry files into per-person-batch subsets, save as .qs2. For national-registry scale this is typically many hours of I/O.
  2. Skeleton processing: for each batch, build a person-week time grid, apply censoring, add demographics from LISA, classify exposure from LMED, and match ICD-10 / ATC / operation codes across the subset. Typically dominates total pipeline time.

Together these add up to a multi-day first build for national-scale studies. Before the three-phase migration, editing one ICD-10 code triggered the full rebuild because the skeleton was saved as a bare data.table with no provenance to diff against.

The goal of the three-phase pipeline is to make that 62h cost proportional to the size of the change: editing one code pattern should cost a few minutes, not days.

Three phases, three cadences of change

The pipeline splits skeleton construction into three phases, each with its own invalidation strategy. Per batch, RegistryStudy$process_skeletons() runs the phases in order, but only re-runs the parts that actually changed.

Phase 1 – framework

A single user-supplied function, registered via $register_framework(fn), with signature function(batch_data, config). It returns a fresh data.table containing the base time grid plus structural censoring (immigration, emigration, death). Nothing downstream can exist without it.

Invalidation: full rebuild whenever digest::digest(list(body(fn), formals(fn)), "xxhash64") changes. Framework edits are expected to be rare (you don’t change the time grid on a whim), so a conservative “always rebuild on hash change” is fine. When phase 1 re-runs, phase 2 and phase 3 state are discarded and re-applied from scratch on the fresh base.

Phase 3 – randvars

An ordered named list of user-supplied functions, each registered via $register_randvars(name, fn). Each has signature function(skeleton, batch_data, config) and mutates the skeleton in place (or, less commonly, returns a new data.table if it filters rows).

Registration order is execution order. Editing one step should replay that step and everything downstream of it but leave upstream steps untouched.

Invalidation: divergence-point rewind-and-replay. swereg scans the stored (name, hash) sequence against the current one, finds the first position where either the name or the hash differs, drops the stored added_columns of every step from that point forward, and then replays current steps from that point. Add, remove, edit, and reorder are all handled uniformly because any of those operations changes either the name sequence or the hash sequence.

A typical phase 3 registration block looks like:

study$register_randvars("demographics",        rv_demographics)
study$register_randvars("exposure",            rv_exposure)
study$register_randvars("baseline_exclusion",  rv_baseline_exclusion)

If baseline_exclusion reads a column produced by exposure (a common pattern: exclusion depends on exposure classification), then editing exposure triggers the divergence point at step 2 and both step 2 and step 3 replay. That dependency is implicit in the registration order – there is no explicit dep graph.

Phase 2 – code registry

A list of code registrations, each added via $register_codes() (primary) or $register_derived_codes() (derived). Each entry contributes one or more columns to the skeleton; the shape of those columns is predicted from metadata, not observed from running fn.

Invalidation: per-entry fingerprint diff. Skeleton$sync_with_registry() computes to_drop = stored_fingerprints - current_fingerprints and to_add = current_fingerprints - stored_fingerprints, drops stale entries’ columns, then re-applies new entries in registration order. When you change one code in one entry, every other entry’s fingerprint is unchanged, so only that one entry re-runs.

Phase 2 runs AFTER phase 3 in the current orchestration. Phase-3 steps CANNOT depend on phase-2 columns. If you find yourself wanting that, push the logic into phase 3 (read raw data yourself) or restructure.

The Skeleton R6 class

Each per-batch file on disk is a serialized Skeleton object, not a bare data.table. A Skeleton carries its own provenance:

sk <- study$load_skeleton(batch_number = 1L)
sk
#> <Skeleton batch 1>
#>   rows:             8,234,112
#>   cols:             287
#>   framework_hash:   abcd1234efgh
#>   randvars steps:   3
#>   applied codes:    127
#>   pipeline_hash:    4d8f99af7b2c

sk$data                          # the underlying data.table
sk$framework_fn_hash             # phase-1 hash
names(sk$randvars_state)         # applied phase-3 step names in order
length(sk$applied_registry)      # phase-2 entries currently materialized
sk$pipeline_hash()               # rolled-up provenance scalar

Why R6 and not a bare data.table?

Because the provenance IS the interesting state. Without it, you can’t answer “is this skeleton still valid for the current pipeline?” without either rebuilding from scratch or trusting a separate sidecar file. With it, every batch file can answer the question locally via sk$pipeline_hash() == study$pipeline_hash().

The qs2 over-allocation gotcha

data.table maintains over-allocated pointer slots (“truelength”) so that in-place := doesn’t have to reallocate. qs2 serialization drops those slots, so after qs_read() the table has truelength == 0 and the next := inside a function triggers a silent reallocation that breaks the caller’s reference.

RegistryStudy$load_skeleton() fixes this by calling data.table::setalloccol() on the freshly-loaded table:

obj$data <- data.table::setalloccol(
  obj$data, n = getOption("datatable.alloccol", 4096L)
)

The reassignment is mandatory – setalloccol() returns a new pointer. data.table::copy() would also work but doubles memory use, which matters when skeletons are multi-GB per batch.

The code registry: primary and derived entries

Primary entries via register_codes()

A primary entry ties a code list to a fn (e.g. add_diagnoses, add_cods, add_rx) and a set of raw-data groups to apply it to. Optional combine_as runs fn once more on the rbind of all groups to produce a combined column.

study$register_codes(
  codes      = list(f20 = c("F20"), vte = c("I26", "I80")),
  fn         = swereg::add_diagnoses,
  groups     = list(ov = "outpatient", sv = "inpatient"),
  combine_as = "os"
)

For each (group_prefix, code_name) pair, a column is written: ov_f20, sv_f20, ov_vte, sv_vte. Because combine_as = "os" is set, two additional columns are produced: os_f20 = ov_f20 | sv_f20 and os_vte = ov_vte | sv_vte (computed by re-running add_diagnoses on the rbind of outpatient and inpatient rawbatch data, not by ORing the columns).

The DORS gotcha and register_derived_codes()

The pre-derived-codes approach tried to be clever: register ICD-10 codes once with groups = list(ov, sv, dors) and combine_as = "osd", hoping to get one column covering all three sources. This failed silently because add_diagnoses searches for hdia/dia*/ekod*/icd* columns – none of which exist in the cause-of-death registry. DORS uses ulorsak (underlying cause) and morsak* (contributing causes). So dors_f20 was always FALSE and osd_f20 = ov_f20 | sv_f20 | FALSE was effectively just hospital data. Deaths never contributed to outcomes.

The fix needs two different functions – add_diagnoses for hospital, add_cods for DORS – sharing one logical combined column. combine_as can’t express that: it reruns the SAME fn on rbind data. So we need a way to OR together columns that were produced by different registrations.

Enter register_derived_codes():

# 1. Hospital (OV + SV) -> ov_*, sv_*, os_*
study$register_codes(
  codes      = ICD10_CODES,
  fn         = swereg::add_diagnoses,
  groups     = list(ov = "outpatient", sv = "inpatient"),
  combine_as = "os"
)

# 2. DORS underlying cause -> dorsu_*
study$register_codes(
  codes   = ICD10_CODES,
  fn      = swereg::add_cods,
  fn_args = list(cod_type = "underlying"),
  groups  = list(dorsu = "dors")
)

# 3. DORS contributing causes -> dorsm_*
study$register_codes(
  codes   = ICD10_CODES,
  fn      = swereg::add_cods,
  fn_args = list(cod_type = "multiple"),
  groups  = list(dorsm = "dors")
)

# 4. Derived: osd_<nm> = os_<nm> | dorsu_<nm> | dorsm_<nm>
study$register_derived_codes(
  codes = ICD10_CODES,
  from  = c("os", "dorsu", "dorsm"),
  as    = "osd"
)

A derived entry doesn’t read rawbatch data at all. It iterates its own codes list, looks up get("<from[1]>_<nm>"), get("<from[2]>_<nm>"), etc. on the skeleton, ORs them, and writes <as>_<nm>. Missing source columns raise a loud error. Because it runs in registration order during phase 2 sync, upstream primary entries are guaranteed to have been applied already.

Fingerprint cascade for derived entries

Derived entries fold the fingerprints of their upstream primary entries into their own fingerprint. That means editing ANY upstream field – fn_args, groups, even a single code pattern – cascades into derived replay without the user having to touch the derived entry. If you flip cod_type = "underlying" to cod_type = "both" on the DORS primary, the next process_skeletons() run drops the derived entry’s columns and rewrites them from the new dorsu output, all on every batch, in order, without rebuilding anything else.

Cascade is implemented in code_registry_fingerprints() as a two-pass walk:

  1. Compute every primary entry’s fingerprint (depends only on codes, label, groups, fn_args, combine_as, exactly as in the pre-derived release).
  2. For each derived entry, walk entries registered before it, collect the primary fingerprints whose groups prefixes or combine_as appear in the derived’s from, and hash (kind="derived", codes, from, as, upstream_fps).

Primary fingerprint stability is preserved (same payload shape as before the derived feature landed) so the release doesn’t accidentally invalidate any existing skeletons.

Registering the pipeline

A complete runner script has the shape:

study <- swereg::RegistryStudy$new(
  data_rawbatch_dir = c("/mnt/shared/rawbatch/", "C:/shared/rawbatch/"),
  data_skeleton_dir = c("/mnt/shared/skeleton/", "C:/shared/skeleton/"),
  data_raw_dir      = c("/mnt/shared/raw/",      "C:/shared/raw/"),
  group_names       = c("lmed", "inpatient", "outpatient",
                        "cancer", "dors", "other")
)

# Phase 1
study$register_framework(my_framework_fn)

# Phase 3 (registration order = execution order)
study$register_randvars("demographics",        rv_demographics)
study$register_randvars("exposure",            rv_exposure)
study$register_randvars("baseline_exclusion",  rv_baseline_exclusion)

# Phase 2: primaries first, then derived
study$register_codes(ICD10_CODES, swereg::add_diagnoses,
  groups = list(ov = "outpatient", sv = "inpatient"), combine_as = "os")
study$register_codes(ICD10_CODES, swereg::add_cods,
  fn_args = list(cod_type = "underlying"),
  groups = list(dorsu = "dors"))
study$register_codes(ICD10_CODES, swereg::add_cods,
  fn_args = list(cod_type = "multiple"),
  groups = list(dorsm = "dors"))
study$register_derived_codes(ICD10_CODES,
  from = c("os", "dorsu", "dorsm"), as = "osd")

# ... rawbatch creation (one-time) ...

study$process_skeletons(n_workers = 4L)

The critical invariant: the framework, randvars, and code registry must all be registered on study BEFORE $process_skeletons() is called. There is no callback argument on $process_skeletons() – the pipeline is declared on the study itself, not passed in per call.

The reload-clobber anti-pattern

A subtle failure mode when running the pipeline repeatedly:

# WRONG
study <- swereg::RegistryStudy$new(...)
study$register_framework(current_framework_fn)     # fresh in-memory
study$register_codes(current_code_list, ...)       # fresh in-memory

if (file.exists(study$meta_file)) {
  study <- swereg::qs2_read(study$meta_file)       # CLOBBER
}

The reload path replaces the freshly-registered pipeline with the stale on-disk version, silently reverting any edits you just made. The fix is to absorb only the persisted runtime state (IDs, batches, groups_saved) into the fresh study without touching its config:

# CORRECT
if (file.exists(study$meta_file)) {
  study$adopt_runtime_state_from(swereg::qs2_read(study$meta_file))
}

Provenance verification

Each Skeleton can compute its own pipeline_hash() from its stored phase provenance. The study exposes the expected value:

sk <- study$load_skeleton(1L)
sk$pipeline_hash() == study$pipeline_hash()
# TRUE iff this batch is fully synced with the current pipeline

Two summary helpers:

# Per-batch table with framework hash, randvars count, code count, saved_at
study$skeleton_pipeline_hashes()

# Errors loudly if any persisted skeleton is out of sync
study$assert_skeletons_consistent()

Downstream TTE consumers call $assert_skeletons_consistent() automatically near the top of tteplan_from_spec_and_registrystudy(), so you can’t accidentally build a plan against a half-upgraded skeleton set.

Cross-host pipeline snapshots

If data_pipeline_snapshot_dir is configured at construction time, $process_skeletons() writes one TSV row per host to {snapshot_dir}/{host_label}.tsv after each successful run. The snapshot file is meant to be git add-ed and committed, so the history of who ran what version of the pipeline on which host lives in git log.

Why per-host files and not a single append-only log: the user typically runs on 3+ hosts in parallel. A shared append-only file would conflict on every concurrent run. Per-host files never have the same name, so concurrent runs produce separate commits that merge without conflict.

A tiny helper shell script like dev/commit_pipeline_snapshot.sh (in your own project repo) can wrap git add + git commit into a one-liner the operator runs after the generator finishes. swereg never runs git commands itself – that would be a side effect the user hasn’t explicitly asked for.

Expected re-run costs

After the first full build:

Edit Cost
Nothing Per-batch no-op: load Skeleton, see nothing changed, save. Seconds per batch.
One code pattern in one primary entry Drop that entry’s columns on all batches, re-apply it. Roughly (phase-2 total) / (N primaries).
One primary’s fn_args (e.g. cod_type) Same as above, plus the cascade re-applies any derived entries that depend on it.
One phase-3 step’s body Drop that step’s columns on all batches, replay it and everything downstream of it in the randvars sequence.
The framework function’s body Full rebuild from scratch, all phases. Rare.

For a national-registry pipeline with a multi-day first build, the expected “edit one ICD-10 code” cost drops from the full rebuild down to minutes. That’s the headline win.

Summary

  • Per-batch skeleton files are Skeleton R6 objects carrying framework hash, ordered randvars state, and the applied code registry fingerprint map.
  • Three phases with independent invalidation: framework (full rebuild on hash change), randvars (divergence-point rewind and replay), codes (per-entry fingerprint diff).
  • Primary and derived code entries let you combine outputs from different registration functions into one logical column. Derived entries cascade upstream fingerprint changes automatically.
  • Provenance verification via pipeline_hash() lets any consumer check whether a skeleton matches the current pipeline.
  • adopt_runtime_state_from() is the correct reload pattern – never clobber the freshly-registered pipeline by reassigning from disk.

For further context: