46  Message Queue Fundamentals

In 60 Seconds

Message queues use circular buffers with head/tail pointers and modulo wrap-around to enable asynchronous IoT communication on constrained devices (avoiding expensive array shifts). Priority queues ensure critical alerts (fire, security) get delivered first – when buffers fill during network outages, low-priority messages (diagnostics) are displaced. Size your buffer to hold 2-5 minutes of sensor data at peak rate to survive typical connectivity interruptions.

Key Concepts
  • Message Queue: Data structure holding messages in order, decoupling producers from consumers in time — producers write when convenient, consumers read when ready
  • First-In-First-Out (FIFO): Default queue ordering where messages are processed in the order they were enqueued, ensuring sequenced delivery for ordered IoT event streams
  • Durable Queue: Queue configured to survive broker restarts by persisting messages to disk, preventing IoT data loss during infrastructure maintenance or failures
  • Message Persistence: Storage of enqueued messages to non-volatile storage so they survive broker crashes and can be replayed after recovery
  • Producer-Consumer Decoupling: Queue architecture benefit allowing producers and consumers to operate at different rates and different times without coordination
  • Transient Queue: In-memory-only queue with high throughput but no persistence — suitable for high-rate sensor data where some loss is acceptable
  • Queue Depth: Current number of messages waiting in the queue — a growing queue depth indicates consumer processing cannot keep up with producer rate
  • Acknowledgment Mode: Configuration controlling when a message is removed from queue — auto-acknowledge (on delivery) vs. manual-acknowledge (after successful processing)
Minimum Viable Understanding
  • Message queues use circular buffers to enable asynchronous, reliable communication between IoT components – sensors publish data without waiting for cloud acknowledgment.
  • Circular buffers use fixed memory with head/tail pointers and the modulo operator for wrap-around, avoiding expensive array shifting on constrained embedded devices.
  • Priority queues ensure critical alerts (fire, security) get delivered first by displacing low-priority messages (diagnostics) when the queue is full during network outages.

46.1 Learning Objectives

By the end of this chapter, you will be able to:

  • Construct circular buffer data structures optimized for IoT message handling on constrained devices
  • Implement enqueue, dequeue, and priority queue operations with modulo wrap-around logic
  • Optimize buffer memory allocation for resource-constrained embedded systems
  • Instrument queue health monitoring using utilization counters and throughput metrics
  • Design priority displacement strategies that protect critical alerts during queue overflow

A message queue is like a line at a theme park – everyone waits their turn, and VIPs get to skip ahead!

Sammy the Sensor was sending temperature readings to the cloud, but the internet was SO slow. “I have 10 readings to send, but the cloud can only take one at a time!” he cried.

Max the Microcontroller had an idea. “Let’s make a WAITING LINE! Each reading stands in line and waits its turn.” That is a queue – first in, first out, just like at the ice cream shop!

But then the fire alarm went off! Bella the Battery shouted, “EMERGENCY! The fire alert can’t wait behind 10 temperature readings!” So Lila the LED created a VIP lane – critical alerts skip to the front of the line! That is a priority queue.

The coolest part? Their waiting line was shaped like a CIRCLE (a circular buffer). When the last spot was used, the next message went back to the beginning – like a merry-go-round that never stops!

46.2 Introduction

Message queuing is a fundamental communication pattern in IoT systems that enables asynchronous, reliable message delivery between distributed components. This chapter focuses on the core queue data structures and operations that power protocols like MQTT, AMQP, and cloud message brokers.

Understanding queue fundamentals prepares you to:

  • Decouple producers from consumers - Sensors can publish data without waiting for cloud acknowledgment
  • Buffer during network outages - Messages persist locally until delivery succeeds
  • Handle burst traffic - Queues absorb spikes in sensor data
  • Prioritize critical messages - Important alerts get delivered first
Real-World Relevance

Message queues are everywhere in production IoT:

  • AWS IoT Core uses MQTT with message queuing for millions of devices
  • Azure IoT Hub implements device-to-cloud queuing with configurable retention
  • Industrial SCADA systems use message buffers to handle sensor bursts
  • Smart home hubs queue commands when devices are temporarily offline

Understanding these patterns prepares you for working with any enterprise IoT platform.

46.3 Queue Data Structures

46.3.1 The Message Structure

Every message in an IoT system carries metadata needed for routing and delivery guarantees. The core message structure mirrors what you’ll find in real protocols like MQTT:

