1302  Hands-On Lab: Basic Stream Processing

1302.1 Lab: Build a Real-Time Stream Processing System

~45 min | Intermediate-Advanced | P10.C14.LAB01

1302.1.1 Learning Objectives

By completing this hands-on lab, you will be able to:

  • Implement continuous data streaming from multiple sensors
  • Create windowed aggregations (tumbling and sliding windows)
  • Build event detection and alerting systems
  • Design circular buffers for memory-efficient data management
  • Calculate and display throughput statistics
  • Understand the relationship between window size and detection latency
NoteWhat You’ll Build

A complete stream processing system on ESP32 that demonstrates core streaming concepts: continuous sensor data ingestion, windowed aggregations, event detection with threshold alerts, circular buffer management, and real-time throughput monitoring. This lab brings theoretical stream processing concepts to life on embedded hardware.

1302.1.2 Stream Processing Concepts Demonstrated

This lab implements several key stream processing patterns at the embedded level:

Concept Implementation Real-World Analogy
Continuous Streaming Sensors read every 100ms Kafka consumer polling
Tumbling Windows Non-overlapping 5-second windows Hourly billing aggregates
Sliding Windows 10-sample moving average Real-time trend smoothing
Event Detection Threshold crossing alerts Anomaly detection triggers
Circular Buffer Fixed-size ring buffer Bounded memory streaming
Throughput Stats Events per second tracking Pipeline monitoring

1302.1.3 Wokwi Simulator

Use the embedded simulator below to build your stream processing system:

1302.1.4 Circuit Setup

Connect multiple sensors and LEDs to the ESP32:

Component ESP32 Pin Purpose
Temperature Sensor (NTC) GPIO 34 Primary data stream
Light Sensor (LDR) GPIO 35 Secondary data stream
Red LED GPIO 18 High temperature alert
Yellow LED GPIO 19 Rate of change alert
Green LED GPIO 21 Normal operation indicator
Potentiometer GPIO 32 Adjustable threshold

Add this diagram.json configuration in Wokwi:

{
  "version": 1,
  "author": "IoT Class - Stream Processing Lab",
  "editor": "wokwi",
  "parts": [
    { "type": "wokwi-esp32-devkit-v1", "id": "esp", "top": 0, "left": 0 },
    { "type": "wokwi-ntc-temperature-sensor", "id": "temp1", "top": -100, "left": 100 },
    { "type": "wokwi-photoresistor-sensor", "id": "ldr1", "top": -100, "left": 220 },
    { "type": "wokwi-potentiometer", "id": "pot1", "top": -100, "left": 340 },
    { "type": "wokwi-led", "id": "led_red", "top": 150, "left": 100, "attrs": { "color": "red" } },
    { "type": "wokwi-led", "id": "led_yellow", "top": 150, "left": 150, "attrs": { "color": "yellow" } },
    { "type": "wokwi-led", "id": "led_green", "top": 150, "left": 200, "attrs": { "color": "green" } },
    { "type": "wokwi-resistor", "id": "r1", "top": 200, "left": 100, "attrs": { "value": "220" } },
    { "type": "wokwi-resistor", "id": "r2", "top": 200, "left": 150, "attrs": { "value": "220" } },
    { "type": "wokwi-resistor", "id": "r3", "top": 200, "left": 200, "attrs": { "value": "220" } }
  ],
  "connections": [
    ["esp:GND.1", "temp1:GND", "black", ["h0"]],
    ["esp:3V3", "temp1:VCC", "red", ["h0"]],
    ["esp:34", "temp1:OUT", "green", ["h0"]],
    ["esp:GND.1", "ldr1:GND", "black", ["h0"]],
    ["esp:3V3", "ldr1:VCC", "red", ["h0"]],
    ["esp:35", "ldr1:OUT", "orange", ["h0"]],
    ["esp:GND.1", "pot1:GND", "black", ["h0"]],
    ["esp:3V3", "pot1:VCC", "red", ["h0"]],
    ["esp:32", "pot1:SIG", "purple", ["h0"]],
    ["esp:18", "led_red:A", "red", ["h0"]],
    ["led_red:C", "r1:1", "black", ["h0"]],
    ["r1:2", "esp:GND.2", "black", ["h0"]],
    ["esp:19", "led_yellow:A", "yellow", ["h0"]],
    ["led_yellow:C", "r2:1", "black", ["h0"]],
    ["r2:2", "esp:GND.2", "black", ["h0"]],
    ["esp:21", "led_green:A", "green", ["h0"]],
    ["led_green:C", "r3:1", "black", ["h0"]],
    ["r3:2", "esp:GND.2", "black", ["h0"]]
  ]
}

