Saltar al contenido principal

ML preprocessing as data-pipeline nodes

Bridges alphaswarm.ml.processors into the data engine (alphaswarm/data/engine) so an alphaswarm.data.engine.PipelineManifest can chain source -> ml_preprocessing -> sink like any other transform.

Why

Before this expansion, the only way to apply an ML preprocessing recipe was to load a Dataset and call Processor.fit_process — which only works for offline Experiment runs. Promoting processors to first-class data-engine nodes lets you:

  • Materialise preprocessed features into Iceberg via sink.ml_feature_snapshot and reload them deterministically in later training runs.
  • Reuse the same recipe in batch ingestion AND online inference.
  • Drop a saved PipelineRecipe row directly onto the manifest builder canvas via POST /ml/pipelines/{id}/as-node.

Two layers

Umbrella node — transform.ml_preprocessing

Accepts either a saved recipe_id or an inline processors list. Re-uses apply_processor_specs so a manifest run applies the same transformation as the offline ML training loop.

- name: transform.ml_preprocessing
kwargs:
recipe_id: 1c5b... # optional — saved /ml/pipelines recipe
processors: # optional inline overlay
- class: WinsorizeByQuantile
module_path: alphaswarm.ml.processors
kwargs: {lower_q: 0.01, upper_q: 0.99}
fit: true

Specialized convenience nodes

Each maps onto a single processor and shows up in the Manifest Builder palette as its own tile:

Node nameProcessor
transform.ml_scaleSklearnTransformerProcessor (Standard / Robust / MinMax)
transform.ml_winsorizeWinsorizeByQuantile
transform.ml_lag_featuresLagFeatureGenerator
transform.ml_rolling_featuresRollingFeatureGenerator
transform.ml_seasonal_decomposeSeasonalDecomposeFeatures
transform.ml_pyod_outliersPyODOutlierFilter
transform.ml_imputationFillna
transform.ml_target_encodeTargetEncode

Sink — sink.ml_feature_snapshot

Iceberg writer that stamps the resulting table with pipeline_recipe_id, dataset_version_id, and a stable feature_snapshot_id so downstream training runs can reload exactly the same preprocessed features:

- name: sink.ml_feature_snapshot
kwargs:
namespace: ml.features
table: alpha_panel_v1
pipeline_recipe_id: 1c5b...
dataset_version_id: 9f8a...
mode: append

The sink's result includes a feature_snapshot_id UUID; persist it in the dataset registry so future DatasetH instances can lazily reload from the snapshot table.

End-to-end flow

REST

# Materialise a saved recipe into a manifest fragment for the
# Pipeline Builder UI.
curl -XPOST http://localhost:8000/ml/pipelines/<recipe_id>/as-node \
-d '{"fit": false}' -H 'content-type: application/json'

Returns:

{
"name": "transform.ml_preprocessing",
"label": "my-recipe",
"enabled": true,
"kwargs": {"recipe_id": "<recipe_id>", "fit": false}
}