1301  Hands-On Lab: Advanced CEP

1301.1 Lab: Advanced Stream Pattern Matching and Complex Event Processing

~60 min | Advanced | P10.C14.LAB02

1301.1.1 Learning Objectives

By completing this advanced hands-on lab, you will be able to:

  • Implement pattern matching algorithms for detecting event sequences in streams
  • Build complex event processing (CEP) rules for multi-condition detection
  • Design session windows that group events by activity periods
  • Create stream filters with multiple configurable conditions
  • Implement comprehensive stream aggregation (count, sum, average, variance)
  • Detect temporal patterns like “A followed by B within N seconds”
NoteWhat You’ll Build

An advanced stream processing system on ESP32 that demonstrates Complex Event Processing (CEP) concepts: pattern matching across multiple sensors, sequence detection with temporal constraints, session-based windowing, multi-level filtering, and statistical aggregation. This lab bridges embedded stream processing with enterprise CEP patterns used in Apache Flink and Esper.

1301.1.2 Advanced Stream Processing Concepts

This lab implements sophisticated patterns found in production stream processing systems:

Concept Implementation Enterprise Equivalent
Pattern Matching Sequence detection (A->B->C) Flink CEP PatternStream
Session Windows Activity-based grouping Kafka Streams sessionWindow
Multi-Condition Filters Compound boolean expressions SQL WHERE clauses
Stream Joins Correlating multiple streams Flink DataStream.join()
Statistical Aggregation Mean, variance, min, max Spark aggregateByKey
Temporal Constraints “Within N seconds” rules Esper pattern guards

1301.1.3 Wokwi Simulator

Use the embedded simulator below to build your advanced stream processing system:

1301.1.4 Circuit Setup

Connect multiple sensors and outputs to the ESP32 for comprehensive stream analysis:

Component ESP32 Pin Purpose
Temperature Sensor (NTC) GPIO 34 Primary data stream
Light Sensor (LDR) GPIO 35 Secondary data stream
Motion Sensor (PIR) GPIO 33 Event trigger stream
Potentiometer GPIO 32 Configurable threshold
Red LED GPIO 18 Pattern match alert
Yellow LED GPIO 19 Session active indicator
Green LED GPIO 21 Filter pass indicator
Blue LED GPIO 22 Anomaly detection

Add this diagram.json configuration in Wokwi:

