Skip to main content

Pub/Sub + Cloud Functions

Modern ML systems are event-driven. When model drift is detected, a retraining pipeline should trigger automatically. When new data arrives, preprocessing should start immediately. When a prediction log shows anomalies, an alert should fire. Pub/Sub provides the messaging backbone, Cloud Functions provide the serverless compute, and Cloud Scheduler provides the cron trigger — together they create a fully automated MLOps loop.

Architecture

code
┌──────────────┐ publish ┌──────────────┐ push ┌──────────────┐
│ Drift Alert │──────────────►│ Pub/Sub │────────────►│ Cloud │
│ (Monitor) │ │ Topic: │ │ Function │
└──────────────┘ │ retrain │ │ (Trigger) │
└──────┬───────┘ └──────┬───────┘
│ │
┌──────────────┐ publish │ ▼
│ Cloud │─────────────────────►│ ┌──────────────┐
│ Scheduler │ │ │ Vertex AI │
│ (weekly) │ │ │ Pipeline │
└──────────────┘ │ └──────┬───────┘
│ │
│ push │ deploy
▼ ▼
┌──────────────┐ ┌──────────────┐
│ Cloud │ │ Cloud Run │
│ Function │ │ (New Model) │
│ (Notify) │ └──────────────┘
└──────────────┘

1. Pub/Sub Topics and Subscriptions

Cloud Pub/Sub is a fully managed message bus that decouples producers from consumers. Messages are retained for up to 7 days and delivered at least once.

Create a topic

bash
# Create a topic for retraining triggers
gcloud pubsub topics create ml-retrain-trigger \
--project=your-project-id

# Create a topic for prediction logging
gcloud pubsub topics create ml-prediction-logs \
--project=your-project-id

# Create a topic for drift alerts
gcloud pubsub topics create ml-drift-alerts \
--project=your-project-id

Create a push subscription

bash
# Push subscription delivers to an HTTP endpoint (Cloud Function)
gcloud pubsub subscriptions create ml-retrain-sub \
--topic=ml-retrain-trigger \
--push-endpoint=https://us-central1-your-project-id.cloudfunctions.net/retrain-model \
--push-auth-service-account=pubsub-sa@your-project-id.iam.gserviceaccount.com \
--ack-deadline=600 \
--message-retention-duration=604800s

Create a pull subscription

bash
# Pull subscription for custom consumers
gcloud pubsub subscriptions create ml-prediction-logs-sub \
--topic=ml-prediction-logs \
--ack-deadline=60

Publish and consume messages

python
# Publisher — send a retraining trigger
from google.cloud import pubsub_v1
import json

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path("your-project-id", "ml-retrain-trigger")

message = json.dumps({
"model_name": "credit-scoring-v1",
"trigger_reason": "drift_detected",
"drift_score": 0.08,
"feature": "income",
"timestamp": "2025-04-15T10:30:00Z",
}).encode("utf-8")

future = publisher.publish(topic_path, message, origin="model-monitor")
print(f"Published message ID: {future.result()}")

# Subscriber — pull messages
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
"your-project-id", "ml-prediction-logs-sub"
)

def callback(message):
print(f"Received: {message.data.decode('utf-8')}")
message.ack()

streaming_pull = subscriber.subscribe(subscription_path, callback=callback)

2. Cloud Functions for Event-Driven Triggers

Cloud Functions (2nd gen) run in response to events. They're perfect for lightweight orchestration — triggering pipelines, sending notifications, and routing events.

Automated retraining function

python
# functions/retrain_model/main.py
import json
import google.cloud.aiplatform as aiplatform
from google.cloud import pubsub_v1

def retrain_model(event, context):
"""Triggered by Pub/Sub message from drift alert or scheduler."""
message_data = json.loads(event["data"])

model_name = message_data.get("model_name", "credit-scoring-v1")
trigger_reason = message_data.get("trigger_reason", "scheduled")

print(f"Retraining triggered for {model_name}. Reason: {trigger_reason}")

# Initialize Vertex AI
aiplatform.init(project="your-project-id", location="us-central1")

# Launch the training pipeline
job = aiplatform.PipelineJob(
display_name=f"retrain-{model_name}-{trigger_reason}",
template_path="gs://your-project-ml/pipelines/retrain_pipeline.json",
parameter_values={
"model_display_name": model_name,
"data_source": "bq://your-project.ml_data.training_view",
"epochs": 10,
"learning_rate": 0.001,
},
enable_caching=True,
)

job.run()

# Publish completion event
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path("your-project-id", "ml-retrain-complete")
completion_msg = json.dumps({
"model_name": model_name,
"job_id": job.display_name,
"status": "started",
}).encode("utf-8")
publisher.publish(topic_path, completion_msg)

return {"status": "retraining_started", "job": job.display_name}

Function configuration