1302.1.5 Complete Arduino Code

Copy this code into the Wokwi editor:

// ============================================================
// Stream Processing Lab: Real-Time IoT Data Pipeline
// Demonstrates: Windowing, Event Detection, Circular Buffers
// ============================================================

// --- Pin Definitions ---
const int TEMP_PIN = 34;        // Temperature sensor (NTC)
const int LIGHT_PIN = 35;       // Light sensor (LDR)
const int THRESHOLD_PIN = 32;   // Potentiometer for threshold

const int LED_RED = 18;         // High temperature alert
const int LED_YELLOW = 19;      // Rate of change alert
const int LED_GREEN = 21;       // Normal operation

// --- Stream Processing Parameters ---
const int SAMPLE_INTERVAL_MS = 100;       // 10 Hz sampling rate
const int TUMBLING_WINDOW_MS = 5000;      // 5-second tumbling windows
const int SLIDING_WINDOW_SIZE = 10;       // 10-sample sliding window

// --- Circular Buffer for Stream Data ---
const int BUFFER_SIZE = 100;              // Store last 100 samples

struct SensorReading {
  unsigned long timestamp;
  float temperature;
  float light;
};

SensorReading circularBuffer[BUFFER_SIZE];
int bufferHead = 0;                       // Write position
int bufferCount = 0;                      // Number of valid samples

// --- Tumbling Window State ---
struct TumblingWindow {
  unsigned long windowStart;
  float tempSum;
  float lightSum;
  float tempMin;
  float tempMax;
  int sampleCount;
};

TumblingWindow currentWindow;

// --- Sliding Window State (for moving average) ---
float tempSlidingBuffer[SLIDING_WINDOW_SIZE];
float lightSlidingBuffer[SLIDING_WINDOW_SIZE];
int slidingIndex = 0;
bool slidingBufferFull = false;

// --- Event Detection State ---
float lastTempAvg = 0;
unsigned long lastAlertTime = 0;
const int ALERT_COOLDOWN_MS = 2000;       // Prevent alert flooding

// --- Throughput Statistics ---
unsigned long totalEventsProcessed = 0;
unsigned long throughputWindowStart = 0;
int eventsInCurrentSecond = 0;
float currentThroughput = 0;

// --- Timing ---
unsigned long lastSampleTime = 0;
unsigned long lastDisplayTime = 0;
const int DISPLAY_INTERVAL_MS = 1000;     // Update display every second

void setup() {
  Serial.begin(115200);
  delay(1000);

  // Configure pins
  pinMode(TEMP_PIN, INPUT);
  pinMode(LIGHT_PIN, INPUT);
  pinMode(THRESHOLD_PIN, INPUT);
  pinMode(LED_RED, OUTPUT);
  pinMode(LED_YELLOW, OUTPUT);
  pinMode(LED_GREEN, OUTPUT);

  // Initialize tumbling window
  resetTumblingWindow();

  // Initialize sliding window buffers
  for (int i = 0; i < SLIDING_WINDOW_SIZE; i++) {
    tempSlidingBuffer[i] = 0;
    lightSlidingBuffer[i] = 0;
  }

  throughputWindowStart = millis();

  printHeader();
}

void printHeader() {
  Serial.println("\n");
  Serial.println("+=================================================================+");
  Serial.println("|     STREAM PROCESSING LAB - Real-Time IoT Pipeline             |");
  Serial.println("+=================================================================+");
  Serial.println("|  Concepts: Tumbling Windows | Sliding Windows | Event Detection |");
  Serial.println("|            Circular Buffers | Throughput Monitoring             |");
  Serial.println("+-----------------------------------------------------------------+\n");
  Serial.println("Adjust potentiometer to change alert threshold (20-80C range)");
  Serial.println("LEDs: RED=High Temp | YELLOW=Rapid Change | GREEN=Normal\n");
}

