13  Hands-On Lab: Advanced CEP

Chapter Topic
Stream Processing Overview of batch vs. stream processing for IoT
Fundamentals Windowing strategies, watermarks, and event-time semantics
Architectures Kafka, Flink, and Spark Streaming comparison
Pipelines End-to-end ingestion, processing, and output design
Challenges Late data, exactly-once semantics, and backpressure
Pitfalls Common mistakes and production worked examples
Basic Lab ESP32 circular buffers, windows, and event detection
Advanced Lab CEP, pattern matching, and anomaly detection on ESP32
Game & Summary Interactive review game and module summary
Interoperability Four levels of IoT interoperability
Interop Fundamentals Technical, syntactic, semantic, and organizational layers
Interop Standards SenML, JSON-LD, W3C WoT, and oneM2M
Integration Patterns Protocol adapters, gateways, and ontology mapping

Learning Objectives

After completing this lab, you will be able to:

  • Implement pattern matching algorithms for detecting temporal event sequences on ESP32
  • Build Complex Event Processing (CEP) rules with multi-condition detection and temporal constraints
  • Design session windows that group events by activity periods with configurable gap timeouts
  • Apply z-score anomaly detection to streaming sensor data for statistical outlier identification
  • Create multi-condition stream filters that combine threshold, range, and cross-sensor logic
  • Detect temporal patterns like “A followed by B within N seconds”

This advanced lab gives you practice building real-time data systems that handle multiple sensor streams simultaneously. Think of moving from cooking a single dish to running an entire restaurant kitchen – you will manage timing, handle errors, and process IoT data as fast as it arrives.

In 60 Seconds

Complex Event Processing (CEP) detects meaningful patterns across event streams by matching sequences with temporal constraints – for example, “motion detected, then temperature rise, then light change within 5 seconds” to identify intrusion. This lab implements CEP on ESP32 with pattern matching, session windows that group events by activity periods, z-score anomaly detection, and multi-condition filtering, bridging embedded stream processing with enterprise patterns used in Apache Flink and Esper.

Key Concepts
  • Complex Event Processing (CEP): Pattern detection across multiple event streams, identifying sequences, thresholds, and temporal patterns that span multiple events
  • Event Pattern: A specific sequence, combination, or temporal arrangement of events that triggers a downstream action (e.g., “3 consecutive temperature readings above 80°C within 10 seconds”)
  • Temporal Join: Combining events from different IoT streams based on time proximity, enabling correlation of events from different sensors that occur within a time window of each other
  • CEP Rule Engine: Component evaluating incoming events against defined patterns (ESPER, Apache Flink CEP, Drools Fusion) and generating composite events when patterns match
  • Sliding Window Join: Joining two event streams where events within a configurable time window of each other are considered related (e.g., matching a sensor reading with the nearest calibration event)
  • Pattern Sequence: Ordered event pattern where events must occur in a specific temporal sequence — A then B within 5 seconds then C
  • NFA (Non-deterministic Finite Automaton): State machine model used by CEP engines to efficiently track partial pattern matches across concurrent event sequences
  • Operator State in Flink: Per-key or per-operator state maintained across checkpoints, enabling stateful CEP operations to survive failures and recover from last checkpoint

13.1 Lab: Advanced Stream Pattern Matching and Complex Event Processing

~60 min | Advanced | P10.C14.LAB02

What You’ll Build

An advanced stream processing system on ESP32 that demonstrates Complex Event Processing (CEP) concepts: pattern matching across multiple sensors, sequence detection with temporal constraints, session-based windowing, multi-level filtering, and statistical aggregation. This lab bridges embedded stream processing with enterprise CEP patterns used in Apache Flink and Esper.

13.1.1 Advanced Stream Processing Concepts

This lab implements sophisticated patterns found in production stream processing systems:

Concept Implementation Enterprise Equivalent
Pattern Matching Sequence detection (A->B->C) Flink CEP PatternStream
Session Windows Activity-based grouping Kafka Streams sessionWindow
Multi-Condition Filters Compound boolean expressions SQL WHERE clauses
Stream Joins Correlating multiple streams Flink DataStream.join()
Statistical Aggregation Mean, variance, min, max Spark aggregateByKey
Temporal Constraints “Within N seconds” rules Esper pattern guards
Try It: Temporal Pattern Matching Explorer

Adjust the event arrival times and temporal window to see whether a three-event pattern (A then B then C) matches within the allowed duration.

Z-score anomaly detection quantifies how many standard deviations an observation deviates from the mean. Calculate outliers:

Z-Score Formula: \(z = \frac{x - \mu}{\sigma}\) where \(x\) = observed value, \(\mu\) = mean, \(\sigma\) = standard deviation.

Worked example (temperature sensor stream with historical baseline):

  • Historical mean temperature: \(\mu = 22.5°C\)
  • Historical standard deviation: \(\sigma = 1.2°C\)
  • New reading: \(x = 27.8°C\)
  • Z-score: \(z = \frac{27.8 - 22.5}{1.2} = \frac{5.3}{1.2} = 4.42\)

Threshold: \(|z| > 2.5\) = anomaly (as used in the lab code). This reading has \(z = 4.42 > 2.5\), flagged as statistical anomaly. Under the normal distribution, a 4.42\(\sigma\) deviation corresponds to a one-tailed probability of approximately \(5 \times 10^{-6}\), or roughly 1 in 200,000 – a high-confidence alert indicating likely sensor failure or genuine environmental anomaly.

13.1.2 Wokwi Simulator

Use the embedded simulator below to build your advanced stream processing system:

13.1.3 Circuit Setup

Connect multiple sensors and outputs to the ESP32 for comprehensive stream analysis:

Component ESP32 Pin Purpose
Temperature Sensor (NTC) GPIO 34 Primary data stream
Light Sensor (LDR) GPIO 35 Secondary data stream
Motion Sensor (PIR) GPIO 33 Event trigger stream
Potentiometer GPIO 32 Configurable threshold
Red LED GPIO 18 Pattern match alert
Yellow LED GPIO 19 Session active indicator
Green LED GPIO 21 Filter pass indicator
Blue LED GPIO 22 Anomaly detection

Add this diagram.json configuration in Wokwi:

{
  "version": 1,
  "author": "IoT Class - Advanced Stream Processing Lab",
  "editor": "wokwi",
  "parts": [
    { "type": "wokwi-esp32-devkit-v1", "id": "esp", "top": 0, "left": 0 },
    { "type": "wokwi-ntc-temperature-sensor", "id": "temp1", "top": -120, "left": 80 },
    { "type": "wokwi-photoresistor-sensor", "id": "ldr1", "top": -120, "left": 180 },
    { "type": "wokwi-pir-motion-sensor", "id": "pir1", "top": -120, "left": 280 },
    { "type": "wokwi-potentiometer", "id": "pot1", "top": -120, "left": 380 },
    { "type": "wokwi-led", "id": "led_red", "top": 180, "left": 80, "attrs": { "color": "red" } },
    { "type": "wokwi-led", "id": "led_yellow", "top": 180, "left": 130, "attrs": { "color": "yellow" } },
    { "type": "wokwi-led", "id": "led_green", "top": 180, "left": 180, "attrs": { "color": "green" } },
    { "type": "wokwi-led", "id": "led_blue", "top": 180, "left": 230, "attrs": { "color": "blue" } },
    { "type": "wokwi-resistor", "id": "r1", "top": 230, "left": 80, "attrs": { "value": "220" } },
    { "type": "wokwi-resistor", "id": "r2", "top": 230, "left": 130, "attrs": { "value": "220" } },
    { "type": "wokwi-resistor", "id": "r3", "top": 230, "left": 180, "attrs": { "value": "220" } },
    { "type": "wokwi-resistor", "id": "r4", "top": 230, "left": 230, "attrs": { "value": "220" } }
  ],
  "connections": [
    ["esp:GND.1", "temp1:GND", "black", ["h0"]],
    ["esp:3V3", "temp1:VCC", "red", ["h0"]],
    ["esp:34", "temp1:OUT", "green", ["h0"]],
    ["esp:GND.1", "ldr1:GND", "black", ["h0"]],
    ["esp:3V3", "ldr1:VCC", "red", ["h0"]],
    ["esp:35", "ldr1:OUT", "orange", ["h0"]],
    ["esp:GND.1", "pir1:GND", "black", ["h0"]],
    ["esp:3V3", "pir1:VCC", "red", ["h0"]],
    ["esp:33", "pir1:OUT", "purple", ["h0"]],
    ["esp:GND.1", "pot1:GND", "black", ["h0"]],
    ["esp:3V3", "pot1:VCC", "red", ["h0"]],
    ["esp:32", "pot1:SIG", "blue", ["h0"]],
    ["esp:18", "led_red:A", "red", ["h0"]],
    ["led_red:C", "r1:1", "black", ["h0"]],
    ["r1:2", "esp:GND.2", "black", ["h0"]],
    ["esp:19", "led_yellow:A", "yellow", ["h0"]],
    ["led_yellow:C", "r2:1", "black", ["h0"]],
    ["r2:2", "esp:GND.2", "black", ["h0"]],
    ["esp:21", "led_green:A", "green", ["h0"]],
    ["led_green:C", "r3:1", "black", ["h0"]],
    ["r3:2", "esp:GND.2", "black", ["h0"]],
    ["esp:22", "led_blue:A", "blue", ["h0"]],
    ["led_blue:C", "r4:1", "black", ["h0"]],
    ["r4:2", "esp:GND.2", "black", ["h0"]]
  ]
}

13.1.4 Complete Arduino Code

Copy this code into the Wokwi editor:

// ============================================================================
// ADVANCED STREAM PROCESSING LAB: Pattern Matching & Complex Event Processing
// ============================================================================
// Demonstrates: Pattern Detection, Session Windows, Stream Filtering,
//               Complex Event Processing (CEP), Statistical Aggregation
// ============================================================================

#include <Arduino.h>
#include <math.h>

// ====================== PIN DEFINITIONS ======================
const int TEMP_PIN = 34;        // Temperature sensor (NTC)
const int LIGHT_PIN = 35;       // Light sensor (LDR)
const int MOTION_PIN = 33;      // Motion sensor (PIR)
const int THRESHOLD_PIN = 32;   // Potentiometer for threshold

const int LED_RED = 18;         // Pattern match alert
const int LED_YELLOW = 19;      // Session active indicator
const int LED_GREEN = 21;       // Filter pass indicator
const int LED_BLUE = 22;        // Anomaly detection

// ====================== STREAM PARAMETERS ======================
const int SAMPLE_INTERVAL_MS = 100;         // 10 Hz sampling rate
const int SESSION_TIMEOUT_MS = 3000;        // 3 seconds of inactivity closes session
const int PATTERN_WINDOW_MS = 10000;        // 10-second window for pattern matching
const int AGGREGATION_WINDOW_MS = 5000;     // 5-second aggregation window

// ====================== BUFFER SIZES ======================
const int EVENT_BUFFER_SIZE = 50;           // Recent events for pattern matching
const int STREAM_BUFFER_SIZE = 100;         // Circular buffer for raw data
const int PATTERN_MAX_LENGTH = 5;           // Maximum pattern sequence length

// ====================== EVENT TYPES ======================
enum EventType {
  EVT_NONE = 0,
  EVT_TEMP_HIGH = 1,          // Temperature exceeds threshold
  EVT_TEMP_LOW = 2,           // Temperature below threshold
  EVT_TEMP_RISING = 3,        // Temperature increasing rapidly
  EVT_TEMP_FALLING = 4,       // Temperature decreasing rapidly
  EVT_LIGHT_HIGH = 5,         // Light level high
  EVT_LIGHT_LOW = 6,          // Light level low (darkness)
  EVT_MOTION_START = 7,       // Motion detected (rising edge)
  EVT_MOTION_END = 8,         // Motion ended (falling edge)
  EVT_COMPOUND_ALERT = 9      // Multiple conditions met
};

// ====================== DATA STRUCTURES ======================

// Single sensor reading with timestamp
struct SensorReading {
  unsigned long timestamp;
  float temperature;
  float light;
  bool motion;
};

// Event record for pattern matching
struct Event {
  unsigned long timestamp;
  EventType type;
  float value;                // Associated value (temp, light, etc.)
};