// Core message structure - represents a single queued message
struct Message {
  uint32_t messageId;           // Unique identifier for tracking
  char topic[MAX_TOPIC_LENGTH]; // Topic string (e.g., "sensors/temp/room1")
  char payload[MAX_PAYLOAD_SIZE]; // Message content (JSON or binary)
  uint16_t payloadLen;          // Actual payload length
  QoSLevel qos;                 // Quality of service level
  Priority priority;            // Message priority for ordering
  uint32_t timestamp;           // Creation time (millis)
  uint32_t expiry;              // Expiration time (0 = never)
  bool retained;                // Should broker store for new subscribers
  uint8_t deliveryCount;        // Retry counter for QoS 1/2
  bool acknowledged;            // Has subscriber confirmed receipt
};
Message Fields Explained
Field Purpose Real-World Example
messageId Track and deduplicate messages MQTT Packet Identifier
topic Route to interested subscribers sensors/temperature/room1
payload Actual sensor data {"value": 23.5, "unit": "C"}
qos Delivery guarantee level QoS 0, 1, or 2
priority Ordering preference Critical alerts first
retained Persist for new subscribers Last known temperature
deliveryCount Retry tracking Stop after 3 failed attempts

46.3.2 QoS and Priority Enumerations

Define delivery guarantee levels and message priorities:

// QoS Level definitions (matching MQTT specification)
enum QoSLevel {
  QOS_AT_MOST_ONCE = 0,   // Fire and forget - no acknowledgment
  QOS_AT_LEAST_ONCE = 1,  // Acknowledged delivery - may duplicate
  QOS_EXACTLY_ONCE = 2    // Two-phase commit - guaranteed single delivery
};

// Message priority levels
enum Priority {
  PRIORITY_LOW = 0,
  PRIORITY_NORMAL = 1,
  PRIORITY_HIGH = 2,
  PRIORITY_CRITICAL = 3
};

46.3.3 The Circular Buffer Queue

The circular buffer is the most memory-efficient queue implementation for embedded systems. It reuses a fixed-size array by wrapping around when reaching the end:

// Circular buffer queue for efficient memory usage
struct MessageQueue {
  Message messages[MAX_QUEUE_SIZE];
  int head;                     // Next position to dequeue
  int tail;                     // Next position to enqueue
  int count;                    // Current message count
  int maxSize;                  // Queue capacity
  uint32_t totalEnqueued;       // Statistics: total messages added
  uint32_t totalDequeued;       // Statistics: total messages removed
  uint32_t totalDropped;        // Statistics: messages dropped (overflow)
};

Circular buffer queue diagram showing a fixed-size array with head pointer for dequeue and tail pointer for enqueue, with arrows indicating modulo wrap-around from the last position back to position zero

Circular buffer queue structure showing head and tail pointers with modulo wrap-around

Imagine a circular buffer like a carousel at an airport baggage claim:

  • Bags (messages) get placed on the carousel at one point (TAIL)
  • Travelers pick them up at another point (HEAD)
  • When the carousel reaches the end, it wraps around to the beginning
  • If too many bags arrive before pickup, some get dropped

This design uses memory efficiently because it never needs to shift elements - just update the HEAD and TAIL pointers.

Try It: Circular Buffer Visualizer

Watch how a circular buffer works in real time. Click Enqueue to add messages and Dequeue to remove them. Observe how the head and tail pointers wrap around using modulo arithmetic.

46.4 Queue Operations

46.4.1 Initialize Queue

Set up a queue with empty state and reset all counters:

/**
 * Initialize a message queue
 * Sets up circular buffer with empty state
 */
void initQueue(MessageQueue* q, int maxSize) {
  q->head = 0;
  q->tail = 0;
  q->count = 0;
  q->maxSize = min(maxSize, MAX_QUEUE_SIZE);
  q->totalEnqueued = 0;
  q->totalDequeued = 0;
  q->totalDropped = 0;

  // Clear all message slots
  for (int i = 0; i < MAX_QUEUE_SIZE; i++) {
    q->messages[i].messageId = 0;
    q->messages[i].topic[0] = '\0';
    q->messages[i].payloadLen = 0;
  }
}

46.4.2 Status Check Functions

Simple helper functions to check queue state:

/**
 * Check if queue is empty
 */
bool isQueueEmpty(MessageQueue* q) {
  return q->count == 0;
}

