207  QoS Management Lab: ESP32 Implementation

207.1 Learning Objectives

By the end of this lab, you will be able to:

  • Build Priority Queues: Implement multi-level priority queues with configurable scheduling on ESP32
  • Implement Token Bucket: Create traffic shaping with the token bucket algorithm
  • Configure Rate Limiting: Build sliding window rate limiters to protect system resources
  • Monitor SLA Compliance: Track latency metrics and detect SLA violations in real-time
  • Build Policy Engines: Create dynamic QoS policy enforcement based on system state

207.2 What You Will Learn

In this hands-on lab, you will build and experiment with a comprehensive QoS management system on ESP32. The simulation demonstrates:

  1. Priority Queuing: Multi-level priority queues with configurable scheduling
  2. Traffic Shaping: Token bucket implementation for rate control
  3. Rate Limiting: Request throttling to protect system resources
  4. Service Level Monitoring: Real-time SLA tracking and violation detection
  5. QoS Policy Engine: Dynamic policy enforcement based on system state
  6. Metrics Dashboard: Live visualization of QoS performance

207.3 Lab Components

Component Purpose Simulation Role
ESP32 DevKit Main controller Runs QoS engine
Push Buttons (4) Traffic generators Simulate different priority messages
LEDs (4) Status indicators Show queue states and violations
Potentiometer Rate control Adjust traffic shaping rate
Serial Monitor Dashboard Real-time QoS metrics display

207.4 Wokwi Simulator Environment

NoteAbout Wokwi

Wokwi is a free online simulator for Arduino, ESP32, and other microcontrollers. It allows you to build and test IoT projects entirely in your browser without purchasing hardware. This lab demonstrates QoS concepts using standard components.

Launch the simulator below and copy the provided code to explore QoS management interactively.

TipSimulator Tips
  • Click the + button to add components (search for β€œPush Button”, β€œLED”, β€œPotentiometer”)
  • Use the Serial Monitor to see QoS metrics (115200 baud)
  • Press buttons to generate messages of different priorities
  • Adjust the potentiometer to change the token bucket rate
  • Watch LEDs indicate queue states (green = healthy, red = overloaded)
  • The code demonstrates production-quality QoS patterns

207.5 Step-by-Step Instructions

207.5.1 Step 1: Set Up the Circuit

  1. Add 4 Push Buttons: Click + and search for β€œPush Button” - add 4 buttons
  2. Add 4 LEDs: Click + and search for β€œLED” - add Red, Yellow, Green, and Blue LEDs
  3. Add 1 Potentiometer: Click + and search for β€œPotentiometer”
  4. Wire the components to ESP32:
    • Button 1 (Emergency Traffic) -> GPIO 12
    • Button 2 (Critical Traffic) -> GPIO 13
    • Button 3 (Normal Traffic) -> GPIO 14
    • Button 4 (Background Traffic) -> GPIO 15
    • Red LED (Emergency Queue) -> GPIO 25
    • Yellow LED (Critical Queue) -> GPIO 26
    • Green LED (Normal Queue) -> GPIO 27
    • Blue LED (Background Queue) -> GPIO 32
    • Potentiometer signal -> GPIO 34 (ADC)
    • All button other pins -> GND
    • All LED cathodes -> GND (with 220 ohm resistors)

%% fig-alt: Circuit diagram showing ESP32 connected to four push buttons for generating different priority traffic, four LEDs for indicating queue states, and a potentiometer for controlling the token bucket rate
%%{init: {'theme': 'base', 'themeVariables': {'primaryColor': '#2C3E50', 'primaryTextColor': '#fff', 'primaryBorderColor': '#16A085', 'lineColor': '#16A085', 'secondaryColor': '#E67E22', 'tertiaryColor': '#7F8C8D'}}}%%
flowchart LR
    subgraph ESP32["ESP32 DevKit"]
        G12["GPIO 12<br/>(Emergency)"]
        G13["GPIO 13<br/>(Critical)"]
        G14["GPIO 14<br/>(Normal)"]
        G15["GPIO 15<br/>(Background)"]
        G25["GPIO 25<br/>(Red LED)"]
        G26["GPIO 26<br/>(Yellow LED)"]
        G27["GPIO 27<br/>(Green LED)"]
        G32["GPIO 32<br/>(Blue LED)"]
        G34["GPIO 34<br/>(ADC)"]
        GND["GND"]
    end

    subgraph Traffic["Traffic Generators"]
        B1["Button 1<br/>Emergency"]
        B2["Button 2<br/>Critical"]
        B3["Button 3<br/>Normal"]
        B4["Button 4<br/>Background"]
    end

    subgraph Status["Queue Status LEDs"]
        LEDR["Red LED<br/>Emergency Q"]
        LEDY["Yellow LED<br/>Critical Q"]
        LEDG["Green LED<br/>Normal Q"]
        LEDB["Blue LED<br/>Background Q"]
    end

    subgraph Control["Rate Control"]
        POT["Potentiometer<br/>Token Rate"]
    end

    G12 -.->|"Input"| B1
    G13 -.->|"Input"| B2
    G14 -.->|"Input"| B3
    G15 -.->|"Input"| B4
    G25 -.->|"Output"| LEDR
    G26 -.->|"Output"| LEDY
    G27 -.->|"Output"| LEDG
    G32 -.->|"Output"| LEDB
    G34 -.->|"Analog"| POT

    style ESP32 fill:#2C3E50,stroke:#16A085,stroke-width:2px,color:#fff
    style Traffic fill:#c0392b,stroke:#2C3E50,stroke-width:2px,color:#fff
    style Status fill:#E67E22,stroke:#2C3E50,stroke-width:2px,color:#fff
    style Control fill:#16A085,stroke:#2C3E50,stroke-width:2px,color:#fff

207.5.2 Step 2: Understanding the QoS Architecture

This lab implements a complete QoS management system with the following components:

