193  Message Queue Fundamentals

193.1 Learning Objectives

By the end of this chapter, you will be able to:

  • Understand Queue Data Structures: Implement circular buffers for IoT message handling
  • Implement Queue Operations: Build enqueue, dequeue, and priority queue logic
  • Handle Buffer Management: Manage memory efficiently with limited resources
  • Track Queue Statistics: Monitor queue health with counters and metrics
  • Apply Priority Ordering: Implement message prioritization for critical data

193.2 Introduction

Message queuing is a fundamental communication pattern in IoT systems that enables asynchronous, reliable message delivery between distributed components. This chapter focuses on the core queue data structures and operations that power protocols like MQTT, AMQP, and cloud message brokers.

Understanding queue fundamentals prepares you to:

  • Decouple producers from consumers - Sensors can publish data without waiting for cloud acknowledgment
  • Buffer during network outages - Messages persist locally until delivery succeeds
  • Handle burst traffic - Queues absorb spikes in sensor data
  • Prioritize critical messages - Important alerts get delivered first
TipReal-World Relevance

Message queues are everywhere in production IoT:

  • AWS IoT Core uses MQTT with message queuing for millions of devices
  • Azure IoT Hub implements device-to-cloud queuing with configurable retention
  • Industrial SCADA systems use message buffers to handle sensor bursts
  • Smart home hubs queue commands when devices are temporarily offline

Understanding these patterns prepares you for working with any enterprise IoT platform.

193.3 Queue Data Structures

193.3.1 The Message Structure

Every message in an IoT system carries metadata needed for routing and delivery guarantees. The core message structure mirrors what you’ll find in real protocols like MQTT:

// Core message structure - represents a single queued message
struct Message {
  uint32_t messageId;           // Unique identifier for tracking
  char topic[MAX_TOPIC_LENGTH]; // Topic string (e.g., "sensors/temp/room1")
  char payload[MAX_PAYLOAD_SIZE]; // Message content (JSON or binary)
  uint16_t payloadLen;          // Actual payload length
  QoSLevel qos;                 // Quality of service level
  Priority priority;            // Message priority for ordering
  uint32_t timestamp;           // Creation time (millis)
  uint32_t expiry;              // Expiration time (0 = never)
  bool retained;                // Should broker store for new subscribers
  uint8_t deliveryCount;        // Retry counter for QoS 1/2
  bool acknowledged;            // Has subscriber confirmed receipt
};
NoteMessage Fields Explained
Field Purpose Real-World Example
messageId Track and deduplicate messages MQTT Packet Identifier
topic Route to interested subscribers sensors/temperature/room1
payload Actual sensor data {"value": 23.5, "unit": "C"}
qos Delivery guarantee level QoS 0, 1, or 2
priority Ordering preference Critical alerts first
retained Persist for new subscribers Last known temperature
deliveryCount Retry tracking Stop after 3 failed attempts

193.3.2 QoS and Priority Enumerations

Define delivery guarantee levels and message priorities:

// QoS Level definitions (matching MQTT specification)
enum QoSLevel {
  QOS_AT_MOST_ONCE = 0,   // Fire and forget - no acknowledgment
  QOS_AT_LEAST_ONCE = 1,  // Acknowledged delivery - may duplicate
  QOS_EXACTLY_ONCE = 2    // Two-phase commit - guaranteed single delivery
};

// Message priority levels
enum Priority {
  PRIORITY_LOW = 0,
  PRIORITY_NORMAL = 1,
  PRIORITY_HIGH = 2,
  PRIORITY_CRITICAL = 3
};

193.3.3 The Circular Buffer Queue

The circular buffer is the most memory-efficient queue implementation for embedded systems. It reuses a fixed-size array by wrapping around when reaching the end:

// Circular buffer queue for efficient memory usage
struct MessageQueue {
  Message messages[MAX_QUEUE_SIZE];
  int head;                     // Next position to dequeue
  int tail;                     // Next position to enqueue
  int count;                    // Current message count
  int maxSize;                  // Queue capacity
  uint32_t totalEnqueued;       // Statistics: total messages added
  uint32_t totalDequeued;       // Statistics: total messages removed
  uint32_t totalDropped;        // Statistics: messages dropped (overflow)
};
%%{init: {'theme': 'base', 'themeVariables': {'primaryColor': '#2C3E50', 'primaryTextColor': '#fff', 'primaryBorderColor': '#16A085', 'lineColor': '#E67E22', 'secondaryColor': '#7F8C8D'}}}%%
flowchart LR
    subgraph CircularBuffer["Circular Buffer (size=5)"]
        S0["[0]<br/>Msg A"]
        S1["[1]<br/>Msg B"]
        S2["[2]<br/>Empty"]
        S3["[3]<br/>Empty"]
        S4["[4]<br/>Empty"]
    end

    HEAD["HEAD=0<br/>(next to read)"]
    TAIL["TAIL=2<br/>(next to write)"]

    HEAD --> S0
    TAIL --> S2

    S0 --> S1 --> S2 --> S3 --> S4 --> S0