{
  "version": 1,
  "author": "IoT Class - Advanced Stream Processing Lab",
  "editor": "wokwi",
  "parts": [
    { "type": "wokwi-esp32-devkit-v1", "id": "esp", "top": 0, "left": 0 },
    { "type": "wokwi-ntc-temperature-sensor", "id": "temp1", "top": -120, "left": 80 },
    { "type": "wokwi-photoresistor-sensor", "id": "ldr1", "top": -120, "left": 180 },
    { "type": "wokwi-pir-motion-sensor", "id": "pir1", "top": -120, "left": 280 },
    { "type": "wokwi-potentiometer", "id": "pot1", "top": -120, "left": 380 },
    { "type": "wokwi-led", "id": "led_red", "top": 180, "left": 80, "attrs": { "color": "red" } },
    { "type": "wokwi-led", "id": "led_yellow", "top": 180, "left": 130, "attrs": { "color": "yellow" } },
    { "type": "wokwi-led", "id": "led_green", "top": 180, "left": 180, "attrs": { "color": "green" } },
    { "type": "wokwi-led", "id": "led_blue", "top": 180, "left": 230, "attrs": { "color": "blue" } },
    { "type": "wokwi-resistor", "id": "r1", "top": 230, "left": 80, "attrs": { "value": "220" } },
    { "type": "wokwi-resistor", "id": "r2", "top": 230, "left": 130, "attrs": { "value": "220" } },
    { "type": "wokwi-resistor", "id": "r3", "top": 230, "left": 180, "attrs": { "value": "220" } },
    { "type": "wokwi-resistor", "id": "r4", "top": 230, "left": 230, "attrs": { "value": "220" } }
  ],
  "connections": [
    ["esp:GND.1", "temp1:GND", "black", ["h0"]],
    ["esp:3V3", "temp1:VCC", "red", ["h0"]],
    ["esp:34", "temp1:OUT", "green", ["h0"]],
    ["esp:GND.1", "ldr1:GND", "black", ["h0"]],
    ["esp:3V3", "ldr1:VCC", "red", ["h0"]],
    ["esp:35", "ldr1:OUT", "orange", ["h0"]],
    ["esp:GND.1", "pir1:GND", "black", ["h0"]],
    ["esp:3V3", "pir1:VCC", "red", ["h0"]],
    ["esp:33", "pir1:OUT", "purple", ["h0"]],
    ["esp:GND.1", "pot1:GND", "black", ["h0"]],
    ["esp:3V3", "pot1:VCC", "red", ["h0"]],
    ["esp:32", "pot1:SIG", "blue", ["h0"]],
    ["esp:18", "led_red:A", "red", ["h0"]],
    ["led_red:C", "r1:1", "black", ["h0"]],
    ["r1:2", "esp:GND.2", "black", ["h0"]],
    ["esp:19", "led_yellow:A", "yellow", ["h0"]],
    ["led_yellow:C", "r2:1", "black", ["h0"]],
    ["r2:2", "esp:GND.2", "black", ["h0"]],
    ["esp:21", "led_green:A", "green", ["h0"]],
    ["led_green:C", "r3:1", "black", ["h0"]],
    ["r3:2", "esp:GND.2", "black", ["h0"]],
    ["esp:22", "led_blue:A", "blue", ["h0"]],
    ["led_blue:C", "r4:1", "black", ["h0"]],
    ["r4:2", "esp:GND.2", "black", ["h0"]]
  ]
}

1301.1.5 Complete Arduino Code

Copy this code into the Wokwi editor:

// ============================================================================
// ADVANCED STREAM PROCESSING LAB: Pattern Matching & Complex Event Processing
// ============================================================================
// Demonstrates: Pattern Detection, Session Windows, Stream Filtering,
//               Complex Event Processing (CEP), Statistical Aggregation
// ============================================================================

#include <Arduino.h>
#include <math.h>

// ====================== PIN DEFINITIONS ======================
const int TEMP_PIN = 34;        // Temperature sensor (NTC)
const int LIGHT_PIN = 35;       // Light sensor (LDR)
const int MOTION_PIN = 33;      // Motion sensor (PIR)
const int THRESHOLD_PIN = 32;   // Potentiometer for threshold

const int LED_RED = 18;         // Pattern match alert
const int LED_YELLOW = 19;      // Session active indicator
const int LED_GREEN = 21;       // Filter pass indicator
const int LED_BLUE = 22;        // Anomaly detection

// ====================== STREAM PARAMETERS ======================
const int SAMPLE_INTERVAL_MS = 100;         // 10 Hz sampling rate
const int SESSION_TIMEOUT_MS = 3000;        // 3 seconds of inactivity closes session
const int PATTERN_WINDOW_MS = 10000;        // 10-second window for pattern matching
const int AGGREGATION_WINDOW_MS = 5000;     // 5-second aggregation window

// ====================== BUFFER SIZES ======================
const int EVENT_BUFFER_SIZE = 50;           // Recent events for pattern matching
const int STREAM_BUFFER_SIZE = 100;         // Circular buffer for raw data
const int PATTERN_MAX_LENGTH = 5;           // Maximum pattern sequence length