%% fig-alt: QoS system architecture diagram showing message flow from traffic generators through classifiers into priority queues, then through a token bucket traffic shaper, rate limiter, and finally the QoS policy engine that monitors SLAs and adjusts policies dynamically
%%{init: {'theme': 'base', 'themeVariables': {'primaryColor': '#2C3E50', 'primaryTextColor': '#fff', 'primaryBorderColor': '#16A085', 'lineColor': '#16A085', 'secondaryColor': '#E67E22', 'tertiaryColor': '#7F8C8D'}}}%%
flowchart TB
    subgraph Input["Traffic Input"]
        TG["Traffic<br/>Generators<br/>(Buttons)"]
        CL["Classifier<br/>(Priority Assignment)"]
    end

    subgraph Queues["Priority Queue System"]
        Q1["Queue 1: Emergency<br/>Max Latency: 50ms"]
        Q2["Queue 2: Critical<br/>Max Latency: 200ms"]
        Q3["Queue 3: Normal<br/>Max Latency: 1000ms"]
        Q4["Queue 4: Background<br/>Best Effort"]
    end

    subgraph Shaping["Traffic Shaping"]
        TB["Token Bucket<br/>(Rate Control)"]
        RL["Rate Limiter<br/>(Overflow Protection)"]
    end

    subgraph Policy["QoS Policy Engine"]
        SLA["SLA Monitor<br/>(Violation Detection)"]
        PE["Policy Enforcer<br/>(Dynamic Adjustment)"]
    end

    subgraph Output["Service Output"]
        OUT["Message<br/>Processing"]
        MET["Metrics<br/>Dashboard"]
    end

    TG --> CL
    CL --> Q1
    CL --> Q2
    CL --> Q3
    CL --> Q4

    Q1 -->|"Strict Priority"| TB
    Q2 -->|"Strict Priority"| TB
    Q3 -->|"Strict Priority"| TB
    Q4 -->|"Strict Priority"| TB

    TB --> RL
    RL --> OUT
    OUT --> SLA
    SLA --> PE
    PE -.->|"Adjust"| TB
    SLA --> MET

    style Input fill:#2C3E50,stroke:#16A085,stroke-width:2px,color:#fff
    style Queues fill:#E67E22,stroke:#2C3E50,stroke-width:2px,color:#fff
    style Shaping fill:#16A085,stroke:#2C3E50,stroke-width:2px,color:#fff
    style Policy fill:#7F8C8D,stroke:#2C3E50,stroke-width:2px,color:#fff
    style Output fill:#2C3E50,stroke:#16A085,stroke-width:2px,color:#fff

207.5.3 Step 3: Copy the QoS Lab Code

Copy the following code into the Wokwi code editor. This comprehensive implementation demonstrates professional QoS patterns used in production IoT systems.

/*
 * QoS Management Lab - ESP32 Implementation
 *
 * Comprehensive demonstration of Quality of Service concepts for IoT:
 * - Priority Queuing with 4 priority levels
 * - Token Bucket Traffic Shaping
 * - Rate Limiting with sliding window
 * - Service Level Agreement (SLA) Monitoring
 * - Dynamic QoS Policy Enforcement
 * - Real-time Metrics Dashboard
 *
 * Hardware Configuration:
 * - Button 1 (GPIO 12): Generate EMERGENCY priority traffic
 * - Button 2 (GPIO 13): Generate CRITICAL priority traffic
 * - Button 3 (GPIO 14): Generate NORMAL priority traffic
 * - Button 4 (GPIO 15): Generate BACKGROUND priority traffic
 * - Red LED (GPIO 25): Emergency queue status
 * - Yellow LED (GPIO 26): Critical queue status
 * - Green LED (GPIO 27): Normal queue status
 * - Blue LED (GPIO 32): Background queue status
 * - Potentiometer (GPIO 34): Token bucket rate control
 *
 * Serial Monitor: 115200 baud for QoS metrics dashboard
 *
 * Key QoS Concepts Demonstrated:
 * 1. Priority-based message scheduling
 * 2. Traffic shaping with token bucket algorithm
 * 3. Rate limiting to prevent overload
 * 4. SLA violation detection and alerting
 * 5. Dynamic policy adjustment based on load
 * 6. Queue depth monitoring and overflow handling
 */

#include <Arduino.h>

// ============================================================
// CONFIGURATION CONSTANTS
// ============================================================

// Pin Definitions
#define BTN_EMERGENCY    12    // Emergency traffic generator
#define BTN_CRITICAL     13    // Critical traffic generator
#define BTN_NORMAL       14    // Normal traffic generator
#define BTN_BACKGROUND   15    // Background traffic generator

#define LED_EMERGENCY    25    // Emergency queue status LED
#define LED_CRITICAL     26    // Critical queue status LED
#define LED_NORMAL       27    // Normal queue status LED
#define LED_BACKGROUND   32    // Background queue status LED

#define POT_RATE         34    // Token rate control potentiometer

// Priority Levels (lower number = higher priority)
#define PRIORITY_EMERGENCY   0
#define PRIORITY_CRITICAL    1
#define PRIORITY_NORMAL      2
#define PRIORITY_BACKGROUND  3
#define NUM_PRIORITY_LEVELS  4

// Queue Configuration
#define MAX_QUEUE_SIZE       20   // Maximum messages per queue
#define MESSAGE_PAYLOAD_SIZE 64   // Bytes per message

// SLA Latency Requirements (milliseconds)
#define SLA_EMERGENCY_MS     50   // Emergency: max 50ms latency
#define SLA_CRITICAL_MS      200  // Critical: max 200ms latency
#define SLA_NORMAL_MS        1000 // Normal: max 1 second latency
#define SLA_BACKGROUND_MS    5000 // Background: max 5 seconds (best effort)

// Token Bucket Configuration
#define TOKEN_BUCKET_MAX     100  // Maximum tokens in bucket
#define TOKEN_COST_EMERGENCY 1    // Tokens consumed per emergency message
#define TOKEN_COST_CRITICAL  2    // Tokens consumed per critical message
#define TOKEN_COST_NORMAL    5    // Tokens consumed per normal message
#define TOKEN_COST_BACKGROUND 10  // Tokens consumed per background message