// Session window state
struct SessionWindow {
  unsigned long sessionStart;
  unsigned long lastEventTime;
  int eventCount;
  float tempSum;
  float lightSum;
  float tempMin;
  float tempMax;
  bool isActive;
};

// Statistical aggregation state
struct StreamStatistics {
  unsigned long windowStart;
  int count;
  float sum;
  float sumSquares;           // For variance calculation
  float min;
  float max;
};

// Pattern definition for CEP
struct Pattern {
  EventType sequence[PATTERN_MAX_LENGTH];
  int length;
  unsigned long maxDuration;  // Maximum time for pattern to complete
  const char* name;
};

// ====================== GLOBAL STATE ======================

// Circular buffer for raw sensor readings
SensorReading streamBuffer[STREAM_BUFFER_SIZE];
int streamHead = 0;
int streamCount = 0;

// Event buffer for pattern matching
Event eventBuffer[EVENT_BUFFER_SIZE];
int eventHead = 0;
int eventCount = 0;

// Session window
SessionWindow currentSession;

// Statistics for temperature and light streams
StreamStatistics tempStats;
StreamStatistics lightStats;

// Previous values for edge detection
float prevTemp = 0;
float prevLight = 0;
bool prevMotion = false;
float prevTempAvg = 0;

// Pattern matching state
int patternsMatched = 0;
unsigned long lastPatternMatchTime = 0;

// Filter state
int filterPassCount = 0;
int filterFailCount = 0;

// Throughput tracking
unsigned long totalEventsProcessed = 0;
unsigned long throughputWindowStart = 0;
int eventsPerSecond = 0;
int eventsThisSecond = 0;

// Timing
unsigned long lastSampleTime = 0;
unsigned long lastDisplayTime = 0;
const int DISPLAY_INTERVAL_MS = 2000;

// ====================== PATTERN DEFINITIONS ======================
// Define patterns to detect in the event stream

// Pattern 1: Motion -> Temperature Rise -> Light Change (intruder with flashlight)
Pattern patterns[] = {
  {{EVT_MOTION_START, EVT_TEMP_RISING, EVT_LIGHT_HIGH}, 3, 5000, "Intruder-Alert"},
  {{EVT_LIGHT_LOW, EVT_MOTION_START, EVT_LIGHT_HIGH}, 3, 6000, "Room-Entry"},
  {{EVT_TEMP_HIGH, EVT_TEMP_HIGH, EVT_TEMP_HIGH}, 3, 10000, "Sustained-Heat"},
  {{EVT_MOTION_START, EVT_MOTION_END, EVT_MOTION_START}, 3, 8000, "Activity-Burst"}
};
const int NUM_PATTERNS = 4;

void setup() {
  Serial.begin(115200);
  delay(1000);

  // Configure pins
  pinMode(TEMP_PIN, INPUT);
  pinMode(LIGHT_PIN, INPUT);
  pinMode(MOTION_PIN, INPUT);
  pinMode(THRESHOLD_PIN, INPUT);
  pinMode(LED_RED, OUTPUT);
  pinMode(LED_YELLOW, OUTPUT);
  pinMode(LED_GREEN, OUTPUT);
  pinMode(LED_BLUE, OUTPUT);

  // Initialize session
  currentSession.isActive = false;
  currentSession.eventCount = 0;

  // Initialize statistics
  resetStatistics(&tempStats);
  resetStatistics(&lightStats);

  throughputWindowStart = millis();

  printHeader();
}

void printHeader() {
  Serial.println("\n");
  Serial.println("+=================================================================+");
  Serial.println("|     ADVANCED STREAM PROCESSING LAB - Complex Event Processing  |");
  Serial.println("+=================================================================+");
  Serial.println("| Pattern Matching | Session Windows | Statistical Aggregation    |");
  Serial.println("| Multi-condition Filtering | Temporal Pattern Detection          |");
  Serial.println("+-----------------------------------------------------------------+\n");
  Serial.println("LEDs: RED=Pattern Match | YELLOW=Session Active");
  Serial.println("      GREEN=Filter Pass | BLUE=Statistical Anomaly\n");
}

// ====================== STATISTICS FUNCTIONS ======================
void resetStatistics(StreamStatistics* stats) {
  stats->windowStart = millis();
  stats->count = 0;
  stats->sum = 0;
  stats->sumSquares = 0;
  stats->min = 999;
  stats->max = -999;
}

void updateStatistics(StreamStatistics* stats, float value) {
  stats->count++;
  stats->sum += value;
  stats->sumSquares += value * value;
  if (value < stats->min) stats->min = value;
  if (value > stats->max) stats->max = value;
}

float getMean(StreamStatistics* stats) {
  if (stats->count == 0) return 0;
  return stats->sum / stats->count;
}

float getVariance(StreamStatistics* stats) {
  if (stats->count < 2) return 0;
  float mean = getMean(stats);
  return (stats->sumSquares / stats->count) - (mean * mean);
}

float getStdDev(StreamStatistics* stats) {
  float var = getVariance(stats);
  if (var <= 0) return 0;  // Guard against negative variance from float rounding
  return sqrt(var);
}

// ====================== STREAM BUFFER FUNCTIONS ======================
void addToStreamBuffer(unsigned long ts, float temp, float light, bool motion) {
  streamBuffer[streamHead].timestamp = ts;
  streamBuffer[streamHead].temperature = temp;
  streamBuffer[streamHead].light = light;
  streamBuffer[streamHead].motion = motion;

  streamHead = (streamHead + 1) % STREAM_BUFFER_SIZE;
  if (streamCount < STREAM_BUFFER_SIZE) streamCount++;
}

// ====================== EVENT BUFFER FUNCTIONS ======================
void addEvent(EventType type, float value) {
  eventBuffer[eventHead].timestamp = millis();
  eventBuffer[eventHead].type = type;
  eventBuffer[eventHead].value = value;

  eventHead = (eventHead + 1) % EVENT_BUFFER_SIZE;
  if (eventCount < EVENT_BUFFER_SIZE) eventCount++;

  totalEventsProcessed++;
}