// ====================== EVENT TYPES ======================
enum EventType {
  EVT_NONE = 0,
  EVT_TEMP_HIGH = 1,          // Temperature exceeds threshold
  EVT_TEMP_LOW = 2,           // Temperature below threshold
  EVT_TEMP_RISING = 3,        // Temperature increasing rapidly
  EVT_TEMP_FALLING = 4,       // Temperature decreasing rapidly
  EVT_LIGHT_HIGH = 5,         // Light level high
  EVT_LIGHT_LOW = 6,          // Light level low (darkness)
  EVT_MOTION_START = 7,       // Motion detected (rising edge)
  EVT_MOTION_END = 8,         // Motion ended (falling edge)
  EVT_COMPOUND_ALERT = 9      // Multiple conditions met
};

// ====================== DATA STRUCTURES ======================

// Single sensor reading with timestamp
struct SensorReading {
  unsigned long timestamp;
  float temperature;
  float light;
  bool motion;
};

// Event record for pattern matching
struct Event {
  unsigned long timestamp;
  EventType type;
  float value;                // Associated value (temp, light, etc.)
};

// Session window state
struct SessionWindow {
  unsigned long sessionStart;
  unsigned long lastEventTime;
  int eventCount;
  float tempSum;
  float lightSum;
  float tempMin;
  float tempMax;
  bool isActive;
};

// Statistical aggregation state
struct StreamStatistics {
  unsigned long windowStart;
  int count;
  float sum;
  float sumSquares;           // For variance calculation
  float min;
  float max;
};

// Pattern definition for CEP
struct Pattern {
  EventType sequence[PATTERN_MAX_LENGTH];
  int length;
  unsigned long maxDuration;  // Maximum time for pattern to complete
  const char* name;
};

// ====================== GLOBAL STATE ======================

// Circular buffer for raw sensor readings
SensorReading streamBuffer[STREAM_BUFFER_SIZE];
int streamHead = 0;
int streamCount = 0;

// Event buffer for pattern matching
Event eventBuffer[EVENT_BUFFER_SIZE];
int eventHead = 0;
int eventCount = 0;

// Session window
SessionWindow currentSession;

// Statistics for temperature and light streams
StreamStatistics tempStats;
StreamStatistics lightStats;

// Previous values for edge detection
float prevTemp = 0;
float prevLight = 0;
bool prevMotion = false;
float prevTempAvg = 0;

// Pattern matching state
int patternsMatched = 0;
unsigned long lastPatternMatchTime = 0;

// Filter state
int filterPassCount = 0;
int filterFailCount = 0;

// Throughput tracking
unsigned long totalEventsProcessed = 0;
unsigned long throughputWindowStart = 0;
int eventsPerSecond = 0;
int eventsThisSecond = 0;

// Timing
unsigned long lastSampleTime = 0;
unsigned long lastDisplayTime = 0;
const int DISPLAY_INTERVAL_MS = 2000;

// ====================== PATTERN DEFINITIONS ======================
// Define patterns to detect in the event stream

// Pattern 1: Motion -> Temperature Rise -> Light Change (intruder with flashlight)
Pattern patterns[] = {
  {{EVT_MOTION_START, EVT_TEMP_RISING, EVT_LIGHT_HIGH}, 3, 5000, "Intruder-Alert"},
  {{EVT_LIGHT_LOW, EVT_MOTION_START, EVT_LIGHT_HIGH}, 3, 6000, "Room-Entry"},
  {{EVT_TEMP_HIGH, EVT_TEMP_HIGH, EVT_TEMP_HIGH}, 3, 10000, "Sustained-Heat"},
  {{EVT_MOTION_START, EVT_MOTION_END, EVT_MOTION_START}, 3, 8000, "Activity-Burst"}
};
const int NUM_PATTERNS = 4;

void setup() {
  Serial.begin(115200);
  delay(1000);

  // Configure pins
  pinMode(TEMP_PIN, INPUT);
  pinMode(LIGHT_PIN, INPUT);
  pinMode(MOTION_PIN, INPUT);
  pinMode(THRESHOLD_PIN, INPUT);
  pinMode(LED_RED, OUTPUT);
  pinMode(LED_YELLOW, OUTPUT);
  pinMode(LED_GREEN, OUTPUT);
  pinMode(LED_BLUE, OUTPUT);

  // Initialize session
  currentSession.isActive = false;
  currentSession.eventCount = 0;

  // Initialize statistics
  resetStatistics(&tempStats);
  resetStatistics(&lightStats);

  throughputWindowStart = millis();

  printHeader();
}