/**
 * Check if queue is full
 */
bool isQueueFull(MessageQueue* q) {
  return q->count >= q->maxSize;
}

/**
 * Get current queue size
 */
int getQueueSize(MessageQueue* q) {
  return q->count;
}

46.4.3 Enqueue Operation

Add a message to the back of the queue:

/**
 * Enqueue a message (add to back of queue)
 * Returns true if successful, false if queue is full
 */
bool enqueue(MessageQueue* q, Message* msg) {
  if (isQueueFull(q)) {
    q->totalDropped++;
    Serial.printf("[QUEUE] DROPPED: Queue full (size=%d), msg=%lu topic=%s\n",
                  q->count, msg->messageId, msg->topic);
    return false;
  }

  // Copy message to queue slot
  memcpy(&q->messages[q->tail], msg, sizeof(Message));

  // Advance tail pointer (circular)
  q->tail = (q->tail + 1) % q->maxSize;
  q->count++;
  q->totalEnqueued++;

  return true;
}
The Modulo Trick

The line q->tail = (q->tail + 1) % q->maxSize is the key to circular behavior:

  • If tail = 4 and maxSize = 5, then (4 + 1) % 5 = 0
  • The pointer wraps from the end back to the beginning
  • This avoids the need for expensive array shifting operations
Try It: Modulo Wrap-Around Explorer

Experiment with different tail positions and queue sizes to see how the modulo operator creates circular behavior. This is the single line of code that makes circular buffers work.

46.4.4 Dequeue Operation

Remove and return a message from the front of the queue:

/**
 * Dequeue a message (remove from front of queue)
 * Returns true if successful, false if queue is empty
 */
bool dequeue(MessageQueue* q, Message* msg) {
  if (isQueueEmpty(q)) {
    return false;
  }

  // Copy message from queue slot
  memcpy(msg, &q->messages[q->head], sizeof(Message));

  // Clear the slot
  q->messages[q->head].messageId = 0;

  // Advance head pointer (circular)
  q->head = (q->head + 1) % q->maxSize;
  q->count--;
  q->totalDequeued++;

  return true;
}

46.4.5 Peek Operation

View the front message without removing it:

/**
 * Peek at front message without removing
 * Returns pointer to message or NULL if empty
 */
Message* peek(MessageQueue* q) {
  if (isQueueEmpty(q)) {
    return NULL;
  }
  return &q->messages[q->head];
}

46.4.6 Clear Queue

Remove all messages from the queue:

/**
 * Clear all messages from queue
 */
void clearQueue(MessageQueue* q) {
  q->head = 0;
  q->tail = 0;
  q->count = 0;
}

46.5 Priority Queue Operations

For IoT systems handling mixed-priority data (critical alerts vs routine telemetry), priority queuing ensures important messages get delivered first.

46.5.1 Enqueue with Priority

When the queue is full, a high-priority message can displace a low-priority one:

/**
 * Enqueue with priority (insert based on priority level)
 * Higher priority messages are placed ahead of lower priority
 */
bool enqueuePriority(MessageQueue* q, Message* msg) {
  if (isQueueFull(q)) {
    // For priority queue, we may drop lowest priority message
    if (msg->priority > PRIORITY_LOW) {
      // Find and remove lowest priority message
      int lowestIdx = -1;
      Priority lowestPri = PRIORITY_CRITICAL;

      for (int i = 0; i < q->count; i++) {
        int idx = (q->head + i) % q->maxSize;
        if (q->messages[idx].priority < lowestPri) {
          lowestPri = q->messages[idx].priority;
          lowestIdx = idx;
        }
      }

      if (lowestIdx >= 0 && lowestPri < msg->priority) {
        Serial.printf("[QUEUE] Dropping low-priority msg %lu for high-priority %lu\n",
                      q->messages[lowestIdx].messageId, msg->messageId);
        // Mark slot as available by shifting messages
        q->totalDropped++;
      } else {
        q->totalDropped++;
        return false;
      }
    } else {
      q->totalDropped++;
      return false;
    }
  }

  // Simple priority: add to queue then sort
  memcpy(&q->messages[q->tail], msg, sizeof(Message));
  q->tail = (q->tail + 1) % q->maxSize;
  q->count++;
  q->totalEnqueued++;

  return true;
}
Priority Queue Use Cases
Priority Message Type Example
CRITICAL Safety alerts Motion detected during armed state
HIGH System commands Firmware update triggers
NORMAL Sensor data Temperature readings
LOW Diagnostics Memory usage reports