// Rate Limiting Configuration
#define RATE_LIMIT_WINDOW_MS 1000 // Sliding window duration
#define RATE_LIMIT_MAX_MSGS  50   // Maximum messages per window

// Timing
#define DEBOUNCE_MS          50   // Button debounce time
#define METRICS_INTERVAL_MS  2000 // Dashboard update interval
#define LED_BLINK_FAST_MS    100  // Fast blink for violations
#define LED_BLINK_SLOW_MS    500  // Slow blink for activity

// ============================================================
// DATA STRUCTURES
// ============================================================

/**
 * Message structure representing a unit of IoT data
 * Contains priority, payload, and timing information for SLA tracking
 */
struct Message {
    uint8_t priority;                      // 0=Emergency, 3=Background
    uint32_t enqueueTime;                  // When message entered queue
    uint32_t processTime;                  // When message was processed
    char payload[MESSAGE_PAYLOAD_SIZE];    // Message content
    uint16_t payloadSize;                  // Actual payload size
    uint32_t sequenceNumber;               // Unique message ID
    bool processed;                        // Processing status
};

/**
 * Priority Queue structure with SLA tracking
 * Each priority level has its own queue with configurable SLA requirements
 */
struct PriorityQueue {
    Message messages[MAX_QUEUE_SIZE];      // Message buffer
    uint8_t head;                          // Read position
    uint8_t tail;                          // Write position
    uint8_t count;                         // Current message count
    uint8_t maxSize;                       // Maximum capacity
    uint32_t slaLatencyMs;                 // Maximum allowed latency
    uint32_t totalEnqueued;                // Total messages received
    uint32_t totalProcessed;               // Total messages processed
    uint32_t totalDropped;                 // Messages dropped (overflow)
    uint32_t slaViolations;                // SLA violations count
    uint64_t totalLatencyMs;               // Cumulative latency for averaging
    const char* name;                      // Queue name for logging
};

/**
 * Token Bucket structure for traffic shaping
 * Controls the rate at which messages can be processed
 */
struct TokenBucket {
    float tokens;                          // Current token count
    float maxTokens;                       // Maximum token capacity
    float refillRate;                      // Tokens added per second
    uint32_t lastRefillTime;               // Last refill timestamp
};

/**
 * Rate Limiter structure using sliding window
 * Tracks request rates and enforces limits
 */
struct RateLimiter {
    uint32_t windowStart;                  // Current window start time
    uint32_t messageCount;                 // Messages in current window
    uint32_t windowDurationMs;             // Window duration
    uint32_t maxMessagesPerWindow;         // Maximum allowed messages
    uint32_t totalRejected;                // Total rejected due to limit
};

/**
 * QoS Metrics structure for dashboard reporting
 * Aggregates statistics across all queues and components
 */
struct QoSMetrics {
    uint32_t totalMessagesReceived;        // All messages received
    uint32_t totalMessagesProcessed;       // Successfully processed
    uint32_t totalMessagesDropped;         // Dropped due to overflow
    uint32_t totalMessagesThrottled;       // Rejected by rate limiter
    uint32_t totalSLAViolations;           // Total SLA breaches
    float avgLatencyMs[NUM_PRIORITY_LEVELS]; // Average latency per priority
    float throughputMsgsPerSec;            // Current throughput
    uint32_t lastThroughputCalcTime;       // Last throughput calculation
    uint32_t messagesInLastSecond;         // Messages processed in last second
    float tokenBucketLevel;                // Current token bucket fill level
    float systemLoad;                      // Estimated system load (0-1)
};

/**
 * QoS Policy structure for dynamic adjustment
 * Defines thresholds and actions for policy enforcement
 */
struct QoSPolicy {
    float highLoadThreshold;               // Load level triggering high-load policy
    float criticalLoadThreshold;           // Load level triggering critical policy
    bool emergencyOnlyMode;                // Only process emergency messages
    bool dropBackgroundTraffic;            // Drop background during high load
    uint32_t lastPolicyCheck;              // Last policy evaluation time
    uint32_t policyCheckIntervalMs;        // Policy check frequency
};

// ============================================================
// GLOBAL STATE
// ============================================================

// Priority Queues (one per priority level)
PriorityQueue queues[NUM_PRIORITY_LEVELS];

// Traffic Shaping
TokenBucket tokenBucket;
RateLimiter rateLimiter;

// Metrics and Policy
QoSMetrics metrics;
QoSPolicy policy;

// Message Sequencing
uint32_t nextSequenceNumber = 1;

// Button State Tracking
uint32_t lastButtonPress[4] = {0, 0, 0, 0};
bool buttonState[4] = {false, false, false, false};

// LED State for Blinking
uint32_t ledLastToggle[4] = {0, 0, 0, 0};
bool ledState[4] = {false, false, false, false};

// Dashboard Timing
uint32_t lastMetricsDisplay = 0;

// Auto-traffic generation for demonstration
uint32_t lastAutoTraffic = 0;
bool autoTrafficEnabled = true;
uint32_t autoTrafficInterval = 500; // Generate traffic every 500ms

// ============================================================
// QUEUE OPERATIONS
// ============================================================

/**
 * Initialize a priority queue with given parameters
 *
 * @param queue Pointer to the queue structure
 * @param name Human-readable queue name
 * @param slaLatencyMs Maximum allowed latency for SLA compliance
 */
void initQueue(PriorityQueue* queue, const char* name, uint32_t slaLatencyMs) {
    queue->head = 0;
    queue->tail = 0;
    queue->count = 0;
    queue->maxSize = MAX_QUEUE_SIZE;
    queue->slaLatencyMs = slaLatencyMs;
    queue->totalEnqueued = 0;
    queue->totalProcessed = 0;
    queue->totalDropped = 0;
    queue->slaViolations = 0;
    queue->totalLatencyMs = 0;
    queue->name = name;

    // Clear message buffer
    for (int i = 0; i < MAX_QUEUE_SIZE; i++) {
        queue->messages[i].processed = true;
        queue->messages[i].priority = 255; // Invalid
    }
}

