Implement pattern matching algorithms for detecting temporal event sequences on ESP32
Build Complex Event Processing (CEP) rules with multi-condition detection and temporal constraints
Design session windows that group events by activity periods with configurable gap timeouts
Apply z-score anomaly detection to streaming sensor data for statistical outlier identification
Create multi-condition stream filters that combine threshold, range, and cross-sensor logic
Detect temporal patterns like “A followed by B within N seconds”
For Beginners: Advanced Stream Processing Lab
This advanced lab gives you practice building real-time data systems that handle multiple sensor streams simultaneously. Think of moving from cooking a single dish to running an entire restaurant kitchen – you will manage timing, handle errors, and process IoT data as fast as it arrives.
In 60 Seconds
Complex Event Processing (CEP) detects meaningful patterns across event streams by matching sequences with temporal constraints – for example, “motion detected, then temperature rise, then light change within 5 seconds” to identify intrusion. This lab implements CEP on ESP32 with pattern matching, session windows that group events by activity periods, z-score anomaly detection, and multi-condition filtering, bridging embedded stream processing with enterprise patterns used in Apache Flink and Esper.
Key Concepts
Complex Event Processing (CEP): Pattern detection across multiple event streams, identifying sequences, thresholds, and temporal patterns that span multiple events
Event Pattern: A specific sequence, combination, or temporal arrangement of events that triggers a downstream action (e.g., “3 consecutive temperature readings above 80°C within 10 seconds”)
Temporal Join: Combining events from different IoT streams based on time proximity, enabling correlation of events from different sensors that occur within a time window of each other
CEP Rule Engine: Component evaluating incoming events against defined patterns (ESPER, Apache Flink CEP, Drools Fusion) and generating composite events when patterns match
Sliding Window Join: Joining two event streams where events within a configurable time window of each other are considered related (e.g., matching a sensor reading with the nearest calibration event)
Pattern Sequence: Ordered event pattern where events must occur in a specific temporal sequence — A then B within 5 seconds then C
NFA (Non-deterministic Finite Automaton): State machine model used by CEP engines to efficiently track partial pattern matches across concurrent event sequences
Operator State in Flink: Per-key or per-operator state maintained across checkpoints, enabling stateful CEP operations to survive failures and recover from last checkpoint
13.1 Lab: Advanced Stream Pattern Matching and Complex Event Processing
~60 min | Advanced | P10.C14.LAB02
What You’ll Build
An advanced stream processing system on ESP32 that demonstrates Complex Event Processing (CEP) concepts: pattern matching across multiple sensors, sequence detection with temporal constraints, session-based windowing, multi-level filtering, and statistical aggregation. This lab bridges embedded stream processing with enterprise CEP patterns used in Apache Flink and Esper.
13.1.1 Advanced Stream Processing Concepts
This lab implements sophisticated patterns found in production stream processing systems:
Concept
Implementation
Enterprise Equivalent
Pattern Matching
Sequence detection (A->B->C)
Flink CEP PatternStream
Session Windows
Activity-based grouping
Kafka Streams sessionWindow
Multi-Condition Filters
Compound boolean expressions
SQL WHERE clauses
Stream Joins
Correlating multiple streams
Flink DataStream.join()
Statistical Aggregation
Mean, variance, min, max
Spark aggregateByKey
Temporal Constraints
“Within N seconds” rules
Esper pattern guards
Try It: Temporal Pattern Matching Explorer
Adjust the event arrival times and temporal window to see whether a three-event pattern (A then B then C) matches within the allowed duration.
Show code
viewof evtA_time = Inputs.range([0,15], {value:0,step:0.5,label:"Event A arrives at (s)"})viewof evtB_time = Inputs.range([0,15], {value:2,step:0.5,label:"Event B arrives at (s)"})viewof evtC_time = Inputs.range([0,15], {value:4,step:0.5,label:"Event C arrives at (s)"})viewof pattern_window = Inputs.range([1,15], {value:5,step:0.5,label:"Pattern window (s)"})
Threshold: \(|z| > 2.5\) = anomaly (as used in the lab code). This reading has \(z = 4.42 > 2.5\), flagged as statistical anomaly. Under the normal distribution, a 4.42\(\sigma\) deviation corresponds to a one-tailed probability of approximately \(5 \times 10^{-6}\), or roughly 1 in 200,000 – a high-confidence alert indicating likely sensor failure or genuine environmental anomaly.
13.1.2 Wokwi Simulator
Use the embedded simulator below to build your advanced stream processing system:
13.1.3 Circuit Setup
Connect multiple sensors and outputs to the ESP32 for comprehensive stream analysis:
Component
ESP32 Pin
Purpose
Temperature Sensor (NTC)
GPIO 34
Primary data stream
Light Sensor (LDR)
GPIO 35
Secondary data stream
Motion Sensor (PIR)
GPIO 33
Event trigger stream
Potentiometer
GPIO 32
Configurable threshold
Red LED
GPIO 18
Pattern match alert
Yellow LED
GPIO 19
Session active indicator
Green LED
GPIO 21
Filter pass indicator
Blue LED
GPIO 22
Anomaly detection
Add this diagram.json configuration in Wokwi:
{"version":1,"author":"IoT Class - Advanced Stream Processing Lab","editor":"wokwi","parts":[{"type":"wokwi-esp32-devkit-v1","id":"esp","top":0,"left":0},{"type":"wokwi-ntc-temperature-sensor","id":"temp1","top":-120,"left":80},{"type":"wokwi-photoresistor-sensor","id":"ldr1","top":-120,"left":180},{"type":"wokwi-pir-motion-sensor","id":"pir1","top":-120,"left":280},{"type":"wokwi-potentiometer","id":"pot1","top":-120,"left":380},{"type":"wokwi-led","id":"led_red","top":180,"left":80,"attrs":{"color":"red"}},{"type":"wokwi-led","id":"led_yellow","top":180,"left":130,"attrs":{"color":"yellow"}},{"type":"wokwi-led","id":"led_green","top":180,"left":180,"attrs":{"color":"green"}},{"type":"wokwi-led","id":"led_blue","top":180,"left":230,"attrs":{"color":"blue"}},{"type":"wokwi-resistor","id":"r1","top":230,"left":80,"attrs":{"value":"220"}},{"type":"wokwi-resistor","id":"r2","top":230,"left":130,"attrs":{"value":"220"}},{"type":"wokwi-resistor","id":"r3","top":230,"left":180,"attrs":{"value":"220"}},{"type":"wokwi-resistor","id":"r4","top":230,"left":230,"attrs":{"value":"220"}}],"connections":[["esp:GND.1","temp1:GND","black",["h0"]],["esp:3V3","temp1:VCC","red",["h0"]],["esp:34","temp1:OUT","green",["h0"]],["esp:GND.1","ldr1:GND","black",["h0"]],["esp:3V3","ldr1:VCC","red",["h0"]],["esp:35","ldr1:OUT","orange",["h0"]],["esp:GND.1","pir1:GND","black",["h0"]],["esp:3V3","pir1:VCC","red",["h0"]],["esp:33","pir1:OUT","purple",["h0"]],["esp:GND.1","pot1:GND","black",["h0"]],["esp:3V3","pot1:VCC","red",["h0"]],["esp:32","pot1:SIG","blue",["h0"]],["esp:18","led_red:A","red",["h0"]],["led_red:C","r1:1","black",["h0"]],["r1:2","esp:GND.2","black",["h0"]],["esp:19","led_yellow:A","yellow",["h0"]],["led_yellow:C","r2:1","black",["h0"]],["r2:2","esp:GND.2","black",["h0"]],["esp:21","led_green:A","green",["h0"]],["led_green:C","r3:1","black",["h0"]],["r3:2","esp:GND.2","black",["h0"]],["esp:22","led_blue:A","blue",["h0"]],["led_blue:C","r4:1","black",["h0"]],["r4:2","esp:GND.2","black",["h0"]]]}
What is Pattern Matching? Pattern matching detects specific sequences of events occurring within temporal constraints. Unlike simple threshold alerts, patterns capture complex behaviors like “motion followed by temperature rise within 5 seconds.”
Implementation in this lab:
Define patterns as arrays of EventType enum values
Each pattern has a maximum duration (temporal constraint)
The matchPattern() function scans the event buffer chronologically (oldest to newest) looking for the specified sequence within the time window
Real-World Applications:
Intrusion detection: “door open -> motion -> door close” within 30 seconds
Equipment failure: “vibration spike -> temperature rise -> pressure drop” within 1 minute
Fraud detection: “login from new IP -> large withdrawal -> logout” within 2 minutes
Concept 2: Session Windows
What are Session Windows? Unlike fixed tumbling/sliding windows, session windows are activity-based. They group events that occur close together in time, closing when activity stops for a specified gap duration.
Implementation in this lab:
Session starts when a significant event occurs (motion or temperature spike)
Session remains active while significant events continue
Session closes after 3 seconds of inactivity (no motion or temperature change)
Adjust the session gap timeout and see how the same event stream is split into different sessions. Events closer together than the gap timeout are grouped into one session.
What is Stream Filtering? Filtering removes events that don’t meet specified criteria before they enter downstream processing. This reduces noise and focuses analysis on relevant events.
Filter in this lab:
Condition 1: Temperature in normal range (20-40C)
Condition 2: Light in normal range (10-90%)
Condition 3: No motion during high temperature
Only events passing ALL conditions proceed
Best Practices:
Filter early to reduce downstream processing load
Log filtered events for auditing
Use filter pass rate as a data quality metric
Try It: Multi-Condition Stream Filter
Set sensor readings and see in real time which filter conditions pass or fail. The stream filter only passes readings where ALL conditions are satisfied simultaneously.
What is Statistical Aggregation? Computing statistics (mean, variance, min, max) over streaming data requires incremental algorithms that update with each new value without storing all historical data points.
Naive sum-of-squares approach (used in this lab):
// Incremental mean and variance using running sumscount++;sum += value;sumSquares += value * value;mean = sum / count;variance =(sumSquares / count)-(mean * mean);
This approach is simple and efficient for short aggregation windows (5 seconds in this lab). For longer windows or values with large magnitudes, consider Welford’s online algorithm which provides better numerical stability:
Equipment-Startup: EVT_MOTION_START -> EVT_LIGHT_HIGH -> EVT_TEMP_RISING within 8 seconds
Night-Activity: EVT_LIGHT_LOW -> EVT_MOTION_START -> EVT_LIGHT_LOW within 10 seconds
Add these to the patterns[] array and verify detection.
Challenge 2: Implement Hopping Windows
Add hopping window support alongside session windows:
Window size: 10 seconds
Hop interval: 2 seconds (5 overlapping windows)
Track: average temperature per window
Display: trend comparison across overlapping windows
Challenge 3: Add Pattern Negation
Extend pattern matching to support “NOT” conditions:
Pattern: EVT_MOTION_START -> NOT(EVT_TEMP_HIGH) within 5 seconds -> EVT_MOTION_END
This detects motion periods without temperature spikes (normal activity vs alert-worthy activity)
Challenge 4: Implement Stream Joins
Add a join operation that correlates temperature and motion events:
When motion starts, capture temperature at that moment
When motion ends, capture temperature again
Calculate temperature delta during motion period
Alert if temperature rises more than 5C during any motion session
13.2.3 Expected Outcomes
After completing this lab, you should observe:
Feature
Expected Behavior
Pattern Matching
RED LED flashes when event sequence matches defined patterns
Session Windows
YELLOW LED on during activity, session summary printed on timeout
Stream Filtering
GREEN LED indicates readings pass all filter conditions
Statistical Anomaly
BLUE LED lights when values exceed 2.5 standard deviations
Event Detection
Console shows real-time event detection as sensor values change
Aggregation Stats
5-second window statistics with count, avg, stddev, min, max
Try It: Buffer Capacity and Throughput Calculator
Explore how sampling rate, buffer size, and processing time interact. This calculator shows whether the ESP32 can keep up with the incoming data stream or if events will be lost.
This lab demonstrates patterns used in production stream processing systems:
Lab Feature
Production Equivalent
Real-World Use Case
Pattern matching
Apache Flink CEP
Fraud detection in banking
Session windows
Kafka Streams sessions
User behavior analytics
Stream filtering
WHERE clauses in ksqlDB
Data quality enforcement
Statistical aggregation
Spark Streaming aggregates
Real-time dashboards
Anomaly detection
Z-score in Grafana alerts
Infrastructure monitoring
Event correlation
Complex Event Processing
Security incident detection
Key Takeaway
Complex Event Processing (CEP) goes beyond simple threshold alerts by detecting meaningful sequences of events with temporal constraints. The pattern “motion, then temperature rise, then light change within 5 seconds” captures a real-world scenario (intrusion) that no single sensor reading could detect. This lab demonstrates that even on ESP32, you can implement pattern matching, session windows, z-score anomaly detection, and multi-condition filtering – the same patterns used in enterprise Flink CEP, Esper, and Kafka Streams applications.
For Kids: Meet the Sensor Squad!
The Sensor Squad becomes DETECTIVES – spotting sneaky patterns in data!
The Sensor Squad has leveled up! Instead of just watching for high temperatures, they now look for PATTERNS – like detectives solving a mystery!
Mystery 1: The Intruder PatternMax the Microcontroller programs a special rule: “If I see MOTION, then TEMPERATURE RISE, then LIGHT CHANGE – all within 5 seconds – something suspicious is happening!”
Sammy the Sensor asks: “Why not just alert on motion?”
Lila the LED explains: “Because the office cat triggers motion sensors all the time! But a cat does not cause temperature to rise AND lights to turn on. Only a PERSON does all three. That is why we look for the PATTERN, not just one event!”
Mystery 2: The Activity SessionBella the Battery watches a factory machine. It starts working (session begins!), runs for a while, then stops (session ends!). “I track everything that happens during each work session – how long it ran, the average temperature, and whether anything was unusual.”
“It is like chapters in a book,” says Bella. “Each session is one complete story. When the machine takes a 3-second break, the chapter ends and I write a summary!”
Mystery 3: The Statistical Oddball Max keeps track of average temperature and how much it normally varies. When a reading is WAY outside normal (more than 2.5 standard deviations), the BLUE LED lights up!
“It is like noticing one kid in class who is 7 feet tall,” explains Sammy. “Most kids are between 4 and 5 feet. One kid being 7 feet is so unusual, something must be special about them!”
The Sensor Squad’s Detective Toolkit:
Pattern matching = “Did events happen in THIS specific order?”
Session windows = “What happened during this activity period?”
Statistical anomaly = “Is this reading WEIRD compared to normal?”
Multi-condition filtering = “Does this event pass ALL my tests?”
13.2.5 Try This at Home!
Be a pattern detective! Sit by a window and watch for this pattern: “Car passes, then a person walks by, then another car passes – all within 2 minutes.” How many times does this pattern happen in 30 minutes? Now try a different pattern. This is exactly what CEP does – looking for specific sequences in a stream of events!
Worked Example: Calculating Pattern Match Latency for Complex Event Processing
An industrial safety system uses ESP32 with the CEP lab code to detect the “Intruder-Alert” pattern: EVT_MOTION_START -> EVT_TEMP_RISING -> EVT_LIGHT_HIGH within 5 seconds. Calculate end-to-end latency from first event to pattern match alert.
GPIO read latency: ~10us (digitalRead in polling loop)
Edge detection comparison: ~1us
Event buffer write: ~5us (addEvent() function)
Total: t~0.02ms (motion event buffered)
Event 2: Temperature rising
Physical event: t=1500ms (body heat detected by NTC sensor)
ADC read latency: ~100us (analogRead() with 12-bit SAR ADC on ESP32)
Temperature calculation: ~10us (map() and division)
Rate-of-change comparison: ~1us
Event buffer write: ~5us
Total: t~1500.1ms (temp event buffered)
Event 3: Light change
Physical event: t=3000ms (flashlight turns on)
ADC read latency: ~100us (LDR photoresistor)
Threshold comparison: ~1us
Event buffer write: ~5us
Total: t~3000.1ms (light event buffered)
Pattern matching execution:
checkPatterns() called every 100ms (from main loop at SAMPLE_INTERVAL_MS)
Last call before Event 3: t=3000ms
Next call after Event 3: t=3100ms
Pattern matching scan: ~50us (iterates through up to 50 events with simple integer comparisons – ESP32 at 240 MHz executes ~240 instructions per microsecond)
Alert generation: ~2ms (Serial.println + LED flash setup)
Moderate latency (10-100ms acceptable)? – Up to 5-event patterns with 100ms polling
Complex behavior analysis (>1s latency acceptable)? – Use gateway-tier CEP (Raspberry Pi, not ESP32)
Need statistical patterns (count/duration)? – Use session windows with periodic aggregation
Multi-sensor correlation? – Offload to fog tier (ESP32 does edge filtering only)
Common Mistake: Not Handling Pattern Timeout Edge Cases in CEP
Developers implement Complex Event Processing patterns but forget to handle partial pattern matches that never complete – leaving state in memory indefinitely and causing memory leaks.
What goes wrong: The lab code defines the “Intruder-Alert” pattern: EVT_MOTION_START -> EVT_TEMP_RISING -> EVT_LIGHT_HIGH within 5 seconds. A security guard walks past the sensor triggering EVT_MOTION_START at t=0s. The temperature sensor is faulty and never triggers EVT_TEMP_RISING. The partial pattern (only Event 1 matched) stays in memory forever, consuming buffer space.
Why it fails: The matchPattern() function in the lab code searches the event buffer but does not explicitly expire partial matches that exceed the maxDuration window. While the circular buffer eventually overwrites old events, the matching algorithm still scans them unnecessarily.
The correct approach (production CEP system):
1. Add pattern expiration logic:
struct PartialMatch {unsignedlong startTime;int matchedCount; EventType sequence[PATTERN_MAX_LENGTH];};std::vector<PartialMatch> activeMatches;// Track partial pattern matchesvoid checkPatternsWithExpiration(){unsignedlong now = millis();// Expire old partial matches activeMatches.erase(std::remove_if(activeMatches.begin(), activeMatches.end(),[now](PartialMatch& m){return(now - m.startTime)> PATTERN_TIMEOUT_MS;}), activeMatches.end());// Check new events against active partial matches// ... pattern matching logic ...}
int partialMatchesExpired =0;void trackExpiredPatterns(PartialMatch& expired){ partialMatchesExpired++;if(partialMatchesExpired %100==0){ Serial.print("WARNING: "); Serial.print(partialMatchesExpired); Serial.println(" partial patterns expired without completion");// This may indicate faulty sensors or incorrect pattern definitions}}
4. Implement pattern confidence scoring:
struct PatternMatch { Pattern* pattern;float confidence;// 0.0-1.0 based on event timingunsignedlong completionTime;};float calculateConfidence(Pattern* pattern,unsignedlong actualDuration){// Patterns completed quickly get higher confidencefloat timeRatio =(float)actualDuration / pattern->maxDuration;return1.0-(timeRatio *0.5);// 100% if instant, 50% if at limit}
5. Handle partial matches in graceful degradation:
// If too many partial matches accumulate, reduce pattern complexityif(activeMatches.size()>50){ Serial.println("WARN: Pattern matching overloaded, using simplified patterns");// Temporarily disable complex 5-event patterns, keep only critical 2-event patterns enableSimplifiedPatterns =true;}
Real consequence: A smart building deployed CEP code for intrusion detection across 100 sensors. Over 3 months, partial pattern matches from false triggers (employees triggering motion without completing the full pattern) accumulated in memory. The ESP32’s event buffer fragmented, pattern matching became progressively slower, and after 90 days the system crashed with a watchdog timer reset. Post-mortem analysis revealed 15,000+ partial pattern matches in memory.
The fix: implemented pattern expiration (discard partial matches older than 2x maxDuration), limited active partial matches to 50, and added monitoring for expiration rate. After deployment, the system maintained stable <5% memory usage for partial patterns indefinitely. The lesson: CEP systems MUST explicitly manage partial pattern state with timeouts and capacity limits, or memory leaks from incomplete patterns will eventually crash the system.
13.3 Concept Relationships
This advanced lab implements production patterns at embedded scale:
Overly general patterns (e.g., “any two consecutive high readings”) produce thousands of false positive alerts in production IoT systems. Always validate CEP patterns against historical data to measure precision and recall before deployment. Use specific conditions (value thresholds, device type filters, time constraints) to achieve >95% precision.
2. Not Handling Partial Pattern Matches Under System Restart
CEP engines track in-progress pattern matches in operator state. Under system restart without proper checkpointing, all partial matches are lost — a 4-event pattern that was 3 events complete is discarded, potentially missing a critical composite event. Always configure CEP operator state checkpointing intervals compatible with acceptable event detection latency after recovery.
3. Defining Patterns Without Specifying Time Constraints
CEP patterns without time constraints accumulate partial matches indefinitely — a pattern “A then B” will remember every A event and check every future B event, consuming unbounded memory. Always specify maximum time windows for pattern completion (A then B within 30 seconds) to enable efficient state cleanup.
4. Testing CEP Rules Only on Clean, Ordered Event Streams
Real IoT event streams contain late arrivals, duplicates, and gaps from sensor failures. CEP patterns that work correctly on clean ordered test data often produce incorrect results on real IoT streams with out-of-order events. Always test CEP rules against replay of real captured IoT data before production deployment.