// --- Circular Buffer Operations ---
void addToCircularBuffer(unsigned long ts, float temp, float light) {
  circularBuffer[bufferHead].timestamp = ts;
  circularBuffer[bufferHead].temperature = temp;
  circularBuffer[bufferHead].light = light;

  bufferHead = (bufferHead + 1) % BUFFER_SIZE;
  if (bufferCount < BUFFER_SIZE) {
    bufferCount++;
  }
}

SensorReading getFromBuffer(int offset) {
  // offset 0 = most recent, offset 1 = second most recent, etc.
  int index = (bufferHead - 1 - offset + BUFFER_SIZE) % BUFFER_SIZE;
  return circularBuffer[index];
}

// --- Tumbling Window Operations ---
void resetTumblingWindow() {
  currentWindow.windowStart = millis();
  currentWindow.tempSum = 0;
  currentWindow.lightSum = 0;
  currentWindow.tempMin = 999;
  currentWindow.tempMax = -999;
  currentWindow.sampleCount = 0;
}

void updateTumblingWindow(float temp, float light) {
  currentWindow.tempSum += temp;
  currentWindow.lightSum += light;
  currentWindow.sampleCount++;

  if (temp < currentWindow.tempMin) currentWindow.tempMin = temp;
  if (temp > currentWindow.tempMax) currentWindow.tempMax = temp;
}

void emitTumblingWindowResults() {
  if (currentWindow.sampleCount == 0) return;

  float avgTemp = currentWindow.tempSum / currentWindow.sampleCount;
  float avgLight = currentWindow.lightSum / currentWindow.sampleCount;

  Serial.println("\n+=============== TUMBLING WINDOW COMPLETE ===============+");
  Serial.print("| Window Duration: ");
  Serial.print(TUMBLING_WINDOW_MS / 1000);
  Serial.println(" seconds (non-overlapping)           |");
  Serial.print("| Samples in Window: ");
  Serial.print(currentWindow.sampleCount);
  Serial.println("                                  |");
  Serial.println("+=========================================================+");
  Serial.print("| TEMP - Avg: ");
  Serial.print(avgTemp, 1);
  Serial.print("C  Min: ");
  Serial.print(currentWindow.tempMin, 1);
  Serial.print("C  Max: ");
  Serial.print(currentWindow.tempMax, 1);
  Serial.println("C     |");
  Serial.print("| LIGHT - Avg: ");
  Serial.print(avgLight, 1);
  Serial.println("%                                   |");
  Serial.println("+---------------------------------------------------------+\n");
}

// --- Sliding Window Operations (Moving Average) ---
void updateSlidingWindow(float temp, float light) {
  tempSlidingBuffer[slidingIndex] = temp;
  lightSlidingBuffer[slidingIndex] = light;

  slidingIndex = (slidingIndex + 1) % SLIDING_WINDOW_SIZE;
  if (slidingIndex == 0) slidingBufferFull = true;
}

float getSlidingAverage(float* buffer) {
  int count = slidingBufferFull ? SLIDING_WINDOW_SIZE : slidingIndex;
  if (count == 0) return 0;

  float sum = 0;
  for (int i = 0; i < count; i++) {
    sum += buffer[i];
  }
  return sum / count;
}

// --- Event Detection ---
void detectEvents(float currentTemp, float slidingAvgTemp, float threshold) {
  unsigned long now = millis();
  bool inCooldown = (now - lastAlertTime) < ALERT_COOLDOWN_MS;

  // Event 1: High temperature threshold crossing
  bool highTempAlert = currentTemp > threshold;

  // Event 2: Rapid change detection (rate of change)
  float rateOfChange = abs(slidingAvgTemp - lastTempAvg);
  bool rapidChangeAlert = (rateOfChange > 5.0) && slidingBufferFull;

  // Update LEDs based on events
  if (highTempAlert) {
    digitalWrite(LED_RED, HIGH);
    digitalWrite(LED_GREEN, LOW);

    if (!inCooldown) {
      Serial.println("!! ALERT: Temperature exceeded threshold!");
      Serial.print("    Current: ");
      Serial.print(currentTemp, 1);
      Serial.print("C > Threshold: ");
      Serial.print(threshold, 1);
      Serial.println("C");
      lastAlertTime = now;
    }
  } else {
    digitalWrite(LED_RED, LOW);
    digitalWrite(LED_GREEN, HIGH);
  }

  if (rapidChangeAlert && !inCooldown) {
    digitalWrite(LED_YELLOW, HIGH);
    Serial.println("!! ALERT: Rapid temperature change detected!");
    Serial.print("    Rate of change: ");
    Serial.print(rateOfChange, 2);
    Serial.println("C per sliding window");
    lastAlertTime = now;
  } else if (!rapidChangeAlert) {
    digitalWrite(LED_YELLOW, LOW);
  }

  lastTempAvg = slidingAvgTemp;
}

