AMQP messages consist of headers, properties (delivery mode, priority, TTL, correlation ID), and a body payload. The protocol supports three delivery guarantee levels – at-most-once, at-least-once, and exactly-once – implemented through publisher confirms and consumer acknowledgments. Dead-letter queues capture undeliverable or rejected messages for later investigation, preventing silent data loss.
Learning Objectives
By the end of this chapter, you will be able to:
Analyze Message Structure : Distinguish the role of AMQP header, properties, and body sections and explain how each field affects message routing and delivery
Configure Delivery Modes : Select appropriate message persistence, priority, and time-to-live (TTL) settings to match application reliability requirements
Implement Delivery Guarantees : Evaluate and implement at-most-once, at-least-once, and exactly-once semantics based on data criticality and overhead trade-offs
Apply Acknowledgments : Demonstrate correct use of publisher confirms and consumer acknowledgments to construct reliable messaging pipelines
Handle Message Failures : Design dead-letter queue configurations and justify negative acknowledgment strategies for production IoT systems
Key Concepts
Message Properties : AMQP metadata: delivery-mode (persistent/transient), priority, expiration, content-type, correlation-id
Delivery Mode : 1 = transient (memory only, lost on restart), 2 = persistent (disk-written, survives restart)
Acknowledgment Mode : Auto-ack removes messages immediately on delivery; manual-ack waits for explicit consumer confirmation
Message Priority : 0-9 scale for priority queues — higher priority messages are delivered first to consumers
Content Type : MIME type header (e.g., application/json, application/cbor) enabling consumer-side deserialization
Correlation ID : Identifier linking RPC request to reply message — essential for request/reply patterns over queues
Redelivery Flag : Set on messages redelivered after consumer crash — enables idempotent handling of at-least-once delivery
Prerequisites
Before diving into this chapter, you should be familiar with:
Deep Dives:
Related Concepts:
Think of AMQP message delivery like sending registered mail:
At-Most-Once : Regular mail - might get lost, but fastest and cheapest
At-Least-Once : Tracked mail with retry - guaranteed to arrive, might arrive twice if tracking fails
Exactly-Once : Registered mail with signature - guaranteed exactly one delivery, most expensive
Key terms:
Persistent
Message saved to disk, survives broker restart
ACK
Consumer says “I processed this message successfully”
NACK
Consumer says “I failed to process this, please retry”
Dead Letter Queue
Special queue for messages that can’t be processed
Publisher Confirm
Broker tells producer “I received your message”
“My temperature alert says the warehouse is overheating, but the cooling system never turned on!” Sammy the Sensor cried. “What happened to my message?”
Max the Microcontroller pulled up the message logs. “Let’s check. When you sent your alert, did you ask for a publisher confirm ?” Sammy looked blank. “That’s like asking the post office to text you when they receive your package. Without it, you just toss the message and hope for the best.”
“So what should I do?” asked Sammy. “Use at-least-once delivery ,” said Lila the LED. “The broker will keep your message and send back an ACK – an acknowledgment – when the cooling system processes it. If the cooling system crashes before saying ACK, the broker sends the message again. Your alert won’t get lost!”
“But what if a message is truly undeliverable?” Bella the Battery asked. “Then it goes to the dead letter queue ,” Max explained. “Think of it as the ‘lost and found’ box. Engineers can check it later to find out what went wrong. No message just vanishes into thin air!”
Message Structure
An AMQP message consists of multiple sections that provide metadata and payload.
Properties Section
Properties provide message metadata for routing and processing:
Content-Type
MIME type of body
application/json, text/plain
Content-Encoding
Encoding applied
gzip, utf-8
Correlation-ID
Links related messages
UUID for request-reply matching
Reply-To
Return address queue
reply_queue_abc123
Message-ID
Unique identifier
UUID for deduplication
Timestamp
Creation time
Unix timestamp
Type
Application-specific type
sensor.reading, order.created
App-ID
Producing application
temperature-sensor-01
Example: Complete message with properties:
properties = pika.BasicProperties(
content_type= 'application/json' ,
content_encoding= 'utf-8' ,
message_id= str (uuid.uuid4()),
correlation_id= request_correlation_id,
reply_to= 'response_queue' ,
timestamp= int (time.time()),
type = 'sensor.temperature' ,
app_id= 'weather-station-01' ,
delivery_mode= 2
)
Body Section
The body contains the actual message payload:
Can be binary or text
Encoding specified in properties
No size limit (but broker may impose limits)
Common formats: JSON, Protocol Buffers, MessagePack
Construct an AMQP message by selecting header fields, properties, and a payload format. See how each choice affects the total message size and structure.
Show code
viewof msgDurable = Inputs. select (["true (persistent)" , "false (transient)" ], {value : "true (persistent)" , label : "Durable" })
viewof msgPriority = Inputs. range ([0 , 9 ], {value : 5 , step : 1 , label : "Priority (0-9)" })
viewof msgTTL = Inputs. range ([0 , 300000 ], {value : 60000 , step : 1000 , label : "TTL (ms, 0 = no expiry)" })
viewof msgContentType = Inputs. select (["application/json" , "text/plain" , "application/octet-stream" , "application/protobuf" ], {value : "application/json" , label : "Content-Type" })
viewof msgIncludeCorrelation = Inputs. checkbox (["Include Correlation-ID" ], {value : ["Include Correlation-ID" ]})
viewof msgIncludeReplyTo = Inputs. checkbox (["Include Reply-To queue" ])
viewof msgBodySize = Inputs. range ([10 , 2000 ], {value : 200 , step : 10 , label : "Body payload size (bytes)" })
Show code
msgHeaderSize = 8 + (msgDurable === "true (persistent)" ? 1 : 0 ) + 1 + (msgTTL > 0 ? 4 : 0 ) + 1 + 4
msgPropsSize = msgContentType. length + 4 + (msgIncludeCorrelation. length > 0 ? 36 : 0 ) + (msgIncludeReplyTo. length > 0 ? 20 : 0 ) + 36 + 8 + 10
msgTotalSize = msgHeaderSize + msgPropsSize + msgBodySize + 16
html `
<div style="background: #f8f9fa; padding: 20px; border-radius: 8px; border-left: 4px solid #3498DB; margin: 20px 0;">
<div style="font-size: 15px; font-weight: bold; color: #2C3E50; margin-bottom: 15px;">Message Structure Breakdown</div>
<div style="display: flex; gap: 0; border-radius: 8px; overflow: hidden; margin-bottom: 15px; height: 40px;">
<div style="background: #2C3E50; flex: ${ msgHeaderSize} ; display: flex; align-items: center; justify-content: center; color: white; font-size: 11px; font-weight: bold; min-width: 60px;">Header ${ msgHeaderSize} B</div>
<div style="background: #16A085; flex: ${ msgPropsSize} ; display: flex; align-items: center; justify-content: center; color: white; font-size: 11px; font-weight: bold; min-width: 60px;">Properties ${ msgPropsSize} B</div>
<div style="background: #3498DB; flex: ${ msgBodySize} ; display: flex; align-items: center; justify-content: center; color: white; font-size: 11px; font-weight: bold; min-width: 60px;">Body ${ msgBodySize} B</div>
</div>
<div style="display: grid; grid-template-columns: 1fr 1fr 1fr; gap: 12px; margin-bottom: 15px;">
<div style="background: white; padding: 12px; border-radius: 6px; border-top: 3px solid #2C3E50;">
<div style="font-size: 12px; color: #7F8C8D; margin-bottom: 6px;">Header Fields</div>
<div style="font-size: 12px; color: #2C3E50; line-height: 1.7;">
Durable: <strong> ${ msgDurable. split (" " )[0 ]} </strong><br>
Priority: <strong> ${ msgPriority} </strong><br>
TTL: <strong> ${ msgTTL > 0 ? msgTTL + " ms" : "none" } </strong><br>
Delivery-Count: <strong>0</strong>
</div>
</div>
<div style="background: white; padding: 12px; border-radius: 6px; border-top: 3px solid #16A085;">
<div style="font-size: 12px; color: #7F8C8D; margin-bottom: 6px;">Properties</div>
<div style="font-size: 12px; color: #2C3E50; line-height: 1.7;">
Content-Type: <strong> ${ msgContentType} </strong><br>
Message-ID: <strong>uuid</strong><br>
${ msgIncludeCorrelation. length > 0 ? "Correlation-ID: <strong>uuid</strong><br>" : "" }
${ msgIncludeReplyTo. length > 0 ? "Reply-To: <strong>reply_q</strong><br>" : "" }
Timestamp: <strong>set</strong>
</div>
</div>
<div style="background: white; padding: 12px; border-radius: 6px; border-top: 3px solid #3498DB;">
<div style="font-size: 12px; color: #7F8C8D; margin-bottom: 6px;">Body</div>
<div style="font-size: 12px; color: #2C3E50; line-height: 1.7;">
Format: <strong> ${ msgContentType. split ("/" )[1 ]} </strong><br>
Size: <strong> ${ msgBodySize} bytes</strong><br>
Encoding: <strong>utf-8</strong>
</div>
</div>
</div>
<div style="display: grid; grid-template-columns: 1fr 1fr; gap: 12px;">
<div style="background: white; padding: 12px; border-radius: 6px;">
<div style="font-size: 12px; color: #7F8C8D;">Total Message Size</div>
<div style="font-size: 22px; font-weight: bold; color: #2C3E50;"> ${ msgTotalSize} bytes</div>
</div>
<div style="background: white; padding: 12px; border-radius: 6px;">
<div style="font-size: 12px; color: #7F8C8D;">Payload Efficiency</div>
<div style="font-size: 22px; font-weight: bold; color: ${ (msgBodySize / msgTotalSize * 100 ) >= 70 ? '#16A085' : (msgBodySize / msgTotalSize * 100 ) >= 40 ? '#E67E22' : '#E74C3C' } ;"> ${ (msgBodySize / msgTotalSize * 100 ). toFixed (1 )} %</div>
</div>
</div>
</div>
`
Delivery Guarantees
AMQP provides configurable delivery semantics to match application requirements.
1. At-Most-Once (0 or 1)
Message delivered once or not at all. Fastest but may lose messages.
Characteristics:
No acknowledgment required
Fastest, lowest overhead
Fire-and-forget pattern
Implementation:
# Producer: No confirms
channel.basic_publish(
exchange= 'sensors' ,
routing_key= 'temperature' ,
body= '22.5' ,
properties= pika.BasicProperties(delivery_mode= 1 ) # Transient
)
# Consumer: Auto-acknowledge
channel.basic_consume(
queue= 'sensor_data' ,
on_message_callback= callback,
auto_ack= True # Message removed immediately on delivery
)
Use cases:
Non-critical telemetry
Sensor readings where occasional loss is acceptable
High-throughput streaming where speed matters more than completeness
2. At-Least-Once (1 or more)
Message guaranteed to be delivered, but may arrive multiple times.
Characteristics:
Requires acknowledgment
Retries on failure
Possible duplicates (consumer must handle)
Implementation:
# Producer: With confirms
channel.confirm_delivery()
try :
channel.basic_publish(
exchange= 'orders' ,
routing_key= 'order.created' ,
body= json.dumps(order),
properties= pika.BasicProperties(
delivery_mode= 2 , # Persistent
message_id= str (uuid.uuid4())
)
)
except pika.exceptions.UnroutableError:
# Handle unroutable message
retry_or_log(order)
# Consumer: Manual acknowledgment
def callback(ch, method, properties, body):
try :
process_order(body)
ch.basic_ack(delivery_tag= method.delivery_tag)
except Exception :
ch.basic_nack(delivery_tag= method.delivery_tag, requeue= True )
channel.basic_consume(
queue= 'orders' ,
on_message_callback= callback,
auto_ack= False # Manual ACK required
)
Use cases:
Important events and commands
Order processing (with idempotent handlers)
Notifications that must be delivered
3. Exactly-Once (exactly 1)
Message delivered exactly once with no loss or duplicates.
Characteristics:
Uses transactions or deduplication
Highest reliability, highest overhead
Most complex to implement
Implementation (transaction-based):
# Producer: Transactional publishing
channel.tx_select()
try :
channel.basic_publish(
exchange= 'payments' ,
routing_key= 'payment.process' ,
body= json.dumps(payment),
properties= pika.BasicProperties(delivery_mode= 2 )
)
channel.tx_commit()
except Exception :
channel.tx_rollback()
raise
# Consumer: With deduplication
def callback(ch, method, properties, body):
message_id = properties.message_id
# Check if already processed
if redis.exists(f'processed: { message_id} ' ):
ch.basic_ack(delivery_tag= method.delivery_tag)
return
try :
result = process_payment(body)
redis.setex(f'processed: { message_id} ' , 86400 , '1' ) # 24h TTL
ch.basic_ack(delivery_tag= method.delivery_tag)
except Exception :
ch.basic_nack(delivery_tag= method.delivery_tag, requeue= True )
Use cases:
Financial transactions
Critical commands that cannot be duplicated
Billing and accounting events
Select a delivery guarantee level and simulate sending messages to see how each mode handles success, failure, and network issues. Adjust the failure rate to observe retries, duplicates, and lost messages.
Show code
viewof simGuarantee = Inputs. select (["At-Most-Once" , "At-Least-Once" , "Exactly-Once" ], {value : "At-Least-Once" , label : "Delivery guarantee" })
viewof simFailRate = Inputs. range ([0 , 80 ], {value : 20 , step : 5 , label : "Network failure rate (%)" })
viewof simMessageCount = Inputs. range ([5 , 50 ], {value : 20 , step : 1 , label : "Messages to send" })
viewof simRunBtn = Inputs. button ("Run Simulation" , {reduce : (x) => (x || 0 ) + 1 })
Show code
simResults = {
simRunBtn;
const count = simMessageCount;
const failPct = simFailRate / 100 ;
const mode = simGuarantee;
let sent = 0 , delivered = 0 , lost = 0 , duplicates = 0 , overhead = 0 ;
for (let i = 0 ; i < count; i++ ) {
sent++;
const fails = Math . random () < failPct;
if (mode === "At-Most-Once" ) {
if (fails) { lost++; } else { delivered++; }
overhead += 0 ;
} else if (mode === "At-Least-Once" ) {
if (fails) {
overhead += 1 ;
const retryFails = Math . random () < failPct * 0.5 ;
if (retryFails) {
overhead += 1 ;
delivered++;
} else {
delivered++;
if (Math . random () < 0.15 ) { duplicates++; }
}
} else {
delivered++;
overhead += 1 ;
}
} else {
overhead += 3 ;
if (fails) {
overhead += 3 ;
delivered++;
} else {
delivered++;
}
}
}
return {sent, delivered, lost, duplicates, overhead, mode};
}
html `
<div style="background: #f8f9fa; padding: 20px; border-radius: 8px; border-left: 4px solid #9B59B6; margin: 20px 0;">
<div style="font-size: 15px; font-weight: bold; color: #2C3E50; margin-bottom: 15px;">Simulation Results: ${ simResults. mode } </div>
<div style="display: grid; grid-template-columns: repeat(5, 1fr); gap: 10px; margin-bottom: 15px;">
<div style="background: white; padding: 12px; border-radius: 6px; text-align: center;">
<div style="font-size: 11px; color: #7F8C8D;">Sent</div>
<div style="font-size: 22px; font-weight: bold; color: #2C3E50;"> ${ simResults. sent } </div>
</div>
<div style="background: white; padding: 12px; border-radius: 6px; text-align: center;">
<div style="font-size: 11px; color: #7F8C8D;">Delivered</div>
<div style="font-size: 22px; font-weight: bold; color: #16A085;"> ${ simResults. delivered } </div>
</div>
<div style="background: white; padding: 12px; border-radius: 6px; text-align: center;">
<div style="font-size: 11px; color: #7F8C8D;">Lost</div>
<div style="font-size: 22px; font-weight: bold; color: #E74C3C;"> ${ simResults. lost } </div>
</div>
<div style="background: white; padding: 12px; border-radius: 6px; text-align: center;">
<div style="font-size: 11px; color: #7F8C8D;">Duplicates</div>
<div style="font-size: 22px; font-weight: bold; color: #E67E22;"> ${ simResults. duplicates } </div>
</div>
<div style="background: white; padding: 12px; border-radius: 6px; text-align: center;">
<div style="font-size: 11px; color: #7F8C8D;">ACK/TX Overhead</div>
<div style="font-size: 22px; font-weight: bold; color: #7F8C8D;"> ${ simResults. overhead } </div>
</div>
</div>
<svg width="100%" height="60" viewBox="0 0 500 60">
${ Array . from ({length : simResults. sent }, (_, i) => {
const x = 10 + (i % 25 ) * 19 ;
const y = i < 25 ? 10 : 35 ;
const isLost = i < simResults. lost ;
const isDup = ! isLost && i >= simResults. sent - simResults. duplicates ;
const color = isLost ? "#E74C3C" : isDup ? "#E67E22" : "#16A085" ;
return `<rect x=" ${ x} " y=" ${ y} " width="15" height="15" rx="2" fill=" ${ color} " opacity="0.85"/>` ;
}). join ("" )}
</svg>
<div style="display: flex; gap: 15px; font-size: 11px; color: #7F8C8D; margin-top: 8px;">
<span><span style="display:inline-block;width:12px;height:12px;background:#16A085;border-radius:2px;vertical-align:middle;margin-right:4px;"></span> Delivered</span>
<span><span style="display:inline-block;width:12px;height:12px;background:#E74C3C;border-radius:2px;vertical-align:middle;margin-right:4px;"></span> Lost</span>
<span><span style="display:inline-block;width:12px;height:12px;background:#E67E22;border-radius:2px;vertical-align:middle;margin-right:4px;"></span> Duplicate</span>
</div>
<div style="background: white; padding: 12px; border-radius: 6px; margin-top: 12px;">
<div style="font-size: 12px; color: #2C3E50; line-height: 1.7;">
<strong>Insight:</strong>
${ simResults. mode === "At-Most-Once" ? `At-most-once lost ${ simResults. lost } of ${ simResults. sent } messages ( ${ (simResults. lost / simResults. sent * 100 ). toFixed (0 )} %). This is acceptable for periodic sensor telemetry where the next reading replaces the lost one, but dangerous for commands or alerts.` : simResults. mode === "At-Least-Once" ? `At-least-once delivered all messages but produced ${ simResults. duplicates } duplicate(s). Consumers must use message-ID deduplication to handle duplicates safely. The ${ simResults. overhead } ACK round-trips add latency but guarantee delivery.` : `Exactly-once delivered all messages with zero duplicates, but required ${ simResults. overhead } transaction round-trips (3x per message). This overhead is justified for financial transactions but wasteful for high-frequency telemetry.` }
</div>
</div>
</div>
`
Publisher Confirms
Publisher confirms allow producers to know when messages are safely received by the broker.
Implementation:
# Enable publisher confirms
channel.confirm_delivery()
# Synchronous confirm (blocking)
channel.basic_publish(
exchange= 'orders' ,
routing_key= 'order.new' ,
body= order_json,
mandatory= True
)
# Raises exception if not confirmed
# Asynchronous confirms (non-blocking)
def on_confirm(frame):
if frame.method.NAME == 'Basic.Ack' :
print (f"Message { frame. method. delivery_tag} confirmed" )
else :
print (f"Message { frame. method. delivery_tag} rejected" )
channel.add_on_return_callback(on_confirm)
Consumer Acknowledgments
Consumer acknowledgments ensure messages are processed successfully before removal from queues.
ACK Types
Positive ACK
basic_ack()
Message removed from queue
Negative ACK
basic_nack()
Message requeued or sent to DLQ
Reject
basic_reject()
Single message reject (legacy)
Prefetch Count
Prefetch controls how many unacknowledged messages a consumer can hold.
# Limit to 1 unacknowledged message at a time
channel.basic_qos(prefetch_count= 1 )
# Best for:
# - Long-running tasks (prevents one consumer hoarding all work)
# - Fair distribution across multiple consumers
Prefetch recommendations:
< 100ms
50-100
Reduce round-trip overhead
100ms - 1s
10-20
Balance throughput and fairness
1s - 10s
1-5
Prevent consumer overload
> 10s
1
One task at a time
Adjust the prefetch count and number of consumers to see how messages are distributed. Observe how different prefetch values affect fairness and throughput when consumers have varying processing speeds.
Show code
viewof pfPrefetch = Inputs. range ([1 , 50 ], {value : 1 , step : 1 , label : "Prefetch count" })
viewof pfConsumers = Inputs. range ([1 , 6 ], {value : 3 , step : 1 , label : "Number of consumers" })
viewof pfTotalMsgs = Inputs. range ([10 , 60 ], {value : 30 , step : 1 , label : "Total messages in queue" })
viewof pfSpeedVariation = Inputs. select (["Uniform (all same speed)" , "Variable (1x, 2x, 3x speeds)" , "One slow consumer (10x slower)" ], {value : "Variable (1x, 2x, 3x speeds)" , label : "Consumer speed profile" })
Show code
pfSimulation = {
const n = pfConsumers;
const prefetch = pfPrefetch;
const total = pfTotalMsgs;
const speeds = [];
for (let i = 0 ; i < n; i++ ) {
if (pfSpeedVariation === "Uniform (all same speed)" ) {
speeds. push (1 );
} else if (pfSpeedVariation === "Variable (1x, 2x, 3x speeds)" ) {
speeds. push (1 + i);
} else {
speeds. push (i === 0 ? 10 : 1 );
}
}
const consumerLoad = Array . from ({length : n}, () => 0 );
const consumerUnacked = Array . from ({length : n}, () => 0 );
const consumerProcessed = Array . from ({length : n}, () => 0 );
let dispatched = 0 ;
for (let round = 0 ; round < total * 3 && dispatched < total; round++ ) {
for (let c = 0 ; c < n && dispatched < total; c++ ) {
if (consumerUnacked[c] < prefetch) {
consumerUnacked[c]++;
consumerLoad[c]++;
dispatched++;
}
}
for (let c = 0 ; c < n; c++ ) {
if (consumerUnacked[c] > 0 && round % speeds[c] === 0 ) {
consumerUnacked[c]--;
consumerProcessed[c]++;
}
}
}
const maxLoad = Math . max (... consumerLoad);
const minLoad = Math . min (... consumerLoad);
const fairness = maxLoad > 0 ? (minLoad / maxLoad * 100 ). toFixed (0 ) : 100 ;
return {consumerLoad, speeds, fairness, n, prefetch, total, maxLoad};
}
html `
<div style="background: #f8f9fa; padding: 20px; border-radius: 8px; border-left: 4px solid #16A085; margin: 20px 0;">
<div style="font-size: 15px; font-weight: bold; color: #2C3E50; margin-bottom: 15px;">Message Distribution Across ${ pfSimulation. n } Consumers (prefetch= ${ pfSimulation. prefetch } )</div>
<div style="display: grid; grid-template-columns: repeat( ${ Math . min (pfSimulation. n , 6 )} , 1fr); gap: 10px; margin-bottom: 15px;">
${ pfSimulation. consumerLoad . map ((load, i) => `
<div style="background: white; padding: 12px; border-radius: 6px; text-align: center;">
<div style="font-size: 11px; color: #7F8C8D; margin-bottom: 4px;">Consumer ${ i + 1 } ( ${ pfSimulation. speeds [i]} x)</div>
<div style="font-size: 20px; font-weight: bold; color: #2C3E50;"> ${ load} </div>
<div style="font-size: 11px; color: #7F8C8D;">messages</div>
<div style="margin-top: 8px; background: #ecf0f1; border-radius: 4px; height: 8px; overflow: hidden;">
<div style="height: 100%; width: ${ pfSimulation. maxLoad > 0 ? (load / pfSimulation. maxLoad * 100 ) : 0 } %; background: ${ load === pfSimulation. maxLoad ? '#E67E22' : '#16A085' } ; border-radius: 4px;"></div>
</div>
</div>
` ). join ("" )}
</div>
<div style="display: grid; grid-template-columns: 1fr 1fr; gap: 12px;">
<div style="background: white; padding: 12px; border-radius: 6px;">
<div style="font-size: 12px; color: #7F8C8D;">Fairness Index</div>
<div style="font-size: 20px; font-weight: bold; color: ${ pfSimulation. fairness >= 80 ? '#16A085' : pfSimulation. fairness >= 50 ? '#E67E22' : '#E74C3C' } ;"> ${ pfSimulation. fairness } %</div>
<div style="font-size: 11px; color: #7F8C8D;"> ${ pfSimulation. fairness >= 80 ? 'Fair distribution' : pfSimulation. fairness >= 50 ? 'Moderate imbalance' : 'Unfair -- reduce prefetch' } </div>
</div>
<div style="background: white; padding: 12px; border-radius: 6px;">
<div style="font-size: 12px; color: #7F8C8D;">Recommendation</div>
<div style="font-size: 12px; color: #2C3E50; line-height: 1.6; margin-top: 4px;">
${ pfSimulation. prefetch === 1 ? "Prefetch=1 gives fairest distribution but adds ACK latency per message. Good for slow tasks." : pfSimulation. prefetch <= 10 ? "Moderate prefetch balances throughput and fairness. Good for medium-speed tasks." : "High prefetch boosts throughput but fast consumers may hoard messages, starving slower ones." }
</div>
</div>
</div>
</div>
`
Dead Letter Queues
Dead letter queues (DLQ) capture messages that cannot be processed successfully.
DLQ Triggers
Messages are sent to DLQ when:
Consumer rejects with requeue=false : basic_nack(requeue=False)
Message TTL expires : Message exceeded time-to-live
Queue length exceeded : Queue reached maximum length
Message rejected after max retries : Application-level retry exhaustion
DLQ Configuration
# Declare dead letter exchange
channel.exchange_declare(
exchange= 'dlx' ,
exchange_type= 'direct'
)
# Declare dead letter queue
channel.queue_declare(queue= 'dead_letters' )
channel.queue_bind(queue= 'dead_letters' , exchange= 'dlx' , routing_key= 'failed' )
# Declare main queue with DLQ configuration
channel.queue_declare(
queue= 'orders' ,
arguments= {
'x-dead-letter-exchange' : 'dlx' ,
'x-dead-letter-routing-key' : 'failed' ,
'x-message-ttl' : 300000 # Optional: 5 min TTL
}
)
Configure a queue with DLQ settings and simulate message processing. Watch how messages flow from the main queue to the dead letter queue under different failure conditions.
Show code
viewof dlqTTL = Inputs. range ([0 , 600 ], {value : 60 , step : 10 , label : "Message TTL (seconds, 0 = no TTL)" })
viewof dlqMaxLen = Inputs. range ([0 , 100 ], {value : 20 , step : 5 , label : "Max queue length (0 = unlimited)" })
viewof dlqMaxRetries = Inputs. range ([1 , 10 ], {value : 3 , step : 1 , label : "Max retry attempts" })
viewof dlqIncomingRate = Inputs. range ([1 , 20 ], {value : 5 , step : 1 , label : "Incoming messages/sec" })
viewof dlqProcessRate = Inputs. range ([1 , 20 ], {value : 3 , step : 1 , label : "Processing rate/sec" })
viewof dlqFailPct = Inputs. range ([0 , 100 ], {value : 30 , step : 5 , label : "Processing failure rate (%)" })
Show code
dlqSim = {
const duration = 30 ;
const incoming = dlqIncomingRate;
const processing = dlqProcessRate;
const failRate = dlqFailPct / 100 ;
const maxLen = dlqMaxLen === 0 ? Infinity : dlqMaxLen;
const maxRetries = dlqMaxRetries;
const ttl = dlqTTL;
let queueSize = 0 , dlqSize = 0 ;
let totalIn = 0 , processed = 0 , dlqByReject = 0 , dlqByTTL = 0 , dlqByOverflow = 0 ;
const timeline = [];
for (let t = 0 ; t < duration; t++ ) {
let newMsgs = incoming;
let overflow = 0 ;
if (queueSize + newMsgs > maxLen) {
overflow = (queueSize + newMsgs) - maxLen;
newMsgs = Math . max (0 , maxLen - queueSize);
dlqByOverflow += overflow;
dlqSize += overflow;
}
queueSize += newMsgs;
totalIn += incoming;
if (ttl > 0 && queueSize > 0 ) {
const expired = Math . min (queueSize, Math . floor (queueSize * (1 / ttl)));
if (t > ttl && expired > 0 ) {
queueSize -= expired;
dlqByTTL += expired;
dlqSize += expired;
}
}
const canProcess = Math . min (queueSize, processing);
for (let m = 0 ; m < canProcess; m++ ) {
queueSize--;
if (Math . random () < failRate) {
if (Math . random () < 1 / maxRetries) {
dlqByReject++;
dlqSize++;
} else {
queueSize++;
}
} else {
processed++;
}
}
timeline. push ({t, queueSize, dlqSize});
}
return {totalIn, processed, dlqSize, dlqByReject, dlqByTTL, dlqByOverflow, queueSize, timeline};
}
html `
<div style="background: #f8f9fa; padding: 20px; border-radius: 8px; border-left: 4px solid #E74C3C; margin: 20px 0;">
<div style="font-size: 15px; font-weight: bold; color: #2C3E50; margin-bottom: 15px;">DLQ Simulation (30-second window)</div>
<div style="display: grid; grid-template-columns: repeat(4, 1fr); gap: 10px; margin-bottom: 15px;">
<div style="background: white; padding: 12px; border-radius: 6px; text-align: center;">
<div style="font-size: 11px; color: #7F8C8D;">Total Incoming</div>
<div style="font-size: 20px; font-weight: bold; color: #3498DB;"> ${ dlqSim. totalIn } </div>
</div>
<div style="background: white; padding: 12px; border-radius: 6px; text-align: center;">
<div style="font-size: 11px; color: #7F8C8D;">Processed OK</div>
<div style="font-size: 20px; font-weight: bold; color: #16A085;"> ${ dlqSim. processed } </div>
</div>
<div style="background: white; padding: 12px; border-radius: 6px; text-align: center;">
<div style="font-size: 11px; color: #7F8C8D;">Still in Queue</div>
<div style="font-size: 20px; font-weight: bold; color: #E67E22;"> ${ dlqSim. queueSize } </div>
</div>
<div style="background: white; padding: 12px; border-radius: 6px; text-align: center;">
<div style="font-size: 11px; color: #7F8C8D;">In DLQ</div>
<div style="font-size: 20px; font-weight: bold; color: #E74C3C;"> ${ dlqSim. dlqSize } </div>
</div>
</div>
<div style="background: white; padding: 15px; border-radius: 6px; margin-bottom: 12px;">
<div style="font-size: 12px; color: #7F8C8D; margin-bottom: 8px;">DLQ Breakdown by Trigger</div>
<div style="display: flex; gap: 0; border-radius: 6px; overflow: hidden; height: 30px; background: #ecf0f1;">
${ dlqSim. dlqByReject > 0 ? `<div style="background: #E74C3C; flex: ${ dlqSim. dlqByReject } ; display: flex; align-items: center; justify-content: center; color: white; font-size: 10px; font-weight: bold; min-width: ${ dlqSim. dlqByReject > 0 ? 40 : 0 } px;">NACK ${ dlqSim. dlqByReject } </div>` : '' }
${ dlqSim. dlqByTTL > 0 ? `<div style="background: #9B59B6; flex: ${ dlqSim. dlqByTTL } ; display: flex; align-items: center; justify-content: center; color: white; font-size: 10px; font-weight: bold; min-width: 40px;">TTL ${ dlqSim. dlqByTTL } </div>` : '' }
${ dlqSim. dlqByOverflow > 0 ? `<div style="background: #E67E22; flex: ${ dlqSim. dlqByOverflow } ; display: flex; align-items: center; justify-content: center; color: white; font-size: 10px; font-weight: bold; min-width: 40px;">Overflow ${ dlqSim. dlqByOverflow } </div>` : '' }
${ dlqSim. dlqSize === 0 ? `<div style="flex: 1; display: flex; align-items: center; justify-content: center; color: #7F8C8D; font-size: 11px;">No messages in DLQ</div>` : '' }
</div>
<div style="display: flex; gap: 15px; font-size: 11px; color: #7F8C8D; margin-top: 8px;">
<span><span style="display:inline-block;width:10px;height:10px;background:#E74C3C;border-radius:2px;vertical-align:middle;margin-right:3px;"></span> Consumer reject (NACK)</span>
<span><span style="display:inline-block;width:10px;height:10px;background:#9B59B6;border-radius:2px;vertical-align:middle;margin-right:3px;"></span> TTL expired</span>
<span><span style="display:inline-block;width:10px;height:10px;background:#E67E22;border-radius:2px;vertical-align:middle;margin-right:3px;"></span> Queue overflow</span>
</div>
</div>
<div style="background: white; padding: 12px; border-radius: 6px;">
<div style="font-size: 12px; color: #2C3E50; line-height: 1.7;">
<strong>Analysis:</strong>
${ dlqIncomingRate > dlqProcessRate ? `Incoming rate ( ${ dlqIncomingRate} /s) exceeds processing rate ( ${ dlqProcessRate} /s), causing queue buildup.` : `Processing rate ( ${ dlqProcessRate} /s) keeps up with incoming rate ( ${ dlqIncomingRate} /s).` }
${ dlqSim. dlqByOverflow > 0 ? ` Queue length limit caused ${ dlqSim. dlqByOverflow } messages to overflow to DLQ -- consider increasing max queue length or adding consumers.` : '' }
${ dlqSim. dlqByReject > 0 ? ` ${ dlqSim. dlqByReject } messages exhausted retries and were rejected to DLQ -- investigate processing failures.` : '' }
${ dlqSim. dlqSize === 0 ? ' No messages reached the DLQ. The system is handling the current load well.' : '' }
</div>
</div>
</div>
`
Worked Example: Smart Factory Message Sizing
Scenario: A pharmaceutical manufacturing plant monitors 500 machines via AMQP. Each machine publishes three types of messages with different delivery requirements:
Temperature telemetry
Every 5 seconds
120 bytes JSON
Occasional loss OK
Quality alert
On threshold breach (~10/hour/machine)
350 bytes JSON
Must not be lost
Batch completion event
~2 per hour per machine
800 bytes JSON
Exactly once (audit trail)
Step 1: Calculate message rates and bandwidth
Telemetry: 500 machines x 1 msg/5s = 100 msg/s
Quality alerts: 500 x 10/hr = 5,000/hr = 1.39 msg/s
Batch events: 500 x 2/hr = 1,000/hr = 0.28 msg/s
Total: ~102 msg/s
Step 2: Calculate per-message AMQP overhead
Each AMQP message includes: - Frame header: 8 bytes - Method frame (basic.publish): ~40 bytes (exchange name, routing key) - Content header: ~60 bytes (properties: delivery_mode, content_type, timestamp, message_id, correlation_id) - Content body frame: 8 bytes header + payload
Total overhead per message: ~116 bytes
Step 3: Choose delivery mode for each type
Telemetry
At-most-once
120 + 116 = 236 B
Next reading in 5s replaces any loss
Quality alert
At-least-once
350 + 116 = 466 B + ACK (40 B)
Must arrive; dedup via message_id
Batch event
Exactly-once
800 + 116 = 916 B + TX overhead (~200 B)
Audit requires no duplicates
Step 4: Aggregate bandwidth
Telemetry: 100 msg/s x 236 B = 23,600 B/s = 189 Kbps
Alerts: 1.39 msg/s x 506 B = 703 B/s = 5.6 Kbps
Batch: 0.28 msg/s x 1,116 B = 312 B/s = 2.5 Kbps
Total: ~197 Kbps (well within a 1 Mbps LAN connection)
AMQP protocol overhead per message consists of multiple frame components:
\[
\text{Overhead}_{\text{AMQP}} = \text{frame}_{\text{header}} + \text{method}_{\text{frame}} + \text{content}_{\text{header}} + \text{body}_{\text{frame}}
\]
\[
= 8B + 40B + 60B + 8B = 116 \text{ bytes minimum}
\]
Message efficiency for different payload sizes:
\[
\text{Efficiency} = \frac{\text{payload}}{\text{payload} + \text{overhead}} \times 100\%
\]
10 B
126 B
7.9% (poor)
120 B
236 B
50.8% (moderate)
800 B
916 B
87.3% (good)
For telemetry with 120-byte payloads, nearly half the bandwidth is protocol overhead. Batching 10 readings into one 1,200-byte message improves efficiency from 51% to 91%, saving \((10 \times 236B) - (1,200B + 116B) = 1,044B\) per 10 readings (44% reduction).
Interactive Calculator: AMQP Message Sizing
Show code
viewof payloadSize = Inputs. range ([1 , 5000 ], {
value : 120 ,
step : 1 ,
label : "Payload size (bytes)"
})
viewof deliveryMode = Inputs. select (
["At-Most-Once (no ACK)" , "At-Least-Once (with ACK)" , "Exactly-Once (with TX)" ],
{value : "At-Least-Once (with ACK)" , label : "Delivery mode" }
)
// Constants
frameHeader = 8
methodFrame = 40
contentHeader = 60
bodyFrame = 8
baseOverhead = frameHeader + methodFrame + contentHeader + bodyFrame
ackOverhead = deliveryMode === "At-Least-Once (with ACK)" ? 40 : 0
txOverhead = deliveryMode === "Exactly-Once (with TX)" ? 200 : 0
totalOverhead = baseOverhead + ackOverhead + txOverhead
totalSize = payloadSize + totalOverhead
efficiency = (payloadSize / totalSize * 100 ). toFixed (1 )
efficiencyColor = efficiency >= 80 ? "#16A085" : efficiency >= 50 ? "#E67E22" : "#E74C3C"
html `
<div style="background: #f8f9fa; padding: 20px; border-radius: 8px; border-left: 4px solid #2C3E50; margin: 20px 0;">
<div style="display: grid; grid-template-columns: 1fr 1fr; gap: 20px; margin-bottom: 20px;">
<div>
<div style="font-size: 14px; color: #7F8C8D; margin-bottom: 5px;">Payload</div>
<div style="font-size: 28px; font-weight: bold; color: #2C3E50;"> ${ payloadSize} bytes</div>
</div>
<div>
<div style="font-size: 14px; color: #7F8C8D; margin-bottom: 5px;">Protocol Overhead</div>
<div style="font-size: 28px; font-weight: bold; color: #E67E22;"> ${ totalOverhead} bytes</div>
</div>
</div>
<div style="margin-bottom: 20px;">
<div style="font-size: 14px; color: #7F8C8D; margin-bottom: 5px;">Overhead Breakdown</div>
<div style="font-size: 13px; color: #2C3E50; line-height: 1.8;">
• Frame header: ${ frameHeader} B + Method frame: ${ methodFrame} B + Content header: ${ contentHeader} B + Body frame: ${ bodyFrame} B = ${ baseOverhead} B base<br>
${ deliveryMode === "At-Least-Once (with ACK)" ? `• ACK overhead: ${ ackOverhead} B` : "" }
${ deliveryMode === "Exactly-Once (with TX)" ? `• Transaction overhead: ${ txOverhead} B` : "" }
</div>
</div>
<div style="display: grid; grid-template-columns: 1fr 1fr; gap: 20px;">
<div>
<div style="font-size: 14px; color: #7F8C8D; margin-bottom: 5px;">Total Message Size</div>
<div style="font-size: 24px; font-weight: bold; color: #2C3E50;"> ${ totalSize} bytes</div>
</div>
<div>
<div style="font-size: 14px; color: #7F8C8D; margin-bottom: 5px;">Payload Efficiency</div>
<div style="font-size: 24px; font-weight: bold; color: ${ efficiencyColor} ;"> ${ efficiency} %</div>
<div style="font-size: 12px; color: #7F8C8D; margin-top: 4px;">
${ efficiency >= 80 ? "Excellent" : efficiency >= 50 ? "Moderate" : "Poor" }
</div>
</div>
</div>
<div style="margin-top: 15px; padding: 15px; background: white; border-radius: 6px;">
<div style="font-size: 13px; color: #2C3E50; margin-bottom: 8px;">
<strong>Recommendation:</strong>
${ efficiency < 50
? `Consider batching multiple messages to improve efficiency. Batching 10 messages would increase efficiency to ${ (payloadSize * 10 / (payloadSize * 10 + totalOverhead) * 100 ). toFixed (1 )} %.`
: efficiency < 80
? `Efficiency is moderate. For high-throughput scenarios, consider batching to reduce overhead.`
: `Excellent efficiency! Overhead is minimal relative to payload size.` }
</div>
</div>
</div>
`
Interactive Calculator: AMQP Bandwidth Requirements
Show code
viewof msgRate = Inputs. range ([0.1 , 1000 ], {
value : 100 ,
step : 0.1 ,
label : "Message rate (msg/s)"
})
viewof msgPayload = Inputs. range ([10 , 2000 ], {
value : 120 ,
step : 10 ,
label : "Average payload (bytes)"
})
viewof qosLevel = Inputs. select (
["At-Most-Once" , "At-Least-Once" , "Exactly-Once" ],
{value : "At-Least-Once" , label : "QoS level" }
)
// Calculate overhead based on QoS
qosOverhead = qosLevel === "At-Most-Once" ? 116 :
qosLevel === "At-Least-Once" ? 156 : 316
totalMsgSize = msgPayload + qosOverhead
bytesPerSecond = (msgRate * totalMsgSize). toFixed (0 )
kiloBitsPerSecond = (bytesPerSecond * 8 / 1000 ). toFixed (1 )
megaBitsPerSecond = (bytesPerSecond * 8 / 1000000 ). toFixed (3 )
messagesPerHour = (msgRate * 3600 ). toFixed (0 )
dataPerHour = (bytesPerSecond * 3600 / 1024 / 1024 ). toFixed (1 )
bandwidthColor = megaBitsPerSecond < 0.1 ? "#16A085" :
megaBitsPerSecond < 1 ? "#E67E22" : "#E74C3C"
html `
<div style="background: #f8f9fa; padding: 20px; border-radius: 8px; border-left: 4px solid #2C3E50; margin: 20px 0;">
<div style="display: grid; grid-template-columns: repeat(3, 1fr); gap: 15px; margin-bottom: 20px;">
<div>
<div style="font-size: 14px; color: #7F8C8D; margin-bottom: 5px;">Message Rate</div>
<div style="font-size: 24px; font-weight: bold; color: #2C3E50;"> ${ msgRate. toFixed (1 )} msg/s</div>
<div style="font-size: 12px; color: #7F8C8D; margin-top: 2px;"> ${ Number (messagesPerHour). toLocaleString ()} msg/hr</div>
</div>
<div>
<div style="font-size: 14px; color: #7F8C8D; margin-bottom: 5px;">Message Size</div>
<div style="font-size: 24px; font-weight: bold; color: #2C3E50;"> ${ totalMsgSize} B</div>
<div style="font-size: 12px; color: #7F8C8D; margin-top: 2px;"> ${ msgPayload} B payload + ${ qosOverhead} B overhead</div>
</div>
<div>
<div style="font-size: 14px; color: #7F8C8D; margin-bottom: 5px;">QoS Overhead</div>
<div style="font-size: 24px; font-weight: bold; color: #E67E22;"> ${ qosOverhead} B</div>
<div style="font-size: 12px; color: #7F8C8D; margin-top: 2px;"> ${ qosLevel} </div>
</div>
</div>
<div style="background: white; padding: 15px; border-radius: 6px; margin-bottom: 15px;">
<div style="font-size: 14px; color: #7F8C8D; margin-bottom: 10px;">Bandwidth Requirements</div>
<div style="display: grid; grid-template-columns: 1fr 1fr 1fr; gap: 15px;">
<div>
<div style="font-size: 13px; color: #7F8C8D;">Bytes/second</div>
<div style="font-size: 20px; font-weight: bold; color: #2C3E50;"> ${ Number (bytesPerSecond). toLocaleString ()} </div>
</div>
<div>
<div style="font-size: 13px; color: #7F8C8D;">Kbps</div>
<div style="font-size: 20px; font-weight: bold; color: ${ bandwidthColor} ;"> ${ kiloBitsPerSecond} </div>
</div>
<div>
<div style="font-size: 13px; color: #7F8C8D;">Mbps</div>
<div style="font-size: 20px; font-weight: bold; color: ${ bandwidthColor} ;"> ${ megaBitsPerSecond} </div>
</div>
</div>
</div>
<div style="background: white; padding: 15px; border-radius: 6px;">
<div style="font-size: 14px; color: #7F8C8D; margin-bottom: 10px;">Hourly Throughput</div>
<div style="font-size: 13px; color: #2C3E50; line-height: 1.8;">
• <strong> ${ Number (messagesPerHour). toLocaleString ()} </strong> messages per hour<br>
• <strong> ${ dataPerHour} MB</strong> data per hour<br>
• Network recommendation:
${ megaBitsPerSecond < 0.01 ? "Low-power networks (LoRaWAN, NB-IoT) suitable" :
megaBitsPerSecond < 0.1 ? "Standard IoT networks (Zigbee, BLE) suitable" :
megaBitsPerSecond < 1 ? "Wi-Fi or cellular required" :
"High-bandwidth network required (Wi-Fi, Ethernet)" }
</div>
</div>
</div>
`
Step 5: Broker memory for persistent sessions
Quality alerts and batch events use persistent delivery (delivery_mode=2): - Persistent messages per second: 1.39 + 0.28 = 1.67 msg/s - Average persistent message size: ~810 bytes (weighted: (1.39 x 506 + 0.28 x 1116) / 1.67 ≈ 608 B on-wire; ~810 B with broker internal framing) - If a consumer goes offline for 10 minutes: 1.67 x 600 = 1,002 messages ≈ ~810 KB queued - With 5 consumers, worst case: ~4 MB broker memory for queued messages
Conclusion: The telemetry (98% of messages) uses fire-and-forget, keeping broker load minimal. Only the 2% of messages that matter (alerts + batch events) use persistent delivery, consuming modest broker resources. Using exactly-once for all messages would increase bandwidth to ~360 Kbps (vs. 197 Kbps) – roughly 1.8x – due to adding ~200 bytes of transaction overhead per telemetry message. That overhead is waste for data that is replaced every 5 seconds.
Knowledge Check
Test your understanding of message structure and delivery guarantees.
Summary
This chapter covered AMQP message structure and delivery guarantees:
Message Format : Analyzed header (durable, priority, TTL), properties (content-type, correlation-ID, message-ID), and body sections
Delivery Modes : Compared at-most-once (fast, may lose), at-least-once (guaranteed, may duplicate), and exactly-once (no loss, no duplicates)
Publisher Confirms : Implemented broker acknowledgment to producers for reliable publishing
Consumer Acknowledgments : Applied manual ACK/NACK with prefetch control for processing guarantees
Dead Letter Queues : Configured DLQ for capturing failed messages for investigation
Idempotency : Used message-ID for deduplication in at-least-once scenarios
What’s Next
AMQP Features and Frames
SASL authentication, TLS encryption, and AMQP 1.0 frame types
Secure and inspect the protocol layer that carries the messages you configured here
AMQP Core Architecture
Exchanges, queues, bindings, and the producer-broker-consumer model
Reinforce how message routing decisions upstream affect the delivery guarantees you set
AMQP Implementations and Labs
RabbitMQ broker setup, hands-on queue and DLQ configuration
Apply publisher confirms and DLQ patterns in a running broker environment
MQTT QoS Levels
MQTT at-most-once, at-least-once, and exactly-once equivalents
Compare AMQP delivery semantics with the constrained-device protocol used across most IoT edge devices
CoAP Fundamentals and Architecture
RESTful CoAP reliability via CON/NON message types
Contrast AMQP’s broker-mediated guarantees with CoAP’s direct endpoint reliability model
Protocol Integration Patterns
Bridging AMQP with MQTT, CoAP, and HTTP in IoT gateways
See how the delivery guarantees studied here propagate (or break) across protocol translation boundaries