When a queue fills during a network outage, low-priority diagnostic messages are dropped to make room for critical security alerts.

Try It: Priority Queue Simulator

Add messages with different priority levels and observe what happens when the queue fills up. Critical messages displace low-priority ones – watch the displacement log.

46.6 Queue Statistics

Monitoring queue health is essential for detecting backpressure and capacity issues:

/**
 * Print queue statistics
 */
void printQueueStats(MessageQueue* q, const char* name) {
  Serial.printf("[STATS] %s Queue: size=%d/%d, enqueued=%lu, dequeued=%lu, dropped=%lu\n",
                name, q->count, q->maxSize, q->totalEnqueued,
                q->totalDequeued, q->totalDropped);
}

46.6.1 Key Metrics to Monitor

Queue health monitoring dashboard showing key metrics: queue utilization percentage, message drop rate, enqueue and dequeue throughput, and alerting thresholds for capacity warnings

Queue monitoring metrics including utilization, drop rate, and throughput
Queue Overflow Prevention

When totalDropped starts increasing:

  1. Increase queue size - If memory allows
  2. Speed up consumers - Process messages faster
  3. Add backpressure - Slow down producers
  4. Filter at source - Reduce message frequency
  5. Implement priority - Drop low-priority first

Scenario: A remote weather station transmits 1 reading/minute via cellular MQTT. The DevOps team must size the local message queue to survive typical cellular outages.

Given Data:

  • Sensor reading frequency: 1 message/minute = 60 messages/hour
  • Message payload size: 85 bytes (JSON: temperature, humidity, pressure, timestamp)
  • Available RAM for queue: 32 KB (ESP32 with other tasks consuming 480 KB)
  • Cellular outage statistics (6-month analysis):
    • Average outage: 12 minutes
    • 95th percentile outage: 45 minutes
    • Maximum outage: 180 minutes (tower maintenance)

Step 1: Calculate messages during typical outage

Average outage messages: 12 min × 1 msg/min = 12 messages 95th percentile: 45 min × 1 msg/min = 45 messages Maximum outage: 180 min × 1 msg/min = 180 messages

Step 2: Calculate memory requirements

Per-message overhead (from code): - Message struct: 85 bytes payload + 50 bytes metadata = 135 bytes total

Let’s calculate precise circular buffer memory requirements and access patterns. For a queue with capacity \(N = 100\) messages, each message \(M = 135\) bytes:

Total memory: \(Memory = N \times M = 100 \times 135 = 13,500\) bytes \(= 13.2\) KB

For enqueue at position \(tail\), the next position uses modulo arithmetic: \[tail_{new} = (tail + 1) \mod N\]

Example: \(tail = 99\), after enqueue: \(tail_{new} = (99 + 1) \mod 100 = 0\) (wraps to start)

Fill ratio when buffer holds \(k\) messages: \(\rho = \frac{k}{N}\)

At 80% capacity (\(k = 80\)): \(\rho = 0.80\). Probability of drop on next enqueue if no dequeue: \[P_{drop} = \begin{cases} 0 & \text{if } k < N \\ 1 & \text{if } k = N \end{cases}\]

For network outage duration \(T = 180\) minutes at rate \(\lambda = 1\) msg/min: \[Messages = \lambda \times T = 1 \times 180 = 180 \text{ messages}\]

With \(N = 100\), messages dropped: \(180 - 100 = 80\) messages (44% loss). This math drives buffer sizing decisions.

  • Circular buffer overhead: 8 bytes per slot (pointers/counters)
  • Total per message: 135 + 8 = 143 bytes

Queue sizes: - 12 messages (average): 12 × 143 = 1.7 KB - 45 messages (95th%): 45 × 143 = 6.4 KB - 180 messages (max): 180 × 143 = 25.7 KB

Step 3: Choose queue size with trade-offs

Budget constraint: 32 KB available Option A: 180-message queue = 25.7 KB (survives all outages, 80% memory usage) Option B: 45-message queue = 6.4 KB (survives 95% of outages, 20% memory usage) Option C: 100-message queue = 14.3 KB (survives 98.5% of outages, 45% memory usage)

Decision: Choose Option C (100-message queue)