/**
 * Check if a queue is full
 *
 * @param queue Pointer to the queue
 * @return true if queue is at capacity
 */
bool isQueueFull(PriorityQueue* queue) {
    return queue->count >= queue->maxSize;
}

/**
 * Check if a queue is empty
 *
 * @param queue Pointer to the queue
 * @return true if queue has no messages
 */
bool isQueueEmpty(PriorityQueue* queue) {
    return queue->count == 0;
}

/**
 * Enqueue a message into a priority queue
 * Handles overflow by dropping the message and tracking the drop
 *
 * @param queue Pointer to the target queue
 * @param priority Message priority level
 * @param payload Message content
 * @param payloadSize Size of payload in bytes
 * @return true if message was enqueued, false if dropped
 */
bool enqueueMessage(PriorityQueue* queue, uint8_t priority,
                    const char* payload, uint16_t payloadSize) {
    // Check for queue overflow
    if (isQueueFull(queue)) {
        queue->totalDropped++;
        metrics.totalMessagesDropped++;
        Serial.printf("[DROP] %s queue full, dropping message #%lu\n",
                     queue->name, nextSequenceNumber);
        return false;
    }

    // Create new message
    Message* msg = &queue->messages[queue->tail];
    msg->priority = priority;
    msg->enqueueTime = millis();
    msg->processTime = 0;
    msg->sequenceNumber = nextSequenceNumber++;
    msg->payloadSize = min(payloadSize, (uint16_t)(MESSAGE_PAYLOAD_SIZE - 1));
    strncpy(msg->payload, payload, msg->payloadSize);
    msg->payload[msg->payloadSize] = '\0';
    msg->processed = false;

    // Advance tail pointer (circular buffer)
    queue->tail = (queue->tail + 1) % queue->maxSize;
    queue->count++;
    queue->totalEnqueued++;
    metrics.totalMessagesReceived++;

    return true;
}

/**
 * Dequeue a message from a priority queue
 * Updates SLA tracking based on message latency
 *
 * @param queue Pointer to the source queue
 * @param outMessage Pointer to store dequeued message
 * @return true if message was dequeued, false if queue empty
 */
bool dequeueMessage(PriorityQueue* queue, Message* outMessage) {
    if (isQueueEmpty(queue)) {
        return false;
    }

    // Get message at head
    Message* msg = &queue->messages[queue->head];

    // Calculate latency
    uint32_t latency = millis() - msg->enqueueTime;
    msg->processTime = millis();

    // Check SLA compliance
    if (latency > queue->slaLatencyMs) {
        queue->slaViolations++;
        metrics.totalSLAViolations++;
        Serial.printf("[SLA VIOLATION] %s: Message #%lu latency %lums > %lums SLA\n",
                     queue->name, msg->sequenceNumber, latency, queue->slaLatencyMs);
    }

    // Update latency statistics
    queue->totalLatencyMs += latency;
    queue->totalProcessed++;
    metrics.totalMessagesProcessed++;

    // Copy message to output
    memcpy(outMessage, msg, sizeof(Message));
    msg->processed = true;

    // Advance head pointer
    queue->head = (queue->head + 1) % queue->maxSize;
    queue->count--;

    return true;
}

/**
 * Get the fill percentage of a queue
 *
 * @param queue Pointer to the queue
 * @return Fill percentage (0-100)
 */
float getQueueFillPercent(PriorityQueue* queue) {
    return (float)queue->count / queue->maxSize * 100.0f;
}

// ============================================================
// TOKEN BUCKET IMPLEMENTATION
// ============================================================

/**
 * Initialize the token bucket for traffic shaping
 *
 * @param maxTokens Maximum token capacity
 * @param refillRate Tokens added per second
 */
void initTokenBucket(float maxTokens, float refillRate) {
    tokenBucket.tokens = maxTokens;  // Start with full bucket
    tokenBucket.maxTokens = maxTokens;
    tokenBucket.refillRate = refillRate;
    tokenBucket.lastRefillTime = millis();
}

/**
 * Refill tokens based on elapsed time
 * Called before each consume attempt to add tokens proportional to time elapsed
 */
void refillTokens() {
    uint32_t now = millis();
    uint32_t elapsed = now - tokenBucket.lastRefillTime;

    if (elapsed > 0) {
        // Add tokens proportional to elapsed time
        float tokensToAdd = (elapsed / 1000.0f) * tokenBucket.refillRate;
        tokenBucket.tokens = min(tokenBucket.tokens + tokensToAdd,
                                  tokenBucket.maxTokens);
        tokenBucket.lastRefillTime = now;
    }
}

/**
 * Attempt to consume tokens for a message
 *
 * @param tokensRequired Number of tokens needed
 * @return true if tokens were consumed, false if insufficient tokens
 */
bool consumeTokens(float tokensRequired) {
    refillTokens();

    if (tokenBucket.tokens >= tokensRequired) {
        tokenBucket.tokens -= tokensRequired;
        return true;
    }

    return false;
}

/**
 * Get token cost for a message based on its priority
 * Higher priority messages cost fewer tokens (get more bandwidth)
 *
 * @param priority Message priority level
 * @return Token cost for the message
 */
float getTokenCost(uint8_t priority) {
    switch (priority) {
        case PRIORITY_EMERGENCY:  return TOKEN_COST_EMERGENCY;
        case PRIORITY_CRITICAL:   return TOKEN_COST_CRITICAL;
        case PRIORITY_NORMAL:     return TOKEN_COST_NORMAL;
        case PRIORITY_BACKGROUND: return TOKEN_COST_BACKGROUND;
        default:                  return TOKEN_COST_BACKGROUND;
    }
}

