Running Flows

Learn how to write flows, request GPU compute, parallelize work, monitor in real-time, and schedule production runs.

Your First Flow

Anatomy of a Flow

A flow is a Python class that inherits from FlowSpec. Each method decorated with @step becomes a node in the DAG. You connect steps with self.next().

from metaflow import FlowSpec, step, resources, card

class TrainFlow(FlowSpec):
    """Train a model and evaluate it."""

    @step
    def start(self):
        self.learning_rate = 0.001
        self.epochs = 50
        self.next(self.train)

    @resources(gpu=1, memory=32000, cpu=8)
    @card(type="blank", refresh_interval=5)
    @step
    def train(self):
        # Your training code here
        self.model = train_model(
            lr=self.learning_rate,
            epochs=self.epochs
        )
        self.accuracy = evaluate(self.model)
        self.next(self.end)

    @step
    def end(self):
        print(f"Training complete. Accuracy: {self.accuracy}")

if __name__ == "__main__":
    TrainFlow()
start
train
end

Run locally: python train_flow.py run
Run on Kubernetes: python train_flow.py --with kubernetes run

Requesting GPUs

The @resources Decorator

Request compute resources for any step. Karpenter automatically provisions the right node type and scales back down when the step completes.

# Standard GPU (L4/L40S) — most training workloads
@resources(gpu=1, memory=64000, cpu=16)

# Top-tier GPU (A100/H100) — large-scale training
@resources(gpu=4, memory=256000, cpu=64)

# CPU-only — data processing, preprocessing
@resources(memory=32000, cpu=16, disk=500000)

# With large scratch disk for simulation data
@resources(gpu=1, memory=64000, cpu=16, disk=1000000)
Memory is in MB, disk is in MB. So memory=64000 is 64 GB and disk=1000000 is ~1 TB. The platform automatically selects the right compute tier based on whether you request gpu.
Parallel Execution

Fan-out with foreach

Use foreach to parallelize work across multiple parameter combinations. Each iteration runs as a separate Kubernetes pod.

@step
def start(self):
    self.learning_rates = [0.001, 0.01, 0.1]
    self.next(self.train, foreach="learning_rates")

@resources(gpu=1, memory=32000)
@step
def train(self):
    self.lr = self.input  # 0.001, 0.01, or 0.1
    self.model = train_model(lr=self.lr)
    self.score = evaluate(self.model)
    self.next(self.pick_best)

@step
def pick_best(self, inputs):
    self.best = max(inputs, key=lambda x: x.score)
    print(f"Best LR: {self.best.lr}, Score: {self.best.score}")
    self.next(self.end)
start
train (lr=0.001)
train (lr=0.01)
train (lr=0.1)
pick_best
end

Each train step runs simultaneously on its own GPU. The pick_best join step receives all results and selects the winner.

Live Monitoring

Real-time Training Cards

Attach live-updating visualizations to any step with the @card decorator. Cards are viewable in the Metaflow UI during and after execution.

from metaflow.cards import Markdown, Table, VegaChart

@card(type="blank", refresh_interval=5)
@resources(gpu=1)
@step
def train(self):
    from metaflow import current

    for epoch in range(self.epochs):
        loss = train_one_epoch()

        # Update the card every 5 seconds
        current.card.clear()
        current.card.append(
            Markdown(f"## Epoch {epoch}/{self.epochs}")
        )
        current.card.append(
            Table([["Loss", f"{loss:.4f}"]])
        )
        current.card.refresh()

    self.next(self.end)
View cards in the UI. Navigate to your run at https://{stage}.metaflow.simulation.amazon.dev, click on the running step, and watch the card update in real-time. No additional setup required.
Failure Recovery

Retries and Checkpoints

GPU training is expensive. Don't lose hours of work to a transient failure.

Automatic Retries

@retry(times=3, minutes_between_retries=5)
@resources(gpu=1)
@step
def train(self):
    # If this step fails, it retries up to 3 times
    # with 5 minutes between each attempt
    ...

Training Checkpoints

@checkpoint
@resources(gpu=1)
@step
def train(self):
    # Checkpoint periodically saves your training state to S3.
    # On failure + retry, it resumes from the last checkpoint
    # instead of starting from scratch.
    for epoch in range(self.epochs):
        train_one_epoch()
        save_checkpoint()  # Metaflow persists this to S3
Combine them. Use @retry + @checkpoint together. If training fails at epoch 45/50, the retry picks up from the checkpoint at epoch 45 instead of restarting from epoch 0.
Production Scheduling

Cron and Event Triggers

Move from ad-hoc runs to production schedules.

Cron Schedule

from metaflow import schedule

@schedule(cron="0 6 * * 1")  # Every Monday at 6 AM UTC
class WeeklyTrainFlow(FlowSpec):
    ...

Event-driven Triggers

from metaflow import trigger, trigger_on_finish

# Trigger when a named event fires
@trigger(event="new_training_data")
class RetrainFlow(FlowSpec):
    ...

# Trigger when another flow completes
@trigger_on_finish(flow="DataPrepFlow")
class TrainAfterPrepFlow(FlowSpec):
    ...

Schedules and triggers are managed by Argo Events with a NATS JetStream event bus for reliable delivery.

Decorator Reference

Common Decorators

Decorator Purpose Example
@step Mark a method as a DAG step @step
@resources Request CPU, memory, GPU, disk @resources(gpu=1, memory=64000)
@card Attach live visualization @card(type="blank", refresh_interval=5)
@retry Auto-retry on failure @retry(times=3, minutes_between_retries=5)
@checkpoint Save/resume training state @checkpoint
@schedule Cron-based scheduling @schedule(cron="0 6 * * 1")
@trigger Event-driven execution @trigger(event="new_data")
@trigger_on_finish Run after another flow completes @trigger_on_finish(flow="PrepFlow")
Stack decorators freely. Decorators compose: @retry @checkpoint @resources(gpu=1) @card @step gives you a GPU step with live monitoring, automatic retries, and checkpoint recovery. The order doesn't matter (except @step should be closest to the method).