The three-phase skeleton pipeline
This vignette describes how swereg builds and
incrementally rebuilds per-batch skeleton files in production. If you
just want to create a small skeleton and attach some diagnoses by hand,
read vignette("skeleton-create") first. This vignette is
for people running full-registry pipelines where total build time is
measured in days and every iteration matters.
The problem
A production swereg pipeline on Swedish registry data typically has two dominant costs:
-
Rawbatch creation: split the raw registry files
into per-person-batch subsets, save as
.qs2. For national-registry scale this is typically many hours of I/O. - Skeleton processing: for each batch, build a person-week time grid, apply censoring, add demographics from LISA, classify exposure from LMED, and match ICD-10 / ATC / operation codes across the subset. Typically dominates total pipeline time.
Together these add up to a multi-day first build for national-scale
studies. Before the three-phase migration, editing one ICD-10 code
triggered the full rebuild because the skeleton was saved as a bare
data.table with no provenance to diff against.
The goal of the three-phase pipeline is to make that 62h cost proportional to the size of the change: editing one code pattern should cost a few minutes, not days.
Three phases, three cadences of change
The pipeline splits skeleton construction into three phases, each
with its own invalidation strategy. Per batch,
RegistryStudy$process_skeletons() runs the phases in order,
but only re-runs the parts that actually changed.
Phase 1 – framework
A single user-supplied function, registered via
$register_framework(fn), with signature
function(batch_data, config). It returns a fresh
data.table containing the base time grid plus structural
censoring (immigration, emigration, death). Nothing downstream can exist
without it.
Invalidation: full rebuild whenever
digest::digest(list(body(fn), formals(fn)), "xxhash64")
changes. Framework edits are expected to be rare (you don’t change the
time grid on a whim), so a conservative “always rebuild on hash change”
is fine. When phase 1 re-runs, phase 2 and phase 3 state are discarded
and re-applied from scratch on the fresh base.
Phase 3 – randvars
An ordered named list of user-supplied functions, each registered via
$register_randvars(name, fn). Each has signature
function(skeleton, batch_data, config) and mutates the
skeleton in place (or, less commonly, returns a new
data.table if it filters rows).
Registration order is execution order. Editing one step should replay that step and everything downstream of it but leave upstream steps untouched.
Invalidation: divergence-point
rewind-and-replay. swereg scans the stored
(name, hash) sequence against the current one, finds the
first position where either the name or the hash differs, drops the
stored added_columns of every step from that point forward,
and then replays current steps from that point. Add, remove, edit, and
reorder are all handled uniformly because any of those operations
changes either the name sequence or the hash sequence.
A typical phase 3 registration block looks like:
study$register_randvars("demographics", rv_demographics)
study$register_randvars("exposure", rv_exposure)
study$register_randvars("baseline_exclusion", rv_baseline_exclusion)If baseline_exclusion reads a column produced by
exposure (a common pattern: exclusion depends on exposure
classification), then editing exposure triggers the
divergence point at step 2 and both step 2 and step 3 replay. That
dependency is implicit in the registration order – there is no explicit
dep graph.
Phase 2 – code registry
A list of code registrations, each added via
$register_codes() (primary) or
$register_derived_codes() (derived). Each entry contributes
one or more columns to the skeleton; the shape of those columns is
predicted from metadata, not observed from running fn.
Invalidation: per-entry fingerprint diff.
Skeleton$sync_with_registry() computes
to_drop = stored_fingerprints - current_fingerprints and
to_add = current_fingerprints - stored_fingerprints, drops
stale entries’ columns, then re-applies new entries in registration
order. When you change one code in one entry, every other entry’s
fingerprint is unchanged, so only that one entry re-runs.
Phase 2 runs AFTER phase 3 in the current orchestration. Phase-3 steps CANNOT depend on phase-2 columns. If you find yourself wanting that, push the logic into phase 3 (read raw data yourself) or restructure.
The Skeleton R6 class
Each per-batch file on disk is a serialized Skeleton
object, not a bare data.table. A Skeleton
carries its own provenance:
sk <- study$load_skeleton(batch_number = 1L)
sk
#> <Skeleton batch 1>
#> rows: 8,234,112
#> cols: 287
#> framework_hash: abcd1234efgh
#> randvars steps: 3
#> applied codes: 127
#> pipeline_hash: 4d8f99af7b2c
sk$data # the underlying data.table
sk$framework_fn_hash # phase-1 hash
names(sk$randvars_state) # applied phase-3 step names in order
length(sk$applied_registry) # phase-2 entries currently materialized
sk$pipeline_hash() # rolled-up provenance scalarWhy R6 and not a bare data.table?
Because the provenance IS the interesting state. Without it, you
can’t answer “is this skeleton still valid for the current pipeline?”
without either rebuilding from scratch or trusting a separate sidecar
file. With it, every batch file can answer the question locally via
sk$pipeline_hash() == study$pipeline_hash().
The qs2 over-allocation gotcha
data.table maintains over-allocated pointer slots
(“truelength”) so that in-place := doesn’t have to
reallocate. qs2 serialization drops those slots, so after
qs_read() the table has truelength == 0 and
the next := inside a function triggers a silent
reallocation that breaks the caller’s reference.
RegistryStudy$load_skeleton() fixes this by calling
data.table::setalloccol() on the freshly-loaded table:
obj$data <- data.table::setalloccol(
obj$data, n = getOption("datatable.alloccol", 4096L)
)The reassignment is mandatory – setalloccol() returns a
new pointer. data.table::copy() would also work but doubles
memory use, which matters when skeletons are multi-GB per batch.
The code registry: primary and derived entries
Primary entries via register_codes()
A primary entry ties a code list to a fn
(e.g. add_diagnoses, add_cods,
add_rx) and a set of raw-data groups to apply it to.
Optional combine_as runs fn once more on the
rbind of all groups to produce a combined column.
study$register_codes(
codes = list(f20 = c("F20"), vte = c("I26", "I80")),
fn = swereg::add_diagnoses,
groups = list(ov = "outpatient", sv = "inpatient"),
combine_as = "os"
)For each (group_prefix, code_name) pair, a column is
written: ov_f20, sv_f20, ov_vte,
sv_vte. Because combine_as = "os" is set, two
additional columns are produced: os_f20 = ov_f20 | sv_f20
and os_vte = ov_vte | sv_vte (computed by re-running
add_diagnoses on the rbind of outpatient and inpatient
rawbatch data, not by ORing the columns).
The DORS gotcha and register_derived_codes()
The pre-derived-codes approach tried to be clever: register ICD-10
codes once with groups = list(ov, sv, dors) and
combine_as = "osd", hoping to get one column covering all
three sources. This failed silently because add_diagnoses
searches for
hdia/dia*/ekod*/icd*
columns – none of which exist in the cause-of-death registry. DORS uses
ulorsak (underlying cause) and morsak*
(contributing causes). So dors_f20 was always
FALSE and osd_f20 = ov_f20 | sv_f20 | FALSE
was effectively just hospital data. Deaths never contributed to
outcomes.
The fix needs two different functions – add_diagnoses
for hospital, add_cods for DORS – sharing one logical
combined column. combine_as can’t express that: it reruns
the SAME fn on rbind data. So we need a way to OR together
columns that were produced by different registrations.
Enter register_derived_codes():
# 1. Hospital (OV + SV) -> ov_*, sv_*, os_*
study$register_codes(
codes = ICD10_CODES,
fn = swereg::add_diagnoses,
groups = list(ov = "outpatient", sv = "inpatient"),
combine_as = "os"
)
# 2. DORS underlying cause -> dorsu_*
study$register_codes(
codes = ICD10_CODES,
fn = swereg::add_cods,
fn_args = list(cod_type = "underlying"),
groups = list(dorsu = "dors")
)
# 3. DORS contributing causes -> dorsm_*
study$register_codes(
codes = ICD10_CODES,
fn = swereg::add_cods,
fn_args = list(cod_type = "multiple"),
groups = list(dorsm = "dors")
)
# 4. Derived: osd_<nm> = os_<nm> | dorsu_<nm> | dorsm_<nm>
study$register_derived_codes(
codes = ICD10_CODES,
from = c("os", "dorsu", "dorsm"),
as = "osd"
)A derived entry doesn’t read rawbatch data at all. It iterates its
own codes list, looks up
get("<from[1]>_<nm>"),
get("<from[2]>_<nm>"), etc. on the skeleton,
ORs them, and writes <as>_<nm>. Missing source
columns raise a loud error. Because it runs in registration order during
phase 2 sync, upstream primary entries are guaranteed to have been
applied already.
Fingerprint cascade for derived entries
Derived entries fold the fingerprints of their upstream primary
entries into their own fingerprint. That means editing ANY upstream
field – fn_args, groups, even a single code
pattern – cascades into derived replay without the user having to touch
the derived entry. If you flip cod_type = "underlying" to
cod_type = "both" on the DORS primary, the next
process_skeletons() run drops the derived entry’s columns
and rewrites them from the new dorsu output, all on every batch, in
order, without rebuilding anything else.
Cascade is implemented in code_registry_fingerprints()
as a two-pass walk:
- Compute every primary entry’s fingerprint (depends only on
codes, label, groups, fn_args, combine_as, exactly as in the pre-derived release). - For each derived entry, walk entries registered before it, collect
the primary fingerprints whose
groupsprefixes orcombine_asappear in the derived’sfrom, and hash(kind="derived", codes, from, as, upstream_fps).
Primary fingerprint stability is preserved (same payload shape as before the derived feature landed) so the release doesn’t accidentally invalidate any existing skeletons.
Registering the pipeline
A complete runner script has the shape:
study <- swereg::RegistryStudy$new(
data_rawbatch_dir = c("/mnt/shared/rawbatch/", "C:/shared/rawbatch/"),
data_skeleton_dir = c("/mnt/shared/skeleton/", "C:/shared/skeleton/"),
data_raw_dir = c("/mnt/shared/raw/", "C:/shared/raw/"),
group_names = c("lmed", "inpatient", "outpatient",
"cancer", "dors", "other")
)
# Phase 1
study$register_framework(my_framework_fn)
# Phase 3 (registration order = execution order)
study$register_randvars("demographics", rv_demographics)
study$register_randvars("exposure", rv_exposure)
study$register_randvars("baseline_exclusion", rv_baseline_exclusion)
# Phase 2: primaries first, then derived
study$register_codes(ICD10_CODES, swereg::add_diagnoses,
groups = list(ov = "outpatient", sv = "inpatient"), combine_as = "os")
study$register_codes(ICD10_CODES, swereg::add_cods,
fn_args = list(cod_type = "underlying"),
groups = list(dorsu = "dors"))
study$register_codes(ICD10_CODES, swereg::add_cods,
fn_args = list(cod_type = "multiple"),
groups = list(dorsm = "dors"))
study$register_derived_codes(ICD10_CODES,
from = c("os", "dorsu", "dorsm"), as = "osd")
# ... rawbatch creation (one-time) ...
study$process_skeletons(n_workers = 4L)The critical invariant: the framework, randvars, and code registry
must all be registered on study BEFORE
$process_skeletons() is called. There is no callback
argument on $process_skeletons() – the pipeline is declared
on the study itself, not passed in per call.
The reload-clobber anti-pattern
A subtle failure mode when running the pipeline repeatedly:
# WRONG
study <- swereg::RegistryStudy$new(...)
study$register_framework(current_framework_fn) # fresh in-memory
study$register_codes(current_code_list, ...) # fresh in-memory
if (file.exists(study$meta_file)) {
study <- swereg::qs2_read(study$meta_file) # CLOBBER
}The reload path replaces the freshly-registered pipeline with the stale on-disk version, silently reverting any edits you just made. The fix is to absorb only the persisted runtime state (IDs, batches, groups_saved) into the fresh study without touching its config:
# CORRECT
if (file.exists(study$meta_file)) {
study$adopt_runtime_state_from(swereg::qs2_read(study$meta_file))
}Provenance verification
Each Skeleton can compute its own
pipeline_hash() from its stored phase provenance. The study
exposes the expected value:
sk <- study$load_skeleton(1L)
sk$pipeline_hash() == study$pipeline_hash()
# TRUE iff this batch is fully synced with the current pipelineTwo summary helpers:
# Per-batch table with framework hash, randvars count, code count, saved_at
study$skeleton_pipeline_hashes()
# Errors loudly if any persisted skeleton is out of sync
study$assert_skeletons_consistent()Downstream TTE consumers call
$assert_skeletons_consistent() automatically near the top
of tteplan_from_spec_and_registrystudy(), so you can’t
accidentally build a plan against a half-upgraded skeleton set.
Cross-host pipeline snapshots
If data_pipeline_snapshot_dir is configured at
construction time, $process_skeletons() writes one TSV row
per host to {snapshot_dir}/{host_label}.tsv after each
successful run. The snapshot file is meant to be git add-ed
and committed, so the history of who ran what version of the pipeline on
which host lives in git log.
Why per-host files and not a single append-only log: the user typically runs on 3+ hosts in parallel. A shared append-only file would conflict on every concurrent run. Per-host files never have the same name, so concurrent runs produce separate commits that merge without conflict.
A tiny helper shell script like
dev/commit_pipeline_snapshot.sh (in your own project repo)
can wrap git add + git commit into a one-liner
the operator runs after the generator finishes. swereg never runs git
commands itself – that would be a side effect the user hasn’t explicitly
asked for.
Expected re-run costs
After the first full build:
| Edit | Cost |
|---|---|
| Nothing | Per-batch no-op: load Skeleton, see nothing changed,
save. Seconds per batch. |
| One code pattern in one primary entry | Drop that entry’s columns on all batches, re-apply it. Roughly
(phase-2 total) / (N primaries). |
One primary’s fn_args (e.g. cod_type) |
Same as above, plus the cascade re-applies any derived entries that depend on it. |
| One phase-3 step’s body | Drop that step’s columns on all batches, replay it and everything downstream of it in the randvars sequence. |
| The framework function’s body | Full rebuild from scratch, all phases. Rare. |
For a national-registry pipeline with a multi-day first build, the expected “edit one ICD-10 code” cost drops from the full rebuild down to minutes. That’s the headline win.
Summary
-
Per-batch skeleton files are
SkeletonR6 objects carrying framework hash, ordered randvars state, and the applied code registry fingerprint map. - Three phases with independent invalidation: framework (full rebuild on hash change), randvars (divergence-point rewind and replay), codes (per-entry fingerprint diff).
- Primary and derived code entries let you combine outputs from different registration functions into one logical column. Derived entries cascade upstream fingerprint changes automatically.
-
Provenance verification via
pipeline_hash()lets any consumer check whether a skeleton matches the current pipeline. -
adopt_runtime_state_from()is the correct reload pattern – never clobber the freshly-registered pipeline by reassigning from disk.
For further context:
-
?RegistryStudy– full method reference -
?Skeleton– per-batch provenance object -
vignette("skeleton-concept")– why the person-week time grid -
vignette("rowdep-rowind-concept")– variable-type conventions