void printHeader() {
  Serial.println("\n");
  Serial.println("+=================================================================+");
  Serial.println("|     ADVANCED STREAM PROCESSING LAB - Complex Event Processing  |");
  Serial.println("+=================================================================+");
  Serial.println("| Pattern Matching | Session Windows | Statistical Aggregation    |");
  Serial.println("| Multi-condition Filtering | Temporal Pattern Detection          |");
  Serial.println("+-----------------------------------------------------------------+\n");
  Serial.println("LEDs: RED=Pattern Match | YELLOW=Session Active");
  Serial.println("      GREEN=Filter Pass | BLUE=Statistical Anomaly\n");
}

// ====================== STATISTICS FUNCTIONS ======================
void resetStatistics(StreamStatistics* stats) {
  stats->windowStart = millis();
  stats->count = 0;
  stats->sum = 0;
  stats->sumSquares = 0;
  stats->min = 999;
  stats->max = -999;
}

void updateStatistics(StreamStatistics* stats, float value) {
  stats->count++;
  stats->sum += value;
  stats->sumSquares += value * value;
  if (value < stats->min) stats->min = value;
  if (value > stats->max) stats->max = value;
}

float getMean(StreamStatistics* stats) {
  if (stats->count == 0) return 0;
  return stats->sum / stats->count;
}

float getVariance(StreamStatistics* stats) {
  if (stats->count < 2) return 0;
  float mean = getMean(stats);
  return (stats->sumSquares / stats->count) - (mean * mean);
}

float getStdDev(StreamStatistics* stats) {
  return sqrt(getVariance(stats));
}

// ====================== STREAM BUFFER FUNCTIONS ======================
void addToStreamBuffer(unsigned long ts, float temp, float light, bool motion) {
  streamBuffer[streamHead].timestamp = ts;
  streamBuffer[streamHead].temperature = temp;
  streamBuffer[streamHead].light = light;
  streamBuffer[streamHead].motion = motion;

  streamHead = (streamHead + 1) % STREAM_BUFFER_SIZE;
  if (streamCount < STREAM_BUFFER_SIZE) streamCount++;
}

// ====================== EVENT BUFFER FUNCTIONS ======================
void addEvent(EventType type, float value) {
  eventBuffer[eventHead].timestamp = millis();
  eventBuffer[eventHead].type = type;
  eventBuffer[eventHead].value = value;

  eventHead = (eventHead + 1) % EVENT_BUFFER_SIZE;
  if (eventCount < EVENT_BUFFER_SIZE) eventCount++;

  totalEventsProcessed++;
}

Event getEvent(int offset) {
  // offset 0 = most recent event
  int index = (eventHead - 1 - offset + EVENT_BUFFER_SIZE) % EVENT_BUFFER_SIZE;
  return eventBuffer[index];
}

// ====================== EVENT DETECTION ======================
void detectEvents(float temp, float light, bool motion, float threshold) {
  unsigned long now = millis();

  // Temperature events
  if (temp > threshold) {
    addEvent(EVT_TEMP_HIGH, temp);
  } else if (temp < threshold - 10) {
    addEvent(EVT_TEMP_LOW, temp);
  }

  // Rate of change events
  float tempChange = temp - prevTemp;
  if (tempChange > 2.0) {
    addEvent(EVT_TEMP_RISING, tempChange);
  } else if (tempChange < -2.0) {
    addEvent(EVT_TEMP_FALLING, tempChange);
  }

  // Light events
  if (light > 70) {
    addEvent(EVT_LIGHT_HIGH, light);
  } else if (light < 30) {
    addEvent(EVT_LIGHT_LOW, light);
  }

  // Motion edge detection
  if (motion && !prevMotion) {
    addEvent(EVT_MOTION_START, 1.0);
  } else if (!motion && prevMotion) {
    addEvent(EVT_MOTION_END, 0.0);
  }

  // Compound event detection
  if (temp > threshold && light < 30 && motion) {
    addEvent(EVT_COMPOUND_ALERT, temp);
  }

  prevTemp = temp;
  prevLight = light;
  prevMotion = motion;
}