Rationale:

  • Handles 100 minutes of outage (covers 98.5% of historical cases)
  • Leaves 17.7 KB for future features or deeper queues if needed
  • For the rare >100 min outage, oldest messages dropped (acceptable for weather data)
  • Memory usage: 14.3 ÷ 32 = 45% (healthy headroom)

Overflow policy: For messages beyond 100: Drop oldest (FIFO). Weather reading from 2 hours ago has less value than current data.

Key insight: Size queues for 95-99th percentile, not worst-case. The memory/benefit ratio for worst-case is often poor.

Application Type Queue Depth Target Overflow Policy Rationale
Time-series telemetry 95th% outage duration Drop oldest Recent data more valuable
Critical commands Worst-case + 50% margin Drop lowest priority Commands must not be lost
Financial transactions Disk-backed (unlimited) Block new writes Zero data loss required
Alert messages 2× average burst size Drop duplicates first Alerts are idempotent

Queue sizing formula:

Queue Depth = (Message Rate) × (Target Outage Duration) × (Growth Factor)

Growth Factor = 1.5 for memory-constrained (ESP32, Arduino)
              = 3.0 for cloud systems (cheap to over-provision)

Example calculations: | Scenario | Rate | Outage | Growth | Queue Size | |———-|——|——–|——–|————| | Weather station | 1/min | 45 min | 1.5 | 68 messages | | Industrial PLC | 10/sec | 30 sec | 1.5 | 450 messages | | Smart meter | 1/hour | 24 hours | 1.5 | 36 messages |

Try It: Queue Sizing Calculator

Size your message queue for network outage survival. Adjust sensor rate, expected outage duration, and memory constraints to find the right queue depth.

Common Mistake: Using Array Shift Instead of Circular Buffer

Scenario: A gateway developer implemented a message queue using a simple array with shift operations on an ESP8266 (80 MHz CPU).

The mistake: Dequeue operation used array.shift() which moves all remaining elements

Performance impact: Queue size: 100 messages Dequeue operation: Remove first element, shift remaining 99 forward Cost per shift: 143 bytes × 99 copies = 14,157 bytes moved ESP8266 memory bandwidth: ~10 MB/sec Shift time: 14,157 bytes ÷ 10 MB/sec = 1.4 milliseconds

With 10 dequeues/second: 1.4 ms × 10 = 14 ms/sec = 1.4% CPU just moving memory!

Why circular buffers are better: Circular buffer dequeue: 1. Read message at head pointer 2. Increment head: head = (head + 1) % maxSize 3. Decrement count: count--

Time: ~10 microseconds (1000× faster!) No memory movement, just pointer arithmetic.

Real-world impact: | Queue Size | Array Shift Time | Circular Buffer Time | Speed-up | |————|—————–|———————|———-| | 10 | 0.14 ms | 10 µs | 14× | | 50 | 0.70 ms | 10 µs | 70× | | 100 | 1.40 ms | 10 µs | 140× | | 500 | 7.00 ms | 10 µs | 700× |

Key lesson: On embedded systems, always use circular buffers for queues. Array shifting is O(n), circular buffers are O(1).

Let’s quantify the computational cost difference. For array-based queue with \(N\) messages of size \(M = 143\) bytes:

Array shift cost per dequeue: Copy \((N-1)\) messages forward \[Cost_{shift} = (N-1) \times M \text{ bytes copied}\]

For \(N = 100\): \(Cost_{shift} = 99 \times 143 = 14,157\) bytes

ESP8266 memory bandwidth: \(B = 10\) MB/s = \(10 \times 10^6\) bytes/s

Time per dequeue: \(t_{dequeue} = \frac{14,157}{10 \times 10^6} = 1.416 \times 10^{-3}\) s \(= 1.42\) ms

At processing rate \(\lambda = 10\) dequeues/second, CPU time spent: \[CPU_{usage} = \lambda \times t_{dequeue} = 10 \times 0.00142 = 0.0142 \text{ s/s} = 1.42\%\]

Circular buffer dequeue: pointer arithmetic only \[t_{circular} = \frac{10 \text{ ops}}{80\text{ MHz CPU}} \approx 10 \text{ µs}\]

Speedup factor: \(\frac{1.42 \text{ ms}}{10 \text{ µs}} = 142\times\)

For \(N = 500\): speedup increases to \(700\times\). This is why circular buffers are mandatory for embedded queue implementations.

Try It: Array Shift vs Circular Buffer Performance

