Skip to main content

Lab 8: Full MLOps on GCP

Difficulty: Advanced · Estimated time: ~6 hours

Objective

Build a complete MLOps pipeline on GCP:

  1. Cloud Function trigger for new data in GCS
  2. BigQuery ML for model training in SQL
  3. Cloud Run for model serving
  4. Cloud Logging for monitoring
  5. Pub/Sub trigger for automated retraining

Step 1 — Cloud Function for data ingest

python
# main.py
from google.cloud import storage, bigquery
import functions_framework

@functions_framework.cloud_event
def ingest_data(event):
"""Triggered by new file in GCS bucket."""
bucket = event.data["bucket"]
name = event.data["name"]

client = bigquery.Client()
job_config = bigquery.LoadJobConfig(
schema=[bigquery.SchemaField("features", "FLOAT", mode="REPEATED"),
bigquery.SchemaField("label", "INTEGER")],
skip_leading_rows=1,
source_format=bigquery.SourceFormat.CSV,
)
uri = f"gs://{bucket}/{name}"
load_job = client.load_table_from_uri(uri, "tds_ml.raw_data", job_config=job_config)
load_job.result()
print(f"Loaded {uri} into BigQuery")

Step 2 — BigQuery ML training

sql
-- Train a logistic regression model
CREATE OR REPLACE MODEL `tds_ml.classifier`
OPTIONS(
model_type='logistic_reg',
input_label_cols=['label'],
max_iterations=50
) AS
SELECT * FROM `tds_ml.raw_data`;

-- Evaluate
SELECT * FROM ML.EVALUATE(MODEL `tds_ml.classifier`);

-- Predict
SELECT * FROM ML.PREDICT(MODEL `tds_ml.classifier`,
(SELECT * FROM `tds_ml.test_data`));

Step 3 — Cloud Run model server

python
from fastapi import FastAPI
from google.cloud import bigquery

app = FastAPI()

@app.post("/predict")
async def predict(features: list[float]):
client = bigquery.Client()
query = """
SELECT predicted_label, prediction_probability
FROM ML.PREDICT(MODEL `tds_ml.classifier`,
(SELECT @features AS features))
"""
job_config = bigquery.QueryJobConfig(
query_parameters=[bigquery.ScalarQueryParameter("features", "FLOAT64", features)]
)
results = client.query(query, job_config=job_config).result()
return [dict(row) for row in results]

Step 4 — Monitoring and alerting

bash
# Create log-based metric
gcloud logging metrics create prediction_errors \
--description="Prediction errors" \
--filter='resource.type="cloud_run_revision" AND jsonPayload.message=~"error"'

# Create alert policy
gcloud alpha monitoring policies create \
--display-name="Prediction Error Alert" \
--condition-display-name="Error rate > 5%" \
--condition-filter='metric.type="logging.googleapis.com/user/prediction_errors"'

Submission

GitHub repo + GCP project screenshots + live endpoint URL

Grading rubric

CriterionPoints
Cloud Function triggers on GCS upload15
BigQuery ML trains and evaluates model20
Cloud Run serves predictions20
Monitoring dashboard configured15
Pub/Sub triggers retraining15
End-to-end pipeline documented15
Total100