TipFor Beginners: Understanding Circular Buffers

Imagine a circular buffer like a carousel at an airport baggage claim:

  • Bags (messages) get placed on the carousel at one point (TAIL)
  • Travelers pick them up at another point (HEAD)
  • When the carousel reaches the end, it wraps around to the beginning
  • If too many bags arrive before pickup, some get dropped

This design uses memory efficiently because it never needs to shift elements - just update the HEAD and TAIL pointers.

193.4 Queue Operations

193.4.1 Initialize Queue

Set up a queue with empty state and reset all counters:

/**
 * Initialize a message queue
 * Sets up circular buffer with empty state
 */
void initQueue(MessageQueue* q, int maxSize) {
  q->head = 0;
  q->tail = 0;
  q->count = 0;
  q->maxSize = min(maxSize, MAX_QUEUE_SIZE);
  q->totalEnqueued = 0;
  q->totalDequeued = 0;
  q->totalDropped = 0;

  // Clear all message slots
  for (int i = 0; i < MAX_QUEUE_SIZE; i++) {
    q->messages[i].messageId = 0;
    q->messages[i].topic[0] = '\0';
    q->messages[i].payloadLen = 0;
  }
}

193.4.2 Status Check Functions

Simple helper functions to check queue state:

/**
 * Check if queue is empty
 */
bool isQueueEmpty(MessageQueue* q) {
  return q->count == 0;
}

/**
 * Check if queue is full
 */
bool isQueueFull(MessageQueue* q) {
  return q->count >= q->maxSize;
}

/**
 * Get current queue size
 */
int getQueueSize(MessageQueue* q) {
  return q->count;
}

193.4.3 Enqueue Operation

Add a message to the back of the queue:

/**
 * Enqueue a message (add to back of queue)
 * Returns true if successful, false if queue is full
 */
bool enqueue(MessageQueue* q, Message* msg) {
  if (isQueueFull(q)) {
    q->totalDropped++;
    Serial.printf("[QUEUE] DROPPED: Queue full (size=%d), msg=%lu topic=%s\n",
                  q->count, msg->messageId, msg->topic);
    return false;
  }

  // Copy message to queue slot
  memcpy(&q->messages[q->tail], msg, sizeof(Message));

  // Advance tail pointer (circular)
  q->tail = (q->tail + 1) % q->maxSize;
  q->count++;
  q->totalEnqueued++;

  return true;
}
NoteThe Modulo Trick

The line q->tail = (q->tail + 1) % q->maxSize is the key to circular behavior:

  • If tail = 4 and maxSize = 5, then (4 + 1) % 5 = 0
  • The pointer wraps from the end back to the beginning
  • This avoids the need for expensive array shifting operations

193.4.4 Dequeue Operation

Remove and return a message from the front of the queue:

/**
 * Dequeue a message (remove from front of queue)
 * Returns true if successful, false if queue is empty
 */
bool dequeue(MessageQueue* q, Message* msg) {
  if (isQueueEmpty(q)) {
    return false;
  }

  // Copy message from queue slot
  memcpy(msg, &q->messages[q->head], sizeof(Message));

  // Clear the slot
  q->messages[q->head].messageId = 0;

  // Advance head pointer (circular)
  q->head = (q->head + 1) % q->maxSize;
  q->count--;
  q->totalDequeued++;

  return true;
}

193.4.5 Peek Operation

View the front message without removing it:

/**
 * Peek at front message without removing
 * Returns pointer to message or NULL if empty
 */
Message* peek(MessageQueue* q) {
  if (isQueueEmpty(q)) {
    return NULL;
  }
  return &q->messages[q->head];
}

193.4.6 Clear Queue

Remove all messages from the queue:

/**
 * Clear all messages from queue
 */
void clearQueue(MessageQueue* q) {
  q->head = 0;
  q->tail = 0;
  q->count = 0;
}

193.5 Priority Queue Operations

For IoT systems handling mixed-priority data (critical alerts vs routine telemetry), priority queuing ensures important messages get delivered first.

193.5.1 Enqueue with Priority

When the queue is full, a high-priority message can displace a low-priority one:

/**
 * Enqueue with priority (insert based on priority level)
 * Higher priority messages are placed ahead of lower priority
 */