// ====================== PATTERN MATCHING ======================
bool matchPattern(Pattern* pattern) {
  if (eventCount < pattern->length) return false;

  unsigned long now = millis();
  int matchIndex = 0;
  unsigned long patternStart = 0;

  // Search event buffer for pattern sequence
  for (int i = eventCount - 1; i >= 0 && matchIndex < pattern->length; i--) {
    Event evt = getEvent(eventCount - 1 - i);

    if (evt.type == pattern->sequence[matchIndex]) {
      if (matchIndex == 0) {
        patternStart = evt.timestamp;
      }

      // Check temporal constraint
      if (evt.timestamp - patternStart > pattern->maxDuration) {
        // Pattern took too long, restart search
        matchIndex = 0;
        patternStart = 0;
        if (evt.type == pattern->sequence[0]) {
          patternStart = evt.timestamp;
          matchIndex = 1;
        }
      } else {
        matchIndex++;
      }
    }
  }

  return (matchIndex == pattern->length);
}

void checkPatterns() {
  unsigned long now = millis();

  // Debounce pattern matches
  if (now - lastPatternMatchTime < 2000) return;

  for (int p = 0; p < NUM_PATTERNS; p++) {
    if (matchPattern(&patterns[p])) {
      patternsMatched++;
      lastPatternMatchTime = now;

      Serial.println("\n!!! PATTERN MATCHED !!!");
      Serial.print("Pattern: ");
      Serial.println(patterns[p].name);
      Serial.print("Duration limit: ");
      Serial.print(patterns[p].maxDuration / 1000.0);
      Serial.println(" seconds");

      // Flash RED LED
      for (int i = 0; i < 3; i++) {
        digitalWrite(LED_RED, HIGH);
        delay(100);
        digitalWrite(LED_RED, LOW);
        delay(100);
      }
    }
  }
}

// ====================== SESSION WINDOW ======================
void updateSession(float temp, float light, bool motion) {
  unsigned long now = millis();

  // Check if we should start a new session
  if (!currentSession.isActive) {
    // Start session on any significant event
    if (motion || abs(temp - prevTempAvg) > 3.0) {
      currentSession.isActive = true;
      currentSession.sessionStart = now;
      currentSession.eventCount = 0;
      currentSession.tempSum = 0;
      currentSession.lightSum = 0;
      currentSession.tempMin = temp;
      currentSession.tempMax = temp;

      Serial.println("\n>>> SESSION STARTED");
      digitalWrite(LED_YELLOW, HIGH);
    }
  }

  // Update active session
  if (currentSession.isActive) {
    currentSession.lastEventTime = now;
    currentSession.eventCount++;
    currentSession.tempSum += temp;
    currentSession.lightSum += light;
    if (temp < currentSession.tempMin) currentSession.tempMin = temp;
    if (temp > currentSession.tempMax) currentSession.tempMax = temp;

    // Check for session timeout
    if (now - currentSession.lastEventTime > SESSION_TIMEOUT_MS) {
      closeSession();
    }
  }

  prevTempAvg = getMean(&tempStats);
}