/**
 * Get current token bucket fill percentage
 *
 * @return Fill percentage (0-100)
 */
float getTokenBucketPercent() {
    refillTokens();
    return (tokenBucket.tokens / tokenBucket.maxTokens) * 100.0f;
}

// ============================================================
// RATE LIMITER IMPLEMENTATION
// ============================================================

/**
 * Initialize the rate limiter with sliding window parameters
 *
 * @param windowDurationMs Duration of the sliding window
 * @param maxMessagesPerWindow Maximum messages allowed per window
 */
void initRateLimiter(uint32_t windowDurationMs, uint32_t maxMessagesPerWindow) {
    rateLimiter.windowStart = millis();
    rateLimiter.messageCount = 0;
    rateLimiter.windowDurationMs = windowDurationMs;
    rateLimiter.maxMessagesPerWindow = maxMessagesPerWindow;
    rateLimiter.totalRejected = 0;
}

/**
 * Check if a message should be rate limited
 * Updates window and counts, returns whether message is allowed
 *
 * @return true if message is allowed, false if rate limited
 */
bool checkRateLimit() {
    uint32_t now = millis();

    // Check if window has expired
    if (now - rateLimiter.windowStart >= rateLimiter.windowDurationMs) {
        // Reset window
        rateLimiter.windowStart = now;
        rateLimiter.messageCount = 0;
    }

    // Check if within limit
    if (rateLimiter.messageCount < rateLimiter.maxMessagesPerWindow) {
        rateLimiter.messageCount++;
        return true;
    }

    // Rate limit exceeded
    rateLimiter.totalRejected++;
    metrics.totalMessagesThrottled++;
    return false;
}

/**
 * Get current rate (messages per second)
 *
 * @return Current message rate
 */
float getCurrentRate() {
    uint32_t elapsed = millis() - rateLimiter.windowStart;
    if (elapsed == 0) return 0;
    return (float)rateLimiter.messageCount / (elapsed / 1000.0f);
}

// ============================================================
// QoS POLICY ENGINE
// ============================================================

/**
 * Initialize QoS policy with default thresholds
 */
void initPolicy() {
    policy.highLoadThreshold = 0.7f;      // 70% load triggers high-load mode
    policy.criticalLoadThreshold = 0.9f;   // 90% load triggers critical mode
    policy.emergencyOnlyMode = false;
    policy.dropBackgroundTraffic = false;
    policy.lastPolicyCheck = millis();
    policy.policyCheckIntervalMs = 500;    // Check every 500ms
}

/**
 * Calculate current system load based on queue fill levels
 * Weighted average favoring high-priority queues
 *
 * @return Load factor (0.0 to 1.0)
 */
float calculateSystemLoad() {
    float weightedLoad = 0;
    float totalWeight = 0;

    // Weight high-priority queues more heavily
    float weights[] = {4.0f, 3.0f, 2.0f, 1.0f};

    for (int i = 0; i < NUM_PRIORITY_LEVELS; i++) {
        weightedLoad += getQueueFillPercent(&queues[i]) / 100.0f * weights[i];
        totalWeight += weights[i];
    }

    return weightedLoad / totalWeight;
}

/**
 * Evaluate and update QoS policy based on current system state
 * Adjusts policy flags for emergency-only mode and background dropping
 */
void evaluatePolicy() {
    uint32_t now = millis();

    // Only evaluate periodically
    if (now - policy.lastPolicyCheck < policy.policyCheckIntervalMs) {
        return;
    }
    policy.lastPolicyCheck = now;

    // Calculate current load
    metrics.systemLoad = calculateSystemLoad();

    // Update policy based on load
    bool wasEmergencyOnly = policy.emergencyOnlyMode;
    bool wasDropping = policy.dropBackgroundTraffic;

    if (metrics.systemLoad >= policy.criticalLoadThreshold) {
        // Critical load: emergency only
        policy.emergencyOnlyMode = true;
        policy.dropBackgroundTraffic = true;
    } else if (metrics.systemLoad >= policy.highLoadThreshold) {
        // High load: drop background
        policy.emergencyOnlyMode = false;
        policy.dropBackgroundTraffic = true;
    } else {
        // Normal load
        policy.emergencyOnlyMode = false;
        policy.dropBackgroundTraffic = false;
    }

    // Log policy changes
    if (policy.emergencyOnlyMode != wasEmergencyOnly) {
        Serial.printf("[POLICY] Emergency-only mode: %s\n",
                     policy.emergencyOnlyMode ? "ENABLED" : "DISABLED");
    }
    if (policy.dropBackgroundTraffic != wasDropping) {
        Serial.printf("[POLICY] Background traffic dropping: %s\n",
                     policy.dropBackgroundTraffic ? "ENABLED" : "DISABLED");
    }
}

/**
 * Check if a message should be accepted based on current policy
 *
 * @param priority Message priority level
 * @return true if message should be accepted, false if policy rejects it
 */
bool policyAllowsMessage(uint8_t priority) {
    // Emergency always allowed
    if (priority == PRIORITY_EMERGENCY) {
        return true;
    }

    // Emergency-only mode blocks all non-emergency
    if (policy.emergencyOnlyMode) {
        return false;
    }

    // High load mode drops background
    if (policy.dropBackgroundTraffic && priority == PRIORITY_BACKGROUND) {
        return false;
    }

    return true;
}

// ============================================================
// PRIORITY SCHEDULER
// ============================================================

/**
 * Process messages from queues in priority order
 * Implements strict priority scheduling with token bucket rate control
 *
 * @return Number of messages processed in this call
 */
