193 Message Queue Fundamentals
193.1 Learning Objectives
By the end of this chapter, you will be able to:
- Understand Queue Data Structures: Implement circular buffers for IoT message handling
- Implement Queue Operations: Build enqueue, dequeue, and priority queue logic
- Handle Buffer Management: Manage memory efficiently with limited resources
- Track Queue Statistics: Monitor queue health with counters and metrics
- Apply Priority Ordering: Implement message prioritization for critical data
193.2 Introduction
Message queuing is a fundamental communication pattern in IoT systems that enables asynchronous, reliable message delivery between distributed components. This chapter focuses on the core queue data structures and operations that power protocols like MQTT, AMQP, and cloud message brokers.
Understanding queue fundamentals prepares you to:
- Decouple producers from consumers - Sensors can publish data without waiting for cloud acknowledgment
- Buffer during network outages - Messages persist locally until delivery succeeds
- Handle burst traffic - Queues absorb spikes in sensor data
- Prioritize critical messages - Important alerts get delivered first
Message queues are everywhere in production IoT:
- AWS IoT Core uses MQTT with message queuing for millions of devices
- Azure IoT Hub implements device-to-cloud queuing with configurable retention
- Industrial SCADA systems use message buffers to handle sensor bursts
- Smart home hubs queue commands when devices are temporarily offline
Understanding these patterns prepares you for working with any enterprise IoT platform.
193.3 Queue Data Structures
193.3.1 The Message Structure
Every message in an IoT system carries metadata needed for routing and delivery guarantees. The core message structure mirrors what youβll find in real protocols like MQTT:
// Core message structure - represents a single queued message
struct Message {
uint32_t messageId; // Unique identifier for tracking
char topic[MAX_TOPIC_LENGTH]; // Topic string (e.g., "sensors/temp/room1")
char payload[MAX_PAYLOAD_SIZE]; // Message content (JSON or binary)
uint16_t payloadLen; // Actual payload length
QoSLevel qos; // Quality of service level
Priority priority; // Message priority for ordering
uint32_t timestamp; // Creation time (millis)
uint32_t expiry; // Expiration time (0 = never)
bool retained; // Should broker store for new subscribers
uint8_t deliveryCount; // Retry counter for QoS 1/2
bool acknowledged; // Has subscriber confirmed receipt
};| Field | Purpose | Real-World Example |
|---|---|---|
messageId |
Track and deduplicate messages | MQTT Packet Identifier |
topic |
Route to interested subscribers | sensors/temperature/room1 |
payload |
Actual sensor data | {"value": 23.5, "unit": "C"} |
qos |
Delivery guarantee level | QoS 0, 1, or 2 |
priority |
Ordering preference | Critical alerts first |
retained |
Persist for new subscribers | Last known temperature |
deliveryCount |
Retry tracking | Stop after 3 failed attempts |
193.3.2 QoS and Priority Enumerations
Define delivery guarantee levels and message priorities:
// QoS Level definitions (matching MQTT specification)
enum QoSLevel {
QOS_AT_MOST_ONCE = 0, // Fire and forget - no acknowledgment
QOS_AT_LEAST_ONCE = 1, // Acknowledged delivery - may duplicate
QOS_EXACTLY_ONCE = 2 // Two-phase commit - guaranteed single delivery
};
// Message priority levels
enum Priority {
PRIORITY_LOW = 0,
PRIORITY_NORMAL = 1,
PRIORITY_HIGH = 2,
PRIORITY_CRITICAL = 3
};193.3.3 The Circular Buffer Queue
The circular buffer is the most memory-efficient queue implementation for embedded systems. It reuses a fixed-size array by wrapping around when reaching the end:
// Circular buffer queue for efficient memory usage
struct MessageQueue {
Message messages[MAX_QUEUE_SIZE];
int head; // Next position to dequeue
int tail; // Next position to enqueue
int count; // Current message count
int maxSize; // Queue capacity
uint32_t totalEnqueued; // Statistics: total messages added
uint32_t totalDequeued; // Statistics: total messages removed
uint32_t totalDropped; // Statistics: messages dropped (overflow)
};%%{init: {'theme': 'base', 'themeVariables': {'primaryColor': '#2C3E50', 'primaryTextColor': '#fff', 'primaryBorderColor': '#16A085', 'lineColor': '#E67E22', 'secondaryColor': '#7F8C8D'}}}%%
flowchart LR
subgraph CircularBuffer["Circular Buffer (size=5)"]
S0["[0]<br/>Msg A"]
S1["[1]<br/>Msg B"]
S2["[2]<br/>Empty"]
S3["[3]<br/>Empty"]
S4["[4]<br/>Empty"]
end
HEAD["HEAD=0<br/>(next to read)"]
TAIL["TAIL=2<br/>(next to write)"]
HEAD --> S0
TAIL --> S2
S0 --> S1 --> S2 --> S3 --> S4 --> S0
Imagine a circular buffer like a carousel at an airport baggage claim:
- Bags (messages) get placed on the carousel at one point (TAIL)
- Travelers pick them up at another point (HEAD)
- When the carousel reaches the end, it wraps around to the beginning
- If too many bags arrive before pickup, some get dropped
This design uses memory efficiently because it never needs to shift elements - just update the HEAD and TAIL pointers.
193.4 Queue Operations
193.4.1 Initialize Queue
Set up a queue with empty state and reset all counters:
/**
* Initialize a message queue
* Sets up circular buffer with empty state
*/
void initQueue(MessageQueue* q, int maxSize) {
q->head = 0;
q->tail = 0;
q->count = 0;
q->maxSize = min(maxSize, MAX_QUEUE_SIZE);
q->totalEnqueued = 0;
q->totalDequeued = 0;
q->totalDropped = 0;
// Clear all message slots
for (int i = 0; i < MAX_QUEUE_SIZE; i++) {
q->messages[i].messageId = 0;
q->messages[i].topic[0] = '\0';
q->messages[i].payloadLen = 0;
}
}193.4.2 Status Check Functions
Simple helper functions to check queue state:
/**
* Check if queue is empty
*/
bool isQueueEmpty(MessageQueue* q) {
return q->count == 0;
}
/**
* Check if queue is full
*/
bool isQueueFull(MessageQueue* q) {
return q->count >= q->maxSize;
}
/**
* Get current queue size
*/
int getQueueSize(MessageQueue* q) {
return q->count;
}193.4.3 Enqueue Operation
Add a message to the back of the queue:
/**
* Enqueue a message (add to back of queue)
* Returns true if successful, false if queue is full
*/
bool enqueue(MessageQueue* q, Message* msg) {
if (isQueueFull(q)) {
q->totalDropped++;
Serial.printf("[QUEUE] DROPPED: Queue full (size=%d), msg=%lu topic=%s\n",
q->count, msg->messageId, msg->topic);
return false;
}
// Copy message to queue slot
memcpy(&q->messages[q->tail], msg, sizeof(Message));
// Advance tail pointer (circular)
q->tail = (q->tail + 1) % q->maxSize;
q->count++;
q->totalEnqueued++;
return true;
}The line q->tail = (q->tail + 1) % q->maxSize is the key to circular behavior:
- If
tail = 4andmaxSize = 5, then(4 + 1) % 5 = 0 - The pointer wraps from the end back to the beginning
- This avoids the need for expensive array shifting operations
193.4.4 Dequeue Operation
Remove and return a message from the front of the queue:
/**
* Dequeue a message (remove from front of queue)
* Returns true if successful, false if queue is empty
*/
bool dequeue(MessageQueue* q, Message* msg) {
if (isQueueEmpty(q)) {
return false;
}
// Copy message from queue slot
memcpy(msg, &q->messages[q->head], sizeof(Message));
// Clear the slot
q->messages[q->head].messageId = 0;
// Advance head pointer (circular)
q->head = (q->head + 1) % q->maxSize;
q->count--;
q->totalDequeued++;
return true;
}193.4.5 Peek Operation
View the front message without removing it:
/**
* Peek at front message without removing
* Returns pointer to message or NULL if empty
*/
Message* peek(MessageQueue* q) {
if (isQueueEmpty(q)) {
return NULL;
}
return &q->messages[q->head];
}193.4.6 Clear Queue
Remove all messages from the queue:
/**
* Clear all messages from queue
*/
void clearQueue(MessageQueue* q) {
q->head = 0;
q->tail = 0;
q->count = 0;
}193.5 Priority Queue Operations
For IoT systems handling mixed-priority data (critical alerts vs routine telemetry), priority queuing ensures important messages get delivered first.
193.5.1 Enqueue with Priority
When the queue is full, a high-priority message can displace a low-priority one:
/**
* Enqueue with priority (insert based on priority level)
* Higher priority messages are placed ahead of lower priority
*/
bool enqueuePriority(MessageQueue* q, Message* msg) {
if (isQueueFull(q)) {
// For priority queue, we may drop lowest priority message
if (msg->priority > PRIORITY_LOW) {
// Find and remove lowest priority message
int lowestIdx = -1;
Priority lowestPri = PRIORITY_CRITICAL;
for (int i = 0; i < q->count; i++) {
int idx = (q->head + i) % q->maxSize;
if (q->messages[idx].priority < lowestPri) {
lowestPri = q->messages[idx].priority;
lowestIdx = idx;
}
}
if (lowestIdx >= 0 && lowestPri < msg->priority) {
Serial.printf("[QUEUE] Dropping low-priority msg %lu for high-priority %lu\n",
q->messages[lowestIdx].messageId, msg->messageId);
// Mark slot as available by shifting messages
q->totalDropped++;
} else {
q->totalDropped++;
return false;
}
} else {
q->totalDropped++;
return false;
}
}
// Simple priority: add to queue then sort
memcpy(&q->messages[q->tail], msg, sizeof(Message));
q->tail = (q->tail + 1) % q->maxSize;
q->count++;
q->totalEnqueued++;
return true;
}| Priority | Message Type | Example |
|---|---|---|
| CRITICAL | Safety alerts | Motion detected during armed state |
| HIGH | System commands | Firmware update triggers |
| NORMAL | Sensor data | Temperature readings |
| LOW | Diagnostics | Memory usage reports |
When a queue fills during a network outage, low-priority diagnostic messages are dropped to make room for critical security alerts.
193.6 Queue Statistics
Monitoring queue health is essential for detecting backpressure and capacity issues:
/**
* Print queue statistics
*/
void printQueueStats(MessageQueue* q, const char* name) {
Serial.printf("[STATS] %s Queue: size=%d/%d, enqueued=%lu, dequeued=%lu, dropped=%lu\n",
name, q->count, q->maxSize, q->totalEnqueued,
q->totalDequeued, q->totalDropped);
}193.6.1 Key Metrics to Monitor
%%{init: {'theme': 'base', 'themeVariables': {'primaryColor': '#2C3E50', 'primaryTextColor': '#fff', 'primaryBorderColor': '#16A085', 'lineColor': '#E67E22', 'secondaryColor': '#7F8C8D'}}}%%
flowchart TD
subgraph Metrics["Queue Health Metrics"]
UTIL["Utilization<br/>count/maxSize"]
DROP["Drop Rate<br/>dropped/enqueued"]
THRU["Throughput<br/>dequeued/second"]
LAT["Latency<br/>time in queue"]
end
UTIL -->|"> 80%"| WARN1["Consider<br/>larger queue"]
DROP -->|"> 1%"| WARN2["Increase<br/>processing speed"]
THRU -->|"Declining"| WARN3["Check<br/>consumers"]
LAT -->|"Increasing"| WARN4["Add<br/>consumers"]
When totalDropped starts increasing:
- Increase queue size - If memory allows
- Speed up consumers - Process messages faster
- Add backpressure - Slow down producers
- Filter at source - Reduce message frequency
- Implement priority - Drop low-priority first
193.7 Summary
This chapter covered the fundamental queue data structures and operations for IoT message handling:
- Message Structure: Fields for topic, payload, QoS, priority, and delivery tracking
- Circular Buffer: Memory-efficient queue implementation using head/tail pointers
- Enqueue/Dequeue: Adding and removing messages with wraparound logic
- Priority Queuing: Ensuring critical messages get delivered when queues fill
- Queue Statistics: Monitoring utilization, drop rate, and throughput
Key Takeaways:
- Circular buffers efficiently use fixed memory without array shifting
- The modulo operator enables seamless wraparound behavior
- Priority queues protect critical data during overflow conditions
- Statistics help detect capacity issues before message loss
193.8 Knowledge Check
Question: What is the primary advantage of using a circular buffer for message queuing in embedded systems?
Explanation: C. Circular buffers use a fixed-size array and simply update head/tail pointers, avoiding expensive memory shifting operations that would be needed in a linear buffer.
Question: When a priority queue is full and a CRITICAL priority message arrives, what should happen?
Explanation: B. Priority queues should drop lower-priority messages to make room for critical ones, ensuring important data gets through during congestion.
Question: In a circular buffer with maxSize=8, if tail is at position 7, what is the next tail position after enqueue?
Explanation: A. Using (7 + 1) % 8 = 0, the tail wraps around to position 0, creating the circular behavior.
193.10 Whatβs Next
Continue to Pub/Sub and Topic Routing to learn about topic hierarchies, wildcard matching, QoS levels, and the publish-subscribe pattern.