bool enqueuePriority(MessageQueue* q, Message* msg) {
  if (isQueueFull(q)) {
    // For priority queue, we may drop lowest priority message
    if (msg->priority > PRIORITY_LOW) {
      // Find and remove lowest priority message
      int lowestIdx = -1;
      Priority lowestPri = PRIORITY_CRITICAL;

      for (int i = 0; i < q->count; i++) {
        int idx = (q->head + i) % q->maxSize;
        if (q->messages[idx].priority < lowestPri) {
          lowestPri = q->messages[idx].priority;
          lowestIdx = idx;
        }
      }

      if (lowestIdx >= 0 && lowestPri < msg->priority) {
        Serial.printf("[QUEUE] Dropping low-priority msg %lu for high-priority %lu\n",
                      q->messages[lowestIdx].messageId, msg->messageId);
        // Mark slot as available by shifting messages
        q->totalDropped++;
      } else {
        q->totalDropped++;
        return false;
      }
    } else {
      q->totalDropped++;
      return false;
    }
  }

  // Simple priority: add to queue then sort
  memcpy(&q->messages[q->tail], msg, sizeof(Message));
  q->tail = (q->tail + 1) % q->maxSize;
  q->count++;
  q->totalEnqueued++;

  return true;
}
TipPriority Queue Use Cases
Priority Message Type Example
CRITICAL Safety alerts Motion detected during armed state
HIGH System commands Firmware update triggers
NORMAL Sensor data Temperature readings
LOW Diagnostics Memory usage reports

When a queue fills during a network outage, low-priority diagnostic messages are dropped to make room for critical security alerts.

193.6 Queue Statistics

Monitoring queue health is essential for detecting backpressure and capacity issues:

/**
 * Print queue statistics
 */
void printQueueStats(MessageQueue* q, const char* name) {
  Serial.printf("[STATS] %s Queue: size=%d/%d, enqueued=%lu, dequeued=%lu, dropped=%lu\n",
                name, q->count, q->maxSize, q->totalEnqueued,
                q->totalDequeued, q->totalDropped);
}

193.6.1 Key Metrics to Monitor

%%{init: {'theme': 'base', 'themeVariables': {'primaryColor': '#2C3E50', 'primaryTextColor': '#fff', 'primaryBorderColor': '#16A085', 'lineColor': '#E67E22', 'secondaryColor': '#7F8C8D'}}}%%
flowchart TD
    subgraph Metrics["Queue Health Metrics"]
        UTIL["Utilization<br/>count/maxSize"]
        DROP["Drop Rate<br/>dropped/enqueued"]
        THRU["Throughput<br/>dequeued/second"]
        LAT["Latency<br/>time in queue"]
    end

    UTIL -->|"> 80%"| WARN1["Consider<br/>larger queue"]
    DROP -->|"> 1%"| WARN2["Increase<br/>processing speed"]
    THRU -->|"Declining"| WARN3["Check<br/>consumers"]
    LAT -->|"Increasing"| WARN4["Add<br/>consumers"]
WarningQueue Overflow Prevention

When totalDropped starts increasing:

  1. Increase queue size - If memory allows
  2. Speed up consumers - Process messages faster
  3. Add backpressure - Slow down producers
  4. Filter at source - Reduce message frequency
  5. Implement priority - Drop low-priority first

193.7 Summary

This chapter covered the fundamental queue data structures and operations for IoT message handling:

  • Message Structure: Fields for topic, payload, QoS, priority, and delivery tracking
  • Circular Buffer: Memory-efficient queue implementation using head/tail pointers
  • Enqueue/Dequeue: Adding and removing messages with wraparound logic
  • Priority Queuing: Ensuring critical messages get delivered when queues fill
  • Queue Statistics: Monitoring utilization, drop rate, and throughput

Key Takeaways:

  • Circular buffers efficiently use fixed memory without array shifting
  • The modulo operator enables seamless wraparound behavior
  • Priority queues protect critical data during overflow conditions
  • Statistics help detect capacity issues before message loss

193.8 Knowledge Check

Question: What is the primary advantage of using a circular buffer for message queuing in embedded systems?

Explanation: C. Circular buffers use a fixed-size array and simply update head/tail pointers, avoiding expensive memory shifting operations that would be needed in a linear buffer.

Question: When a priority queue is full and a CRITICAL priority message arrives, what should happen?

Explanation: B. Priority queues should drop lower-priority messages to make room for critical ones, ensuring important data gets through during congestion.

Question: In a circular buffer with maxSize=8, if tail is at position 7, what is the next tail position after enqueue?

Explanation: A. Using (7 + 1) % 8 = 0, the tail wraps around to position 0, creating the circular behavior.

193.10 What’s Next

Continue to Pub/Sub and Topic Routing to learn about topic hierarchies, wildcard matching, QoS levels, and the publish-subscribe pattern.