Pub/Sub + Cloud Functions
Modern ML systems are event-driven. When model drift is detected, a retraining pipeline should trigger automatically. When new data arrives, preprocessing should start immediately. When a prediction log shows anomalies, an alert should fire. Pub/Sub provides the messaging backbone, Cloud Functions provide the serverless compute, and Cloud Scheduler provides the cron trigger — together they create a fully automated MLOps loop.
Architecture
┌──────────────┐ publish ┌──────────────┐ push ┌──────────────┐
│ Drift Alert │──────────────►│ Pub/Sub │────────────►│ Cloud │
│ (Monitor) │ │ Topic: │ │ Function │
└──────────────┘ │ retrain │ │ (Trigger) │
└──────┬───────┘ └──────┬───────┘
│ │
┌──────────────┐ publish │ ▼
│ Cloud │─────────────────────►│ ┌──────────────┐
│ Scheduler │ │ │ Vertex AI │
│ (weekly) │ │ │ Pipeline │
└──────────────┘ │ └──────┬───────┘
│ │
│ push │ deploy
▼ ▼
┌──────────────┐ ┌──────────────┐
│ Cloud │ │ Cloud Run │
│ Function │ │ (New Model) │
│ (Notify) │ └──────────────┘
└──────────────┘
1. Pub/Sub Topics and Subscriptions
Cloud Pub/Sub is a fully managed message bus that decouples producers from consumers. Messages are retained for up to 7 days and delivered at least once.
Create a topic
# Create a topic for retraining triggers
gcloud pubsub topics create ml-retrain-trigger \
--project=your-project-id
# Create a topic for prediction logging
gcloud pubsub topics create ml-prediction-logs \
--project=your-project-id
# Create a topic for drift alerts
gcloud pubsub topics create ml-drift-alerts \
--project=your-project-id
Create a push subscription
# Push subscription delivers to an HTTP endpoint (Cloud Function)
gcloud pubsub subscriptions create ml-retrain-sub \
--topic=ml-retrain-trigger \
--push-endpoint=https://us-central1-your-project-id.cloudfunctions.net/retrain-model \
--push-auth-service-account=pubsub-sa@your-project-id.iam.gserviceaccount.com \
--ack-deadline=600 \
--message-retention-duration=604800s
Create a pull subscription
# Pull subscription for custom consumers
gcloud pubsub subscriptions create ml-prediction-logs-sub \
--topic=ml-prediction-logs \
--ack-deadline=60
Publish and consume messages
# Publisher — send a retraining trigger
from google.cloud import pubsub_v1
import json
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path("your-project-id", "ml-retrain-trigger")
message = json.dumps({
"model_name": "credit-scoring-v1",
"trigger_reason": "drift_detected",
"drift_score": 0.08,
"feature": "income",
"timestamp": "2025-04-15T10:30:00Z",
}).encode("utf-8")
future = publisher.publish(topic_path, message, origin="model-monitor")
print(f"Published message ID: {future.result()}")
# Subscriber — pull messages
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
"your-project-id", "ml-prediction-logs-sub"
)
def callback(message):
print(f"Received: {message.data.decode('utf-8')}")
message.ack()
streaming_pull = subscriber.subscribe(subscription_path, callback=callback)
2. Cloud Functions for Event-Driven Triggers
Cloud Functions (2nd gen) run in response to events. They're perfect for lightweight orchestration — triggering pipelines, sending notifications, and routing events.
Automated retraining function
# functions/retrain_model/main.py
import json
import google.cloud.aiplatform as aiplatform
from google.cloud import pubsub_v1
def retrain_model(event, context):
"""Triggered by Pub/Sub message from drift alert or scheduler."""
message_data = json.loads(event["data"])
model_name = message_data.get("model_name", "credit-scoring-v1")
trigger_reason = message_data.get("trigger_reason", "scheduled")
print(f"Retraining triggered for {model_name}. Reason: {trigger_reason}")
# Initialize Vertex AI
aiplatform.init(project="your-project-id", location="us-central1")
# Launch the training pipeline
job = aiplatform.PipelineJob(
display_name=f"retrain-{model_name}-{trigger_reason}",
template_path="gs://your-project-ml/pipelines/retrain_pipeline.json",
parameter_values={
"model_display_name": model_name,
"data_source": "bq://your-project.ml_data.training_view",
"epochs": 10,
"learning_rate": 0.001,
},
enable_caching=True,
)
job.run()
# Publish completion event
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path("your-project-id", "ml-retrain-complete")
completion_msg = json.dumps({
"model_name": model_name,
"job_id": job.display_name,
"status": "started",
}).encode("utf-8")
publisher.publish(topic_path, completion_msg)
return {"status": "retraining_started", "job": job.display_name}
Function configuration
# functions/retrain_model/function.yaml
name: retrain-model
description: Triggered by Pub/Sub to launch a Vertex AI retraining pipeline
runtime: python311
entry_point: retrain_model
event_trigger:
event_type: google.cloud.pubsub.topic.publish
resource: projects/your-project-id/topics/ml-retrain-trigger
retry_policy: retry
env_vars:
- name: PROJECT_ID
value: your-project-id
- name: REGION
value: us-central1
resources:
cpu: 1
memory: 512Mi
timeout: 540s
Deploy the function
# Deploy 2nd gen Cloud Function with Pub/Sub trigger
gcloud functions deploy retrain-model \
--gen2 \
--runtime=python311 \
--region=us-central1 \
--source=./functions/retrain_model \
--entry-point=retrain_model \
--trigger-topic=ml-retrain-trigger \
--timeout=540 \
--memory=512Mi \
--cpu=1 \
--min-instances=0 \
--max-instances=1 \
--set-env-vars="PROJECT_ID=your-project-id,REGION=us-central1"
3. Notification Function
Send Slack/email notifications when important ML events occur:
# functions/notify/main.py
import json
import requests
SLACK_WEBHOOK_URL = "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
def notify_ml_event(event, context):
"""Send notification for ML pipeline events."""
message_data = json.loads(event["data"])
event_type = message_data.get("event_type", "unknown")
severity_emoji = {
"drift_detected": ":warning:",
"retrain_started": ":rocket:",
"retrain_complete": ":white_check_mark:",
"deploy_success": ":tada:",
"error": ":rotating_light:",
}
emoji = severity_emoji.get(event_type, ":bell:")
model = message_data.get("model_name", "unknown")
slack_message = {
"text": f"{emoji} ML Event: {event_type}",
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"{emoji} *{event_type}*\nModel: `{model}`\nDetails: {json.dumps(message_data, indent=2)}"
}
}
]
}
response = requests.post(SLACK_WEBHOOK_URL, json=slack_message)
return {"status": "sent", "slack_response": response.status_code}
gcloud functions deploy notify-ml-event \
--gen2 \
--runtime=python311 \
--region=us-central1 \
--source=./functions/notify \
--entry-point=notify_ml_event \
--trigger-topic=ml-retrain-complete \
--timeout=60 \
--memory=256Mi
4. Cloud Scheduler for Periodic Jobs
Cloud Scheduler triggers Pub/Sub messages on a cron schedule — useful for weekly retraining, daily data quality checks, and periodic model evaluation.
# Weekly retraining schedule (every Monday at 2 AM UTC)
gcloud scheduler jobs create pubsub weekly-retrain \
--topic=ml-retrain-trigger \
--message-body='{"model_name":"credit-scoring-v1","trigger_reason":"scheduled","schedule":"weekly"}' \
--schedule="0 2 * * 1" \
--time-zone="UTC" \
--description="Weekly model retraining trigger"
# Daily data quality check (every day at 6 AM UTC)
gcloud scheduler jobs create pubsub daily-quality-check \
--topic=ml-quality-check \
--message-body='{"check_type":"data_quality","frequency":"daily"}' \
--schedule="0 6 * * *" \
--time-zone="UTC" \
--description="Daily data quality monitoring"
# List all scheduled jobs
gcloud scheduler jobs list
# Pause/resume a job
gcloud scheduler jobs pause weekly-retrain
gcloud scheduler jobs resume weekly-retrain
5. Connecting Pub/Sub → Cloud Run
Trigger Cloud Run services directly from Pub/Sub for heavier processing:
# Create a Pub/Sub trigger for Cloud Run
gcloud eventarc triggers create trigger-prediction-log \
--destination-run-service=ml-predictor \
--destination-run-region=us-central1 \
--event-filters="type=google.cloud.pubsub.topic.publish" \
--event-filters="topic=ml-prediction-logs" \
--service-account=pubsub-sa@your-project-id.iam.gserviceaccount.com \
--region=us-central1
The Cloud Run service receives the Pub/Sub message as an HTTP POST with the message body in the data field:
# Inside your Cloud Run FastAPI app
@app.post("/")
async def handle_pubsub(request: Request):
"""Handle Pub/Sub push subscription."""
body = await request.json()
message = base64.b64decode(body["message"]["data"]).decode()
payload = json.loads(message)
# Process the prediction log
await process_prediction_log(payload)
return {"status": "processed"}
Event-Driven MLOps Checklist
- Create Pub/Sub topics for each ML event type (drift, retrain, deploy, error)
- Set up Cloud Functions for lightweight orchestration
- Configure Cloud Scheduler for periodic retraining
- Add Slack/email notification for critical events
- Use dead-letter topics for failed message processing
- Set
--min-instances=0on Cloud Functions to minimize cost - Monitor Pub/Sub subscription backlog (unacked messages = problems)