%%{init: {'theme': 'base', 'themeVariables': {'primaryColor': '#2C3E50', 'primaryTextColor': '#fff', 'primaryBorderColor': '#16A085', 'lineColor': '#16A085', 'secondaryColor': '#E67E22', 'tertiaryColor': '#7F8C8D'}}}%%
flowchart TB
subgraph Input["Data Ingestion (10 Hz)"]
T[Temperature<br/>Sensor]
L[Light<br/>Sensor]
end
subgraph Buffer["Circular Buffer"]
CB[Ring Buffer<br/>100 samples<br/>Fixed Memory]
end
subgraph Windows["Windowed Aggregations"]
TW[Tumbling Window<br/>5 sec non-overlapping<br/>Min/Max/Avg/Count]
SW[Sliding Window<br/>10-sample moving avg<br/>Continuous smoothing]
end
subgraph Events["Event Detection"]
TH[Threshold<br/>Crossing]
RC[Rate of<br/>Change]
end
subgraph Output["Outputs"]
LED[LED<br/>Alerts]
SER[Serial<br/>Dashboard]
STAT[Throughput<br/>Stats]
end
T --> CB
L --> CB
CB --> TW
CB --> SW
TW --> SER
SW --> TH
SW --> RC
TH --> LED
RC --> LED
CB --> STAT
STAT --> SER
style Input fill:#16A085,color:#fff
style Buffer fill:#E67E22,color:#fff
style Windows fill:#2C3E50,color:#fff
style Events fill:#7F8C8D,color:#fff
style Output fill:#16A085,color:#fff
1302 Hands-On Lab: Basic Stream Processing
1302.1 Lab: Build a Real-Time Stream Processing System
1302.1.1 Learning Objectives
By completing this hands-on lab, you will be able to:
- Implement continuous data streaming from multiple sensors
- Create windowed aggregations (tumbling and sliding windows)
- Build event detection and alerting systems
- Design circular buffers for memory-efficient data management
- Calculate and display throughput statistics
- Understand the relationship between window size and detection latency
A complete stream processing system on ESP32 that demonstrates core streaming concepts: continuous sensor data ingestion, windowed aggregations, event detection with threshold alerts, circular buffer management, and real-time throughput monitoring. This lab brings theoretical stream processing concepts to life on embedded hardware.
1302.1.2 Stream Processing Concepts Demonstrated
This lab implements several key stream processing patterns at the embedded level:
| Concept | Implementation | Real-World Analogy |
|---|---|---|
| Continuous Streaming | Sensors read every 100ms | Kafka consumer polling |
| Tumbling Windows | Non-overlapping 5-second windows | Hourly billing aggregates |
| Sliding Windows | 10-sample moving average | Real-time trend smoothing |
| Event Detection | Threshold crossing alerts | Anomaly detection triggers |
| Circular Buffer | Fixed-size ring buffer | Bounded memory streaming |
| Throughput Stats | Events per second tracking | Pipeline monitoring |
1302.1.3 Wokwi Simulator
Use the embedded simulator below to build your stream processing system:
1302.1.4 Circuit Setup
Connect multiple sensors and LEDs to the ESP32:
| Component | ESP32 Pin | Purpose |
|---|---|---|
| Temperature Sensor (NTC) | GPIO 34 | Primary data stream |
| Light Sensor (LDR) | GPIO 35 | Secondary data stream |
| Red LED | GPIO 18 | High temperature alert |
| Yellow LED | GPIO 19 | Rate of change alert |
| Green LED | GPIO 21 | Normal operation indicator |
| Potentiometer | GPIO 32 | Adjustable threshold |
Add this diagram.json configuration in Wokwi:
{
"version": 1,
"author": "IoT Class - Stream Processing Lab",
"editor": "wokwi",
"parts": [
{ "type": "wokwi-esp32-devkit-v1", "id": "esp", "top": 0, "left": 0 },
{ "type": "wokwi-ntc-temperature-sensor", "id": "temp1", "top": -100, "left": 100 },
{ "type": "wokwi-photoresistor-sensor", "id": "ldr1", "top": -100, "left": 220 },
{ "type": "wokwi-potentiometer", "id": "pot1", "top": -100, "left": 340 },
{ "type": "wokwi-led", "id": "led_red", "top": 150, "left": 100, "attrs": { "color": "red" } },
{ "type": "wokwi-led", "id": "led_yellow", "top": 150, "left": 150, "attrs": { "color": "yellow" } },
{ "type": "wokwi-led", "id": "led_green", "top": 150, "left": 200, "attrs": { "color": "green" } },
{ "type": "wokwi-resistor", "id": "r1", "top": 200, "left": 100, "attrs": { "value": "220" } },
{ "type": "wokwi-resistor", "id": "r2", "top": 200, "left": 150, "attrs": { "value": "220" } },
{ "type": "wokwi-resistor", "id": "r3", "top": 200, "left": 200, "attrs": { "value": "220" } }
],
"connections": [
["esp:GND.1", "temp1:GND", "black", ["h0"]],
["esp:3V3", "temp1:VCC", "red", ["h0"]],
["esp:34", "temp1:OUT", "green", ["h0"]],
["esp:GND.1", "ldr1:GND", "black", ["h0"]],
["esp:3V3", "ldr1:VCC", "red", ["h0"]],
["esp:35", "ldr1:OUT", "orange", ["h0"]],
["esp:GND.1", "pot1:GND", "black", ["h0"]],
["esp:3V3", "pot1:VCC", "red", ["h0"]],
["esp:32", "pot1:SIG", "purple", ["h0"]],
["esp:18", "led_red:A", "red", ["h0"]],
["led_red:C", "r1:1", "black", ["h0"]],
["r1:2", "esp:GND.2", "black", ["h0"]],
["esp:19", "led_yellow:A", "yellow", ["h0"]],
["led_yellow:C", "r2:1", "black", ["h0"]],
["r2:2", "esp:GND.2", "black", ["h0"]],
["esp:21", "led_green:A", "green", ["h0"]],
["led_green:C", "r3:1", "black", ["h0"]],
["r3:2", "esp:GND.2", "black", ["h0"]]
]
}1302.1.5 Complete Arduino Code
Copy this code into the Wokwi editor:
// ============================================================
// Stream Processing Lab: Real-Time IoT Data Pipeline
// Demonstrates: Windowing, Event Detection, Circular Buffers
// ============================================================
// --- Pin Definitions ---
const int TEMP_PIN = 34; // Temperature sensor (NTC)
const int LIGHT_PIN = 35; // Light sensor (LDR)
const int THRESHOLD_PIN = 32; // Potentiometer for threshold
const int LED_RED = 18; // High temperature alert
const int LED_YELLOW = 19; // Rate of change alert
const int LED_GREEN = 21; // Normal operation
// --- Stream Processing Parameters ---
const int SAMPLE_INTERVAL_MS = 100; // 10 Hz sampling rate
const int TUMBLING_WINDOW_MS = 5000; // 5-second tumbling windows
const int SLIDING_WINDOW_SIZE = 10; // 10-sample sliding window
// --- Circular Buffer for Stream Data ---
const int BUFFER_SIZE = 100; // Store last 100 samples
struct SensorReading {
unsigned long timestamp;
float temperature;
float light;
};
SensorReading circularBuffer[BUFFER_SIZE];
int bufferHead = 0; // Write position
int bufferCount = 0; // Number of valid samples
// --- Tumbling Window State ---
struct TumblingWindow {
unsigned long windowStart;
float tempSum;
float lightSum;
float tempMin;
float tempMax;
int sampleCount;
};
TumblingWindow currentWindow;
// --- Sliding Window State (for moving average) ---
float tempSlidingBuffer[SLIDING_WINDOW_SIZE];
float lightSlidingBuffer[SLIDING_WINDOW_SIZE];
int slidingIndex = 0;
bool slidingBufferFull = false;
// --- Event Detection State ---
float lastTempAvg = 0;
unsigned long lastAlertTime = 0;
const int ALERT_COOLDOWN_MS = 2000; // Prevent alert flooding
// --- Throughput Statistics ---
unsigned long totalEventsProcessed = 0;
unsigned long throughputWindowStart = 0;
int eventsInCurrentSecond = 0;
float currentThroughput = 0;
// --- Timing ---
unsigned long lastSampleTime = 0;
unsigned long lastDisplayTime = 0;
const int DISPLAY_INTERVAL_MS = 1000; // Update display every second
void setup() {
Serial.begin(115200);
delay(1000);
// Configure pins
pinMode(TEMP_PIN, INPUT);
pinMode(LIGHT_PIN, INPUT);
pinMode(THRESHOLD_PIN, INPUT);
pinMode(LED_RED, OUTPUT);
pinMode(LED_YELLOW, OUTPUT);
pinMode(LED_GREEN, OUTPUT);
// Initialize tumbling window
resetTumblingWindow();
// Initialize sliding window buffers
for (int i = 0; i < SLIDING_WINDOW_SIZE; i++) {
tempSlidingBuffer[i] = 0;
lightSlidingBuffer[i] = 0;
}
throughputWindowStart = millis();
printHeader();
}
void printHeader() {
Serial.println("\n");
Serial.println("+=================================================================+");
Serial.println("| STREAM PROCESSING LAB - Real-Time IoT Pipeline |");
Serial.println("+=================================================================+");
Serial.println("| Concepts: Tumbling Windows | Sliding Windows | Event Detection |");
Serial.println("| Circular Buffers | Throughput Monitoring |");
Serial.println("+-----------------------------------------------------------------+\n");
Serial.println("Adjust potentiometer to change alert threshold (20-80C range)");
Serial.println("LEDs: RED=High Temp | YELLOW=Rapid Change | GREEN=Normal\n");
}
// --- Circular Buffer Operations ---
void addToCircularBuffer(unsigned long ts, float temp, float light) {
circularBuffer[bufferHead].timestamp = ts;
circularBuffer[bufferHead].temperature = temp;
circularBuffer[bufferHead].light = light;
bufferHead = (bufferHead + 1) % BUFFER_SIZE;
if (bufferCount < BUFFER_SIZE) {
bufferCount++;
}
}
SensorReading getFromBuffer(int offset) {
// offset 0 = most recent, offset 1 = second most recent, etc.
int index = (bufferHead - 1 - offset + BUFFER_SIZE) % BUFFER_SIZE;
return circularBuffer[index];
}
// --- Tumbling Window Operations ---
void resetTumblingWindow() {
currentWindow.windowStart = millis();
currentWindow.tempSum = 0;
currentWindow.lightSum = 0;
currentWindow.tempMin = 999;
currentWindow.tempMax = -999;
currentWindow.sampleCount = 0;
}
void updateTumblingWindow(float temp, float light) {
currentWindow.tempSum += temp;
currentWindow.lightSum += light;
currentWindow.sampleCount++;
if (temp < currentWindow.tempMin) currentWindow.tempMin = temp;
if (temp > currentWindow.tempMax) currentWindow.tempMax = temp;
}
void emitTumblingWindowResults() {
if (currentWindow.sampleCount == 0) return;
float avgTemp = currentWindow.tempSum / currentWindow.sampleCount;
float avgLight = currentWindow.lightSum / currentWindow.sampleCount;
Serial.println("\n+=============== TUMBLING WINDOW COMPLETE ===============+");
Serial.print("| Window Duration: ");
Serial.print(TUMBLING_WINDOW_MS / 1000);
Serial.println(" seconds (non-overlapping) |");
Serial.print("| Samples in Window: ");
Serial.print(currentWindow.sampleCount);
Serial.println(" |");
Serial.println("+=========================================================+");
Serial.print("| TEMP - Avg: ");
Serial.print(avgTemp, 1);
Serial.print("C Min: ");
Serial.print(currentWindow.tempMin, 1);
Serial.print("C Max: ");
Serial.print(currentWindow.tempMax, 1);
Serial.println("C |");
Serial.print("| LIGHT - Avg: ");
Serial.print(avgLight, 1);
Serial.println("% |");
Serial.println("+---------------------------------------------------------+\n");
}
// --- Sliding Window Operations (Moving Average) ---
void updateSlidingWindow(float temp, float light) {
tempSlidingBuffer[slidingIndex] = temp;
lightSlidingBuffer[slidingIndex] = light;
slidingIndex = (slidingIndex + 1) % SLIDING_WINDOW_SIZE;
if (slidingIndex == 0) slidingBufferFull = true;
}
float getSlidingAverage(float* buffer) {
int count = slidingBufferFull ? SLIDING_WINDOW_SIZE : slidingIndex;
if (count == 0) return 0;
float sum = 0;
for (int i = 0; i < count; i++) {
sum += buffer[i];
}
return sum / count;
}
// --- Event Detection ---
void detectEvents(float currentTemp, float slidingAvgTemp, float threshold) {
unsigned long now = millis();
bool inCooldown = (now - lastAlertTime) < ALERT_COOLDOWN_MS;
// Event 1: High temperature threshold crossing
bool highTempAlert = currentTemp > threshold;
// Event 2: Rapid change detection (rate of change)
float rateOfChange = abs(slidingAvgTemp - lastTempAvg);
bool rapidChangeAlert = (rateOfChange > 5.0) && slidingBufferFull;
// Update LEDs based on events
if (highTempAlert) {
digitalWrite(LED_RED, HIGH);
digitalWrite(LED_GREEN, LOW);
if (!inCooldown) {
Serial.println("!! ALERT: Temperature exceeded threshold!");
Serial.print(" Current: ");
Serial.print(currentTemp, 1);
Serial.print("C > Threshold: ");
Serial.print(threshold, 1);
Serial.println("C");
lastAlertTime = now;
}
} else {
digitalWrite(LED_RED, LOW);
digitalWrite(LED_GREEN, HIGH);
}
if (rapidChangeAlert && !inCooldown) {
digitalWrite(LED_YELLOW, HIGH);
Serial.println("!! ALERT: Rapid temperature change detected!");
Serial.print(" Rate of change: ");
Serial.print(rateOfChange, 2);
Serial.println("C per sliding window");
lastAlertTime = now;
} else if (!rapidChangeAlert) {
digitalWrite(LED_YELLOW, LOW);
}
lastTempAvg = slidingAvgTemp;
}
// --- Throughput Calculation ---
void updateThroughput() {
eventsInCurrentSecond++;
totalEventsProcessed++;
unsigned long now = millis();
if (now - throughputWindowStart >= 1000) {
currentThroughput = eventsInCurrentSecond;
eventsInCurrentSecond = 0;
throughputWindowStart = now;
}
}
// --- Display Functions ---
void displayStreamStatus(float temp, float light, float slidingTemp,
float slidingLight, float threshold) {
Serial.println("------------------- STREAM STATUS -------------------");
// Current values
Serial.print("| RAW | Temp: ");
Serial.print(temp, 1);
Serial.print("C | Light: ");
Serial.print(light, 1);
Serial.println("%");
// Sliding window averages
Serial.print("| SLIDING | Temp: ");
Serial.print(slidingTemp, 1);
Serial.print("C | Light: ");
Serial.print(slidingLight, 1);
Serial.print("% (");
Serial.print(SLIDING_WINDOW_SIZE);
Serial.println("-sample window)");
// Threshold
Serial.print("| THRESHOLD: ");
Serial.print(threshold, 1);
Serial.println("C (adjust potentiometer)");
// Buffer status
Serial.print("| BUFFER: ");
Serial.print(bufferCount);
Serial.print("/");
Serial.print(BUFFER_SIZE);
Serial.print(" samples (");
Serial.print((bufferCount * 100) / BUFFER_SIZE);
Serial.println("% full)");
// Throughput
Serial.print("| THROUGHPUT: ");
Serial.print(currentThroughput, 0);
Serial.print(" events/sec | Total: ");
Serial.println(totalEventsProcessed);
Serial.println("-----------------------------------------------------\n");
}
void displayBufferVisualization() {
Serial.print("BUFFER FILL: [");
int fillBars = (bufferCount * 40) / BUFFER_SIZE;
for (int i = 0; i < 40; i++) {
Serial.print(i < fillBars ? "#" : "-");
}
Serial.print("] ");
Serial.print(bufferCount);
Serial.print("/");
Serial.println(BUFFER_SIZE);
}
// --- Sensor Reading Functions ---
float readTemperature() {
int raw = analogRead(TEMP_PIN);
// Simulate temperature range 15-85C based on NTC
float temp = map(raw, 0, 4095, 1500, 8500) / 100.0;
return temp;
}
float readLight() {
int raw = analogRead(LIGHT_PIN);
return map(raw, 0, 4095, 0, 100);
}
float readThreshold() {
int raw = analogRead(THRESHOLD_PIN);
// Map to 20-80C range
return map(raw, 0, 4095, 2000, 8000) / 100.0;
}
void loop() {
unsigned long now = millis();
// === CONTINUOUS STREAMING: Sample at fixed interval ===
if (now - lastSampleTime >= SAMPLE_INTERVAL_MS) {
lastSampleTime = now;
// Read sensor values (simulating incoming stream)
float temp = readTemperature();
float light = readLight();
float threshold = readThreshold();
// 1. Add to circular buffer (stream storage)
addToCircularBuffer(now, temp, light);
// 2. Update tumbling window aggregation
updateTumblingWindow(temp, light);
// 3. Update sliding window for moving average
updateSlidingWindow(temp, light);
// 4. Calculate sliding averages
float slidingTempAvg = getSlidingAverage(tempSlidingBuffer);
float slidingLightAvg = getSlidingAverage(lightSlidingBuffer);
// 5. Event detection and alerting
detectEvents(temp, slidingTempAvg, threshold);
// 6. Update throughput statistics
updateThroughput();
// === TUMBLING WINDOW: Emit results when window closes ===
if (now - currentWindow.windowStart >= TUMBLING_WINDOW_MS) {
emitTumblingWindowResults();
resetTumblingWindow();
}
// === DISPLAY: Update every second ===
if (now - lastDisplayTime >= DISPLAY_INTERVAL_MS) {
lastDisplayTime = now;
displayStreamStatus(temp, light, slidingTempAvg,
slidingLightAvg, threshold);
displayBufferVisualization();
}
}
}1302.1.6 Step-by-Step Instructions
1302.1.6.1 Step 1: Set Up the Simulator
- Open the Wokwi simulator embedded above (or visit wokwi.com)
- Create a new ESP32 project
- Click the diagram.json tab and paste the circuit configuration
- Replace the default code with the complete Arduino code above
1302.1.6.2 Step 2: Run and Observe Basic Streaming
- Click the Play button to start the simulation
- Open the Serial Monitor to see the stream output
- Observe the continuous data flow:
- RAW readings update every 100ms (10 Hz)
- SLIDING averages smooth the data
- BUFFER fills up over time
1302.1.6.3 Step 3: Experiment with Tumbling Windows
- Wait 5 seconds for the first tumbling window to complete
- Observe the window summary: min, max, average, sample count
- Note: Each window is non-overlapping - no sample counted twice
- Think about: Why is this useful for billing or hourly reports?
1302.1.6.4 Step 4: Adjust Threshold and Trigger Alerts
- Click and drag the potentiometer to lower the threshold
- Click the NTC sensor and increase temperature to trigger RED LED
- Observe the alert message in Serial Monitor
- Note the cooldown: Alerts don’t flood (2-second cooldown)
1302.1.6.5 Step 5: Observe Rapid Change Detection
- Quickly change the temperature sensor value (click and drag rapidly)
- Watch for YELLOW LED indicating rapid change detected
- This demonstrates: Rate-of-change anomaly detection
- Real-world analogy: Detecting equipment failure signatures
1302.1.6.6 Step 6: Understand the Circular Buffer
- Watch the BUFFER FILL visualization grow
- After ~10 seconds, buffer reaches 100% full
- New data overwrites old data - fixed memory usage
- This is critical: Embedded devices have limited RAM
1302.1.7 Understanding the Output
1302.1.8 Key Stream Processing Concepts Explained
Tumbling Windows (implemented in this lab): - Non-overlapping, fixed-duration windows (5 seconds in our lab) - Each event belongs to exactly ONE window - Results emitted when window closes - Use case: Hourly billing summaries, periodic aggregates
Sliding Windows (implemented in this lab): - Overlapping windows that “slide” with each new event - Each event may belong to MULTIPLE windows - Continuously updated averages - Use case: Moving averages, trend smoothing, real-time dashboards
Memory Trade-off: - Tumbling: O(1) memory per window (just aggregates) - Sliding: O(window_size) memory (must store all samples)
Why Circular Buffers? - Streams are infinite, memory is finite - Fixed memory allocation prevents heap fragmentation - Oldest data automatically evicted when buffer full - O(1) insert and O(1) access by offset
Implementation Pattern:
buffer[head] = newData; // Write at head
head = (head + 1) % BUFFER_SIZE; // Wrap aroundReal-World Analogy: Like a security camera that records over the oldest footage - you always have the last N hours, never running out of storage.
Threshold-Based Detection: - Simple: value > threshold triggers alert - Fast: O(1) computation per event - Limitation: Doesn’t detect trends, only instantaneous values
Rate-of-Change Detection: - Compares current average to previous average - Detects rapid changes even within “normal” range - Example: Temperature rising 10C/minute (could indicate fire)
Alert Cooldown (Backpressure): - Prevents alert flooding during sustained anomalies - Real systems use exponential backoff - Balance: Too short = alert fatigue, Too long = missed events
Why Track Throughput? - Detect processing bottlenecks before they cause data loss - Capacity planning for production deployments - Alerting when throughput drops (sensor failure?)
Metrics to Monitor: - Events per second (current throughput) - Total events processed (lifetime counter) - Buffer fill percentage (approaching capacity?) - Processing latency (time from read to output)
In Production: Use Prometheus, Grafana, or cloud monitoring to track these metrics with alerting thresholds.
1302.1.9 Challenge Exercises
Modify the code to implement session windows that group events by activity periods:
- Start a new session when first event arrives after 3+ seconds of inactivity
- Close session and emit results after 3 seconds of no new events
- Track: session duration, event count, min/max/avg values
Hint: Track lastEventTime and check for gaps exceeding SESSION_GAP_MS
Add support for simulated “late arriving” data:
- Add a button that injects an event with a timestamp 10 seconds in the past
- Determine which tumbling window this event belongs to
- If window already closed, route to “late data” output
- Display count of late events separately
Hint: Compare event timestamp to currentWindow.windowStart
Extend event detection to correlate multiple sensors:
- Alert when BOTH temperature is high AND light is low (equipment overheating in darkness)
- Track how often both conditions occur simultaneously
- Implement a “compound event” counter
Hint: Add correlatedAlertCount and check both conditions in detectEvents()
Replace the fixed potentiometer threshold with an adaptive threshold:
- Calculate the moving standard deviation of temperature
- Set threshold = sliding_average + (2 * standard_deviation)
- This creates an adaptive anomaly boundary
- Compare alert rates between fixed and adaptive thresholds
Hint: Track sum of squares to calculate variance: variance = (sumSq/n) - (sum/n)^2
1302.1.10 Common Issues and Solutions
| Issue | Cause | Solution |
|---|---|---|
| No serial output | Baud rate mismatch | Ensure both code and monitor use 115200 |
| LEDs not lighting | Wrong pin connection | Check diagram.json connections match code |
| Buffer always 100% | Expected behavior | Circular buffer maintains fixed size by design |
| No tumbling window output | Not enough time | Wait full 5 seconds for first window |
| Alerts firing constantly | Threshold too low | Adjust potentiometer higher |
1302.1.11 Real-World Applications
This lab demonstrates patterns used in production stream processing:
| Lab Feature | Production Equivalent |
|---|---|
| 10 Hz sampling | Kafka consumer polling |
| Circular buffer | Kafka topic partitions with retention |
| Tumbling windows | Flink/Spark hourly aggregations |
| Sliding windows | Real-time moving averages in Grafana |
| Threshold alerts | PagerDuty integration for anomalies |
| Throughput stats | Prometheus metrics for pipeline health |
1302.2 What’s Next
Continue to Hands-On Lab: Advanced CEP for a 60-minute advanced lab covering pattern matching, complex event processing (CEP), session windows, and statistical aggregation.