Lab 8: Full MLOps on GCP
Difficulty: Advanced · Estimated time: ~6 hours
Objective
Build a complete MLOps pipeline on GCP:
- Cloud Function trigger for new data in GCS
- BigQuery ML for model training in SQL
- Cloud Run for model serving
- Cloud Logging for monitoring
- Pub/Sub trigger for automated retraining
Step 1 — Cloud Function for data ingest
python
# main.py
from google.cloud import storage, bigquery
import functions_framework
@functions_framework.cloud_event
def ingest_data(event):
"""Triggered by new file in GCS bucket."""
bucket = event.data["bucket"]
name = event.data["name"]
client = bigquery.Client()
job_config = bigquery.LoadJobConfig(
schema=[bigquery.SchemaField("features", "FLOAT", mode="REPEATED"),
bigquery.SchemaField("label", "INTEGER")],
skip_leading_rows=1,
source_format=bigquery.SourceFormat.CSV,
)
uri = f"gs://{bucket}/{name}"
load_job = client.load_table_from_uri(uri, "tds_ml.raw_data", job_config=job_config)
load_job.result()
print(f"Loaded {uri} into BigQuery")
Step 2 — BigQuery ML training
sql
-- Train a logistic regression model
CREATE OR REPLACE MODEL `tds_ml.classifier`
OPTIONS(
model_type='logistic_reg',
input_label_cols=['label'],
max_iterations=50
) AS
SELECT * FROM `tds_ml.raw_data`;
-- Evaluate
SELECT * FROM ML.EVALUATE(MODEL `tds_ml.classifier`);
-- Predict
SELECT * FROM ML.PREDICT(MODEL `tds_ml.classifier`,
(SELECT * FROM `tds_ml.test_data`));
Step 3 — Cloud Run model server
python
from fastapi import FastAPI
from google.cloud import bigquery
app = FastAPI()
@app.post("/predict")
async def predict(features: list[float]):
client = bigquery.Client()
query = """
SELECT predicted_label, prediction_probability
FROM ML.PREDICT(MODEL `tds_ml.classifier`,
(SELECT @features AS features))
"""
job_config = bigquery.QueryJobConfig(
query_parameters=[bigquery.ScalarQueryParameter("features", "FLOAT64", features)]
)
results = client.query(query, job_config=job_config).result()
return [dict(row) for row in results]
Step 4 — Monitoring and alerting
bash
# Create log-based metric
gcloud logging metrics create prediction_errors \
--description="Prediction errors" \
--filter='resource.type="cloud_run_revision" AND jsonPayload.message=~"error"'
# Create alert policy
gcloud alpha monitoring policies create \
--display-name="Prediction Error Alert" \
--condition-display-name="Error rate > 5%" \
--condition-filter='metric.type="logging.googleapis.com/user/prediction_errors"'
Submission
GitHub repo + GCP project screenshots + live endpoint URL
Grading rubric
| Criterion | Points |
|---|---|
| Cloud Function triggers on GCS upload | 15 |
| BigQuery ML trains and evaluates model | 20 |
| Cloud Run serves predictions | 20 |
| Monitoring dashboard configured | 15 |
| Pub/Sub triggers retraining | 15 |
| End-to-end pipeline documented | 15 |
| Total | 100 |