Event getEvent(int offset) {
  // offset 0 = most recent event
  int index = (eventHead - 1 - offset + EVENT_BUFFER_SIZE) % EVENT_BUFFER_SIZE;
  return eventBuffer[index];
}

// ====================== EVENT DETECTION ======================
void detectEvents(float temp, float light, bool motion, float threshold) {
  unsigned long now = millis();

  // Temperature events
  if (temp > threshold) {
    addEvent(EVT_TEMP_HIGH, temp);
  } else if (temp < threshold - 10) {
    addEvent(EVT_TEMP_LOW, temp);
  }

  // Rate of change events
  float tempChange = temp - prevTemp;
  if (tempChange > 2.0) {
    addEvent(EVT_TEMP_RISING, tempChange);
  } else if (tempChange < -2.0) {
    addEvent(EVT_TEMP_FALLING, tempChange);
  }

  // Light events
  if (light > 70) {
    addEvent(EVT_LIGHT_HIGH, light);
  } else if (light < 30) {
    addEvent(EVT_LIGHT_LOW, light);
  }

  // Motion edge detection
  if (motion && !prevMotion) {
    addEvent(EVT_MOTION_START, 1.0);
  } else if (!motion && prevMotion) {
    addEvent(EVT_MOTION_END, 0.0);
  }

  // Compound event detection
  if (temp > threshold && light < 30 && motion) {
    addEvent(EVT_COMPOUND_ALERT, temp);
  }

  prevTemp = temp;
  prevLight = light;
  prevMotion = motion;
}

// ====================== PATTERN MATCHING ======================
bool matchPattern(Pattern* pattern) {
  if (eventCount < pattern->length) return false;

  unsigned long now = millis();
  int matchIndex = 0;
  unsigned long patternStart = 0;

  // Search event buffer chronologically (oldest to newest)
  int searchCount = min(eventCount, EVENT_BUFFER_SIZE);
  for (int i = searchCount - 1; i >= 0 && matchIndex < pattern->length; i--) {
    // offset i = i-th event from most recent; iterate from oldest to newest
    Event evt = getEvent(i);

    if (evt.type == pattern->sequence[matchIndex]) {
      if (matchIndex == 0) {
        patternStart = evt.timestamp;
      }

      // Check temporal constraint
      if (evt.timestamp - patternStart > pattern->maxDuration) {
        // Pattern took too long, restart search
        matchIndex = 0;
        patternStart = 0;
        if (evt.type == pattern->sequence[0]) {
          patternStart = evt.timestamp;
          matchIndex = 1;
        }
      } else {
        matchIndex++;
      }
    }
  }

  return (matchIndex == pattern->length);
}

void checkPatterns() {
  unsigned long now = millis();

  // Debounce pattern matches
  if (now - lastPatternMatchTime < 2000) return;

  for (int p = 0; p < NUM_PATTERNS; p++) {
    if (matchPattern(&patterns[p])) {
      patternsMatched++;
      lastPatternMatchTime = now;

      Serial.println("\n!!! PATTERN MATCHED !!!");
      Serial.print("Pattern: ");
      Serial.println(patterns[p].name);
      Serial.print("Duration limit: ");
      Serial.print(patterns[p].maxDuration / 1000.0);
      Serial.println(" seconds");

      // Flash RED LED
      for (int i = 0; i < 3; i++) {
        digitalWrite(LED_RED, HIGH);
        delay(100);
        digitalWrite(LED_RED, LOW);
        delay(100);
      }
    }
  }
}

// ====================== SESSION WINDOW ======================
void updateSession(float temp, float light, bool motion) {
  unsigned long now = millis();

  // Check if we should start a new session
  if (!currentSession.isActive) {
    // Start session on any significant event
    if (motion || fabs(temp - prevTempAvg) > 3.0) {
      currentSession.isActive = true;
      currentSession.sessionStart = now;
      currentSession.lastEventTime = now;
      currentSession.eventCount = 0;
      currentSession.tempSum = 0;
      currentSession.lightSum = 0;
      currentSession.tempMin = temp;
      currentSession.tempMax = temp;

      Serial.println("\n>>> SESSION STARTED");
      digitalWrite(LED_YELLOW, HIGH);
    }
  }

  // Update active session only when significant activity occurs
  if (currentSession.isActive) {
    // Only update lastEventTime when there is actual sensor activity
    if (motion || fabs(temp - prevTempAvg) > 1.0) {
      currentSession.lastEventTime = now;
    }
    currentSession.eventCount++;
    currentSession.tempSum += temp;
    currentSession.lightSum += light;
    if (temp < currentSession.tempMin) currentSession.tempMin = temp;
    if (temp > currentSession.tempMax) currentSession.tempMax = temp;
  }

  prevTempAvg = getMean(&tempStats);
}

void closeSession() {
  if (!currentSession.isActive) return;

  unsigned long duration = currentSession.lastEventTime - currentSession.sessionStart;

  Serial.println("\n<<< SESSION CLOSED");
  Serial.print("Duration: ");
  Serial.print(duration / 1000.0);
  Serial.println(" seconds");
  Serial.print("Events: ");
  Serial.println(currentSession.eventCount);
  if (currentSession.eventCount > 0) {
    Serial.print("Avg Temp: ");
    Serial.print(currentSession.tempSum / currentSession.eventCount, 1);
    Serial.println("C");
  }
  Serial.print("Temp Range: ");
  Serial.print(currentSession.tempMin, 1);
  Serial.print("C - ");
  Serial.print(currentSession.tempMax, 1);
  Serial.println("C");

  currentSession.isActive = false;
  digitalWrite(LED_YELLOW, LOW);
}

// ====================== STREAM FILTERING ======================
bool applyFilter(float temp, float light, bool motion, float threshold) {
  // Multi-condition filter
  bool condition1 = (temp >= 20 && temp <= 40);  // Normal temp range
  bool condition2 = (light >= 10 && light <= 90);  // Normal light range
  bool condition3 = !motion || temp < threshold;  // No motion, or cool when moving

  bool passes = condition1 && condition2 && condition3;

  if (passes) {
    filterPassCount++;
    digitalWrite(LED_GREEN, HIGH);
  } else {
    filterFailCount++;
    digitalWrite(LED_GREEN, LOW);
  }

  return passes;
}