void closeSession() {
  if (!currentSession.isActive) return;

  unsigned long duration = currentSession.lastEventTime - currentSession.sessionStart;

  Serial.println("\n<<< SESSION CLOSED");
  Serial.print("Duration: ");
  Serial.print(duration / 1000.0);
  Serial.println(" seconds");
  Serial.print("Events: ");
  Serial.println(currentSession.eventCount);
  Serial.print("Avg Temp: ");
  Serial.print(currentSession.tempSum / currentSession.eventCount, 1);
  Serial.println("C");
  Serial.print("Temp Range: ");
  Serial.print(currentSession.tempMin, 1);
  Serial.print("C - ");
  Serial.print(currentSession.tempMax, 1);
  Serial.println("C");

  currentSession.isActive = false;
  digitalWrite(LED_YELLOW, LOW);
}

// ====================== STREAM FILTERING ======================
bool applyFilter(float temp, float light, bool motion, float threshold) {
  // Multi-condition filter
  bool condition1 = (temp >= 20 && temp <= 40);  // Normal temp range
  bool condition2 = (light >= 10 && light <= 90);  // Normal light range
  bool condition3 = !motion || temp < threshold;  // No motion, or cool when moving

  bool passes = condition1 && condition2 && condition3;

  if (passes) {
    filterPassCount++;
    digitalWrite(LED_GREEN, HIGH);
  } else {
    filterFailCount++;
    digitalWrite(LED_GREEN, LOW);
  }

  return passes;
}

// ====================== ANOMALY DETECTION ======================
void checkAnomalies(float temp, float light) {
  float tempStdDev = getStdDev(&tempStats);
  float tempMean = getMean(&tempStats);

  // Z-score anomaly detection
  if (tempStats.count >= 10) {
    float zScore = (tempStdDev > 0) ? (temp - tempMean) / tempStdDev : 0;

    if (abs(zScore) > 2.5) {
      digitalWrite(LED_BLUE, HIGH);
      Serial.println("\n** STATISTICAL ANOMALY DETECTED **");
      Serial.print("Z-score: ");
      Serial.print(zScore, 2);
      Serial.print(" (threshold: +/- 2.5)");
      Serial.print(" Value: ");
      Serial.print(temp, 1);
      Serial.print("C, Mean: ");
      Serial.print(tempMean, 1);
      Serial.print("C, StdDev: ");
      Serial.println(tempStdDev, 2);
    } else {
      digitalWrite(LED_BLUE, LOW);
    }
  }
}

// ====================== THROUGHPUT TRACKING ======================
void updateThroughput() {
  eventsThisSecond++;
  unsigned long now = millis();

  if (now - throughputWindowStart >= 1000) {
    eventsPerSecond = eventsThisSecond;
    eventsThisSecond = 0;
    throughputWindowStart = now;
  }
}

// ====================== DISPLAY FUNCTIONS ======================
void displayStatus(float temp, float light, bool motion, float threshold) {
  Serial.println("\n==================== STREAM STATUS ====================");

  // Current sensor values
  Serial.print("| SENSORS: Temp=");
  Serial.print(temp, 1);
  Serial.print("C  Light=");
  Serial.print(light, 0);
  Serial.print("%  Motion=");
  Serial.println(motion ? "YES" : "NO");

  // Statistics
  Serial.print("| TEMP STATS: Mean=");
  Serial.print(getMean(&tempStats), 1);
  Serial.print("C  StdDev=");
  Serial.print(getStdDev(&tempStats), 2);
  Serial.print("  Min=");
  Serial.print(tempStats.min, 1);
  Serial.print("  Max=");
  Serial.println(tempStats.max, 1);

  // Session info
  if (currentSession.isActive) {
    Serial.print("| SESSION: ACTIVE for ");
    Serial.print((millis() - currentSession.sessionStart) / 1000.0, 1);
    Serial.print("s, ");
    Serial.print(currentSession.eventCount);
    Serial.println(" events");
  } else {
    Serial.println("| SESSION: INACTIVE (waiting for trigger)");
  }

  // Pattern matching
  Serial.print("| PATTERNS MATCHED: ");
  Serial.print(patternsMatched);
  Serial.print("  |  Events in buffer: ");
  Serial.println(eventCount);

  // Filter statistics
  int totalFiltered = filterPassCount + filterFailCount;
  float passRate = totalFiltered > 0 ? (filterPassCount * 100.0 / totalFiltered) : 0;
  Serial.print("| FILTER: ");
  Serial.print(passRate, 1);
  Serial.print("% pass rate (");
  Serial.print(filterPassCount);
  Serial.print(" pass / ");
  Serial.print(filterFailCount);
  Serial.println(" fail)");

  // Throughput
  Serial.print("| THROUGHPUT: ");
  Serial.print(eventsPerSecond);
  Serial.print(" events/sec  |  Total: ");
  Serial.println(totalEventsProcessed);

  Serial.println("========================================================\n");
}