// --- Throughput Calculation ---
void updateThroughput() {
  eventsInCurrentSecond++;
  totalEventsProcessed++;

  unsigned long now = millis();
  if (now - throughputWindowStart >= 1000) {
    currentThroughput = eventsInCurrentSecond;
    eventsInCurrentSecond = 0;
    throughputWindowStart = now;
  }
}

// --- Display Functions ---
void displayStreamStatus(float temp, float light, float slidingTemp,
                         float slidingLight, float threshold) {
  Serial.println("------------------- STREAM STATUS -------------------");

  // Current values
  Serial.print("| RAW     | Temp: ");
  Serial.print(temp, 1);
  Serial.print("C  | Light: ");
  Serial.print(light, 1);
  Serial.println("%");

  // Sliding window averages
  Serial.print("| SLIDING | Temp: ");
  Serial.print(slidingTemp, 1);
  Serial.print("C  | Light: ");
  Serial.print(slidingLight, 1);
  Serial.print("%  (");
  Serial.print(SLIDING_WINDOW_SIZE);
  Serial.println("-sample window)");

  // Threshold
  Serial.print("| THRESHOLD: ");
  Serial.print(threshold, 1);
  Serial.println("C (adjust potentiometer)");

  // Buffer status
  Serial.print("| BUFFER: ");
  Serial.print(bufferCount);
  Serial.print("/");
  Serial.print(BUFFER_SIZE);
  Serial.print(" samples (");
  Serial.print((bufferCount * 100) / BUFFER_SIZE);
  Serial.println("% full)");

  // Throughput
  Serial.print("| THROUGHPUT: ");
  Serial.print(currentThroughput, 0);
  Serial.print(" events/sec | Total: ");
  Serial.println(totalEventsProcessed);
  Serial.println("-----------------------------------------------------\n");
}

void displayBufferVisualization() {
  Serial.print("BUFFER FILL: [");
  int fillBars = (bufferCount * 40) / BUFFER_SIZE;
  for (int i = 0; i < 40; i++) {
    Serial.print(i < fillBars ? "#" : "-");
  }
  Serial.print("] ");
  Serial.print(bufferCount);
  Serial.print("/");
  Serial.println(BUFFER_SIZE);
}

// --- Sensor Reading Functions ---
float readTemperature() {
  int raw = analogRead(TEMP_PIN);
  // Simulate temperature range 15-85C based on NTC
  float temp = map(raw, 0, 4095, 1500, 8500) / 100.0;
  return temp;
}

float readLight() {
  int raw = analogRead(LIGHT_PIN);
  return map(raw, 0, 4095, 0, 100);
}

float readThreshold() {
  int raw = analogRead(THRESHOLD_PIN);
  // Map to 20-80C range
  return map(raw, 0, 4095, 2000, 8000) / 100.0;
}

void loop() {
  unsigned long now = millis();

  // === CONTINUOUS STREAMING: Sample at fixed interval ===
  if (now - lastSampleTime >= SAMPLE_INTERVAL_MS) {
    lastSampleTime = now;

    // Read sensor values (simulating incoming stream)
    float temp = readTemperature();
    float light = readLight();
    float threshold = readThreshold();

    // 1. Add to circular buffer (stream storage)
    addToCircularBuffer(now, temp, light);

    // 2. Update tumbling window aggregation
    updateTumblingWindow(temp, light);

    // 3. Update sliding window for moving average
    updateSlidingWindow(temp, light);

    // 4. Calculate sliding averages
    float slidingTempAvg = getSlidingAverage(tempSlidingBuffer);
    float slidingLightAvg = getSlidingAverage(lightSlidingBuffer);

    // 5. Event detection and alerting
    detectEvents(temp, slidingTempAvg, threshold);

    // 6. Update throughput statistics
    updateThroughput();

    // === TUMBLING WINDOW: Emit results when window closes ===
    if (now - currentWindow.windowStart >= TUMBLING_WINDOW_MS) {
      emitTumblingWindowResults();
      resetTumblingWindow();
    }

    // === DISPLAY: Update every second ===
    if (now - lastDisplayTime >= DISPLAY_INTERVAL_MS) {
      lastDisplayTime = now;
      displayStreamStatus(temp, light, slidingTempAvg,
                          slidingLightAvg, threshold);
      displayBufferVisualization();
    }
  }
}