// ====================== ANOMALY DETECTION ======================
void checkAnomalies(float temp, float light) {
  float tempStdDev = getStdDev(&tempStats);
  float tempMean = getMean(&tempStats);

  // Z-score anomaly detection (requires sufficient samples for reliable statistics)
  if (tempStats.count >= 10 && tempStdDev > 0) {
    float zScore = (temp - tempMean) / tempStdDev;

    if (fabs(zScore) > 2.5) {
      digitalWrite(LED_BLUE, HIGH);
      Serial.println("\n** STATISTICAL ANOMALY DETECTED **");
      Serial.print("Z-score: ");
      Serial.print(zScore, 2);
      Serial.print(" (threshold: +/- 2.5)");
      Serial.print(" Value: ");
      Serial.print(temp, 1);
      Serial.print("C, Mean: ");
      Serial.print(tempMean, 1);
      Serial.print("C, StdDev: ");
      Serial.println(tempStdDev, 2);
    } else {
      digitalWrite(LED_BLUE, LOW);
    }
  }
}

// ====================== THROUGHPUT TRACKING ======================
void updateThroughput() {
  eventsThisSecond++;
  unsigned long now = millis();

  if (now - throughputWindowStart >= 1000) {
    eventsPerSecond = eventsThisSecond;
    eventsThisSecond = 0;
    throughputWindowStart = now;
  }
}

// ====================== DISPLAY FUNCTIONS ======================
void displayStatus(float temp, float light, bool motion, float threshold) {
  Serial.println("\n==================== STREAM STATUS ====================");

  // Current sensor values
  Serial.print("| SENSORS: Temp=");
  Serial.print(temp, 1);
  Serial.print("C  Light=");
  Serial.print(light, 0);
  Serial.print("%  Motion=");
  Serial.println(motion ? "YES" : "NO");

  // Statistics
  Serial.print("| TEMP STATS: Mean=");
  Serial.print(getMean(&tempStats), 1);
  Serial.print("C  StdDev=");
  Serial.print(getStdDev(&tempStats), 2);
  Serial.print("  Min=");
  Serial.print(tempStats.min, 1);
  Serial.print("  Max=");
  Serial.println(tempStats.max, 1);

  // Session info
  if (currentSession.isActive) {
    Serial.print("| SESSION: ACTIVE for ");
    Serial.print((millis() - currentSession.sessionStart) / 1000.0, 1);
    Serial.print("s, ");
    Serial.print(currentSession.eventCount);
    Serial.println(" events");
  } else {
    Serial.println("| SESSION: INACTIVE (waiting for trigger)");
  }

  // Pattern matching
  Serial.print("| PATTERNS MATCHED: ");
  Serial.print(patternsMatched);
  Serial.print("  |  Events in buffer: ");
  Serial.println(eventCount);

  // Filter statistics
  int totalFiltered = filterPassCount + filterFailCount;
  float passRate = totalFiltered > 0 ? (filterPassCount * 100.0 / totalFiltered) : 0;
  Serial.print("| FILTER: ");
  Serial.print(passRate, 1);
  Serial.print("% pass rate (");
  Serial.print(filterPassCount);
  Serial.print(" pass / ");
  Serial.print(filterFailCount);
  Serial.println(" fail)");

  // Throughput
  Serial.print("| THROUGHPUT: ");
  Serial.print(eventsPerSecond);
  Serial.print(" events/sec  |  Total: ");
  Serial.println(totalEventsProcessed);

  Serial.println("========================================================\n");
}

// ====================== SENSOR READING ======================
float readTemperature() {
  int raw = analogRead(TEMP_PIN);
  return map(raw, 0, 4095, 1500, 8500) / 100.0;
}

float readLight() {
  int raw = analogRead(LIGHT_PIN);
  return map(raw, 0, 4095, 0, 100);
}

bool readMotion() {
  return digitalRead(MOTION_PIN) == HIGH;
}

float readThreshold() {
  int raw = analogRead(THRESHOLD_PIN);
  return map(raw, 0, 4095, 2000, 8000) / 100.0;
}

// ====================== MAIN LOOP ======================
void loop() {
  unsigned long now = millis();

  if (now - lastSampleTime >= SAMPLE_INTERVAL_MS) {
    lastSampleTime = now;

    // Read all sensors
    float temp = readTemperature();
    float light = readLight();
    bool motion = readMotion();
    float threshold = readThreshold();

    // 1. Add to circular buffer
    addToStreamBuffer(now, temp, light, motion);

    // 2. Update running statistics
    updateStatistics(&tempStats, temp);
    updateStatistics(&lightStats, light);

    // 3. Detect discrete events
    detectEvents(temp, light, motion, threshold);

    // 4. Check for pattern matches
    checkPatterns();

    // 5. Update session window
    updateSession(temp, light, motion);

    // 6. Apply stream filter
    applyFilter(temp, light, motion, threshold);

    // 7. Check for statistical anomalies
    checkAnomalies(temp, light);

    // 8. Update throughput
    updateThroughput();

    // Reset statistics window periodically
    if (now - tempStats.windowStart > AGGREGATION_WINDOW_MS) {
      Serial.println("\n--- Aggregation Window Complete ---");
      Serial.print("Temperature: Mean=");
      Serial.print(getMean(&tempStats), 2);
      Serial.print(", StdDev=");
      Serial.print(getStdDev(&tempStats), 2);
      Serial.print(", Variance=");
      Serial.println(getVariance(&tempStats), 4);

      resetStatistics(&tempStats);
      resetStatistics(&lightStats);
    }

    // Periodic display update
    if (now - lastDisplayTime >= DISPLAY_INTERVAL_MS) {
      lastDisplayTime = now;
      displayStatus(temp, light, motion, threshold);
    }

    // Check session timeout
    if (currentSession.isActive &&
        now - currentSession.lastEventTime > SESSION_TIMEOUT_MS) {
      closeSession();
    }
  }
}

13.1.5 Key CEP Concepts Explained

13.2 Concept 1: Pattern Matching in Streams

