GCP ML Pipelines
ML pipelines automate the end-to-end workflow from data ingestion to model deployment. On GCP, you build pipelines using the Kubeflow Pipelines (KFP) SDK and run them on Vertex AI Pipelines — a fully managed service that eliminates the need to manage Kubernetes infrastructure.
Why Pipelines?
A typical ML workflow has many steps: data validation, preprocessing, feature engineering, training, evaluation, and deployment. Running these manually is error-prone and non-reproducible. Pipelines solve this by:
- Encoding the workflow as code (version-controlled, reviewable)
- Automating execution in the right order (with dependencies)
- Caching completed steps (skip unchanged stages on re-runs)
- Tracking all artifacts and metadata automatically
- Scheduling regular runs (daily retraining, etc.)
Kubeflow Pipelines on Vertex AI
Vertex AI Pipelines runs KFP v2 pipelines natively. You define your pipeline in Python, compile it to a JSON or YAML artifact, and submit it to Vertex AI.
Pipeline Components and DAGs
A component is a self-contained step in the pipeline — it takes inputs, runs code, and produces outputs. A DAG (Directed Acyclic Graph) defines the execution order and dependencies between components.
┌─────────────┐ ┌──────────────────┐ ┌─────────────┐
│ Ingest Data │────▶│ Preprocess Data │────▶│ Train Model │
└─────────────┘ └──────────────────┘ └──────┬──────┘
│
▼
┌──────────────┐ ┌─────────────┐
│ Deploy Model│◀───────│ Eval Model │
└──────────────┘ └─────────────┘
Building a Pipeline with Python SDK
Step 1: Define Components
You can create components from Python functions using the @component decorator:
from kfp import dsl
from kfp.dsl import component, Output, Dataset, Model, Metrics
@component(
base_image="python:3.11",
packages_to_install=["pandas", "scikit-learn"],
)
def preprocess_data(
input_data_uri: str,
output_train: Output[Dataset],
output_test: Output[Dataset],
test_size: float = 0.2,
random_state: int = 42,
):
import pandas as pd
from sklearn.model_selection import train_test_split
# Read input data
df = pd.read_csv(input_data_uri)
df = df.dropna()
# Split features and target
X = df.drop(columns=["target"])
y = df["target"]
# Train/test split
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, random_state=random_state
)
# Save outputs
train_df = pd.concat([X_train, y_train], axis=1)
test_df = pd.concat([X_test, y_test], axis=1)
train_df.to_csv(output_train.path, index=False)
test_df.to_csv(output_test.path, index=False)
@component(
base_image="python:3.11",
packages_to_install=["scikit-learn", "joblib"],
)
def train_model(
train_data: Input[Dataset],
model_output: Output[Model],
n_estimators: int = 100,
max_depth: int = 5,
):
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
import joblib
df = pd.read_csv(train_data.path)
X = df.drop(columns=["target"])
y = df["target"]
model = RandomForestClassifier(
n_estimators=n_estimators,
max_depth=max_depth,
random_state=42,
)
model.fit(X, y)
joblib.dump(model, model_output.path)
model_output.metadata["framework"] = "sklearn"
model_output.metadata["n_estimators"] = n_estimators
@component(
base_image="python:3.11",
packages_to_install=["scikit-learn", "joblib"],
)
def evaluate_model(
test_data: Input[Dataset],
model: Input[Model],
metrics_output: Output[Metrics],
):
import pandas as pd
import joblib
from sklearn.metrics import accuracy_score, f1_score
df = pd.read_csv(test_data.path)
X = df.drop(columns=["target"])
y = df["target"]
clf = joblib.load(model.path)
y_pred = clf.predict(X)
accuracy = accuracy_score(y, y_pred)
f1 = f1_score(y, y_pred, average="weighted")
metrics_output.log_metric("accuracy", accuracy)
metrics_output.log_metric("f1_score", f1)
Step 2: Compose the Pipeline
from kfp import dsl
@dsl.pipeline(
name="iris-classification-pipeline",
description="End-to-end Iris classification pipeline on Vertex AI",
)
def iris_pipeline(
input_data_uri: str = "gs://your-bucket/data/iris.csv",
test_size: float = 0.2,
n_estimators: int = 100,
max_depth: int = 5,
):
# Step 1: Preprocess
preprocess_task = preprocess_data(
input_data_uri=input_data_uri,
test_size=test_size,
)
# Step 2: Train (depends on preprocess)
train_task = train_model(
train_data=preprocess_task.outputs["output_train"],
n_estimators=n_estimators,
max_depth=max_depth,
)
# Step 3: Evaluate (depends on both preprocess and train)
evaluate_task = evaluate_model(
test_data=preprocess_task.outputs["output_test"],
model=train_task.outputs["model_output"],
)
Step 3: Compile and Run
# Compile the pipeline to a JSON file
python -c "
from kfp.compiler import Compiler
from pipeline import iris_pipeline
Compiler().compile(iris_pipeline, 'iris_pipeline.json')
"
from google.cloud import aiplatform
aiplatform.init(project="your-project", location="us-central1")
# Submit the compiled pipeline
job = aiplatform.PipelineJob(
display_name="iris-pipeline-run",
template_path="iris_pipeline.json",
parameter_values={
"input_data_uri": "gs://your-bucket/data/iris.csv",
"n_estimators": 200,
"max_depth": 8,
},
enable_caching=True,
)
job.run()
Using Pre-built Google Cloud Pipeline Components
Google provides pre-built components for common GCP operations:
from google_cloud_pipeline_components.v1.dataset import TabularDatasetCreateOp
from google_cloud_pipeline_components.v1.model import ModelUploadOp
from google_cloud_pipeline_components.v1.endpoint import EndpointCreateOp, ModelDeployOp
@dsl.pipeline(name="automl-tabular-pipeline")
def automl_pipeline(
bq_source: str = "bq://project.dataset.table",
target_column: str = "label",
):
# Create a dataset from BigQuery
dataset_op = TabularDatasetCreateOp(
display_name="my-dataset",
bigquery_source=bq_source,
project="your-project",
location="us-central1",
)
# (AutoML training would go here using AutoMLTabularTrainingJobRunOp)
# Create an endpoint
endpoint_op = EndpointCreateOp(
display_name="my-endpoint",
project="your-project",
location="us-central1",
)
# Deploy the model
deploy_op = ModelDeployOp(
model=dataset_op.outputs["model"], # From training step
endpoint=endpoint_op.outputs["endpoint"],
deployed_model_display_name="my-deployed-model",
machine_type="n1-standard-2",
)
Scheduled Pipeline Runs
For production ML systems, you often need to retrain models on a schedule (daily, weekly, etc.).
from google.cloud import aiplatform
aiplatform.init(project="your-project", location="us-central1")
# Create a scheduled pipeline job
schedule = aiplatform.PipelineJobSchedule(
display_name="weekly-retraining",
template_path="iris_pipeline.json",
parameter_values={
"input_data_uri": "gs://your-bucket/data/latest_iris.csv",
"n_estimators": 200,
},
)
# Schedule to run every Monday at 2 AM UTC
schedule.create(
cron="0 2 * * 1", # Cron expression: minute hour day month weekday
max_concurrent_run_count=1,
max_run_count=52, # Run for 1 year (52 weeks)
)
Cron Expression Reference
┌───────────── minute (0-59)
│ ┌───────────── hour (0-23)
│ │ ┌───────────── day of month (1-31)
│ │ │ ┌───────────── month (1-12)
│ │ │ │ ┌───────────── day of week (0-6, 0=Sunday)
│ │ │ │ │
* * * * *
| Schedule | Cron |
|---|---|
| Every hour | 0 * * * * |
| Daily at midnight | 0 0 * * * |
| Weekly on Monday | 0 0 * * 1 |
| Monthly on the 1st | 0 0 1 * * |
Vertex AI Pipelines caches completed steps. If your preprocessing code and data haven't changed, the pipeline will skip that step on re-runs — saving both time and compute cost. Enable with enable_caching=True in PipelineJob.
Compiled Pipeline YAML Structure
When you compile a pipeline, it produces a YAML (or JSON) specification. Here's what the output looks like:
# Simplified pipeline spec
pipelineInfo:
name: iris-classification-pipeline
root:
dag:
tasks:
preprocess-data:
cachingOptions:
enableCache: true
componentRef:
name: comp-preprocess-data
inputs:
parameters:
input_data_uri:
runtimeValue:
constant: gs://your-bucket/data/iris.csv
test_size:
runtimeValue:
constant: 0.2
train-model:
dependentTasks:
- preprocess-data
componentRef:
name: comp-train-model
inputs:
artifacts:
train_data:
taskOutputArtifact:
outputArtifactKey: output_train
producerTask: preprocess-data
evaluate-model:
dependentTasks:
- preprocess-data
- train-model
componentRef:
name: comp-evaluate-model