1302.1.6 Step-by-Step Instructions

1302.1.6.1 Step 1: Set Up the Simulator

  1. Open the Wokwi simulator embedded above (or visit wokwi.com)
  2. Create a new ESP32 project
  3. Click the diagram.json tab and paste the circuit configuration
  4. Replace the default code with the complete Arduino code above

1302.1.6.2 Step 2: Run and Observe Basic Streaming

  1. Click the Play button to start the simulation
  2. Open the Serial Monitor to see the stream output
  3. Observe the continuous data flow:
    • RAW readings update every 100ms (10 Hz)
    • SLIDING averages smooth the data
    • BUFFER fills up over time

1302.1.6.3 Step 3: Experiment with Tumbling Windows

  1. Wait 5 seconds for the first tumbling window to complete
  2. Observe the window summary: min, max, average, sample count
  3. Note: Each window is non-overlapping - no sample counted twice
  4. Think about: Why is this useful for billing or hourly reports?

1302.1.6.4 Step 4: Adjust Threshold and Trigger Alerts

  1. Click and drag the potentiometer to lower the threshold
  2. Click the NTC sensor and increase temperature to trigger RED LED
  3. Observe the alert message in Serial Monitor
  4. Note the cooldown: Alerts don’t flood (2-second cooldown)

1302.1.6.5 Step 5: Observe Rapid Change Detection

  1. Quickly change the temperature sensor value (click and drag rapidly)
  2. Watch for YELLOW LED indicating rapid change detected
  3. This demonstrates: Rate-of-change anomaly detection
  4. Real-world analogy: Detecting equipment failure signatures

1302.1.6.6 Step 6: Understand the Circular Buffer

  1. Watch the BUFFER FILL visualization grow
  2. After ~10 seconds, buffer reaches 100% full
  3. New data overwrites old data - fixed memory usage
  4. This is critical: Embedded devices have limited RAM

1302.1.7 Understanding the Output

%%{init: {'theme': 'base', 'themeVariables': {'primaryColor': '#2C3E50', 'primaryTextColor': '#fff', 'primaryBorderColor': '#16A085', 'lineColor': '#16A085', 'secondaryColor': '#E67E22', 'tertiaryColor': '#7F8C8D'}}}%%
flowchart TB
    subgraph Input["Data Ingestion (10 Hz)"]
        T[Temperature<br/>Sensor]
        L[Light<br/>Sensor]
    end

    subgraph Buffer["Circular Buffer"]
        CB[Ring Buffer<br/>100 samples<br/>Fixed Memory]
    end

    subgraph Windows["Windowed Aggregations"]
        TW[Tumbling Window<br/>5 sec non-overlapping<br/>Min/Max/Avg/Count]
        SW[Sliding Window<br/>10-sample moving avg<br/>Continuous smoothing]
    end

    subgraph Events["Event Detection"]
        TH[Threshold<br/>Crossing]
        RC[Rate of<br/>Change]
    end

    subgraph Output["Outputs"]
        LED[LED<br/>Alerts]
        SER[Serial<br/>Dashboard]
        STAT[Throughput<br/>Stats]
    end

    T --> CB
    L --> CB
    CB --> TW
    CB --> SW
    TW --> SER
    SW --> TH
    SW --> RC
    TH --> LED
    RC --> LED
    CB --> STAT
    STAT --> SER

    style Input fill:#16A085,color:#fff
    style Buffer fill:#E67E22,color:#fff
    style Windows fill:#2C3E50,color:#fff
    style Events fill:#7F8C8D,color:#fff
    style Output fill:#16A085,color:#fff

Figure 1302.1: Stream processing lab architecture showing data flow from sensors through buffer and windowing stages to event detection and outputs

1302.1.8 Key Stream Processing Concepts Explained

Tumbling Windows (implemented in this lab): - Non-overlapping, fixed-duration windows (5 seconds in our lab) - Each event belongs to exactly ONE window - Results emitted when window closes - Use case: Hourly billing summaries, periodic aggregates