What is Pattern Matching? Pattern matching detects specific sequences of events occurring within temporal constraints. Unlike simple threshold alerts, patterns capture complex behaviors like “motion followed by temperature rise within 5 seconds.”

Implementation in this lab:

  • Define patterns as arrays of EventType enum values
  • Each pattern has a maximum duration (temporal constraint)
  • The matchPattern() function scans the event buffer chronologically (oldest to newest) looking for the specified sequence within the time window

Real-World Applications:

  • Intrusion detection: “door open -> motion -> door close” within 30 seconds
  • Equipment failure: “vibration spike -> temperature rise -> pressure drop” within 1 minute
  • Fraud detection: “login from new IP -> large withdrawal -> logout” within 2 minutes

What are Session Windows? Unlike fixed tumbling/sliding windows, session windows are activity-based. They group events that occur close together in time, closing when activity stops for a specified gap duration.

Implementation in this lab:

  • Session starts when a significant event occurs (motion or temperature spike)
  • Session remains active while significant events continue
  • Session closes after 3 seconds of inactivity (no motion or temperature change)
  • Session statistics are emitted upon close

When to Use Session Windows:

  • User behavior analysis (web sessions, app usage)
  • Machine operation cycles (start -> run -> stop)
  • Intermittent sensor activity (motion sensors, pressure sensors)
Try It: Session Window Simulator

Adjust the session gap timeout and see how the same event stream is split into different sessions. Events closer together than the gap timeout are grouped into one session.

What is Stream Filtering? Filtering removes events that don’t meet specified criteria before they enter downstream processing. This reduces noise and focuses analysis on relevant events.

Filter in this lab:

  • Condition 1: Temperature in normal range (20-40C)
  • Condition 2: Light in normal range (10-90%)
  • Condition 3: No motion during high temperature
  • Only events passing ALL conditions proceed

Best Practices:

  • Filter early to reduce downstream processing load
  • Log filtered events for auditing
  • Use filter pass rate as a data quality metric
Try It: Multi-Condition Stream Filter

Set sensor readings and see in real time which filter conditions pass or fail. The stream filter only passes readings where ALL conditions are satisfied simultaneously.

What is Statistical Aggregation? Computing statistics (mean, variance, min, max) over streaming data requires incremental algorithms that update with each new value without storing all historical data points.

Naive sum-of-squares approach (used in this lab):

// Incremental mean and variance using running sums
count++;
sum += value;
sumSquares += value * value;
mean = sum / count;
variance = (sumSquares / count) - (mean * mean);

This approach is simple and efficient for short aggregation windows (5 seconds in this lab). For longer windows or values with large magnitudes, consider Welford’s online algorithm which provides better numerical stability:

// Welford's algorithm (more numerically stable)
count++;
delta = value - mean;
mean += delta / count;
M2 += delta * (value - mean);
variance = M2 / (count - 1);

Benefits of incremental aggregation:

  • O(1) memory regardless of stream length
  • Supports windowed aggregation with periodic reset
  • Computes mean, variance, min, and max in a single pass

13.2.1 Z-Score Anomaly Detection Calculator

Use this interactive calculator to explore how z-score anomaly detection works with different sensor readings and thresholds.

13.2.2 Challenge Exercises

Challenge 1: Add More Patterns

Define and implement additional patterns:

  1. Equipment-Startup: EVT_MOTION_START -> EVT_LIGHT_HIGH -> EVT_TEMP_RISING within 8 seconds
  2. Night-Activity: EVT_LIGHT_LOW -> EVT_MOTION_START -> EVT_LIGHT_LOW within 10 seconds

Add these to the patterns[] array and verify detection.

Challenge 2: Implement Hopping Windows

Add hopping window support alongside session windows:

  • Window size: 10 seconds
  • Hop interval: 2 seconds (5 overlapping windows)
  • Track: average temperature per window
  • Display: trend comparison across overlapping windows
Challenge 3: Add Pattern Negation

Extend pattern matching to support “NOT” conditions:

  • Pattern: EVT_MOTION_START -> NOT(EVT_TEMP_HIGH) within 5 seconds -> EVT_MOTION_END
  • This detects motion periods without temperature spikes (normal activity vs alert-worthy activity)
Challenge 4: Implement Stream Joins

Add a join operation that correlates temperature and motion events:

  • When motion starts, capture temperature at that moment
  • When motion ends, capture temperature again
  • Calculate temperature delta during motion period
  • Alert if temperature rises more than 5C during any motion session

13.2.3 Expected Outcomes

After completing this lab, you should observe:

Feature Expected Behavior
Pattern Matching RED LED flashes when event sequence matches defined patterns
Session Windows YELLOW LED on during activity, session summary printed on timeout
Stream Filtering GREEN LED indicates readings pass all filter conditions
Statistical Anomaly BLUE LED lights when values exceed 2.5 standard deviations
Event Detection Console shows real-time event detection as sensor values change
Aggregation Stats 5-second window statistics with count, avg, stddev, min, max
Try It: Buffer Capacity and Throughput Calculator

Explore how sampling rate, buffer size, and processing time interact. This calculator shows whether the ESP32 can keep up with the incoming data stream or if events will be lost.

13.2.4 Real-World Applications

This lab demonstrates patterns used in production stream processing systems:

Lab Feature Production Equivalent Real-World Use Case
Pattern matching Apache Flink CEP Fraud detection in banking
Session windows Kafka Streams sessions User behavior analytics
Stream filtering WHERE clauses in ksqlDB Data quality enforcement
Statistical aggregation Spark Streaming aggregates Real-time dashboards
Anomaly detection Z-score in Grafana alerts Infrastructure monitoring
Event correlation Complex Event Processing Security incident detection

Key Takeaway

Complex Event Processing (CEP) goes beyond simple threshold alerts by detecting meaningful sequences of events with temporal constraints. The pattern “motion, then temperature rise, then light change within 5 seconds” captures a real-world scenario (intrusion) that no single sensor reading could detect. This lab demonstrates that even on ESP32, you can implement pattern matching, session windows, z-score anomaly detection, and multi-condition filtering – the same patterns used in enterprise Flink CEP, Esper, and Kafka Streams applications.

The Sensor Squad becomes DETECTIVES – spotting sneaky patterns in data!

