194 Pub/Sub and Topic Routing
194.1 Learning Objectives
By the end of this chapter, you will be able to:
- Implement Pub/Sub Patterns: Build topic-based publish-subscribe messaging
- Match Topics with Wildcards: Use MQTT-style
+and#wildcards for filtering - Handle QoS Levels: Implement QoS 0, 1, and 2 with appropriate acknowledgments
- Manage Subscribers: Register and route messages to interested consumers
- Store Retained Messages: Deliver last-known values to new subscribers
194.2 Introduction
The publish-subscribe (pub/sub) pattern is the foundation of scalable IoT messaging. Instead of devices communicating directly, they publish to topics and subscribe to topics of interest. A central broker handles routing, enabling loose coupling between producers and consumers.
%%{init: {'theme': 'base', 'themeVariables': {'primaryColor': '#2C3E50', 'primaryTextColor': '#fff', 'primaryBorderColor': '#16A085', 'lineColor': '#E67E22', 'secondaryColor': '#7F8C8D'}}}%%
flowchart LR
subgraph Publishers
P1["Temp Sensor"]
P2["Humidity Sensor"]
P3["Motion Sensor"]
end
subgraph Broker["Message Broker"]
ROUTE["Topic<br/>Router"]
end
subgraph Subscribers
S1["Cloud Service"]
S2["Dashboard"]
S3["Alert System"]
end
P1 -->|"sensors/temp/room1"| ROUTE
P2 -->|"sensors/humidity/room1"| ROUTE
P3 -->|"alerts/motion/entrance"| ROUTE
ROUTE -->|"sensors/#"| S1
ROUTE -->|"sensors/temp/+"| S2
ROUTE -->|"alerts/#"| S3
- Decoupling: Publishers donβt need to know who is listening
- Scalability: Add new subscribers without changing publishers
- Filtering: Subscribers receive only relevant messages
- Reliability: Broker can buffer during disconnections
194.3 Topic Hierarchies
IoT topics follow a hierarchical structure using / as a separator. This creates a tree that enables efficient filtering:
sensors/
βββ temperature/
β βββ room1 <- sensors/temperature/room1
β βββ room2 <- sensors/temperature/room2
β βββ lobby <- sensors/temperature/lobby
βββ humidity/
β βββ room1 <- sensors/humidity/room1
βββ pressure/
βββ outdoor <- sensors/pressure/outdoor
alerts/
βββ motion/
β βββ entrance <- alerts/motion/entrance
βββ fire/
βββ building1 <- alerts/fire/building1
194.3.1 Topic Design Best Practices
| Pattern | Example | Use Case |
|---|---|---|
{type}/{subtype}/{location} |
sensors/temperature/room1 |
Sensor telemetry |
{device}/{action} |
gateway01/command |
Device commands |
{building}/{floor}/{room} |
hq/3/conference |
Location-based |
{version}/{type}/{id} |
v2/sensors/abc123 |
API versioning |
Avoid these common mistakes:
- Too flat:
sensor_temp_room1- loses hierarchical filtering - Too deep:
building/floor/wing/room/wall/sensor/type- hard to manage - Sensitive data in topic:
user/john.doe/password- topics may be logged - Leading/trailing slashes:
/sensors/- creates empty segments
194.4 Topic Matching with Wildcards
MQTT-style wildcards enable flexible subscriptions:
194.4.1 Single-Level Wildcard (+)
Matches exactly one level in the topic hierarchy:
// sensors/+/room1 matches:
// sensors/temperature/room1 β
// sensors/humidity/room1 β
// sensors/temperature/room2 β (different location)
// sensors/a/b/room1 β (two levels, not one)194.4.2 Multi-Level Wildcard (#)
Matches any remaining levels (must be last character):
// sensors/# matches:
// sensors/temperature β
// sensors/temperature/room1 β
// sensors/humidity/room1/raw β
// alerts/motion β (doesn't start with sensors/)194.4.3 Topic Matching Implementation
/**
* Match a topic against a subscription filter
* Supports MQTT wildcards:
* + matches exactly one level (sensors/+/temp matches sensors/room1/temp)
* # matches any remaining levels (sensors/# matches sensors/room1/temp/celsius)
*/
bool matchTopic(const char* topic, const char* filter) {
const char* t = topic;
const char* f = filter;
while (*t && *f) {
// Multi-level wildcard matches everything remaining
if (*f == '#') {
return true;
}
// Single-level wildcard
if (*f == '+') {
// Skip to next separator in topic
while (*t && *t != '/') t++;
f++;
// Skip separator in filter if present
if (*f == '/') f++;
// Skip separator in topic if present
if (*t == '/') t++;
continue;
}
// Exact character match
if (*t != *f) {
return false;
}
t++;
f++;
}
// Both should be at end for exact match
// Or filter ends with # for wildcard match
return (*t == '\0' && *f == '\0') || (*f == '#');
}194.4.4 Topic Matching Examples
// Demonstration of topic matching
const char* topics[] = {
"sensors/temperature/room1",
"sensors/humidity/room1",
"alerts/motion/entrance",
"system/status/gateway"
};
const char* filters[] = {"sensors/#", "sensors/temperature/+", "alerts/#"};
// Results:
// sensors/temperature/room1 vs sensors/# -> MATCH
// sensors/temperature/room1 vs sensors/temperature/+ -> MATCH
// sensors/temperature/room1 vs alerts/# -> no match
// sensors/humidity/room1 vs sensors/# -> MATCH
// sensors/humidity/room1 vs sensors/temperature/+ -> no match
// sensors/humidity/room1 vs alerts/# -> no match
// alerts/motion/entrance vs sensors/# -> no match
// alerts/motion/entrance vs sensors/temperature/+ -> no match
// alerts/motion/entrance vs alerts/# -> MATCH194.5 Subscriber Management
194.5.1 Subscriber Data Structure
Each subscriber has an identity, topic filter, QoS preference, and personal inbox:
// Subscriber information
struct Subscriber {
char clientId[32]; // Unique client identifier
char topicFilter[MAX_TOPIC_LENGTH]; // Subscription topic (may include wildcards)
QoSLevel maxQos; // Maximum QoS this subscriber accepts
bool active; // Is subscription active
MessageQueue inbox; // Personal message queue
uint32_t lastActivity; // Timestamp of last message delivery
};
// Publisher information
struct Publisher {
char clientId[32]; // Unique client identifier
char sensorType[32]; // Type of data published
uint32_t messagesSent; // Statistics
uint32_t lastPublish; // Last publish timestamp
bool active; // Is publisher active
};194.5.2 Register Subscriber
/**
* Register a new subscriber
*/
int registerSubscriber(const char* clientId, const char* topicFilter, QoSLevel maxQos) {
if (subscriberCount >= MAX_SUBSCRIBERS) {
Serial.printf("[BROKER] ERROR: Max subscribers reached\n");
return -1;
}
Subscriber* sub = &subscribers[subscriberCount];
strncpy(sub->clientId, clientId, 31);
strncpy(sub->topicFilter, topicFilter, MAX_TOPIC_LENGTH - 1);
sub->maxQos = maxQos;
sub->active = true;
sub->lastActivity = millis();
initQueue(&sub->inbox, 10); // Each subscriber gets a 10-message inbox
Serial.printf("[BROKER] Subscriber registered: %s -> %s (QoS %d)\n",
clientId, topicFilter, maxQos);
// Deliver retained messages matching this subscription
for (int i = 0; i < MAX_RETAINED_MESSAGES; i++) {
if (retainedMessages[i].valid &&
matchTopic(retainedMessages[i].topic, topicFilter)) {
Serial.printf("[BROKER] Delivering retained message to new subscriber: %s\n",
retainedMessages[i].topic);
enqueue(&sub->inbox, &retainedMessages[i].message);
}
}
return subscriberCount++;
}194.5.3 Publish to Subscribers
The core routing function matches topics and delivers to all matching subscribers:
/**
* Publish a message to all matching subscribers
* This is the core routing function of the broker
*/
int publishMessage(Message* msg) {
int deliveryCount = 0;
Serial.printf("\n[PUBLISH] ID=%lu Topic='%s' QoS=%d Priority=%d Retained=%d\n",
msg->messageId, msg->topic, msg->qos, msg->priority, msg->retained);
Serial.printf("[PUBLISH] Payload: %s\n", msg->payload);
// Store retained message if flagged
if (msg->retained) {
storeRetainedMessage(msg);
}
// Route to all matching subscribers
for (int i = 0; i < subscriberCount; i++) {
if (!subscribers[i].active) continue;
if (matchTopic(msg->topic, subscribers[i].topicFilter)) {
totalTopicMatches++;
// Downgrade QoS to subscriber's maximum
QoSLevel effectiveQos = min(msg->qos, subscribers[i].maxQos);
Message deliveryMsg = *msg;
deliveryMsg.qos = effectiveQos;
// Enqueue to subscriber's inbox
if (enqueue(&subscribers[i].inbox, &deliveryMsg)) {
deliveryCount++;
subscribers[i].lastActivity = millis();
Serial.printf("[ROUTE] -> Subscriber '%s' (filter: %s, QoS: %d->%d)\n",
subscribers[i].clientId, subscribers[i].topicFilter,
msg->qos, effectiveQos);
}
}
}
totalMessagesRouted++;
if (deliveryCount == 0) {
Serial.printf("[ROUTE] No matching subscribers for topic: %s\n", msg->topic);
}
return deliveryCount;
}194.6 QoS Levels Explained
Quality of Service (QoS) defines delivery guarantees. Higher QoS means more reliability but also more overhead.
%%{init: {'theme': 'base', 'themeVariables': {'primaryColor': '#2C3E50', 'primaryTextColor': '#fff', 'primaryBorderColor': '#16A085', 'lineColor': '#E67E22', 'secondaryColor': '#7F8C8D'}}}%%
sequenceDiagram
participant P as Publisher
participant B as Broker
participant S as Subscriber
Note over P,S: QoS 0 - At Most Once
P->>B: PUBLISH
B->>S: PUBLISH
Note right of S: No acknowledgment<br/>May be lost
Note over P,S: QoS 1 - At Least Once
P->>B: PUBLISH
B->>S: PUBLISH
S-->>B: PUBACK
B-->>P: PUBACK
Note right of S: Guaranteed delivery<br/>May duplicate
Note over P,S: QoS 2 - Exactly Once
P->>B: PUBLISH
B->>S: PUBLISH
S-->>B: PUBREC
B-->>P: PUBREC
P->>B: PUBREL
B->>S: PUBREL
S-->>B: PUBCOMP
B-->>P: PUBCOMP
Note right of S: Four-step handshake<br/>Guarantees single delivery
194.6.1 QoS 0: At-Most-Once
Fire and forget - fastest but no guarantee:
/**
* Simulate QoS 0: At-most-once delivery
* Message is sent once with no acknowledgment
*/
void handleQoS0(Message* msg, Subscriber* sub) {
Serial.printf("[QoS 0] Fire-and-forget delivery to %s\n", sub->clientId);
// Message is already in queue, no further action needed
msg->acknowledged = true;
}194.6.2 QoS 1: At-Least-Once
Acknowledged delivery - reliable but may duplicate:
/**
* Simulate QoS 1: At-least-once delivery
* Message is resent until acknowledged (may cause duplicates)
*/
void handleQoS1(Message* msg, Subscriber* sub) {
Serial.printf("[QoS 1] At-least-once delivery to %s\n", sub->clientId);
// Simulate acknowledgment with random success
bool ackReceived = (random(100) > 20); // 80% success rate
if (ackReceived) {
msg->acknowledged = true;
Serial.printf("[QoS 1] PUBACK received for message %lu\n", msg->messageId);
} else {
msg->deliveryCount++;
qosRetries++;
Serial.printf("[QoS 1] No PUBACK - will retry (attempt %d)\n", msg->deliveryCount);
}
}194.6.3 QoS 2: Exactly-Once
Four-step handshake - guaranteed single delivery but highest overhead:
/**
* Simulate QoS 2: Exactly-once delivery
* Four-step handshake ensures no duplicates
*/
void handleQoS2(Message* msg, Subscriber* sub) {
Serial.printf("[QoS 2] Exactly-once delivery to %s\n", sub->clientId);
// Simulate 4-step handshake: PUBLISH -> PUBREC -> PUBREL -> PUBCOMP
Serial.printf("[QoS 2] Step 1: PUBLISH sent (ID=%lu)\n", msg->messageId);
bool pubrecReceived = (random(100) > 10); // 90% success
if (!pubrecReceived) {
Serial.printf("[QoS 2] PUBREC not received - retrying\n");
qosRetries++;
return;
}
Serial.printf("[QoS 2] Step 2: PUBREC received\n");
Serial.printf("[QoS 2] Step 3: PUBREL sent\n");
bool pubcompReceived = (random(100) > 10); // 90% success
if (!pubcompReceived) {
Serial.printf("[QoS 2] PUBCOMP not received - retrying PUBREL\n");
qosRetries++;
return;
}
Serial.printf("[QoS 2] Step 4: PUBCOMP received - transaction complete\n");
msg->acknowledged = true;
exactlyOnceTransactions++;
}194.6.4 When to Use Each QoS Level
| QoS | Use Case | Example | Trade-off |
|---|---|---|---|
| 0 | High-frequency telemetry | Temperature every second | Fast, may lose some |
| 1 | Important sensor data | Hourly energy readings | Reliable, may duplicate |
| 2 | Critical transactions | Firmware updates, billing | Guaranteed, slower |
- QoS 0 is like dropping a postcard in a mailbox - quick but no confirmation
- QoS 1 is like certified mail - you get a delivery confirmation, but sometimes duplicates arrive
- QoS 2 is like registered mail with signature - guaranteed exactly once but requires multiple steps
194.7 Retained Messages
Retained messages are stored by the broker and delivered to new subscribers. This ensures they receive the last known value immediately:
// Retained message storage
struct RetainedMessage {
char topic[MAX_TOPIC_LENGTH];
Message message;
bool valid;
};
RetainedMessage retainedMessages[MAX_RETAINED_MESSAGES];
/**
* Store a retained message
*/
void storeRetainedMessage(Message* msg) {
// Find existing slot for this topic or empty slot
int slot = -1;
for (int i = 0; i < MAX_RETAINED_MESSAGES; i++) {
if (!retainedMessages[i].valid) {
if (slot < 0) slot = i;
} else if (strcmp(retainedMessages[i].topic, msg->topic) == 0) {
slot = i;
break;
}
}
if (slot >= 0) {
strncpy(retainedMessages[slot].topic, msg->topic, MAX_TOPIC_LENGTH - 1);
memcpy(&retainedMessages[slot].message, msg, sizeof(Message));
retainedMessages[slot].valid = true;
Serial.printf("[BROKER] Retained message stored: %s\n", msg->topic);
}
}194.7.1 Retained Message Use Cases
| Topic | Retained Value | Purpose |
|---|---|---|
sensors/temperature/room1 |
Latest reading | New dashboard shows current temp |
device/gateway01/status |
Online/offline | New monitoring app sees device state |
config/firmware/version |
Current version | New device checks for updates |
194.8 Message Persistence
During network outages, messages must be persisted locally and replayed when connectivity returns:
/**
* Persist messages during network outage
* In real systems, this would write to flash/EEPROM
*/
void persistMessage(Message* msg) {
Serial.printf("[PERSIST] Storing message %lu for offline delivery\n", msg->messageId);
Serial.printf("[PERSIST] Topic: %s, QoS: %d, Payload: %.50s...\n",
msg->topic, msg->qos, msg->payload);
}
/**
* Replay persisted messages when network returns
*/
void replayPersistedMessages() {
Serial.printf("[PERSIST] Network restored - replaying buffered messages\n");
// In real implementation, would read from flash storage
for (int i = 0; i < subscriberCount; i++) {
int pendingCount = getQueueSize(&subscribers[i].inbox);
if (pendingCount > 0) {
Serial.printf("[PERSIST] Subscriber %s has %d pending messages\n",
subscribers[i].clientId, pendingCount);
}
}
}194.9 Summary
This chapter covered the publish-subscribe pattern and topic routing for IoT messaging:
- Topic Hierarchies: Slash-separated paths for efficient filtering
- Wildcard Matching:
+for single level,#for multi-level matching - Subscriber Management: Registration, routing, and per-subscriber inboxes
- QoS Levels: Trade-offs between speed (QoS 0), reliability (QoS 1), and exactness (QoS 2)
- Retained Messages: Last-known values for new subscribers
- Persistence: Buffering during network outages
Key Takeaways:
- Topic hierarchies enable selective subscriptions at scale
- Wildcards reduce the need for multiple subscriptions
- QoS selection is a trade-off between reliability and overhead
- Retained messages ensure new subscribers get current state
194.10 Knowledge Check
Question: Which subscription filter would match the topic sensors/temperature/room1?
Explanation: B. The + wildcard matches exactly one level, so sensors/+/room1 matches sensors/temperature/room1 where + matches temperature.
Question: For a security motion alert that must not be lost or duplicated, which QoS level is most appropriate?
Explanation: C. Security alerts are critical - they must not be lost (rules out QoS 0) and must not duplicate (rules out QoS 1). QoS 2βs four-step handshake guarantees exactly-once delivery.
Question: What happens when a new subscriber subscribes to a topic that has a retained message?
Explanation: A. Retained messages are automatically delivered to new subscribers when they subscribe to a matching topic, ensuring they immediately receive the last known value.
194.12 Whatβs Next
Continue to Queue Lab Challenges to apply these concepts in hands-on exercises including dead letter queues, message deduplication, and flow control.