// ====================== SENSOR READING ======================
float readTemperature() {
  int raw = analogRead(TEMP_PIN);
  return map(raw, 0, 4095, 1500, 8500) / 100.0;
}

float readLight() {
  int raw = analogRead(LIGHT_PIN);
  return map(raw, 0, 4095, 0, 100);
}

bool readMotion() {
  return digitalRead(MOTION_PIN) == HIGH;
}

float readThreshold() {
  int raw = analogRead(THRESHOLD_PIN);
  return map(raw, 0, 4095, 2000, 8000) / 100.0;
}

// ====================== MAIN LOOP ======================
void loop() {
  unsigned long now = millis();

  if (now - lastSampleTime >= SAMPLE_INTERVAL_MS) {
    lastSampleTime = now;

    // Read all sensors
    float temp = readTemperature();
    float light = readLight();
    bool motion = readMotion();
    float threshold = readThreshold();

    // 1. Add to circular buffer
    addToStreamBuffer(now, temp, light, motion);

    // 2. Update running statistics
    updateStatistics(&tempStats, temp);
    updateStatistics(&lightStats, light);

    // 3. Detect discrete events
    detectEvents(temp, light, motion, threshold);

    // 4. Check for pattern matches
    checkPatterns();

    // 5. Update session window
    updateSession(temp, light, motion);

    // 6. Apply stream filter
    applyFilter(temp, light, motion, threshold);

    // 7. Check for statistical anomalies
    checkAnomalies(temp, light);

    // 8. Update throughput
    updateThroughput();

    // Reset statistics window periodically
    if (now - tempStats.windowStart > AGGREGATION_WINDOW_MS) {
      Serial.println("\n--- Aggregation Window Complete ---");
      Serial.print("Temperature: Mean=");
      Serial.print(getMean(&tempStats), 2);
      Serial.print(", StdDev=");
      Serial.print(getStdDev(&tempStats), 2);
      Serial.print(", Variance=");
      Serial.println(getVariance(&tempStats), 4);

      resetStatistics(&tempStats);
      resetStatistics(&lightStats);
    }

    // Periodic display update
    if (now - lastDisplayTime >= DISPLAY_INTERVAL_MS) {
      lastDisplayTime = now;
      displayStatus(temp, light, motion, threshold);
    }

    // Check session timeout
    if (currentSession.isActive &&
        now - currentSession.lastEventTime > SESSION_TIMEOUT_MS) {
      closeSession();
    }
  }
}

1301.1.6 Key CEP Concepts Explained

What is Pattern Matching? Pattern matching detects specific sequences of events occurring within temporal constraints. Unlike simple threshold alerts, patterns capture complex behaviors like “motion followed by temperature rise within 5 seconds.”

Implementation in this lab: - Define patterns as arrays of EventType enum values - Each pattern has a maximum duration (temporal constraint) - The matchPattern() function searches the event buffer for sequences

Real-World Applications: - Intrusion detection: “door open -> motion -> door close” within 30 seconds - Equipment failure: “vibration spike -> temperature rise -> pressure drop” within 1 minute - Fraud detection: “login from new IP -> large withdrawal -> logout” within 2 minutes