uint8_t processQueues() {
    uint8_t processed = 0;
    Message msg;

    // Process in strict priority order
    for (int priority = 0; priority < NUM_PRIORITY_LEVELS; priority++) {
        PriorityQueue* queue = &queues[priority];

        // Process all messages in this priority level before moving to next
        while (!isQueueEmpty(queue)) {
            // Check rate limit
            if (!checkRateLimit()) {
                Serial.println("[RATE LIMIT] Rate limit exceeded, pausing processing");
                return processed;
            }

            // Check token bucket
            float cost = getTokenCost(priority);
            if (!consumeTokens(cost)) {
                // Not enough tokens - try lower priority or wait
                break;
            }

            // Dequeue and process
            if (dequeueMessage(queue, &msg)) {
                // Simulate processing (in real system, would send to cloud/actuator)
                processed++;

                // Update throughput tracking
                metrics.messagesInLastSecond++;
            }
        }
    }

    return processed;
}

// ============================================================
// TRAFFIC GENERATION
// ============================================================

/**
 * Generate a message with specified priority
 *
 * @param priority Message priority level
 */
void generateTraffic(uint8_t priority) {
    char payload[MESSAGE_PAYLOAD_SIZE];
    const char* priorityName;

    switch (priority) {
        case PRIORITY_EMERGENCY:
            priorityName = "EMERGENCY";
            snprintf(payload, MESSAGE_PAYLOAD_SIZE,
                    "ALERT: Critical sensor threshold exceeded at %lu", millis());
            break;
        case PRIORITY_CRITICAL:
            priorityName = "CRITICAL";
            snprintf(payload, MESSAGE_PAYLOAD_SIZE,
                    "WARNING: Actuator response required at %lu", millis());
            break;
        case PRIORITY_NORMAL:
            priorityName = "NORMAL";
            snprintf(payload, MESSAGE_PAYLOAD_SIZE,
                    "DATA: Sensor reading value=%.2f at %lu",
                    random(0, 10000) / 100.0f, millis());
            break;
        case PRIORITY_BACKGROUND:
            priorityName = "BACKGROUND";
            snprintf(payload, MESSAGE_PAYLOAD_SIZE,
                    "LOG: System diagnostic entry at %lu", millis());
            break;
        default:
            return;
    }

    // Check policy
    if (!policyAllowsMessage(priority)) {
        Serial.printf("[POLICY BLOCK] %s message rejected by policy\n", priorityName);
        return;
    }

    // Enqueue message
    PriorityQueue* queue = &queues[priority];
    if (enqueueMessage(queue, priority, payload, strlen(payload))) {
        Serial.printf("[ENQUEUE] %s: %s (queue depth: %d/%d)\n",
                     priorityName, payload, queue->count, queue->maxSize);
    }
}

/**
 * Generate automatic traffic for demonstration
 * Creates a realistic mix of different priority messages
 */
void generateAutoTraffic() {
    uint32_t now = millis();

    if (!autoTrafficEnabled) return;

    if (now - lastAutoTraffic >= autoTrafficInterval) {
        lastAutoTraffic = now;

        // Probability-based traffic generation
        int r = random(100);

        if (r < 2) {
            // 2% emergency
            generateTraffic(PRIORITY_EMERGENCY);
        } else if (r < 12) {
            // 10% critical
            generateTraffic(PRIORITY_CRITICAL);
        } else if (r < 52) {
            // 40% normal
            generateTraffic(PRIORITY_NORMAL);
        } else {
            // 48% background
            generateTraffic(PRIORITY_BACKGROUND);
        }
    }
}

// ============================================================
// LED STATUS DISPLAY
// ============================================================

/**
 * Update LED states to reflect queue status
 * - Solid: Queue has messages
 * - Blinking fast: SLA violations detected
 * - Off: Queue empty
 */
void updateLEDs() {
    uint32_t now = millis();

    for (int i = 0; i < NUM_PRIORITY_LEVELS; i++) {
        PriorityQueue* queue = &queues[i];
        uint8_t ledPin;

        switch (i) {
            case PRIORITY_EMERGENCY:  ledPin = LED_EMERGENCY; break;
            case PRIORITY_CRITICAL:   ledPin = LED_CRITICAL; break;
            case PRIORITY_NORMAL:     ledPin = LED_NORMAL; break;
            case PRIORITY_BACKGROUND: ledPin = LED_BACKGROUND; break;
            default: continue;
        }

        // Determine LED behavior based on queue state
        if (queue->slaViolations > 0 &&
            (now - ledLastToggle[i] >= LED_BLINK_FAST_MS)) {
            // SLA violations: fast blink
            ledState[i] = !ledState[i];
            ledLastToggle[i] = now;
            digitalWrite(ledPin, ledState[i] ? HIGH : LOW);
        } else if (queue->count > 0) {
            // Queue has messages: solid on
            digitalWrite(ledPin, HIGH);
            ledState[i] = true;
        } else {
            // Queue empty: off
            digitalWrite(ledPin, LOW);
            ledState[i] = false;
        }
    }
}

// ============================================================
// METRICS DASHBOARD
// ============================================================

/**
 * Calculate and update throughput metrics
 */
void updateThroughputMetrics() {
    uint32_t now = millis();
    uint32_t elapsed = now - metrics.lastThroughputCalcTime;

    if (elapsed >= 1000) {
        metrics.throughputMsgsPerSec = (float)metrics.messagesInLastSecond /
                                       (elapsed / 1000.0f);
        metrics.messagesInLastSecond = 0;
        metrics.lastThroughputCalcTime = now;
    }
}

/**
 * Calculate average latency for a queue
 *
 * @param queue Pointer to the queue
 * @return Average latency in milliseconds
 */
float calculateAvgLatency(PriorityQueue* queue) {
    if (queue->totalProcessed == 0) return 0;
    return (float)queue->totalLatencyMs / queue->totalProcessed;
}

/**
 * Display comprehensive QoS metrics dashboard
 */