yaml
# functions/retrain_model/function.yaml
name: retrain-model
description: Triggered by Pub/Sub to launch a Vertex AI retraining pipeline
runtime: python311
entry_point: retrain_model

event_trigger:
event_type: google.cloud.pubsub.topic.publish
resource: projects/your-project-id/topics/ml-retrain-trigger
retry_policy: retry

env_vars:
- name: PROJECT_ID
value: your-project-id
- name: REGION
value: us-central1

resources:
cpu: 1
memory: 512Mi
timeout: 540s

Deploy the function

bash
# Deploy 2nd gen Cloud Function with Pub/Sub trigger
gcloud functions deploy retrain-model \
--gen2 \
--runtime=python311 \
--region=us-central1 \
--source=./functions/retrain_model \
--entry-point=retrain_model \
--trigger-topic=ml-retrain-trigger \
--timeout=540 \
--memory=512Mi \
--cpu=1 \
--min-instances=0 \
--max-instances=1 \
--set-env-vars="PROJECT_ID=your-project-id,REGION=us-central1"

3. Notification Function

Send Slack/email notifications when important ML events occur:

python
# functions/notify/main.py
import json
import requests

SLACK_WEBHOOK_URL = "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"

def notify_ml_event(event, context):
"""Send notification for ML pipeline events."""
message_data = json.loads(event["data"])
event_type = message_data.get("event_type", "unknown")

severity_emoji = {
"drift_detected": ":warning:",
"retrain_started": ":rocket:",
"retrain_complete": ":white_check_mark:",
"deploy_success": ":tada:",
"error": ":rotating_light:",
}

emoji = severity_emoji.get(event_type, ":bell:")
model = message_data.get("model_name", "unknown")

slack_message = {
"text": f"{emoji} ML Event: {event_type}",
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"{emoji} *{event_type}*\nModel: `{model}`\nDetails: {json.dumps(message_data, indent=2)}"
}
}
]
}

response = requests.post(SLACK_WEBHOOK_URL, json=slack_message)
return {"status": "sent", "slack_response": response.status_code}
bash
gcloud functions deploy notify-ml-event \
--gen2 \
--runtime=python311 \
--region=us-central1 \
--source=./functions/notify \
--entry-point=notify_ml_event \
--trigger-topic=ml-retrain-complete \
--timeout=60 \
--memory=256Mi

4. Cloud Scheduler for Periodic Jobs

Cloud Scheduler triggers Pub/Sub messages on a cron schedule — useful for weekly retraining, daily data quality checks, and periodic model evaluation.

bash
# Weekly retraining schedule (every Monday at 2 AM UTC)
gcloud scheduler jobs create pubsub weekly-retrain \
--topic=ml-retrain-trigger \
--message-body='{"model_name":"credit-scoring-v1","trigger_reason":"scheduled","schedule":"weekly"}' \
--schedule="0 2 * * 1" \
--time-zone="UTC" \
--description="Weekly model retraining trigger"

# Daily data quality check (every day at 6 AM UTC)
gcloud scheduler jobs create pubsub daily-quality-check \
--topic=ml-quality-check \
--message-body='{"check_type":"data_quality","frequency":"daily"}' \
--schedule="0 6 * * *" \
--time-zone="UTC" \
--description="Daily data quality monitoring"

# List all scheduled jobs
gcloud scheduler jobs list

# Pause/resume a job
gcloud scheduler jobs pause weekly-retrain
gcloud scheduler jobs resume weekly-retrain

5. Connecting Pub/Sub → Cloud Run

Trigger Cloud Run services directly from Pub/Sub for heavier processing:

bash
# Create a Pub/Sub trigger for Cloud Run
gcloud eventarc triggers create trigger-prediction-log \
--destination-run-service=ml-predictor \
--destination-run-region=us-central1 \
--event-filters="type=google.cloud.pubsub.topic.publish" \
--event-filters="topic=ml-prediction-logs" \
--service-account=pubsub-sa@your-project-id.iam.gserviceaccount.com \
--region=us-central1

The Cloud Run service receives the Pub/Sub message as an HTTP POST with the message body in the data field:

python
# Inside your Cloud Run FastAPI app
@app.post("/")
async def handle_pubsub(request: Request):
"""Handle Pub/Sub push subscription."""
body = await request.json()
message = base64.b64decode(body["message"]["data"]).decode()
payload = json.loads(message)

# Process the prediction log
await process_prediction_log(payload)

return {"status": "processed"}

Event-Driven MLOps Checklist

  • Create Pub/Sub topics for each ML event type (drift, retrain, deploy, error)
  • Set up Cloud Functions for lightweight orchestration
  • Configure Cloud Scheduler for periodic retraining
  • Add Slack/email notification for critical events
  • Use dead-letter topics for failed message processing
  • Set --min-instances=0 on Cloud Functions to minimize cost
  • Monitor Pub/Sub subscription backlog (unacked messages = problems)