194  Pub/Sub and Topic Routing

194.1 Learning Objectives

By the end of this chapter, you will be able to:

  • Implement Pub/Sub Patterns: Build topic-based publish-subscribe messaging
  • Match Topics with Wildcards: Use MQTT-style + and # wildcards for filtering
  • Handle QoS Levels: Implement QoS 0, 1, and 2 with appropriate acknowledgments
  • Manage Subscribers: Register and route messages to interested consumers
  • Store Retained Messages: Deliver last-known values to new subscribers

194.2 Introduction

The publish-subscribe (pub/sub) pattern is the foundation of scalable IoT messaging. Instead of devices communicating directly, they publish to topics and subscribe to topics of interest. A central broker handles routing, enabling loose coupling between producers and consumers.

%%{init: {'theme': 'base', 'themeVariables': {'primaryColor': '#2C3E50', 'primaryTextColor': '#fff', 'primaryBorderColor': '#16A085', 'lineColor': '#E67E22', 'secondaryColor': '#7F8C8D'}}}%%
flowchart LR
    subgraph Publishers
        P1["Temp Sensor"]
        P2["Humidity Sensor"]
        P3["Motion Sensor"]
    end

    subgraph Broker["Message Broker"]
        ROUTE["Topic<br/>Router"]
    end

    subgraph Subscribers
        S1["Cloud Service"]
        S2["Dashboard"]
        S3["Alert System"]
    end

    P1 -->|"sensors/temp/room1"| ROUTE
    P2 -->|"sensors/humidity/room1"| ROUTE
    P3 -->|"alerts/motion/entrance"| ROUTE

    ROUTE -->|"sensors/#"| S1
    ROUTE -->|"sensors/temp/+"| S2
    ROUTE -->|"alerts/#"| S3
TipWhy Pub/Sub for IoT?
  1. Decoupling: Publishers don’t need to know who is listening
  2. Scalability: Add new subscribers without changing publishers
  3. Filtering: Subscribers receive only relevant messages
  4. Reliability: Broker can buffer during disconnections

194.3 Topic Hierarchies

IoT topics follow a hierarchical structure using / as a separator. This creates a tree that enables efficient filtering:

sensors/
β”œβ”€β”€ temperature/
β”‚   β”œβ”€β”€ room1     <- sensors/temperature/room1
β”‚   β”œβ”€β”€ room2     <- sensors/temperature/room2
β”‚   └── lobby     <- sensors/temperature/lobby
β”œβ”€β”€ humidity/
β”‚   └── room1     <- sensors/humidity/room1
└── pressure/
    └── outdoor   <- sensors/pressure/outdoor

alerts/
β”œβ”€β”€ motion/
β”‚   └── entrance  <- alerts/motion/entrance
└── fire/
    └── building1 <- alerts/fire/building1

194.3.1 Topic Design Best Practices

Pattern Example Use Case
{type}/{subtype}/{location} sensors/temperature/room1 Sensor telemetry
{device}/{action} gateway01/command Device commands
{building}/{floor}/{room} hq/3/conference Location-based
{version}/{type}/{id} v2/sensors/abc123 API versioning
WarningTopic Design Anti-Patterns

Avoid these common mistakes:

  • Too flat: sensor_temp_room1 - loses hierarchical filtering
  • Too deep: building/floor/wing/room/wall/sensor/type - hard to manage
  • Sensitive data in topic: user/john.doe/password - topics may be logged
  • Leading/trailing slashes: /sensors/ - creates empty segments

194.4 Topic Matching with Wildcards

MQTT-style wildcards enable flexible subscriptions:

194.4.1 Single-Level Wildcard (+)

Matches exactly one level in the topic hierarchy:

// sensors/+/room1 matches:
//   sensors/temperature/room1 βœ“
//   sensors/humidity/room1    βœ“
//   sensors/temperature/room2 βœ— (different location)
//   sensors/a/b/room1         βœ— (two levels, not one)