void displayMetrics() {
    uint32_t now = millis();

    if (now - lastMetricsDisplay < METRICS_INTERVAL_MS) {
        return;
    }
    lastMetricsDisplay = now;

    // Update derived metrics
    updateThroughputMetrics();
    metrics.tokenBucketLevel = getTokenBucketPercent();

    // Clear screen and display header
    Serial.println("\n========================================");
    Serial.println("       QoS MANAGEMENT DASHBOARD");
    Serial.println("========================================");
    Serial.printf("Uptime: %lu seconds\n", millis() / 1000);
    Serial.println();

    // System Overview
    Serial.println("--- SYSTEM OVERVIEW ---");
    Serial.printf("System Load:      %.1f%%\n", metrics.systemLoad * 100);
    Serial.printf("Throughput:       %.1f msg/sec\n", metrics.throughputMsgsPerSec);
    Serial.printf("Token Bucket:     %.1f%%\n", metrics.tokenBucketLevel);
    Serial.printf("Current Rate:     %.1f msg/sec\n", getCurrentRate());
    Serial.println();

    // Policy Status
    Serial.println("--- POLICY STATUS ---");
    Serial.printf("Emergency Only:   %s\n", policy.emergencyOnlyMode ? "YES" : "no");
    Serial.printf("Drop Background:  %s\n", policy.dropBackgroundTraffic ? "YES" : "no");
    Serial.println();

    // Queue Statistics
    Serial.println("--- QUEUE STATISTICS ---");
    Serial.println("Priority    | Depth | Enqueued | Processed | Dropped | SLA Viol | Avg Lat");
    Serial.println("------------|-------|----------|-----------|---------|----------|--------");

    const char* names[] = {"EMERGENCY", "CRITICAL", "NORMAL", "BACKGROUND"};

    for (int i = 0; i < NUM_PRIORITY_LEVELS; i++) {
        PriorityQueue* q = &queues[i];
        Serial.printf("%-11s | %2d/%2d | %8lu | %9lu | %7lu | %8lu | %6.1fms\n",
                     names[i],
                     q->count, q->maxSize,
                     q->totalEnqueued,
                     q->totalProcessed,
                     q->totalDropped,
                     q->slaViolations,
                     calculateAvgLatency(q));
    }
    Serial.println();

    // Aggregate Metrics
    Serial.println("--- AGGREGATE METRICS ---");
    Serial.printf("Total Received:   %lu\n", metrics.totalMessagesReceived);
    Serial.printf("Total Processed:  %lu\n", metrics.totalMessagesProcessed);
    Serial.printf("Total Dropped:    %lu\n", metrics.totalMessagesDropped);
    Serial.printf("Total Throttled:  %lu\n", metrics.totalMessagesThrottled);
    Serial.printf("Total SLA Viols:  %lu\n", metrics.totalSLAViolations);
    Serial.println();

    // SLA Compliance
    Serial.println("--- SLA COMPLIANCE ---");
    for (int i = 0; i < NUM_PRIORITY_LEVELS; i++) {
        PriorityQueue* q = &queues[i];
        float compliance = 100.0f;
        if (q->totalProcessed > 0) {
            compliance = 100.0f * (1.0f - (float)q->slaViolations / q->totalProcessed);
        }
        Serial.printf("%s: %.2f%% (target: %lums)\n",
                     names[i], compliance, q->slaLatencyMs);
    }
    Serial.println();

    // Instructions
    Serial.println("--- CONTROLS ---");
    Serial.println("BTN1=Emergency, BTN2=Critical, BTN3=Normal, BTN4=Background");
    Serial.println("POT=Token Rate (turn to adjust traffic shaping)");
    Serial.println("========================================\n");
}

// ============================================================
// BUTTON HANDLING
// ============================================================

/**
 * Read buttons and generate traffic based on presses
 */
void handleButtons() {
    uint32_t now = millis();

    struct ButtonConfig {
        uint8_t pin;
        uint8_t priority;
    };

    ButtonConfig buttons[] = {
        {BTN_EMERGENCY, PRIORITY_EMERGENCY},
        {BTN_CRITICAL, PRIORITY_CRITICAL},
        {BTN_NORMAL, PRIORITY_NORMAL},
        {BTN_BACKGROUND, PRIORITY_BACKGROUND}
    };

    for (int i = 0; i < 4; i++) {
        bool currentState = (digitalRead(buttons[i].pin) == LOW);

        // Debounced press detection
        if (currentState && !buttonState[i] &&
            (now - lastButtonPress[i] >= DEBOUNCE_MS)) {
            // Button pressed
            generateTraffic(buttons[i].priority);
            lastButtonPress[i] = now;
        }

        buttonState[i] = currentState;
    }
}

/**
 * Read potentiometer and adjust token bucket rate
 */
void handlePotentiometer() {
    int potValue = analogRead(POT_RATE);

    // Map potentiometer to token refill rate (10 to 200 tokens/sec)
    float newRate = map(potValue, 0, 4095, 10, 200);

    // Only update if significantly changed
    if (abs(newRate - tokenBucket.refillRate) > 5) {
        tokenBucket.refillRate = newRate;
        Serial.printf("[CONFIG] Token refill rate adjusted to %.1f tokens/sec\n", newRate);
    }
}

// ============================================================
// INITIALIZATION
// ============================================================

/**
 * Initialize all QoS system components
 */
void initQoSSystem() {
    // Initialize priority queues
    initQueue(&queues[PRIORITY_EMERGENCY], "EMERGENCY", SLA_EMERGENCY_MS);
    initQueue(&queues[PRIORITY_CRITICAL], "CRITICAL", SLA_CRITICAL_MS);
    initQueue(&queues[PRIORITY_NORMAL], "NORMAL", SLA_NORMAL_MS);
    initQueue(&queues[PRIORITY_BACKGROUND], "BACKGROUND", SLA_BACKGROUND_MS);

    // Initialize token bucket (50 tokens, 50 tokens/sec refill)
    initTokenBucket(TOKEN_BUCKET_MAX, 50.0f);

    // Initialize rate limiter (50 messages per second)
    initRateLimiter(RATE_LIMIT_WINDOW_MS, RATE_LIMIT_MAX_MSGS);

    // Initialize policy engine
    initPolicy();

    // Initialize metrics
    memset(&metrics, 0, sizeof(metrics));
    metrics.lastThroughputCalcTime = millis();

    Serial.println("[INIT] QoS Management System initialized");
    Serial.println("[INIT] Priority Levels: EMERGENCY < CRITICAL < NORMAL < BACKGROUND");
    Serial.printf("[INIT] SLA Targets: %dms, %dms, %dms, %dms\n",
                 SLA_EMERGENCY_MS, SLA_CRITICAL_MS, SLA_NORMAL_MS, SLA_BACKGROUND_MS);
    Serial.printf("[INIT] Token Bucket: %d tokens max, %.1f tokens/sec refill\n",
                 TOKEN_BUCKET_MAX, tokenBucket.refillRate);
    Serial.printf("[INIT] Rate Limit: %d messages per %dms window\n",
                 RATE_LIMIT_MAX_MSGS, RATE_LIMIT_WINDOW_MS);
}