What are Session Windows? Unlike fixed tumbling/sliding windows, session windows are activity-based. They group events that occur close together in time, closing when activity stops for a specified gap duration.

Implementation in this lab: - Session starts when a significant event occurs (motion or temperature spike) - Session remains active while events continue - Session closes after 3 seconds of inactivity - Session statistics are emitted upon close

When to Use Session Windows: - User behavior analysis (web sessions, app usage) - Machine operation cycles (start -> run -> stop) - Intermittent sensor activity (motion sensors, pressure sensors)

What is Stream Filtering? Filtering removes events that don’t meet specified criteria before they enter downstream processing. This reduces noise and focuses analysis on relevant events.

Filter in this lab: - Condition 1: Temperature in normal range (20-40C) - Condition 2: Light in normal range (10-90%) - Condition 3: No motion during high temperature - Only events passing ALL conditions proceed

Best Practices: - Filter early to reduce downstream processing load - Log filtered events for auditing - Use filter pass rate as a data quality metric

What is Statistical Aggregation? Computing statistics (mean, variance, min, max) over streaming data requires incremental algorithms that update with each new value without storing all data.

Welford’s Algorithm (used in this lab):

// Incremental mean and variance calculation
count++;
delta = value - mean;
mean += delta / count;
M2 += delta * (value - mean);
variance = M2 / (count - 1);

Benefits: - O(1) memory regardless of stream length - Numerically stable for floating-point arithmetic - Supports windowed aggregation with reset

1301.1.7 Challenge Exercises

CautionChallenge 1: Add More Patterns

Define and implement additional patterns:

  1. Equipment-Startup: EVT_MOTION_START -> EVT_LIGHT_HIGH -> EVT_TEMP_RISING within 8 seconds
  2. Night-Activity: EVT_LIGHT_LOW -> EVT_MOTION_START -> EVT_LIGHT_LOW within 10 seconds

Add these to the patterns[] array and verify detection.

CautionChallenge 2: Implement Hopping Windows

Add hopping window support alongside session windows:

  • Window size: 10 seconds
  • Hop interval: 2 seconds (5 overlapping windows)
  • Track: average temperature per window
  • Display: trend comparison across overlapping windows
CautionChallenge 3: Add Pattern Negation

Extend pattern matching to support “NOT” conditions:

  • Pattern: EVT_MOTION_START -> NOT(EVT_TEMP_HIGH) within 5 seconds -> EVT_MOTION_END
  • This detects motion periods without temperature spikes (normal activity vs alert-worthy activity)
CautionChallenge 4: Implement Stream Joins

Add a join operation that correlates temperature and motion events:

  • When motion starts, capture temperature at that moment
  • When motion ends, capture temperature again
  • Calculate temperature delta during motion period
  • Alert if temperature rises more than 5C during any motion session

1301.1.8 Expected Outcomes

After completing this lab, you should observe:

Feature Expected Behavior
Pattern Matching RED LED flashes when event sequence matches defined patterns
Session Windows YELLOW LED on during activity, session summary printed on timeout
Stream Filtering GREEN LED indicates readings pass all filter conditions
Statistical Anomaly BLUE LED lights when values exceed 2.5 standard deviations
Event Detection Console shows real-time event detection as sensor values change
Aggregation Stats 5-second window statistics with count, avg, stddev, min, max

1301.1.9 Real-World Applications

This lab demonstrates patterns used in production stream processing systems:

Lab Feature Production Equivalent Real-World Use Case
Pattern matching Apache Flink CEP Fraud detection in banking
Session windows Kafka Streams sessions User behavior analytics
Stream filtering WHERE clauses in ksqlDB Data quality enforcement
Statistical aggregation Spark Streaming aggregates Real-time dashboards
Anomaly detection Z-score in Grafana alerts Infrastructure monitoring
Event correlation Complex Event Processing Security incident detection

1301.2 What’s Next

Continue to Interactive Game and Summary for the Data Stream Challenge game and chapter summary with key takeaways.