Sliding Windows (implemented in this lab): - Overlapping windows that “slide” with each new event - Each event may belong to MULTIPLE windows - Continuously updated averages - Use case: Moving averages, trend smoothing, real-time dashboards

Memory Trade-off: - Tumbling: O(1) memory per window (just aggregates) - Sliding: O(window_size) memory (must store all samples)

Why Circular Buffers? - Streams are infinite, memory is finite - Fixed memory allocation prevents heap fragmentation - Oldest data automatically evicted when buffer full - O(1) insert and O(1) access by offset

Implementation Pattern:

buffer[head] = newData;          // Write at head
head = (head + 1) % BUFFER_SIZE; // Wrap around

Real-World Analogy: Like a security camera that records over the oldest footage - you always have the last N hours, never running out of storage.

Threshold-Based Detection: - Simple: value > threshold triggers alert - Fast: O(1) computation per event - Limitation: Doesn’t detect trends, only instantaneous values

Rate-of-Change Detection: - Compares current average to previous average - Detects rapid changes even within “normal” range - Example: Temperature rising 10C/minute (could indicate fire)

Alert Cooldown (Backpressure): - Prevents alert flooding during sustained anomalies - Real systems use exponential backoff - Balance: Too short = alert fatigue, Too long = missed events

Why Track Throughput? - Detect processing bottlenecks before they cause data loss - Capacity planning for production deployments - Alerting when throughput drops (sensor failure?)

Metrics to Monitor: - Events per second (current throughput) - Total events processed (lifetime counter) - Buffer fill percentage (approaching capacity?) - Processing latency (time from read to output)

In Production: Use Prometheus, Grafana, or cloud monitoring to track these metrics with alerting thresholds.

1302.1.9 Challenge Exercises

CautionChallenge 1: Add Session Windows

Modify the code to implement session windows that group events by activity periods:

  • Start a new session when first event arrives after 3+ seconds of inactivity
  • Close session and emit results after 3 seconds of no new events
  • Track: session duration, event count, min/max/avg values

Hint: Track lastEventTime and check for gaps exceeding SESSION_GAP_MS

CautionChallenge 2: Implement Late Data Handling

Add support for simulated “late arriving” data:

  1. Add a button that injects an event with a timestamp 10 seconds in the past
  2. Determine which tumbling window this event belongs to
  3. If window already closed, route to “late data” output
  4. Display count of late events separately

Hint: Compare event timestamp to currentWindow.windowStart

CautionChallenge 3: Multi-Sensor Correlation

Extend event detection to correlate multiple sensors:

  • Alert when BOTH temperature is high AND light is low (equipment overheating in darkness)
  • Track how often both conditions occur simultaneously
  • Implement a “compound event” counter

Hint: Add correlatedAlertCount and check both conditions in detectEvents()

CautionChallenge 4: Adaptive Thresholds

Replace the fixed potentiometer threshold with an adaptive threshold:

  1. Calculate the moving standard deviation of temperature
  2. Set threshold = sliding_average + (2 * standard_deviation)
  3. This creates an adaptive anomaly boundary
  4. Compare alert rates between fixed and adaptive thresholds

Hint: Track sum of squares to calculate variance: variance = (sumSq/n) - (sum/n)^2

1302.1.10 Common Issues and Solutions

Issue Cause Solution
No serial output Baud rate mismatch Ensure both code and monitor use 115200
LEDs not lighting Wrong pin connection Check diagram.json connections match code
Buffer always 100% Expected behavior Circular buffer maintains fixed size by design
No tumbling window output Not enough time Wait full 5 seconds for first window
Alerts firing constantly Threshold too low Adjust potentiometer higher

1302.1.11 Real-World Applications

This lab demonstrates patterns used in production stream processing:

Lab Feature Production Equivalent
10 Hz sampling Kafka consumer polling
Circular buffer Kafka topic partitions with retention
Tumbling windows Flink/Spark hourly aggregations
Sliding windows Real-time moving averages in Grafana
Threshold alerts PagerDuty integration for anomalies
Throughput stats Prometheus metrics for pipeline health

1302.2 What’s Next

Continue to Hands-On Lab: Advanced CEP for a 60-minute advanced lab covering pattern matching, complex event processing (CEP), session windows, and statistical aggregation.