The Sensor Squad has leveled up! Instead of just watching for high temperatures, they now look for PATTERNS – like detectives solving a mystery!

Mystery 1: The Intruder Pattern Max the Microcontroller programs a special rule: “If I see MOTION, then TEMPERATURE RISE, then LIGHT CHANGE – all within 5 seconds – something suspicious is happening!”

Sammy the Sensor asks: “Why not just alert on motion?”

Lila the LED explains: “Because the office cat triggers motion sensors all the time! But a cat does not cause temperature to rise AND lights to turn on. Only a PERSON does all three. That is why we look for the PATTERN, not just one event!”

Mystery 2: The Activity Session Bella the Battery watches a factory machine. It starts working (session begins!), runs for a while, then stops (session ends!). “I track everything that happens during each work session – how long it ran, the average temperature, and whether anything was unusual.”

“It is like chapters in a book,” says Bella. “Each session is one complete story. When the machine takes a 3-second break, the chapter ends and I write a summary!”

Mystery 3: The Statistical Oddball Max keeps track of average temperature and how much it normally varies. When a reading is WAY outside normal (more than 2.5 standard deviations), the BLUE LED lights up!

“It is like noticing one kid in class who is 7 feet tall,” explains Sammy. “Most kids are between 4 and 5 feet. One kid being 7 feet is so unusual, something must be special about them!”

The Sensor Squad’s Detective Toolkit:

  • Pattern matching = “Did events happen in THIS specific order?”
  • Session windows = “What happened during this activity period?”
  • Statistical anomaly = “Is this reading WEIRD compared to normal?”
  • Multi-condition filtering = “Does this event pass ALL my tests?”

13.2.5 Try This at Home!

Be a pattern detective! Sit by a window and watch for this pattern: “Car passes, then a person walks by, then another car passes – all within 2 minutes.” How many times does this pattern happen in 30 minutes? Now try a different pattern. This is exactly what CEP does – looking for specific sequences in a stream of events!

An industrial safety system uses ESP32 with the CEP lab code to detect the “Intruder-Alert” pattern: EVT_MOTION_START -> EVT_TEMP_RISING -> EVT_LIGHT_HIGH within 5 seconds. Calculate end-to-end latency from first event to pattern match alert.

Pattern definition (from lab code):

Pattern patterns[] = {
  {{EVT_MOTION_START, EVT_TEMP_RISING, EVT_LIGHT_HIGH}, 3, 5000, "Intruder-Alert"}
};

Timing breakdown:

Event 1: Motion detected

  • Physical event: t=0ms (PIR sensor triggers)
  • GPIO read latency: ~10us (digitalRead in polling loop)
  • Edge detection comparison: ~1us
  • Event buffer write: ~5us (addEvent() function)
  • Total: t~0.02ms (motion event buffered)

Event 2: Temperature rising

  • Physical event: t=1500ms (body heat detected by NTC sensor)
  • ADC read latency: ~100us (analogRead() with 12-bit SAR ADC on ESP32)
  • Temperature calculation: ~10us (map() and division)
  • Rate-of-change comparison: ~1us
  • Event buffer write: ~5us
  • Total: t~1500.1ms (temp event buffered)

Event 3: Light change

  • Physical event: t=3000ms (flashlight turns on)
  • ADC read latency: ~100us (LDR photoresistor)
  • Threshold comparison: ~1us
  • Event buffer write: ~5us
  • Total: t~3000.1ms (light event buffered)

Pattern matching execution:

  • checkPatterns() called every 100ms (from main loop at SAMPLE_INTERVAL_MS)
  • Last call before Event 3: t=3000ms
  • Next call after Event 3: t=3100ms
  • Pattern matching scan: ~50us (iterates through up to 50 events with simple integer comparisons – ESP32 at 240 MHz executes ~240 instructions per microsecond)
  • Alert generation: ~2ms (Serial.println + LED flash setup)
  • Total: ~3102ms from first event

Latency calculation:

  • Physical event sequence duration: 3000ms (motion -> temp -> light)
  • Worst-case detection delay: ~102ms (polling interval + matching + output)
  • Total end-to-end latency: ~3,102ms (well within 5-second pattern window)

Latency optimization opportunities:

  1. Reduce polling interval: Change SAMPLE_INTERVAL_MS from 100ms to 10ms – reduces worst-case detection delay to ~12ms
    • Trade-off: 10x more CPU active time, reduces battery life by ~8%
  2. Event-driven pattern checking: Call checkPatterns() immediately after addEvent() instead of periodic polling
    • Trade-off: Pattern check runs on every event, but each check takes only ~50us so overhead is negligible
  3. Hardware interrupts for motion: Use attachInterrupt() on the PIR pin for zero-latency motion detection
    • Trade-off: ISR must be fast; queue the event and match in loop()

Recommended optimization for production: Use event-driven pattern checking (call immediately after addEvent()). Expected latency: Physical sequence (3000ms) + detection (<1ms) = ~3001ms. This maintains the 5-second pattern window requirement with 99.96% margin.

Pattern Complexity ESP32 Performance Memory Usage Best Use Case
Simple sequence (2-3 events) <1ms match time 2 KB state Real-time safety (e.g., motion->door->alarm)
Moderate sequence (4-5 events) 1-5ms match time 5 KB state Equipment monitoring (e.g., startup sequence validation)
Complex temporal (5+ events with negation) 5-20ms match time 10-20 KB state Behavior analysis (e.g., user activity patterns)
Statistical patterns (count, duration) 1-10ms match time 5 KB state (rolling stats) Anomaly detection (e.g., “10+ events in 1 minute”)
Multi-stream correlation 10-50ms match time 20-50 KB state (multiple buffers) Cross-sensor fusion (e.g., temp + vibration + pressure)

Performance constraints (ESP32 at 240 MHz):

  • Event buffer size: 50-100 events max (limited by 520 KB SRAM)
  • Pattern match frequency: 10-1000 Hz depending on CPU budget
  • State retention: 5-60 seconds depending on buffer size and event rate