See why circular buffers are essential for embedded systems. Adjust the queue size and dequeue rate to compare the CPU cost of array shifting vs pointer-based circular buffer operations.

46.7 Summary

This chapter covered the fundamental queue data structures and operations for IoT message handling:

  • Message Structure: Fields for topic, payload, QoS, priority, and delivery tracking
  • Circular Buffer: Memory-efficient queue implementation using head/tail pointers
  • Enqueue/Dequeue: Adding and removing messages with wraparound logic
  • Priority Queuing: Ensuring critical messages get delivered when queues fill
  • Queue Statistics: Monitoring utilization, drop rate, and throughput

Key Takeaways:

  • Circular buffers efficiently use fixed memory without array shifting
  • The modulo operator enables seamless wraparound behavior
  • Priority queues protect critical data during overflow conditions
  • Statistics help detect capacity issues before message loss

46.8 Knowledge Check

46.9 How It Works: Circular Buffer Queue

The circular buffer is the most memory-efficient queue for embedded systems. Here’s the complete execution flow:

Step 1: Initialization

Queue state: [empty, empty, empty, empty, empty]
head = 0, tail = 0, count = 0, maxSize = 5

Step 2: First enqueue (message ID 101)

Before: tail=0, count=0
Action: Copy message to messages[0], tail = (0+1) % 5 = 1, count = 1
After:  [101, empty, empty, empty, empty]
        head=0, tail=1, count=1

Step 3: Second enqueue (message ID 102)

Before: tail=1, count=1
Action: Copy message to messages[1], tail = (1+1) % 5 = 2, count = 2
After:  [101, 102, empty, empty, empty]
        head=0, tail=2, count=2

Step 4: Fill queue completely

After 3 more enqueues:
[101, 102, 103, 104, 105]
head=0, tail=0 (wrapped around!), count=5
Queue is FULL (count == maxSize)

Step 5: Attempt enqueue when full (message ID 106)

Check: isQueueFull() returns true
Action: Reject enqueue, increment totalDropped
Result: Message 106 DROPPED

Step 6: First dequeue

Before: head=0, count=5
Action: Copy messages[0] to output, head = (0+1) % 5 = 1, count = 4
After:  [cleared, 102, 103, 104, 105]
        head=1, tail=0, count=4
Return: Message 101

Step 7: Enqueue after dequeue (message ID 106)

Before: tail=0, count=4 (not full anymore)
Action: Copy message to messages[0], tail = (0+1) % 5 = 1, count = 5
After:  [106, 102, 103, 104, 105]
        head=1, tail=1, count=5
The slot that held 101 now holds 106!

Key insight: The circular buffer reuses memory without shifting elements. When tail reaches the end, it wraps to the beginning using modulo arithmetic (tail + 1) % maxSize.

46.10 Incremental Examples

46.10.1 Example 1: Basic Queue Operations

Scenario: IoT gateway receiving temperature readings from 3 sensors.

MessageQueue tempQueue;
initQueue(&tempQueue, 10);  // 10-message buffer

// Sensor 1 reading
Message msg1 = {
  .messageId = 1,
  .topic = "sensors/temp/room1",
  .payload = "{\"value\":22.5}",
  .qos = QOS_AT_MOST_ONCE
};
enqueue(&tempQueue, &msg1);
// Queue: [msg1, empty, empty, ...]

// Sensor 2 reading
Message msg2 = {
  .messageId = 2,
  .topic = "sensors/temp/room2",
  .payload = "{\"value\":23.1}",
  .qos = QOS_AT_MOST_ONCE
};
enqueue(&tempQueue, &msg2);
// Queue: [msg1, msg2, empty, ...]

// Process first message
Message processed;
if (dequeue(&tempQueue, &processed)) {
  publishToMQTT(processed.topic, processed.payload);
  // Sent: msg1 to MQTT broker
}
// Queue: [msg2, empty, empty, ...]

46.10.2 Example 2: Priority Queue Behavior

Scenario: Critical alert arrives when queue is full of routine telemetry.

// Queue full with low-priority messages
MessageQueue queue;
initQueue(&queue, 3);

Message temp1 = {.messageId=1, .priority=PRIORITY_LOW, .topic="temp"};
Message temp2 = {.messageId=2, .priority=PRIORITY_LOW, .topic="temp"};
Message temp3 = {.messageId=3, .priority=PRIORITY_LOW, .topic="temp"};
enqueue(&queue, &temp1);
enqueue(&queue, &temp2);
enqueue(&queue, &temp3);
// Queue: [temp1, temp2, temp3] - FULL