// ============================================================
// ARDUINO SETUP AND LOOP
// ============================================================

void setup() {
    // Initialize serial communication
    Serial.begin(115200);
    delay(1000);

    Serial.println("\n\n");
    Serial.println("================================================");
    Serial.println("    QoS MANAGEMENT LAB - ESP32 IMPLEMENTATION");
    Serial.println("================================================");
    Serial.println();

    // Configure GPIO pins
    pinMode(BTN_EMERGENCY, INPUT_PULLUP);
    pinMode(BTN_CRITICAL, INPUT_PULLUP);
    pinMode(BTN_NORMAL, INPUT_PULLUP);
    pinMode(BTN_BACKGROUND, INPUT_PULLUP);

    pinMode(LED_EMERGENCY, OUTPUT);
    pinMode(LED_CRITICAL, OUTPUT);
    pinMode(LED_NORMAL, OUTPUT);
    pinMode(LED_BACKGROUND, OUTPUT);

    // Initialize all LEDs off
    digitalWrite(LED_EMERGENCY, LOW);
    digitalWrite(LED_CRITICAL, LOW);
    digitalWrite(LED_NORMAL, LOW);
    digitalWrite(LED_BACKGROUND, LOW);

    // Initialize QoS system
    initQoSSystem();

    // Startup LED test
    Serial.println("[INIT] LED test sequence...");
    for (int i = 0; i < 4; i++) {
        uint8_t pins[] = {LED_EMERGENCY, LED_CRITICAL, LED_NORMAL, LED_BACKGROUND};
        digitalWrite(pins[i], HIGH);
        delay(200);
        digitalWrite(pins[i], LOW);
    }

    Serial.println("[INIT] System ready!");
    Serial.println("[INIT] Press buttons to generate traffic or wait for auto-generation");
    Serial.println();
}

void loop() {
    // Handle user input
    handleButtons();
    handlePotentiometer();

    // Generate automatic traffic for demonstration
    generateAutoTraffic();

    // Evaluate and apply QoS policy
    evaluatePolicy();

    // Process messages from priority queues
    processQueues();

    // Update LED status indicators
    updateLEDs();

    // Display metrics dashboard
    displayMetrics();

    // Small delay to prevent busy-waiting
    delay(10);
}

207.6 Expected Outcomes

After running the lab, you should observe:

  1. Priority Processing: Emergency messages (Button 1) are always processed first, even when other queues are full
  2. Traffic Shaping: Adjusting the potentiometer changes how quickly messages are processed
  3. Rate Limiting: Rapid button pressing eventually triggers rate limiting
  4. SLA Violations: If queue processing is too slow, SLA violations are logged
  5. Policy Enforcement: Under high load, background traffic is automatically dropped
  6. LED Indicators: LEDs reflect queue states (solid = messages waiting, blinking = SLA violations)

207.7 Challenge Exercises

TipChallenge 1: Implement Weighted Fair Queuing

Modify the processQueues() function to implement weighted fair queuing instead of strict priority. Each priority level should get a proportional share of bandwidth: - Emergency: 40% of bandwidth - Critical: 30% of bandwidth - Normal: 20% of bandwidth - Background: 10% of bandwidth

Hint: Track credits per queue and serve queues in round-robin fashion based on their credits.

TipChallenge 2: Add Priority Aging

Implement a priority aging mechanism where messages waiting too long in lower-priority queues get their effective priority increased. This prevents starvation while still respecting priorities.

Implementation ideas: - Track time in queue for each message - Calculate effective priority as base priority minus (wait_time / AGING_FACTOR) - Messages should never exceed emergency priority through aging

TipChallenge 3: Implement Leaky Bucket

Replace the token bucket with a leaky bucket implementation: - Messages enter a queue (bucket) - Messages leave at a constant rate (leak) - If bucket overflows, messages are dropped

Compare the behavior differences between token bucket (allows bursts) and leaky bucket (constant rate).

TipChallenge 4: Add Network Condition Simulation

Simulate variable network conditions by: - Adding random processing delays - Randomly dropping messages (packet loss) - Varying the token refill rate automatically

Observe how the QoS system adapts to changing conditions.

TipChallenge 5: Implement Hierarchical Queuing

Create a two-level hierarchy: 1. Tier 1: Emergency and Critical share one queue group 2. Tier 2: Normal and Background share another queue group

Apply weighted fair queuing between tiers (70% Tier 1, 30% Tier 2), then strict priority within each tier.

207.8 Summary

In this lab, you implemented a complete QoS management system on ESP32 including:

  • Priority Queues: Four-level priority system with SLA tracking
  • Token Bucket: Traffic shaping with configurable refill rate
  • Rate Limiter: Sliding window protection against overload
  • Policy Engine: Dynamic load-based policy adjustment
  • Metrics Dashboard: Real-time visibility into QoS performance

207.9 What’s Next

  • QoS in Real-World IoT: Apply these concepts to industrial IoT, smart buildings, and learn about protocol-level QoS
  • SDN Fundamentals: Learn how Software-Defined Networking enables programmable QoS policies