Selection heuristic:

  1. Safety-critical (<10ms latency)? – Simple 2-3 event patterns only, event-driven matching
  2. Moderate latency (10-100ms acceptable)? – Up to 5-event patterns with 100ms polling
  3. Complex behavior analysis (>1s latency acceptable)? – Use gateway-tier CEP (Raspberry Pi, not ESP32)
  4. Need statistical patterns (count/duration)? – Use session windows with periodic aggregation
  5. Multi-sensor correlation? – Offload to fog tier (ESP32 does edge filtering only)
Common Mistake: Not Handling Pattern Timeout Edge Cases in CEP

Developers implement Complex Event Processing patterns but forget to handle partial pattern matches that never complete – leaving state in memory indefinitely and causing memory leaks.

What goes wrong: The lab code defines the “Intruder-Alert” pattern: EVT_MOTION_START -> EVT_TEMP_RISING -> EVT_LIGHT_HIGH within 5 seconds. A security guard walks past the sensor triggering EVT_MOTION_START at t=0s. The temperature sensor is faulty and never triggers EVT_TEMP_RISING. The partial pattern (only Event 1 matched) stays in memory forever, consuming buffer space.

Why it fails: The matchPattern() function in the lab code searches the event buffer but does not explicitly expire partial matches that exceed the maxDuration window. While the circular buffer eventually overwrites old events, the matching algorithm still scans them unnecessarily.

The correct approach (production CEP system):

1. Add pattern expiration logic:

struct PartialMatch {
    unsigned long startTime;
    int matchedCount;
    EventType sequence[PATTERN_MAX_LENGTH];
};

std::vector<PartialMatch> activeMatches;  // Track partial pattern matches

void checkPatternsWithExpiration() {
    unsigned long now = millis();

    // Expire old partial matches
    activeMatches.erase(
        std::remove_if(activeMatches.begin(), activeMatches.end(),
            [now](PartialMatch& m) {
                return (now - m.startTime) > PATTERN_TIMEOUT_MS;
            }),
        activeMatches.end()
    );

    // Check new events against active partial matches
    // ... pattern matching logic ...
}

2. Limit active partial matches:

const int MAX_ACTIVE_PATTERNS = 10;  // Prevent memory exhaustion

void addPartialMatch(PartialMatch newMatch) {
    if (activeMatches.size() >= MAX_ACTIVE_PATTERNS) {
        // Evict oldest partial match
        auto oldest = std::min_element(activeMatches.begin(), activeMatches.end(),
            [](PartialMatch& a, PartialMatch& b) {
                return a.startTime < b.startTime;
            });
        activeMatches.erase(oldest);
    }
    activeMatches.push_back(newMatch);
}

3. Monitor partial match expiration rate:

int partialMatchesExpired = 0;

void trackExpiredPatterns(PartialMatch& expired) {
    partialMatchesExpired++;
    if (partialMatchesExpired % 100 == 0) {
        Serial.print("WARNING: ");
        Serial.print(partialMatchesExpired);
        Serial.println(" partial patterns expired without completion");
        // This may indicate faulty sensors or incorrect pattern definitions
    }
}

4. Implement pattern confidence scoring:

struct PatternMatch {
    Pattern* pattern;
    float confidence;  // 0.0-1.0 based on event timing
    unsigned long completionTime;
};

float calculateConfidence(Pattern* pattern, unsigned long actualDuration) {
    // Patterns completed quickly get higher confidence
    float timeRatio = (float)actualDuration / pattern->maxDuration;
    return 1.0 - (timeRatio * 0.5);  // 100% if instant, 50% if at limit
}

5. Handle partial matches in graceful degradation:

// If too many partial matches accumulate, reduce pattern complexity
if (activeMatches.size() > 50) {
    Serial.println("WARN: Pattern matching overloaded, using simplified patterns");
    // Temporarily disable complex 5-event patterns, keep only critical 2-event patterns
    enableSimplifiedPatterns = true;
}

Real consequence: A smart building deployed CEP code for intrusion detection across 100 sensors. Over 3 months, partial pattern matches from false triggers (employees triggering motion without completing the full pattern) accumulated in memory. The ESP32’s event buffer fragmented, pattern matching became progressively slower, and after 90 days the system crashed with a watchdog timer reset. Post-mortem analysis revealed 15,000+ partial pattern matches in memory.

The fix: implemented pattern expiration (discard partial matches older than 2x maxDuration), limited active partial matches to 50, and added monitoring for expiration rate. After deployment, the system maintained stable <5% memory usage for partial patterns indefinitely. The lesson: CEP systems MUST explicitly manage partial pattern state with timeouts and capacity limits, or memory leaks from incomplete patterns will eventually crash the system.

13.3 Concept Relationships

This advanced lab implements production patterns at embedded scale:

The lab proves that production streaming concepts work at any scale.

13.4 See Also

Pattern Matching References:

Embedded Streaming:

Common Pitfalls

Overly general patterns (e.g., “any two consecutive high readings”) produce thousands of false positive alerts in production IoT systems. Always validate CEP patterns against historical data to measure precision and recall before deployment. Use specific conditions (value thresholds, device type filters, time constraints) to achieve >95% precision.

CEP engines track in-progress pattern matches in operator state. Under system restart without proper checkpointing, all partial matches are lost — a 4-event pattern that was 3 events complete is discarded, potentially missing a critical composite event. Always configure CEP operator state checkpointing intervals compatible with acceptable event detection latency after recovery.

CEP patterns without time constraints accumulate partial matches indefinitely — a pattern “A then B” will remember every A event and check every future B event, consuming unbounded memory. Always specify maximum time windows for pattern completion (A then B within 30 seconds) to enable efficient state cleanup.

Real IoT event streams contain late arrivals, duplicates, and gaps from sensor failures. CEP patterns that work correctly on clean ordered test data often produce incorrect results on real IoT streams with out-of-order events. Always test CEP rules against replay of real captured IoT data before production deployment.

13.6 What’s Next

If you want to… Read this
Study the architectures behind CEP systems Stream Processing Architectures
Practice with the basic streaming lab first Lab: Stream Processing
Build complete IoT streaming pipelines Building IoT Streaming Pipelines
Review pitfalls to avoid in production CEP Common Pitfalls and Worked Examples

Continue to Interactive Game and Summary for the Data Stream Challenge game and chapter summary with key takeaways.