%% fig-alt: Circuit diagram showing ESP32 connected to four push buttons for generating different priority traffic, four LEDs for indicating queue states, and a potentiometer for controlling the token bucket rate
%%{init: {'theme': 'base', 'themeVariables': {'primaryColor': '#2C3E50', 'primaryTextColor': '#fff', 'primaryBorderColor': '#16A085', 'lineColor': '#16A085', 'secondaryColor': '#E67E22', 'tertiaryColor': '#7F8C8D'}}}%%
flowchart LR
subgraph ESP32["ESP32 DevKit"]
G12["GPIO 12<br/>(Emergency)"]
G13["GPIO 13<br/>(Critical)"]
G14["GPIO 14<br/>(Normal)"]
G15["GPIO 15<br/>(Background)"]
G25["GPIO 25<br/>(Red LED)"]
G26["GPIO 26<br/>(Yellow LED)"]
G27["GPIO 27<br/>(Green LED)"]
G32["GPIO 32<br/>(Blue LED)"]
G34["GPIO 34<br/>(ADC)"]
GND["GND"]
end
subgraph Traffic["Traffic Generators"]
B1["Button 1<br/>Emergency"]
B2["Button 2<br/>Critical"]
B3["Button 3<br/>Normal"]
B4["Button 4<br/>Background"]
end
subgraph Status["Queue Status LEDs"]
LEDR["Red LED<br/>Emergency Q"]
LEDY["Yellow LED<br/>Critical Q"]
LEDG["Green LED<br/>Normal Q"]
LEDB["Blue LED<br/>Background Q"]
end
subgraph Control["Rate Control"]
POT["Potentiometer<br/>Token Rate"]
end
G12 -.->|"Input"| B1
G13 -.->|"Input"| B2
G14 -.->|"Input"| B3
G15 -.->|"Input"| B4
G25 -.->|"Output"| LEDR
G26 -.->|"Output"| LEDY
G27 -.->|"Output"| LEDG
G32 -.->|"Output"| LEDB
G34 -.->|"Analog"| POT
style ESP32 fill:#2C3E50,stroke:#16A085,stroke-width:2px,color:#fff
style Traffic fill:#c0392b,stroke:#2C3E50,stroke-width:2px,color:#fff
style Status fill:#E67E22,stroke:#2C3E50,stroke-width:2px,color:#fff
style Control fill:#16A085,stroke:#2C3E50,stroke-width:2px,color:#fff
207 QoS Management Lab: ESP32 Implementation
207.1 Learning Objectives
By the end of this lab, you will be able to:
- Build Priority Queues: Implement multi-level priority queues with configurable scheduling on ESP32
- Implement Token Bucket: Create traffic shaping with the token bucket algorithm
- Configure Rate Limiting: Build sliding window rate limiters to protect system resources
- Monitor SLA Compliance: Track latency metrics and detect SLA violations in real-time
- Build Policy Engines: Create dynamic QoS policy enforcement based on system state
207.2 What You Will Learn
In this hands-on lab, you will build and experiment with a comprehensive QoS management system on ESP32. The simulation demonstrates:
- Priority Queuing: Multi-level priority queues with configurable scheduling
- Traffic Shaping: Token bucket implementation for rate control
- Rate Limiting: Request throttling to protect system resources
- Service Level Monitoring: Real-time SLA tracking and violation detection
- QoS Policy Engine: Dynamic policy enforcement based on system state
- Metrics Dashboard: Live visualization of QoS performance
207.3 Lab Components
| Component | Purpose | Simulation Role |
|---|---|---|
| ESP32 DevKit | Main controller | Runs QoS engine |
| Push Buttons (4) | Traffic generators | Simulate different priority messages |
| LEDs (4) | Status indicators | Show queue states and violations |
| Potentiometer | Rate control | Adjust traffic shaping rate |
| Serial Monitor | Dashboard | Real-time QoS metrics display |
207.4 Wokwi Simulator Environment
Wokwi is a free online simulator for Arduino, ESP32, and other microcontrollers. It allows you to build and test IoT projects entirely in your browser without purchasing hardware. This lab demonstrates QoS concepts using standard components.
Launch the simulator below and copy the provided code to explore QoS management interactively.
- Click the + button to add components (search for βPush Buttonβ, βLEDβ, βPotentiometerβ)
- Use the Serial Monitor to see QoS metrics (115200 baud)
- Press buttons to generate messages of different priorities
- Adjust the potentiometer to change the token bucket rate
- Watch LEDs indicate queue states (green = healthy, red = overloaded)
- The code demonstrates production-quality QoS patterns
207.5 Step-by-Step Instructions
207.5.1 Step 1: Set Up the Circuit
- Add 4 Push Buttons: Click + and search for βPush Buttonβ - add 4 buttons
- Add 4 LEDs: Click + and search for βLEDβ - add Red, Yellow, Green, and Blue LEDs
- Add 1 Potentiometer: Click + and search for βPotentiometerβ
- Wire the components to ESP32:
- Button 1 (Emergency Traffic) -> GPIO 12
- Button 2 (Critical Traffic) -> GPIO 13
- Button 3 (Normal Traffic) -> GPIO 14
- Button 4 (Background Traffic) -> GPIO 15
- Red LED (Emergency Queue) -> GPIO 25
- Yellow LED (Critical Queue) -> GPIO 26
- Green LED (Normal Queue) -> GPIO 27
- Blue LED (Background Queue) -> GPIO 32
- Potentiometer signal -> GPIO 34 (ADC)
- All button other pins -> GND
- All LED cathodes -> GND (with 220 ohm resistors)
207.5.2 Step 2: Understanding the QoS Architecture
This lab implements a complete QoS management system with the following components:
%% fig-alt: QoS system architecture diagram showing message flow from traffic generators through classifiers into priority queues, then through a token bucket traffic shaper, rate limiter, and finally the QoS policy engine that monitors SLAs and adjusts policies dynamically
%%{init: {'theme': 'base', 'themeVariables': {'primaryColor': '#2C3E50', 'primaryTextColor': '#fff', 'primaryBorderColor': '#16A085', 'lineColor': '#16A085', 'secondaryColor': '#E67E22', 'tertiaryColor': '#7F8C8D'}}}%%
flowchart TB
subgraph Input["Traffic Input"]
TG["Traffic<br/>Generators<br/>(Buttons)"]
CL["Classifier<br/>(Priority Assignment)"]
end
subgraph Queues["Priority Queue System"]
Q1["Queue 1: Emergency<br/>Max Latency: 50ms"]
Q2["Queue 2: Critical<br/>Max Latency: 200ms"]
Q3["Queue 3: Normal<br/>Max Latency: 1000ms"]
Q4["Queue 4: Background<br/>Best Effort"]
end
subgraph Shaping["Traffic Shaping"]
TB["Token Bucket<br/>(Rate Control)"]
RL["Rate Limiter<br/>(Overflow Protection)"]
end
subgraph Policy["QoS Policy Engine"]
SLA["SLA Monitor<br/>(Violation Detection)"]
PE["Policy Enforcer<br/>(Dynamic Adjustment)"]
end
subgraph Output["Service Output"]
OUT["Message<br/>Processing"]
MET["Metrics<br/>Dashboard"]
end
TG --> CL
CL --> Q1
CL --> Q2
CL --> Q3
CL --> Q4
Q1 -->|"Strict Priority"| TB
Q2 -->|"Strict Priority"| TB
Q3 -->|"Strict Priority"| TB
Q4 -->|"Strict Priority"| TB
TB --> RL
RL --> OUT
OUT --> SLA
SLA --> PE
PE -.->|"Adjust"| TB
SLA --> MET
style Input fill:#2C3E50,stroke:#16A085,stroke-width:2px,color:#fff
style Queues fill:#E67E22,stroke:#2C3E50,stroke-width:2px,color:#fff
style Shaping fill:#16A085,stroke:#2C3E50,stroke-width:2px,color:#fff
style Policy fill:#7F8C8D,stroke:#2C3E50,stroke-width:2px,color:#fff
style Output fill:#2C3E50,stroke:#16A085,stroke-width:2px,color:#fff
207.5.3 Step 3: Copy the QoS Lab Code
Copy the following code into the Wokwi code editor. This comprehensive implementation demonstrates professional QoS patterns used in production IoT systems.
/*
* QoS Management Lab - ESP32 Implementation
*
* Comprehensive demonstration of Quality of Service concepts for IoT:
* - Priority Queuing with 4 priority levels
* - Token Bucket Traffic Shaping
* - Rate Limiting with sliding window
* - Service Level Agreement (SLA) Monitoring
* - Dynamic QoS Policy Enforcement
* - Real-time Metrics Dashboard
*
* Hardware Configuration:
* - Button 1 (GPIO 12): Generate EMERGENCY priority traffic
* - Button 2 (GPIO 13): Generate CRITICAL priority traffic
* - Button 3 (GPIO 14): Generate NORMAL priority traffic
* - Button 4 (GPIO 15): Generate BACKGROUND priority traffic
* - Red LED (GPIO 25): Emergency queue status
* - Yellow LED (GPIO 26): Critical queue status
* - Green LED (GPIO 27): Normal queue status
* - Blue LED (GPIO 32): Background queue status
* - Potentiometer (GPIO 34): Token bucket rate control
*
* Serial Monitor: 115200 baud for QoS metrics dashboard
*
* Key QoS Concepts Demonstrated:
* 1. Priority-based message scheduling
* 2. Traffic shaping with token bucket algorithm
* 3. Rate limiting to prevent overload
* 4. SLA violation detection and alerting
* 5. Dynamic policy adjustment based on load
* 6. Queue depth monitoring and overflow handling
*/
#include <Arduino.h>
// ============================================================
// CONFIGURATION CONSTANTS
// ============================================================
// Pin Definitions
#define BTN_EMERGENCY 12 // Emergency traffic generator
#define BTN_CRITICAL 13 // Critical traffic generator
#define BTN_NORMAL 14 // Normal traffic generator
#define BTN_BACKGROUND 15 // Background traffic generator
#define LED_EMERGENCY 25 // Emergency queue status LED
#define LED_CRITICAL 26 // Critical queue status LED
#define LED_NORMAL 27 // Normal queue status LED
#define LED_BACKGROUND 32 // Background queue status LED
#define POT_RATE 34 // Token rate control potentiometer
// Priority Levels (lower number = higher priority)
#define PRIORITY_EMERGENCY 0
#define PRIORITY_CRITICAL 1
#define PRIORITY_NORMAL 2
#define PRIORITY_BACKGROUND 3
#define NUM_PRIORITY_LEVELS 4
// Queue Configuration
#define MAX_QUEUE_SIZE 20 // Maximum messages per queue
#define MESSAGE_PAYLOAD_SIZE 64 // Bytes per message
// SLA Latency Requirements (milliseconds)
#define SLA_EMERGENCY_MS 50 // Emergency: max 50ms latency
#define SLA_CRITICAL_MS 200 // Critical: max 200ms latency
#define SLA_NORMAL_MS 1000 // Normal: max 1 second latency
#define SLA_BACKGROUND_MS 5000 // Background: max 5 seconds (best effort)
// Token Bucket Configuration
#define TOKEN_BUCKET_MAX 100 // Maximum tokens in bucket
#define TOKEN_COST_EMERGENCY 1 // Tokens consumed per emergency message
#define TOKEN_COST_CRITICAL 2 // Tokens consumed per critical message
#define TOKEN_COST_NORMAL 5 // Tokens consumed per normal message
#define TOKEN_COST_BACKGROUND 10 // Tokens consumed per background message
// Rate Limiting Configuration
#define RATE_LIMIT_WINDOW_MS 1000 // Sliding window duration
#define RATE_LIMIT_MAX_MSGS 50 // Maximum messages per window
// Timing
#define DEBOUNCE_MS 50 // Button debounce time
#define METRICS_INTERVAL_MS 2000 // Dashboard update interval
#define LED_BLINK_FAST_MS 100 // Fast blink for violations
#define LED_BLINK_SLOW_MS 500 // Slow blink for activity
// ============================================================
// DATA STRUCTURES
// ============================================================
/**
* Message structure representing a unit of IoT data
* Contains priority, payload, and timing information for SLA tracking
*/
struct Message {
uint8_t priority; // 0=Emergency, 3=Background
uint32_t enqueueTime; // When message entered queue
uint32_t processTime; // When message was processed
char payload[MESSAGE_PAYLOAD_SIZE]; // Message content
uint16_t payloadSize; // Actual payload size
uint32_t sequenceNumber; // Unique message ID
bool processed; // Processing status
};
/**
* Priority Queue structure with SLA tracking
* Each priority level has its own queue with configurable SLA requirements
*/
struct PriorityQueue {
Message messages[MAX_QUEUE_SIZE]; // Message buffer
uint8_t head; // Read position
uint8_t tail; // Write position
uint8_t count; // Current message count
uint8_t maxSize; // Maximum capacity
uint32_t slaLatencyMs; // Maximum allowed latency
uint32_t totalEnqueued; // Total messages received
uint32_t totalProcessed; // Total messages processed
uint32_t totalDropped; // Messages dropped (overflow)
uint32_t slaViolations; // SLA violations count
uint64_t totalLatencyMs; // Cumulative latency for averaging
const char* name; // Queue name for logging
};
/**
* Token Bucket structure for traffic shaping
* Controls the rate at which messages can be processed
*/
struct TokenBucket {
float tokens; // Current token count
float maxTokens; // Maximum token capacity
float refillRate; // Tokens added per second
uint32_t lastRefillTime; // Last refill timestamp
};
/**
* Rate Limiter structure using sliding window
* Tracks request rates and enforces limits
*/
struct RateLimiter {
uint32_t windowStart; // Current window start time
uint32_t messageCount; // Messages in current window
uint32_t windowDurationMs; // Window duration
uint32_t maxMessagesPerWindow; // Maximum allowed messages
uint32_t totalRejected; // Total rejected due to limit
};
/**
* QoS Metrics structure for dashboard reporting
* Aggregates statistics across all queues and components
*/
struct QoSMetrics {
uint32_t totalMessagesReceived; // All messages received
uint32_t totalMessagesProcessed; // Successfully processed
uint32_t totalMessagesDropped; // Dropped due to overflow
uint32_t totalMessagesThrottled; // Rejected by rate limiter
uint32_t totalSLAViolations; // Total SLA breaches
float avgLatencyMs[NUM_PRIORITY_LEVELS]; // Average latency per priority
float throughputMsgsPerSec; // Current throughput
uint32_t lastThroughputCalcTime; // Last throughput calculation
uint32_t messagesInLastSecond; // Messages processed in last second
float tokenBucketLevel; // Current token bucket fill level
float systemLoad; // Estimated system load (0-1)
};
/**
* QoS Policy structure for dynamic adjustment
* Defines thresholds and actions for policy enforcement
*/
struct QoSPolicy {
float highLoadThreshold; // Load level triggering high-load policy
float criticalLoadThreshold; // Load level triggering critical policy
bool emergencyOnlyMode; // Only process emergency messages
bool dropBackgroundTraffic; // Drop background during high load
uint32_t lastPolicyCheck; // Last policy evaluation time
uint32_t policyCheckIntervalMs; // Policy check frequency
};
// ============================================================
// GLOBAL STATE
// ============================================================
// Priority Queues (one per priority level)
PriorityQueue queues[NUM_PRIORITY_LEVELS];
// Traffic Shaping
TokenBucket tokenBucket;
RateLimiter rateLimiter;
// Metrics and Policy
QoSMetrics metrics;
QoSPolicy policy;
// Message Sequencing
uint32_t nextSequenceNumber = 1;
// Button State Tracking
uint32_t lastButtonPress[4] = {0, 0, 0, 0};
bool buttonState[4] = {false, false, false, false};
// LED State for Blinking
uint32_t ledLastToggle[4] = {0, 0, 0, 0};
bool ledState[4] = {false, false, false, false};
// Dashboard Timing
uint32_t lastMetricsDisplay = 0;
// Auto-traffic generation for demonstration
uint32_t lastAutoTraffic = 0;
bool autoTrafficEnabled = true;
uint32_t autoTrafficInterval = 500; // Generate traffic every 500ms
// ============================================================
// QUEUE OPERATIONS
// ============================================================
/**
* Initialize a priority queue with given parameters
*
* @param queue Pointer to the queue structure
* @param name Human-readable queue name
* @param slaLatencyMs Maximum allowed latency for SLA compliance
*/
void initQueue(PriorityQueue* queue, const char* name, uint32_t slaLatencyMs) {
queue->head = 0;
queue->tail = 0;
queue->count = 0;
queue->maxSize = MAX_QUEUE_SIZE;
queue->slaLatencyMs = slaLatencyMs;
queue->totalEnqueued = 0;
queue->totalProcessed = 0;
queue->totalDropped = 0;
queue->slaViolations = 0;
queue->totalLatencyMs = 0;
queue->name = name;
// Clear message buffer
for (int i = 0; i < MAX_QUEUE_SIZE; i++) {
queue->messages[i].processed = true;
queue->messages[i].priority = 255; // Invalid
}
}
/**
* Check if a queue is full
*
* @param queue Pointer to the queue
* @return true if queue is at capacity
*/
bool isQueueFull(PriorityQueue* queue) {
return queue->count >= queue->maxSize;
}
/**
* Check if a queue is empty
*
* @param queue Pointer to the queue
* @return true if queue has no messages
*/
bool isQueueEmpty(PriorityQueue* queue) {
return queue->count == 0;
}
/**
* Enqueue a message into a priority queue
* Handles overflow by dropping the message and tracking the drop
*
* @param queue Pointer to the target queue
* @param priority Message priority level
* @param payload Message content
* @param payloadSize Size of payload in bytes
* @return true if message was enqueued, false if dropped
*/
bool enqueueMessage(PriorityQueue* queue, uint8_t priority,
const char* payload, uint16_t payloadSize) {
// Check for queue overflow
if (isQueueFull(queue)) {
queue->totalDropped++;
metrics.totalMessagesDropped++;
Serial.printf("[DROP] %s queue full, dropping message #%lu\n",
queue->name, nextSequenceNumber);
return false;
}
// Create new message
Message* msg = &queue->messages[queue->tail];
msg->priority = priority;
msg->enqueueTime = millis();
msg->processTime = 0;
msg->sequenceNumber = nextSequenceNumber++;
msg->payloadSize = min(payloadSize, (uint16_t)(MESSAGE_PAYLOAD_SIZE - 1));
strncpy(msg->payload, payload, msg->payloadSize);
msg->payload[msg->payloadSize] = '\0';
msg->processed = false;
// Advance tail pointer (circular buffer)
queue->tail = (queue->tail + 1) % queue->maxSize;
queue->count++;
queue->totalEnqueued++;
metrics.totalMessagesReceived++;
return true;
}
/**
* Dequeue a message from a priority queue
* Updates SLA tracking based on message latency
*
* @param queue Pointer to the source queue
* @param outMessage Pointer to store dequeued message
* @return true if message was dequeued, false if queue empty
*/
bool dequeueMessage(PriorityQueue* queue, Message* outMessage) {
if (isQueueEmpty(queue)) {
return false;
}
// Get message at head
Message* msg = &queue->messages[queue->head];
// Calculate latency
uint32_t latency = millis() - msg->enqueueTime;
msg->processTime = millis();
// Check SLA compliance
if (latency > queue->slaLatencyMs) {
queue->slaViolations++;
metrics.totalSLAViolations++;
Serial.printf("[SLA VIOLATION] %s: Message #%lu latency %lums > %lums SLA\n",
queue->name, msg->sequenceNumber, latency, queue->slaLatencyMs);
}
// Update latency statistics
queue->totalLatencyMs += latency;
queue->totalProcessed++;
metrics.totalMessagesProcessed++;
// Copy message to output
memcpy(outMessage, msg, sizeof(Message));
msg->processed = true;
// Advance head pointer
queue->head = (queue->head + 1) % queue->maxSize;
queue->count--;
return true;
}
/**
* Get the fill percentage of a queue
*
* @param queue Pointer to the queue
* @return Fill percentage (0-100)
*/
float getQueueFillPercent(PriorityQueue* queue) {
return (float)queue->count / queue->maxSize * 100.0f;
}
// ============================================================
// TOKEN BUCKET IMPLEMENTATION
// ============================================================
/**
* Initialize the token bucket for traffic shaping
*
* @param maxTokens Maximum token capacity
* @param refillRate Tokens added per second
*/
void initTokenBucket(float maxTokens, float refillRate) {
tokenBucket.tokens = maxTokens; // Start with full bucket
tokenBucket.maxTokens = maxTokens;
tokenBucket.refillRate = refillRate;
tokenBucket.lastRefillTime = millis();
}
/**
* Refill tokens based on elapsed time
* Called before each consume attempt to add tokens proportional to time elapsed
*/
void refillTokens() {
uint32_t now = millis();
uint32_t elapsed = now - tokenBucket.lastRefillTime;
if (elapsed > 0) {
// Add tokens proportional to elapsed time
float tokensToAdd = (elapsed / 1000.0f) * tokenBucket.refillRate;
tokenBucket.tokens = min(tokenBucket.tokens + tokensToAdd,
tokenBucket.maxTokens);
tokenBucket.lastRefillTime = now;
}
}
/**
* Attempt to consume tokens for a message
*
* @param tokensRequired Number of tokens needed
* @return true if tokens were consumed, false if insufficient tokens
*/
bool consumeTokens(float tokensRequired) {
refillTokens();
if (tokenBucket.tokens >= tokensRequired) {
tokenBucket.tokens -= tokensRequired;
return true;
}
return false;
}
/**
* Get token cost for a message based on its priority
* Higher priority messages cost fewer tokens (get more bandwidth)
*
* @param priority Message priority level
* @return Token cost for the message
*/
float getTokenCost(uint8_t priority) {
switch (priority) {
case PRIORITY_EMERGENCY: return TOKEN_COST_EMERGENCY;
case PRIORITY_CRITICAL: return TOKEN_COST_CRITICAL;
case PRIORITY_NORMAL: return TOKEN_COST_NORMAL;
case PRIORITY_BACKGROUND: return TOKEN_COST_BACKGROUND;
default: return TOKEN_COST_BACKGROUND;
}
}
/**
* Get current token bucket fill percentage
*
* @return Fill percentage (0-100)
*/
float getTokenBucketPercent() {
refillTokens();
return (tokenBucket.tokens / tokenBucket.maxTokens) * 100.0f;
}
// ============================================================
// RATE LIMITER IMPLEMENTATION
// ============================================================
/**
* Initialize the rate limiter with sliding window parameters
*
* @param windowDurationMs Duration of the sliding window
* @param maxMessagesPerWindow Maximum messages allowed per window
*/
void initRateLimiter(uint32_t windowDurationMs, uint32_t maxMessagesPerWindow) {
rateLimiter.windowStart = millis();
rateLimiter.messageCount = 0;
rateLimiter.windowDurationMs = windowDurationMs;
rateLimiter.maxMessagesPerWindow = maxMessagesPerWindow;
rateLimiter.totalRejected = 0;
}
/**
* Check if a message should be rate limited
* Updates window and counts, returns whether message is allowed
*
* @return true if message is allowed, false if rate limited
*/
bool checkRateLimit() {
uint32_t now = millis();
// Check if window has expired
if (now - rateLimiter.windowStart >= rateLimiter.windowDurationMs) {
// Reset window
rateLimiter.windowStart = now;
rateLimiter.messageCount = 0;
}
// Check if within limit
if (rateLimiter.messageCount < rateLimiter.maxMessagesPerWindow) {
rateLimiter.messageCount++;
return true;
}
// Rate limit exceeded
rateLimiter.totalRejected++;
metrics.totalMessagesThrottled++;
return false;
}
/**
* Get current rate (messages per second)
*
* @return Current message rate
*/
float getCurrentRate() {
uint32_t elapsed = millis() - rateLimiter.windowStart;
if (elapsed == 0) return 0;
return (float)rateLimiter.messageCount / (elapsed / 1000.0f);
}
// ============================================================
// QoS POLICY ENGINE
// ============================================================
/**
* Initialize QoS policy with default thresholds
*/
void initPolicy() {
policy.highLoadThreshold = 0.7f; // 70% load triggers high-load mode
policy.criticalLoadThreshold = 0.9f; // 90% load triggers critical mode
policy.emergencyOnlyMode = false;
policy.dropBackgroundTraffic = false;
policy.lastPolicyCheck = millis();
policy.policyCheckIntervalMs = 500; // Check every 500ms
}
/**
* Calculate current system load based on queue fill levels
* Weighted average favoring high-priority queues
*
* @return Load factor (0.0 to 1.0)
*/
float calculateSystemLoad() {
float weightedLoad = 0;
float totalWeight = 0;
// Weight high-priority queues more heavily
float weights[] = {4.0f, 3.0f, 2.0f, 1.0f};
for (int i = 0; i < NUM_PRIORITY_LEVELS; i++) {
weightedLoad += getQueueFillPercent(&queues[i]) / 100.0f * weights[i];
totalWeight += weights[i];
}
return weightedLoad / totalWeight;
}
/**
* Evaluate and update QoS policy based on current system state
* Adjusts policy flags for emergency-only mode and background dropping
*/
void evaluatePolicy() {
uint32_t now = millis();
// Only evaluate periodically
if (now - policy.lastPolicyCheck < policy.policyCheckIntervalMs) {
return;
}
policy.lastPolicyCheck = now;
// Calculate current load
metrics.systemLoad = calculateSystemLoad();
// Update policy based on load
bool wasEmergencyOnly = policy.emergencyOnlyMode;
bool wasDropping = policy.dropBackgroundTraffic;
if (metrics.systemLoad >= policy.criticalLoadThreshold) {
// Critical load: emergency only
policy.emergencyOnlyMode = true;
policy.dropBackgroundTraffic = true;
} else if (metrics.systemLoad >= policy.highLoadThreshold) {
// High load: drop background
policy.emergencyOnlyMode = false;
policy.dropBackgroundTraffic = true;
} else {
// Normal load
policy.emergencyOnlyMode = false;
policy.dropBackgroundTraffic = false;
}
// Log policy changes
if (policy.emergencyOnlyMode != wasEmergencyOnly) {
Serial.printf("[POLICY] Emergency-only mode: %s\n",
policy.emergencyOnlyMode ? "ENABLED" : "DISABLED");
}
if (policy.dropBackgroundTraffic != wasDropping) {
Serial.printf("[POLICY] Background traffic dropping: %s\n",
policy.dropBackgroundTraffic ? "ENABLED" : "DISABLED");
}
}
/**
* Check if a message should be accepted based on current policy
*
* @param priority Message priority level
* @return true if message should be accepted, false if policy rejects it
*/
bool policyAllowsMessage(uint8_t priority) {
// Emergency always allowed
if (priority == PRIORITY_EMERGENCY) {
return true;
}
// Emergency-only mode blocks all non-emergency
if (policy.emergencyOnlyMode) {
return false;
}
// High load mode drops background
if (policy.dropBackgroundTraffic && priority == PRIORITY_BACKGROUND) {
return false;
}
return true;
}
// ============================================================
// PRIORITY SCHEDULER
// ============================================================
/**
* Process messages from queues in priority order
* Implements strict priority scheduling with token bucket rate control
*
* @return Number of messages processed in this call
*/
uint8_t processQueues() {
uint8_t processed = 0;
Message msg;
// Process in strict priority order
for (int priority = 0; priority < NUM_PRIORITY_LEVELS; priority++) {
PriorityQueue* queue = &queues[priority];
// Process all messages in this priority level before moving to next
while (!isQueueEmpty(queue)) {
// Check rate limit
if (!checkRateLimit()) {
Serial.println("[RATE LIMIT] Rate limit exceeded, pausing processing");
return processed;
}
// Check token bucket
float cost = getTokenCost(priority);
if (!consumeTokens(cost)) {
// Not enough tokens - try lower priority or wait
break;
}
// Dequeue and process
if (dequeueMessage(queue, &msg)) {
// Simulate processing (in real system, would send to cloud/actuator)
processed++;
// Update throughput tracking
metrics.messagesInLastSecond++;
}
}
}
return processed;
}
// ============================================================
// TRAFFIC GENERATION
// ============================================================
/**
* Generate a message with specified priority
*
* @param priority Message priority level
*/
void generateTraffic(uint8_t priority) {
char payload[MESSAGE_PAYLOAD_SIZE];
const char* priorityName;
switch (priority) {
case PRIORITY_EMERGENCY:
priorityName = "EMERGENCY";
snprintf(payload, MESSAGE_PAYLOAD_SIZE,
"ALERT: Critical sensor threshold exceeded at %lu", millis());
break;
case PRIORITY_CRITICAL:
priorityName = "CRITICAL";
snprintf(payload, MESSAGE_PAYLOAD_SIZE,
"WARNING: Actuator response required at %lu", millis());
break;
case PRIORITY_NORMAL:
priorityName = "NORMAL";
snprintf(payload, MESSAGE_PAYLOAD_SIZE,
"DATA: Sensor reading value=%.2f at %lu",
random(0, 10000) / 100.0f, millis());
break;
case PRIORITY_BACKGROUND:
priorityName = "BACKGROUND";
snprintf(payload, MESSAGE_PAYLOAD_SIZE,
"LOG: System diagnostic entry at %lu", millis());
break;
default:
return;
}
// Check policy
if (!policyAllowsMessage(priority)) {
Serial.printf("[POLICY BLOCK] %s message rejected by policy\n", priorityName);
return;
}
// Enqueue message
PriorityQueue* queue = &queues[priority];
if (enqueueMessage(queue, priority, payload, strlen(payload))) {
Serial.printf("[ENQUEUE] %s: %s (queue depth: %d/%d)\n",
priorityName, payload, queue->count, queue->maxSize);
}
}
/**
* Generate automatic traffic for demonstration
* Creates a realistic mix of different priority messages
*/
void generateAutoTraffic() {
uint32_t now = millis();
if (!autoTrafficEnabled) return;
if (now - lastAutoTraffic >= autoTrafficInterval) {
lastAutoTraffic = now;
// Probability-based traffic generation
int r = random(100);
if (r < 2) {
// 2% emergency
generateTraffic(PRIORITY_EMERGENCY);
} else if (r < 12) {
// 10% critical
generateTraffic(PRIORITY_CRITICAL);
} else if (r < 52) {
// 40% normal
generateTraffic(PRIORITY_NORMAL);
} else {
// 48% background
generateTraffic(PRIORITY_BACKGROUND);
}
}
}
// ============================================================
// LED STATUS DISPLAY
// ============================================================
/**
* Update LED states to reflect queue status
* - Solid: Queue has messages
* - Blinking fast: SLA violations detected
* - Off: Queue empty
*/
void updateLEDs() {
uint32_t now = millis();
for (int i = 0; i < NUM_PRIORITY_LEVELS; i++) {
PriorityQueue* queue = &queues[i];
uint8_t ledPin;
switch (i) {
case PRIORITY_EMERGENCY: ledPin = LED_EMERGENCY; break;
case PRIORITY_CRITICAL: ledPin = LED_CRITICAL; break;
case PRIORITY_NORMAL: ledPin = LED_NORMAL; break;
case PRIORITY_BACKGROUND: ledPin = LED_BACKGROUND; break;
default: continue;
}
// Determine LED behavior based on queue state
if (queue->slaViolations > 0 &&
(now - ledLastToggle[i] >= LED_BLINK_FAST_MS)) {
// SLA violations: fast blink
ledState[i] = !ledState[i];
ledLastToggle[i] = now;
digitalWrite(ledPin, ledState[i] ? HIGH : LOW);
} else if (queue->count > 0) {
// Queue has messages: solid on
digitalWrite(ledPin, HIGH);
ledState[i] = true;
} else {
// Queue empty: off
digitalWrite(ledPin, LOW);
ledState[i] = false;
}
}
}
// ============================================================
// METRICS DASHBOARD
// ============================================================
/**
* Calculate and update throughput metrics
*/
void updateThroughputMetrics() {
uint32_t now = millis();
uint32_t elapsed = now - metrics.lastThroughputCalcTime;
if (elapsed >= 1000) {
metrics.throughputMsgsPerSec = (float)metrics.messagesInLastSecond /
(elapsed / 1000.0f);
metrics.messagesInLastSecond = 0;
metrics.lastThroughputCalcTime = now;
}
}
/**
* Calculate average latency for a queue
*
* @param queue Pointer to the queue
* @return Average latency in milliseconds
*/
float calculateAvgLatency(PriorityQueue* queue) {
if (queue->totalProcessed == 0) return 0;
return (float)queue->totalLatencyMs / queue->totalProcessed;
}
/**
* Display comprehensive QoS metrics dashboard
*/
void displayMetrics() {
uint32_t now = millis();
if (now - lastMetricsDisplay < METRICS_INTERVAL_MS) {
return;
}
lastMetricsDisplay = now;
// Update derived metrics
updateThroughputMetrics();
metrics.tokenBucketLevel = getTokenBucketPercent();
// Clear screen and display header
Serial.println("\n========================================");
Serial.println(" QoS MANAGEMENT DASHBOARD");
Serial.println("========================================");
Serial.printf("Uptime: %lu seconds\n", millis() / 1000);
Serial.println();
// System Overview
Serial.println("--- SYSTEM OVERVIEW ---");
Serial.printf("System Load: %.1f%%\n", metrics.systemLoad * 100);
Serial.printf("Throughput: %.1f msg/sec\n", metrics.throughputMsgsPerSec);
Serial.printf("Token Bucket: %.1f%%\n", metrics.tokenBucketLevel);
Serial.printf("Current Rate: %.1f msg/sec\n", getCurrentRate());
Serial.println();
// Policy Status
Serial.println("--- POLICY STATUS ---");
Serial.printf("Emergency Only: %s\n", policy.emergencyOnlyMode ? "YES" : "no");
Serial.printf("Drop Background: %s\n", policy.dropBackgroundTraffic ? "YES" : "no");
Serial.println();
// Queue Statistics
Serial.println("--- QUEUE STATISTICS ---");
Serial.println("Priority | Depth | Enqueued | Processed | Dropped | SLA Viol | Avg Lat");
Serial.println("------------|-------|----------|-----------|---------|----------|--------");
const char* names[] = {"EMERGENCY", "CRITICAL", "NORMAL", "BACKGROUND"};
for (int i = 0; i < NUM_PRIORITY_LEVELS; i++) {
PriorityQueue* q = &queues[i];
Serial.printf("%-11s | %2d/%2d | %8lu | %9lu | %7lu | %8lu | %6.1fms\n",
names[i],
q->count, q->maxSize,
q->totalEnqueued,
q->totalProcessed,
q->totalDropped,
q->slaViolations,
calculateAvgLatency(q));
}
Serial.println();
// Aggregate Metrics
Serial.println("--- AGGREGATE METRICS ---");
Serial.printf("Total Received: %lu\n", metrics.totalMessagesReceived);
Serial.printf("Total Processed: %lu\n", metrics.totalMessagesProcessed);
Serial.printf("Total Dropped: %lu\n", metrics.totalMessagesDropped);
Serial.printf("Total Throttled: %lu\n", metrics.totalMessagesThrottled);
Serial.printf("Total SLA Viols: %lu\n", metrics.totalSLAViolations);
Serial.println();
// SLA Compliance
Serial.println("--- SLA COMPLIANCE ---");
for (int i = 0; i < NUM_PRIORITY_LEVELS; i++) {
PriorityQueue* q = &queues[i];
float compliance = 100.0f;
if (q->totalProcessed > 0) {
compliance = 100.0f * (1.0f - (float)q->slaViolations / q->totalProcessed);
}
Serial.printf("%s: %.2f%% (target: %lums)\n",
names[i], compliance, q->slaLatencyMs);
}
Serial.println();
// Instructions
Serial.println("--- CONTROLS ---");
Serial.println("BTN1=Emergency, BTN2=Critical, BTN3=Normal, BTN4=Background");
Serial.println("POT=Token Rate (turn to adjust traffic shaping)");
Serial.println("========================================\n");
}
// ============================================================
// BUTTON HANDLING
// ============================================================
/**
* Read buttons and generate traffic based on presses
*/
void handleButtons() {
uint32_t now = millis();
struct ButtonConfig {
uint8_t pin;
uint8_t priority;
};
ButtonConfig buttons[] = {
{BTN_EMERGENCY, PRIORITY_EMERGENCY},
{BTN_CRITICAL, PRIORITY_CRITICAL},
{BTN_NORMAL, PRIORITY_NORMAL},
{BTN_BACKGROUND, PRIORITY_BACKGROUND}
};
for (int i = 0; i < 4; i++) {
bool currentState = (digitalRead(buttons[i].pin) == LOW);
// Debounced press detection
if (currentState && !buttonState[i] &&
(now - lastButtonPress[i] >= DEBOUNCE_MS)) {
// Button pressed
generateTraffic(buttons[i].priority);
lastButtonPress[i] = now;
}
buttonState[i] = currentState;
}
}
/**
* Read potentiometer and adjust token bucket rate
*/
void handlePotentiometer() {
int potValue = analogRead(POT_RATE);
// Map potentiometer to token refill rate (10 to 200 tokens/sec)
float newRate = map(potValue, 0, 4095, 10, 200);
// Only update if significantly changed
if (abs(newRate - tokenBucket.refillRate) > 5) {
tokenBucket.refillRate = newRate;
Serial.printf("[CONFIG] Token refill rate adjusted to %.1f tokens/sec\n", newRate);
}
}
// ============================================================
// INITIALIZATION
// ============================================================
/**
* Initialize all QoS system components
*/
void initQoSSystem() {
// Initialize priority queues
initQueue(&queues[PRIORITY_EMERGENCY], "EMERGENCY", SLA_EMERGENCY_MS);
initQueue(&queues[PRIORITY_CRITICAL], "CRITICAL", SLA_CRITICAL_MS);
initQueue(&queues[PRIORITY_NORMAL], "NORMAL", SLA_NORMAL_MS);
initQueue(&queues[PRIORITY_BACKGROUND], "BACKGROUND", SLA_BACKGROUND_MS);
// Initialize token bucket (50 tokens, 50 tokens/sec refill)
initTokenBucket(TOKEN_BUCKET_MAX, 50.0f);
// Initialize rate limiter (50 messages per second)
initRateLimiter(RATE_LIMIT_WINDOW_MS, RATE_LIMIT_MAX_MSGS);
// Initialize policy engine
initPolicy();
// Initialize metrics
memset(&metrics, 0, sizeof(metrics));
metrics.lastThroughputCalcTime = millis();
Serial.println("[INIT] QoS Management System initialized");
Serial.println("[INIT] Priority Levels: EMERGENCY < CRITICAL < NORMAL < BACKGROUND");
Serial.printf("[INIT] SLA Targets: %dms, %dms, %dms, %dms\n",
SLA_EMERGENCY_MS, SLA_CRITICAL_MS, SLA_NORMAL_MS, SLA_BACKGROUND_MS);
Serial.printf("[INIT] Token Bucket: %d tokens max, %.1f tokens/sec refill\n",
TOKEN_BUCKET_MAX, tokenBucket.refillRate);
Serial.printf("[INIT] Rate Limit: %d messages per %dms window\n",
RATE_LIMIT_MAX_MSGS, RATE_LIMIT_WINDOW_MS);
}
// ============================================================
// ARDUINO SETUP AND LOOP
// ============================================================
void setup() {
// Initialize serial communication
Serial.begin(115200);
delay(1000);
Serial.println("\n\n");
Serial.println("================================================");
Serial.println(" QoS MANAGEMENT LAB - ESP32 IMPLEMENTATION");
Serial.println("================================================");
Serial.println();
// Configure GPIO pins
pinMode(BTN_EMERGENCY, INPUT_PULLUP);
pinMode(BTN_CRITICAL, INPUT_PULLUP);
pinMode(BTN_NORMAL, INPUT_PULLUP);
pinMode(BTN_BACKGROUND, INPUT_PULLUP);
pinMode(LED_EMERGENCY, OUTPUT);
pinMode(LED_CRITICAL, OUTPUT);
pinMode(LED_NORMAL, OUTPUT);
pinMode(LED_BACKGROUND, OUTPUT);
// Initialize all LEDs off
digitalWrite(LED_EMERGENCY, LOW);
digitalWrite(LED_CRITICAL, LOW);
digitalWrite(LED_NORMAL, LOW);
digitalWrite(LED_BACKGROUND, LOW);
// Initialize QoS system
initQoSSystem();
// Startup LED test
Serial.println("[INIT] LED test sequence...");
for (int i = 0; i < 4; i++) {
uint8_t pins[] = {LED_EMERGENCY, LED_CRITICAL, LED_NORMAL, LED_BACKGROUND};
digitalWrite(pins[i], HIGH);
delay(200);
digitalWrite(pins[i], LOW);
}
Serial.println("[INIT] System ready!");
Serial.println("[INIT] Press buttons to generate traffic or wait for auto-generation");
Serial.println();
}
void loop() {
// Handle user input
handleButtons();
handlePotentiometer();
// Generate automatic traffic for demonstration
generateAutoTraffic();
// Evaluate and apply QoS policy
evaluatePolicy();
// Process messages from priority queues
processQueues();
// Update LED status indicators
updateLEDs();
// Display metrics dashboard
displayMetrics();
// Small delay to prevent busy-waiting
delay(10);
}207.6 Expected Outcomes
After running the lab, you should observe:
- Priority Processing: Emergency messages (Button 1) are always processed first, even when other queues are full
- Traffic Shaping: Adjusting the potentiometer changes how quickly messages are processed
- Rate Limiting: Rapid button pressing eventually triggers rate limiting
- SLA Violations: If queue processing is too slow, SLA violations are logged
- Policy Enforcement: Under high load, background traffic is automatically dropped
- LED Indicators: LEDs reflect queue states (solid = messages waiting, blinking = SLA violations)
207.7 Challenge Exercises
Modify the processQueues() function to implement weighted fair queuing instead of strict priority. Each priority level should get a proportional share of bandwidth: - Emergency: 40% of bandwidth - Critical: 30% of bandwidth - Normal: 20% of bandwidth - Background: 10% of bandwidth
Hint: Track credits per queue and serve queues in round-robin fashion based on their credits.
Implement a priority aging mechanism where messages waiting too long in lower-priority queues get their effective priority increased. This prevents starvation while still respecting priorities.
Implementation ideas: - Track time in queue for each message - Calculate effective priority as base priority minus (wait_time / AGING_FACTOR) - Messages should never exceed emergency priority through aging
Replace the token bucket with a leaky bucket implementation: - Messages enter a queue (bucket) - Messages leave at a constant rate (leak) - If bucket overflows, messages are dropped
Compare the behavior differences between token bucket (allows bursts) and leaky bucket (constant rate).
Simulate variable network conditions by: - Adding random processing delays - Randomly dropping messages (packet loss) - Varying the token refill rate automatically
Observe how the QoS system adapts to changing conditions.
Create a two-level hierarchy: 1. Tier 1: Emergency and Critical share one queue group 2. Tier 2: Normal and Background share another queue group
Apply weighted fair queuing between tiers (70% Tier 1, 30% Tier 2), then strict priority within each tier.
207.8 Summary
In this lab, you implemented a complete QoS management system on ESP32 including:
- Priority Queues: Four-level priority system with SLA tracking
- Token Bucket: Traffic shaping with configurable refill rate
- Rate Limiter: Sliding window protection against overload
- Policy Engine: Dynamic load-based policy adjustment
- Metrics Dashboard: Real-time visibility into QoS performance
207.9 Whatβs Next
- QoS in Real-World IoT: Apply these concepts to industrial IoT, smart buildings, and learn about protocol-level QoS
- SDN Fundamentals: Learn how Software-Defined Networking enables programmable QoS policies