Running Flows
Learn how to write flows, request GPU compute, parallelize work, monitor in real-time, and schedule production runs.
Anatomy of a Flow
A flow is a Python class that inherits from FlowSpec. Each method decorated with
@step becomes a node in the DAG. You connect steps with self.next().
from metaflow import FlowSpec, step, resources, card class TrainFlow(FlowSpec): """Train a model and evaluate it.""" @step def start(self): self.learning_rate = 0.001 self.epochs = 50 self.next(self.train) @resources(gpu=1, memory=32000, cpu=8) @card(type="blank", refresh_interval=5) @step def train(self): # Your training code here self.model = train_model( lr=self.learning_rate, epochs=self.epochs ) self.accuracy = evaluate(self.model) self.next(self.end) @step def end(self): print(f"Training complete. Accuracy: {self.accuracy}") if __name__ == "__main__": TrainFlow()
Run locally: python train_flow.py run
Run on Kubernetes: python train_flow.py --with kubernetes run
The @resources Decorator
Request compute resources for any step. Karpenter automatically provisions the right node type and scales back down when the step completes.
# Standard GPU (L4/L40S) — most training workloads @resources(gpu=1, memory=64000, cpu=16) # Top-tier GPU (A100/H100) — large-scale training @resources(gpu=4, memory=256000, cpu=64) # CPU-only — data processing, preprocessing @resources(memory=32000, cpu=16, disk=500000) # With large scratch disk for simulation data @resources(gpu=1, memory=64000, cpu=16, disk=1000000)
memory=64000 is 64 GB and
disk=1000000 is ~1 TB. The platform automatically selects the right compute tier based on whether
you request gpu.
Fan-out with foreach
Use foreach to parallelize work across multiple parameter combinations. Each iteration runs as a
separate Kubernetes pod.
@step def start(self): self.learning_rates = [0.001, 0.01, 0.1] self.next(self.train, foreach="learning_rates") @resources(gpu=1, memory=32000) @step def train(self): self.lr = self.input # 0.001, 0.01, or 0.1 self.model = train_model(lr=self.lr) self.score = evaluate(self.model) self.next(self.pick_best) @step def pick_best(self, inputs): self.best = max(inputs, key=lambda x: x.score) print(f"Best LR: {self.best.lr}, Score: {self.best.score}") self.next(self.end)
Each train step runs simultaneously on its own GPU. The pick_best join step receives
all results and selects the winner.
Real-time Training Cards
Attach live-updating visualizations to any step with the @card decorator. Cards are viewable in
the Metaflow UI during and after execution.
from metaflow.cards import Markdown, Table, VegaChart @card(type="blank", refresh_interval=5) @resources(gpu=1) @step def train(self): from metaflow import current for epoch in range(self.epochs): loss = train_one_epoch() # Update the card every 5 seconds current.card.clear() current.card.append( Markdown(f"## Epoch {epoch}/{self.epochs}") ) current.card.append( Table([["Loss", f"{loss:.4f}"]]) ) current.card.refresh() self.next(self.end)
https://{stage}.metaflow.simulation.amazon.dev, click on the running step, and watch the card
update in real-time. No additional setup required.
Retries and Checkpoints
GPU training is expensive. Don't lose hours of work to a transient failure.
Automatic Retries
@retry(times=3, minutes_between_retries=5) @resources(gpu=1) @step def train(self): # If this step fails, it retries up to 3 times # with 5 minutes between each attempt ...
Training Checkpoints
@checkpoint @resources(gpu=1) @step def train(self): # Checkpoint periodically saves your training state to S3. # On failure + retry, it resumes from the last checkpoint # instead of starting from scratch. for epoch in range(self.epochs): train_one_epoch() save_checkpoint() # Metaflow persists this to S3
@retry + @checkpoint together. If training fails
at epoch 45/50, the retry picks up from the checkpoint at epoch 45 instead of restarting from epoch 0.
Cron and Event Triggers
Move from ad-hoc runs to production schedules.
Cron Schedule
from metaflow import schedule @schedule(cron="0 6 * * 1") # Every Monday at 6 AM UTC class WeeklyTrainFlow(FlowSpec): ...
Event-driven Triggers
from metaflow import trigger, trigger_on_finish # Trigger when a named event fires @trigger(event="new_training_data") class RetrainFlow(FlowSpec): ... # Trigger when another flow completes @trigger_on_finish(flow="DataPrepFlow") class TrainAfterPrepFlow(FlowSpec): ...
Schedules and triggers are managed by Argo Events with a NATS JetStream event bus for reliable delivery.
Common Decorators
| Decorator | Purpose | Example |
|---|---|---|
@step |
Mark a method as a DAG step | @step |
@resources |
Request CPU, memory, GPU, disk | @resources(gpu=1, memory=64000) |
@card |
Attach live visualization | @card(type="blank", refresh_interval=5) |
@retry |
Auto-retry on failure | @retry(times=3, minutes_between_retries=5) |
@checkpoint |
Save/resume training state | @checkpoint |
@schedule |
Cron-based scheduling | @schedule(cron="0 6 * * 1") |
@trigger |
Event-driven execution | @trigger(event="new_data") |
@trigger_on_finish |
Run after another flow completes | @trigger_on_finish(flow="PrepFlow") |
@retry @checkpoint @resources(gpu=1) @card @step gives you a GPU step with live monitoring,
automatic retries, and checkpoint recovery. The order doesn't matter (except @step should be
closest to the method).