Manages the full skeleton pipeline lifecycle: portable batch directories, batch splitting, raw registry loading, the declarative code registry, and the three-phase orchestrated per-batch processing (framework -> randvars -> codes) that produces one [Skeleton] file per batch with incremental invalidation.
Portable Directory Resolution
Directories are stored as candidate path vectors and resolved lazily via [CandidatePath] active bindings. The first existing directory wins and is cached. If the cached path becomes invalid (e.g. after moving to a different machine), the binding automatically re-resolves from the candidate list.
Three-phase pipeline
`$process_skeletons()` runs three phases per batch, with per-phase incremental invalidation so editing one step only re-runs what it affects:
- Phase 1 – framework
A single user function registered via `$register_framework(fn)`, signature `(batch_data, config)`, returns a fresh base `data.table` (time grid + structural censoring). Full rebuild on `body(fn)` / `formals(fn)` hash change.
- Phase 3 – randvars
An ordered named list of user functions registered via `$register_randvars(name, fn)`, each signature `(skeleton, batch_data, config)`. Divergence-point rewind-and-replay invalidation: the first step whose name or hash differs from the stored sequence triggers a drop of its columns and replay of it plus everything downstream of it. Add/remove/edit/reorder all handled uniformly.
- Phase 2 – codes
The declarative code registry, built via `$register_codes()` (primary) and `$register_derived_codes()` (derived). Per-entry fingerprint diff: entries no longer present are dropped, new or modified entries are freshly applied. Derived entry fingerprints fold in their upstream primary fingerprints so upstream behavior edits cascade correctly.
Phase 2 runs AFTER phase 3, so phase-3 steps cannot read phase-2 columns. See the [Skeleton] class for the on-disk provenance format.
Code Registry
Primary entries are registered via `$register_codes()`, which declares codes, the function to apply them (e.g. `add_diagnoses`, `add_cods`), which rawbatch groups to use, and optional prefixing/combining. Derived entries are registered via `$register_derived_codes()` and OR together already-existing skeleton columns from upstream primary entries – useful when the combined column needs to draw from registrations that use DIFFERENT `fn`s (something `combine_as` can't express because it re-runs the same `fn` on rbound data).
See also
[Skeleton] for the per-batch on-disk format and provenance fields; [CandidatePath] for the multi-host directory resolution mechanism; [add_diagnoses], [add_cods], [add_rx] for common `fn` choices in `$register_codes()`.
Other skeleton_pipeline:
Skeleton
Public fields
group_namesCharacter vector. Names of rawbatch groups.
batch_sizeInteger. Number of IDs per batch.
seedInteger. Shuffle seed for reproducibility.
id_colCharacter. Person ID column name.
n_idsInteger. Total number of IDs across all batches.
n_batchesInteger. Number of batches.
batch_id_listList of ID vectors, one per batch.
groups_savedCharacter vector of rawbatch groups saved to disk.
code_registryList of code registration entries, appended to by `$register_codes()` and `$register_derived_codes()`. Primary entries (from `$register_codes()`) are plain lists with: `codes, fn, fn_args, groups, combine_as, label`. Derived entries (from `$register_derived_codes()`) are tagged with `kind = "derived"` and hold `codes, from, as, label` instead – no `fn`, no `groups`, no raw data access. The dispatcher `.apply_code_entry_impl()` branches on the entry's `kind` field, defaulting to `"primary"` when absent.
created_atPOSIXct. Timestamp when this study was created.
data_rawbatch_cp[CandidatePath] for the rawbatch directory.
data_skeleton_cp[CandidatePath] for the skeleton directory.
data_meta_cp[CandidatePath] for the metadata directory (holds `registrystudy.qs2`). Defaults to the rawbatch directory for backward compatibility.
data_raw_cp[CandidatePath] for the raw-registry directory, or NULL if not configured.
data_pipeline_snapshot_cp[CandidatePath] for the pipeline-snapshot directory (one TSV file per host, git-tracked), or NULL if the feature is not configured. When NULL, `$write_pipeline_snapshot()` is a silent no-op.
data_summaries_cp[CandidatePath] for the audit-track summaries directory (git-tracked TSV per full run), or NULL if the feature is not configured. When NULL, `$compute_summary()` still writes the local `summary.qs2` and `status.txt` but skips the TSV.
framework_fnFunction of signature `(batch_data, config)` returning a fresh base skeleton `data.table` (phase 1). Set via `$register_framework()`. `$process_skeletons()` re-runs this function per batch when its body/formals hash changes.
randvars_fnsNamed ordered list of phase-3 functions, each with signature `(skeleton, batch_data, config)`. Populated via `$register_randvars(name, fn)`. Registration order is execution order. `$process_skeletons()` uses `Skeleton$sync_randvars()`'s divergence-point rewind-and-replay to apply changes incrementally.
host_labelOptional character scalar. Overrides `Sys.info()[["nodename"]]` when naming the per-host pipeline snapshot file. Useful when hostnames are ambiguous or overly dynamic.
population_by_specsList of character vectors. Each element declares one `by` aggregation that will be pre-computed during `$process_skeletons()` and stored in each batch's meta sidecar, so that `$population(by = ...)` is a fast meta-only walk. Read back with `$population(by = <one of the declared specs>)`.
Active bindings
data_rawbatch_dirCharacter (read-only). Resolved rawbatch directory for the current host. Lazily resolved from `self$data_rawbatch_cp`.
data_skeleton_dirCharacter (read-only). Resolved skeleton directory for the current host.
data_meta_dirCharacter (read-only). Resolved metadata directory for the current host (where `registrystudy.qs2` lives).
data_raw_dirCharacter or NULL (read-only). Resolved raw-registry directory, or NULL if not configured.
data_pipeline_snapshot_dirCharacter or NULL (read-only). Resolved pipeline-snapshot directory for the current host, or NULL if not configured (snapshot feature disabled).
data_summaries_dirCharacter or NULL (read-only). Resolved audit-track summaries directory for the current host, or NULL if not configured.
skeleton_filesCharacter vector (read-only). Skeleton output file paths detected on disk. Scans `skeleton_dir` on each access.
expected_skeleton_file_countInteger (read-only). Expected number of skeleton files (one per batch).
meta_fileCharacter. Path to the on-disk metadata file (`registrystudy.qs2`) inside `data_meta_dir`.
summaryList or NULL (read-only). The `summary.qs2` payload written by `$process_skeletons()` (per-column counts, registry-wide totals, build metadata). NULL with a one-line message if the file is missing.
Methods
RegistryStudy$new()
Create a new RegistryStudy object.
Usage
RegistryStudy$new(
data_rawbatch_dir,
group_names = c("lmed", "inpatient", "outpatient", "cancer", "dors", "other"),
data_skeleton_dir = data_rawbatch_dir,
data_meta_dir = data_rawbatch_dir,
data_raw_dir = NULL,
data_pipeline_snapshot_dir = NULL,
data_summaries_dir = NULL,
batch_size = 1000L,
seed = 4L,
id_col = "lopnr",
population_by_specs = list()
)Arguments
data_rawbatch_dirCharacter vector of candidate paths for rawbatch files. The first existing path is used; a single non-existing path is created automatically.
group_namesCharacter vector of rawbatch group names.
data_skeleton_dirCharacter vector of candidate paths for skeleton output. Defaults to same candidates as `data_rawbatch_dir`.
data_meta_dirCharacter vector of candidate paths for the metadata directory holding `registrystudy.qs2`. Defaults to same candidates as `data_rawbatch_dir` (backward compatible). Pass an explicit value – e.g. the parent of rawbatch – to keep the singleton control file out of the per-batch data directory.
data_raw_dirCharacter vector of candidate paths for raw registry files (optional). NULL if raw data paths are managed externally.
data_pipeline_snapshot_dirOptional character vector of candidate paths for a git-tracked pipeline-snapshot directory (one TSV per host). When NULL (default), the snapshot feature is disabled and `$write_pipeline_snapshot()` is a no-op.
data_summaries_dirOptional character vector of candidate paths for the audit-track summaries directory (typically inside the project git repo, e.g. `dev/summaries/`). When NULL (default), `$compute_summary()` still writes `summary.qs2` and `status.txt` to the skeleton directory but skips the git-tracked TSV.
batch_sizeInteger. Number of IDs per batch. Default: 1000L.
seedInteger. Shuffle seed.
id_colCharacter. Person ID column name.
population_by_specsOptional list of character vectors. Each element declares one `by` aggregation pre-computed during `$process_skeletons()` and stored in each batch's meta sidecar for fast `$population(by)` access. Example: `list(c("rd_age_continuous"), c("rd_age_continuous", "ri_is_amab"))`. Default: empty list.
RegistryStudy$check_version()
Check if this object's schema version matches the current class version. Errors if the object was saved with an older schema.
RegistryStudy$register_framework()
Register the framework function (phase 1). Called once per batch at the start of `$process_skeletons()`, with signature `function(batch_data, config)`, returns a fresh `data.table` containing the base time grid + censoring. Everything downstream builds on this output. A change to the function body or formals triggers a full rebuild of every batch on the next `$process_skeletons()` run.
RegistryStudy$register_randvars()
Register one phase-3 "random variables" step. Phase 3 is an ordered sequence of user-supplied functions; each call to `$register_randvars()` appends one step to the end of the sequence. Registration order is execution order at `$process_skeletons()` time.
Signature of `fn`: `function(skeleton, batch_data, config)`. It mutates `skeleton` in place and must ONLY ADD columns (never modifying or deleting existing ones – the drop-and-replay tracking depends on this invariant).
Editing `fn`'s body (keeping the same `name`) changes the hash and triggers a re-run of this step and everything downstream of it in the sequence.
RegistryStudy$code_registry_fingerprints()
Return the xxhash64 fingerprint of every entry in `self$code_registry`, in registry order.
Primary entries: fingerprint depends on `(codes, label, groups, fn_args, combine_as)` – two primary entries with identical config produce the same fingerprint and are therefore treated as "the same entry" across runs.
Derived entries: fingerprint depends on `(codes, from, as)` PLUS the fingerprints of every upstream primary entry whose output prefix is referenced in `from`. This cascades invalidation when an upstream primary's `fn_args` / `groups` / `codes` change, without requiring the user to touch the derived entry. Computed in a two-pass walk: primary fingerprints first, then derived fingerprints using the already-computed upstream fingerprints.
Used by `Skeleton$sync_with_registry()` for incremental per-entry add/drop.
RegistryStudy$pipeline_hash()
Compute this study's current total pipeline hash from the registered framework, randvars sequence, and code registry. Answer to "what would a freshly-built skeleton look like?"
Invariant: `sk$pipeline_hash() == study$pipeline_hash()` iff the skeleton is fully synced with the study's current registered framework + randvars + codes.
RegistryStudy$adopt_runtime_state_from()
Copy runtime state (IDs, batch list, saved groups) from another `RegistryStudy` into this one, WITHOUT touching config fields (group_names, code_registry, directory candidates, framework/randvars registration, schema version, etc.).
Use case: in `run_generic_create_datasets_v2.R`, the generator script constructs a fresh study every run with the current in-memory config, then on re-runs calls `$adopt_runtime_state_from(qs2_read(self$meta_file))` to pick up batch ids and saved-group state without silently adopting a stale code registry or group name list.
RegistryStudy$register_codes()
Register code definitions for the code registry.
Each call declares codes, the function to apply them, which batch data groups to use, and optional prefixing/combining. Appends to `self$code_registry`.
Usage
RegistryStudy$register_codes(
codes,
fn,
groups,
fn_args = list(),
combine_as = NULL,
label = NULL
)Arguments
codesNamed list of code vectors (e.g. ICD-10, ATC, operation codes).
fnFunction to call (e.g. `add_diagnoses`, `add_rx`).
groupsNamed list mapping prefixes to group names. Unnamed elements get no prefix. Each element is a character vector of group names to rbindlist before calling `fn`.
fn_argsNamed list of extra arguments to pass to `fn` (e.g. `list(source = "atc")`).
combine_asCharacter or NULL. If non-NULL, also run `fn` on all groups combined, using this as the prefix.
labelCharacter. Human-readable label for describe_codes() output. Defaults to deparse(substitute(fn)).
RegistryStudy$register_derived_codes()
Register a derived code entry: one that doesn't read rawbatch data, but instead ORs together already-existing skeleton columns from earlier primary entries.
For each name `<nm>` in `codes`, a new column `<as>_<nm>` is written as `Reduce("|", list(get("<from[1]>_<nm>"), ...))`. The `codes` list pattern values are ignored at apply time but DO participate in the fingerprint, so editing the code list triggers replay. The fingerprint also folds in the fingerprints of every upstream primary entry whose output prefix appears in `from`, so upstream behavior edits (e.g. `cod_type` on an `add_cods` primary) cascade into derived replay automatically.
The derived entry runs in registration order during phase-2 sync, so any primary registrations whose output columns it references MUST be registered BEFORE this call.
RegistryStudy$apply_codes_to_skeleton()
Apply all registered codes to a skeleton data.table. Thin loop over `self$code_registry` that delegates per-entry work to the file-level `.apply_code_entry_impl()` helper. Kept for backwards-compatible "apply everything at once" callers; the incremental code-registry sync inside the Skeleton R6 class calls `.apply_code_entry_impl()` directly on one entry at a time.
RegistryStudy$load_skeleton()
Load a skeleton file for `batch_number` as a [Skeleton] R6 object. Returns `NULL` if the file is missing (caller rebuilds from scratch). Errors if the file on disk is not a `Skeleton` R6 object (e.g. corrupted or from an incompatible version of swereg).
RegistryStudy$save_skeleton()
Save a [Skeleton] to this study's skeleton directory, plus a small `meta_ and the per-batch code-check accumulator snapshot. Subsequent `$process_skeletons()` runs read the meta first and skip loading the heavy skeleton entirely when every hash still matches.
Skeleton is written first, then meta. A crash between the two leaves a stale meta on disk; the next run reads it, finds the hashes don't match the current pipeline, falls through to the slow path, and rewrites both.
RegistryStudy$write_skeleton_meta()
Write only the `meta_ batch (no skeleton file write). Used by the meta-only refresh path in `.process_one_batch()` when the skeleton on disk is still valid but its meta is missing a newly-registered `population_by_specs` entry.
RegistryStudy$load_skeleton_meta()
Read the `meta_ Returns `NULL` if missing or unreadable (treated as cache miss by the fast path in `.process_one_batch()`).
RegistryStudy$skeleton_pipeline_hashes()
Summary of per-batch pipeline hashes across all currently-persisted skeleton files in `self$data_skeleton_dir`. Use this to spot batches out of sync with each other or with `self$pipeline_hash()`.
Files that are not valid `Skeleton` R6 objects (e.g. unreadable or corrupted) surface as rows with `NA` `pipeline_hash` and `NA` `framework_fn_hash`.
RegistryStudy$assert_skeletons_consistent()
Assert that every persisted skeleton file has the same pipeline hash AND that it matches this study's current pipeline hash. Errors loudly with an actionable message if not.
Intended as a pre-flight check at the top of downstream consumers like `tteplan_from_spec_and_registrystudy()`, so partial-rebuild stragglers or config drift never silently flow into a TTE plan.
RegistryStudy$write_pipeline_snapshot()
Write a one-row TSV snapshot of this host's current pipeline state to `data_pipeline_snapshot_dir` / `host_label.tsv` (one file per host). The file is OVERWRITTEN (not appended) on each call, so concurrent runs from different hosts never conflict in git. The chronological audit trail is `git log -p dev/pipeline_snapshots/your_host.tsv`.
Silently skipped when `self$data_pipeline_snapshot_cp` is NULL (feature not configured) or when the candidate directory does not exist on the current host (e.g. hosts without the git repo mounted).
The `host_label` defaults to `Sys.info()[["nodename"]]` but can be overridden by setting `self$host_label` when hostnames are ambiguous.
RegistryStudy$process_skeletons()
Orchestrate the three-phase skeleton pipeline per batch.
Reads `self$framework_fn` (phase 1), `self$randvars_fns` (phase 3), and `self$code_registry` (phase 2) from the study and applies them via the incremental logic on [Skeleton]. Exact per-batch work:
1. Load existing skeleton via `self$load_skeleton(i)`. If missing OR its `framework_fn_hash` doesn't match the current framework's hash, rebuild the base skeleton from scratch by calling `self$framework_fn(batch_data, self)` and wrapping in a fresh [Skeleton]. (Phase 1.) 2. Call `sk$sync_randvars()` with the current ordered `self$randvars_fns` and their body/formals hashes. Divergence- point rewind-and-replay semantics drop and re-run the affected phase-3 steps only. (Phase 3.) 3. Call `sk$sync_with_registry()` with `self$code_registry_fingerprints()`. Entries present on disk but not in the current registry are dropped (via `.entry_columns()` on the stored descriptor); entries present in the current registry but not on disk are applied fresh. (Phase 2.) 4. Save via `self$save_skeleton(sk)`.
`batch_data` is loaded lazily – exactly once per batch, by whichever phase needs it first. If no phase needs it (everything already in sync), the rawbatch read is skipped entirely and the per-batch work is just load → save.
At the end of the full batch loop, `self$write_pipeline_snapshot()` is called (silently no-ops when `data_pipeline_snapshot_cp` is NULL).
Arguments
batchesInteger vector of batch indices to process, or `NULL` (default) for all batches in `self$batch_id_list`.
n_workersInteger. Number of parallel workers (1 = sequential). When `> 1`, each batch runs in a fresh callr subprocess.
...Additional arguments (unused; reserved for future use).
RegistryStudy$population()
Read a pre-computed population table for one of the `by` specs declared at construction time via `population_by_specs`.
Population tables are computed automatically at the end of `$process_skeletons()` from the per-batch aggregations stored in each meta sidecar, then written as `population_<spec>.qs2` in the skeleton directory. This getter just reads that file.
RegistryStudy$delete_skeletons()
Delete all skeleton output files (and their meta sidecars, plus any cached `population_*.qs2` and `summary.qs2` artefacts) from disk.
RegistryStudy$save_meta()
Save this study object as metadata. Captures the destination path first, then clears host-specific [CandidatePath] caches before writing, so the on-disk file never carries a resolved path from the saving host.
Examples
if (FALSE) { # \dontrun{
study <- RegistryStudy$new(
data_rawbatch_dir = c("/linux/.../rawbatch/", "C:/win/.../rawbatch/"),
data_skeleton_dir = c("/linux/.../skeleton/", "C:/win/.../skeleton/"),
data_raw_dir = c("/linux/.../raw/", "C:/win/.../raw/"),
group_names = c("lmed", "inpatient", "outpatient", "cancer", "dors")
)
# Phase 1: framework (structural time grid + censoring)
study$register_framework(my_framework_fn)
# Phase 3: randvars (ordered user steps; order = execution order)
study$register_randvars("demographics", my_demographics_fn)
study$register_randvars("exposure", my_exposure_fn)
# Phase 2: codes. Primary entries first, derived entries after.
study$register_codes(
codes = list(f20 = c("F20"), vte = c("I26", "I80")),
fn = swereg::add_diagnoses,
groups = list(ov = "outpatient", sv = "inpatient"),
combine_as = "os"
)
study$register_codes(
codes = list(f20 = c("F20"), vte = c("I26", "I80")),
fn = swereg::add_cods,
fn_args = list(cod_type = "underlying"),
groups = list(dorsu = "dors")
)
study$register_codes(
codes = list(f20 = c("F20"), vte = c("I26", "I80")),
fn = swereg::add_cods,
fn_args = list(cod_type = "multiple"),
groups = list(dorsm = "dors")
)
# Build osd_f20 = os_f20 | dorsu_f20 | dorsm_f20 (same codes list
# shared by reference so an edit in one place cascades to all four)
study$register_derived_codes(
codes = list(f20 = c("F20"), vte = c("I26", "I80")),
from = c("os", "dorsu", "dorsm"),
as = "osd"
)
study$set_ids(ids)
study$save_rawbatch("lmed", lmed_data)
study$describe_codes()
study$process_skeletons(n_workers = 4L)
# Per-batch provenance and cross-batch consistency check
sk <- study$load_skeleton(1L)
sk$pipeline_hash() == study$pipeline_hash() # TRUE iff in sync
study$assert_skeletons_consistent() # errors on mixed state
} # }