// Critical alert arrives
Message alert = {.messageId=4, .priority=PRIORITY_CRITICAL, .topic="alerts/fire"};
enqueuePriority(&queue, &alert);

// What happens inside enqueuePriority():
// 1. Detect queue is full
// 2. Scan for lowest priority message (finds temp1, priority=LOW)
// 3. Compare: alert.priority (CRITICAL) > temp1.priority (LOW)
// 4. Drop temp1, insert alert
// Queue: [alert, temp2, temp3]

// Result: Fire alert delivered, routine temperature reading dropped

46.10.3 Example 3: Queue Overflow Handling

Scenario: Network outage causes queue to fill beyond capacity.

MessageQueue outageBuffer;
initQueue(&outageBuffer, 100);  // 100-message capacity

// Network goes down, sensors keep publishing
for (int i = 0; i < 150; i++) {
  Message msg = {.messageId = i, .topic = "sensors/data"};
  if (!enqueue(&outageBuffer, &msg)) {
    // Queue full at message 100
    Serial.printf("[ERROR] Dropped message %d\n", i);
    // Messages 100-149 are DROPPED
  }
}

printQueueStats(&outageBuffer, "Outage");
// Output:
// [STATS] Outage Queue: size=100/100, enqueued=100, dequeued=0, dropped=50
// 50 messages lost during outage!

// Better approach: size queue for expected outage duration
// If sensors send 1 msg/sec and outages last 5 min:
// Queue size = 1 msg/sec × 300 sec = 300 messages minimum

46.12 Try It Yourself

Challenge 1: Implement a Simple Logger

Create a logging queue that captures debug messages in your ESP32 project:

MessageQueue logQueue;

void setup() {
  initQueue(&logQueue, 20);  // 20 log entries
}

void logMessage(const char* level, const char* message) {
  Message logEntry;
  logEntry.messageId = millis();
  snprintf(logEntry.topic, sizeof(logEntry.topic), "log/%s", level);
  strncpy(logEntry.payload, message, sizeof(logEntry.payload));
  logEntry.timestamp = millis();

  enqueue(&logQueue, &logEntry);
}

void loop() {
  logMessage("INFO", "System running");
  delay(1000);

  // Periodically flush logs
  Message log;
  while (dequeue(&logQueue, &log)) {
    Serial.printf("[%lu] %s: %s\n", log.timestamp, log.topic, log.payload);
  }
}

What to observe: Logs buffer in the queue even if Serial Monitor isn’t open. When you open Serial Monitor later, all buffered logs appear.

Challenge 2: Priority-Based Alert System

Modify the queue to prioritize critical alerts:

// Add to loop():
if (temperatureAboveThreshold) {
  Message alert = {
    .priority = PRIORITY_CRITICAL,
    .topic = "alerts/temp",
    .payload = "{\"alert\":\"High temperature\"}"
  };
  enqueuePriority(&logQueue, &alert);
}

What to observe: When the queue fills with INFO logs, critical alerts still get through by displacing low-priority entries.

Common Pitfalls

Non-durable queues lose all messages on broker restart. For IoT telemetry, alarm events, or command messages that must not be lost, always configure durable queues with disk persistence, even though this reduces throughput by 30-50% compared to in-memory queues.

Distributed queue systems (Kafka) guarantee order within a partition but not across partitions. If IoT events from one device are routed to multiple partitions (due to changing partition keys), they may be consumed out of order. Always route events from the same IoT device to the same partition using device ID as the partition key.

Queue depth is a leading indicator of data loss — queues fill before they overflow. Set up monitoring and alerting on queue depth from day one, with alert thresholds at 50% capacity and emergency procedures at 80% capacity. Discovering a queue overflow after it has been dropping messages is too late.

46.13 What’s Next

If you want to… Read this
Study advanced queue challenges and patterns Message Queue Lab Challenges
Practice with the hands-on queue lab Message Queue Lab
Learn pub/sub patterns that complement queues Pub/Sub and Topic Routing
See how queues fit in gateway architecture Protocol Bridging Fundamentals

Continue to Pub/Sub and Topic Routing to learn about topic hierarchies, wildcard matching, QoS levels, and the publish-subscribe pattern.