194.4.2 Multi-Level Wildcard (#)

Matches any remaining levels (must be last character):

// sensors/# matches:
//   sensors/temperature         βœ“
//   sensors/temperature/room1   βœ“
//   sensors/humidity/room1/raw  βœ“
//   alerts/motion               βœ— (doesn't start with sensors/)

194.4.3 Topic Matching Implementation

/**
 * Match a topic against a subscription filter
 * Supports MQTT wildcards:
 *   + matches exactly one level (sensors/+/temp matches sensors/room1/temp)
 *   # matches any remaining levels (sensors/# matches sensors/room1/temp/celsius)
 */
bool matchTopic(const char* topic, const char* filter) {
  const char* t = topic;
  const char* f = filter;

  while (*t && *f) {
    // Multi-level wildcard matches everything remaining
    if (*f == '#') {
      return true;
    }

    // Single-level wildcard
    if (*f == '+') {
      // Skip to next separator in topic
      while (*t && *t != '/') t++;
      f++;
      // Skip separator in filter if present
      if (*f == '/') f++;
      // Skip separator in topic if present
      if (*t == '/') t++;
      continue;
    }

    // Exact character match
    if (*t != *f) {
      return false;
    }

    t++;
    f++;
  }

  // Both should be at end for exact match
  // Or filter ends with # for wildcard match
  return (*t == '\0' && *f == '\0') || (*f == '#');
}

194.4.4 Topic Matching Examples

// Demonstration of topic matching
const char* topics[] = {
  "sensors/temperature/room1",
  "sensors/humidity/room1",
  "alerts/motion/entrance",
  "system/status/gateway"
};

const char* filters[] = {"sensors/#", "sensors/temperature/+", "alerts/#"};

// Results:
// sensors/temperature/room1 vs sensors/#              -> MATCH
// sensors/temperature/room1 vs sensors/temperature/+  -> MATCH
// sensors/temperature/room1 vs alerts/#               -> no match

// sensors/humidity/room1    vs sensors/#              -> MATCH
// sensors/humidity/room1    vs sensors/temperature/+  -> no match
// sensors/humidity/room1    vs alerts/#               -> no match

// alerts/motion/entrance    vs sensors/#              -> no match
// alerts/motion/entrance    vs sensors/temperature/+  -> no match
// alerts/motion/entrance    vs alerts/#               -> MATCH

194.5 Subscriber Management

194.5.1 Subscriber Data Structure

Each subscriber has an identity, topic filter, QoS preference, and personal inbox:

// Subscriber information
struct Subscriber {
  char clientId[32];            // Unique client identifier
  char topicFilter[MAX_TOPIC_LENGTH]; // Subscription topic (may include wildcards)
  QoSLevel maxQos;              // Maximum QoS this subscriber accepts
  bool active;                  // Is subscription active
  MessageQueue inbox;           // Personal message queue
  uint32_t lastActivity;        // Timestamp of last message delivery
};

// Publisher information
struct Publisher {
  char clientId[32];            // Unique client identifier
  char sensorType[32];          // Type of data published
  uint32_t messagesSent;        // Statistics
  uint32_t lastPublish;         // Last publish timestamp
  bool active;                  // Is publisher active
};

194.5.2 Register Subscriber

/**
 * Register a new subscriber
 */
int registerSubscriber(const char* clientId, const char* topicFilter, QoSLevel maxQos) {
  if (subscriberCount >= MAX_SUBSCRIBERS) {
    Serial.printf("[BROKER] ERROR: Max subscribers reached\n");
    return -1;
  }

  Subscriber* sub = &subscribers[subscriberCount];
  strncpy(sub->clientId, clientId, 31);
  strncpy(sub->topicFilter, topicFilter, MAX_TOPIC_LENGTH - 1);
  sub->maxQos = maxQos;
  sub->active = true;
  sub->lastActivity = millis();
  initQueue(&sub->inbox, 10);  // Each subscriber gets a 10-message inbox

  Serial.printf("[BROKER] Subscriber registered: %s -> %s (QoS %d)\n",
                clientId, topicFilter, maxQos);

  // Deliver retained messages matching this subscription
  for (int i = 0; i < MAX_RETAINED_MESSAGES; i++) {
    if (retainedMessages[i].valid &&
        matchTopic(retainedMessages[i].topic, topicFilter)) {
      Serial.printf("[BROKER] Delivering retained message to new subscriber: %s\n",
                    retainedMessages[i].topic);
      enqueue(&sub->inbox, &retainedMessages[i].message);
    }
  }

  return subscriberCount++;
}

194.5.3 Publish to Subscribers

The core routing function matches topics and delivers to all matching subscribers:

/**
 * Publish a message to all matching subscribers
 * This is the core routing function of the broker
 */
int publishMessage(Message* msg) {
  int deliveryCount = 0;

  Serial.printf("\n[PUBLISH] ID=%lu Topic='%s' QoS=%d Priority=%d Retained=%d\n",
                msg->messageId, msg->topic, msg->qos, msg->priority, msg->retained);
  Serial.printf("[PUBLISH] Payload: %s\n", msg->payload);

  // Store retained message if flagged
  if (msg->retained) {
    storeRetainedMessage(msg);
  }

  // Route to all matching subscribers
  for (int i = 0; i < subscriberCount; i++) {
    if (!subscribers[i].active) continue;

    if (matchTopic(msg->topic, subscribers[i].topicFilter)) {
      totalTopicMatches++;

      // Downgrade QoS to subscriber's maximum
      QoSLevel effectiveQos = min(msg->qos, subscribers[i].maxQos);
      Message deliveryMsg = *msg;
      deliveryMsg.qos = effectiveQos;

      // Enqueue to subscriber's inbox
      if (enqueue(&subscribers[i].inbox, &deliveryMsg)) {
        deliveryCount++;
        subscribers[i].lastActivity = millis();

        Serial.printf("[ROUTE] -> Subscriber '%s' (filter: %s, QoS: %d->%d)\n",
                      subscribers[i].clientId, subscribers[i].topicFilter,
                      msg->qos, effectiveQos);
      }
    }
  }

  totalMessagesRouted++;

  if (deliveryCount == 0) {
    Serial.printf("[ROUTE] No matching subscribers for topic: %s\n", msg->topic);
  }

  return deliveryCount;
}

194.6 QoS Levels Explained

Quality of Service (QoS) defines delivery guarantees. Higher QoS means more reliability but also more overhead.

%%{init: {'theme': 'base', 'themeVariables': {'primaryColor': '#2C3E50', 'primaryTextColor': '#fff', 'primaryBorderColor': '#16A085', 'lineColor': '#E67E22', 'secondaryColor': '#7F8C8D'}}}%%
sequenceDiagram
    participant P as Publisher
    participant B as Broker
    participant S as Subscriber

    Note over P,S: QoS 0 - At Most Once
    P->>B: PUBLISH
    B->>S: PUBLISH
    Note right of S: No acknowledgment<br/>May be lost

    Note over P,S: QoS 1 - At Least Once
    P->>B: PUBLISH
    B->>S: PUBLISH
    S-->>B: PUBACK
    B-->>P: PUBACK
    Note right of S: Guaranteed delivery<br/>May duplicate

    Note over P,S: QoS 2 - Exactly Once
    P->>B: PUBLISH
    B->>S: PUBLISH
    S-->>B: PUBREC
    B-->>P: PUBREC
    P->>B: PUBREL
    B->>S: PUBREL
    S-->>B: PUBCOMP
    B-->>P: PUBCOMP
    Note right of S: Four-step handshake<br/>Guarantees single delivery

194.6.1 QoS 0: At-Most-Once

Fire and forget - fastest but no guarantee:

/**
 * Simulate QoS 0: At-most-once delivery
 * Message is sent once with no acknowledgment
 */
void handleQoS0(Message* msg, Subscriber* sub) {
  Serial.printf("[QoS 0] Fire-and-forget delivery to %s\n", sub->clientId);
  // Message is already in queue, no further action needed
  msg->acknowledged = true;
}

194.6.2 QoS 1: At-Least-Once

Acknowledged delivery - reliable but may duplicate:

/**
 * Simulate QoS 1: At-least-once delivery
 * Message is resent until acknowledged (may cause duplicates)
 */
void handleQoS1(Message* msg, Subscriber* sub) {
  Serial.printf("[QoS 1] At-least-once delivery to %s\n", sub->clientId);

  // Simulate acknowledgment with random success
  bool ackReceived = (random(100) > 20);  // 80% success rate

  if (ackReceived) {
    msg->acknowledged = true;
    Serial.printf("[QoS 1] PUBACK received for message %lu\n", msg->messageId);
  } else {
    msg->deliveryCount++;
    qosRetries++;
    Serial.printf("[QoS 1] No PUBACK - will retry (attempt %d)\n", msg->deliveryCount);
  }
}

194.6.3 QoS 2: Exactly-Once

Four-step handshake - guaranteed single delivery but highest overhead:

/**
 * Simulate QoS 2: Exactly-once delivery
 * Four-step handshake ensures no duplicates
 */
void handleQoS2(Message* msg, Subscriber* sub) {
  Serial.printf("[QoS 2] Exactly-once delivery to %s\n", sub->clientId);

  // Simulate 4-step handshake: PUBLISH -> PUBREC -> PUBREL -> PUBCOMP
  Serial.printf("[QoS 2] Step 1: PUBLISH sent (ID=%lu)\n", msg->messageId);

  bool pubrecReceived = (random(100) > 10);  // 90% success
  if (!pubrecReceived) {
    Serial.printf("[QoS 2] PUBREC not received - retrying\n");
    qosRetries++;
    return;
  }
  Serial.printf("[QoS 2] Step 2: PUBREC received\n");

  Serial.printf("[QoS 2] Step 3: PUBREL sent\n");

  bool pubcompReceived = (random(100) > 10);  // 90% success
  if (!pubcompReceived) {
    Serial.printf("[QoS 2] PUBCOMP not received - retrying PUBREL\n");
    qosRetries++;
    return;
  }
  Serial.printf("[QoS 2] Step 4: PUBCOMP received - transaction complete\n");

  msg->acknowledged = true;
  exactlyOnceTransactions++;
}

194.6.4 When to Use Each QoS Level

QoS Use Case Example Trade-off
0 High-frequency telemetry Temperature every second Fast, may lose some
1 Important sensor data Hourly energy readings Reliable, may duplicate
2 Critical transactions Firmware updates, billing Guaranteed, slower
TipFor Beginners: QoS Like Mail Delivery
  • QoS 0 is like dropping a postcard in a mailbox - quick but no confirmation
  • QoS 1 is like certified mail - you get a delivery confirmation, but sometimes duplicates arrive
  • QoS 2 is like registered mail with signature - guaranteed exactly once but requires multiple steps

194.7 Retained Messages

Retained messages are stored by the broker and delivered to new subscribers. This ensures they receive the last known value immediately:

// Retained message storage
struct RetainedMessage {
  char topic[MAX_TOPIC_LENGTH];
  Message message;
  bool valid;
};

RetainedMessage retainedMessages[MAX_RETAINED_MESSAGES];

/**
 * Store a retained message
 */
void storeRetainedMessage(Message* msg) {
  // Find existing slot for this topic or empty slot
  int slot = -1;
  for (int i = 0; i < MAX_RETAINED_MESSAGES; i++) {
    if (!retainedMessages[i].valid) {
      if (slot < 0) slot = i;
    } else if (strcmp(retainedMessages[i].topic, msg->topic) == 0) {
      slot = i;
      break;
    }
  }

  if (slot >= 0) {
    strncpy(retainedMessages[slot].topic, msg->topic, MAX_TOPIC_LENGTH - 1);
    memcpy(&retainedMessages[slot].message, msg, sizeof(Message));
    retainedMessages[slot].valid = true;
    Serial.printf("[BROKER] Retained message stored: %s\n", msg->topic);
  }
}

194.7.1 Retained Message Use Cases

Topic Retained Value Purpose
sensors/temperature/room1 Latest reading New dashboard shows current temp
device/gateway01/status Online/offline New monitoring app sees device state
config/firmware/version Current version New device checks for updates

194.8 Message Persistence

During network outages, messages must be persisted locally and replayed when connectivity returns:

/**
 * Persist messages during network outage
 * In real systems, this would write to flash/EEPROM
 */
void persistMessage(Message* msg) {
  Serial.printf("[PERSIST] Storing message %lu for offline delivery\n", msg->messageId);
  Serial.printf("[PERSIST] Topic: %s, QoS: %d, Payload: %.50s...\n",
                msg->topic, msg->qos, msg->payload);
}

/**
 * Replay persisted messages when network returns
 */
void replayPersistedMessages() {
  Serial.printf("[PERSIST] Network restored - replaying buffered messages\n");

  // In real implementation, would read from flash storage
  for (int i = 0; i < subscriberCount; i++) {
    int pendingCount = getQueueSize(&subscribers[i].inbox);
    if (pendingCount > 0) {
      Serial.printf("[PERSIST] Subscriber %s has %d pending messages\n",
                    subscribers[i].clientId, pendingCount);
    }
  }
}

194.9 Summary

This chapter covered the publish-subscribe pattern and topic routing for IoT messaging:

  • Topic Hierarchies: Slash-separated paths for efficient filtering
  • Wildcard Matching: + for single level, # for multi-level matching
  • Subscriber Management: Registration, routing, and per-subscriber inboxes
  • QoS Levels: Trade-offs between speed (QoS 0), reliability (QoS 1), and exactness (QoS 2)
  • Retained Messages: Last-known values for new subscribers
  • Persistence: Buffering during network outages

Key Takeaways:

  • Topic hierarchies enable selective subscriptions at scale
  • Wildcards reduce the need for multiple subscriptions
  • QoS selection is a trade-off between reliability and overhead
  • Retained messages ensure new subscribers get current state

194.10 Knowledge Check

Question: Which subscription filter would match the topic sensors/temperature/room1?

Explanation: B. The + wildcard matches exactly one level, so sensors/+/room1 matches sensors/temperature/room1 where + matches temperature.

Question: For a security motion alert that must not be lost or duplicated, which QoS level is most appropriate?

Explanation: C. Security alerts are critical - they must not be lost (rules out QoS 0) and must not duplicate (rules out QoS 1). QoS 2’s four-step handshake guarantees exactly-once delivery.

Question: What happens when a new subscriber subscribes to a topic that has a retained message?

Explanation: A. Retained messages are automatically delivered to new subscribers when they subscribe to a matching topic, ensuring they immediately receive the last known value.

194.12 What’s Next

Continue to Queue Lab Challenges to apply these concepts in hands-on exercises including dead letter queues, message deduplication, and flow control.