%%{init: {'theme': 'base', 'themeVariables': {'primaryColor': '#2C3E50', 'primaryTextColor': '#fff', 'primaryBorderColor': '#16A085', 'lineColor': '#16A085', 'secondaryColor': '#E67E22', 'tertiaryColor': '#7F8C8D'}}}%%
flowchart TB
subgraph Edge["Edge Layer (ms latency)"]
E1[Sensor Data]
E2[Threshold Check]
E3[Critical Alert]
E1 --> E2 --> E3
end
subgraph Fog["Fog/Gateway Layer (s latency)"]
F1[Feature Extraction]
F2[ML Model Inference]
F3[Pattern Alert]
F1 --> F2 --> F3
end
subgraph Cloud["Cloud Layer (min latency)"]
C1[Model Training]
C2[Deep Analysis]
C3[Trend Detection]
C1 --> C2 --> C3
end
Edge --> Fog
Fog --> Cloud
Cloud -.->|Model Updates| Fog
Cloud -.->|Threshold Updates| Edge
style Edge fill:#16A085,color:#fff
style Fog fill:#E67E22,color:#fff
style Cloud fill:#2C3E50,color:#fff
1355 Real-Time Anomaly Detection Pipelines
1355.1 Learning Objectives
By the end of this chapter, you will be able to:
- Design Three-Tier Architecture: Build anomaly detection systems spanning edge, fog, and cloud
- Balance Latency and Accuracy: Choose where to deploy detection based on requirements
- Handle Concept Drift: Implement adaptive systems that evolve with changing data patterns
- Build Production Pipelines: Create end-to-end systems from data ingestion to alerting
Core Concept: Production anomaly detection requires a layered architecture - simple statistical methods at the edge for immediate response, ML models at gateways for deeper analysis, and cloud systems for training and pattern discovery.
Why It Matters: Edge-only detection is fast but misses complex patterns. Cloud-only detection has high latency and network dependency. The right architecture balances both.
Key Takeaway: Deploy threshold detection at the edge (<100ms), ML models at gateways (~1s), and deep learning in the cloud (~10s). Use feedback loops to continuously improve all layers.
1355.2 Prerequisites
Before diving into this chapter, you should be familiar with:
- Statistical Methods: Methods suitable for edge deployment
- Machine Learning Methods: Methods requiring cloud or gateway resources
- Anomaly Types: Understanding which anomalies to detect where
1355.3 Introduction
Moving from algorithms to production systems requires careful architecture design balancing latency, accuracy, and resource constraints.
1355.4 End-to-End Architecture
A production anomaly detection system typically spans three tiers:
Pipeline Stages:
- Data Ingestion
- Protocols: MQTT (lightweight, edge), Kafka (high-throughput, cloud)
- Buffering: Handle network interruptions with local queues
- Sampling: Adaptive sampling - increase frequency when anomalies suspected
- Feature Extraction
- Windowing: Sliding windows (e.g., 10-second windows, 5-second overlap)
- Aggregation: Mean, std, min, max, percentiles per window
- Domain-specific: FFT for vibration, wavelets for signals
- Normalization: Scale features to common range (0-1 or z-score)
- Model Scoring
- Edge: Simple statistical methods (<10ms per sample)
- Gateway: Trained ML models (10-100ms per batch)
- Cloud: Deep learning ensembles (1-10s per batch)
- Alert Generation
- Thresholding: Confidence scores to binary decision
- Hysteresis: Require N consecutive anomalies to reduce false alarms
- Severity levels: Critical (act immediately) vs Warning (monitor)
- Feedback Loop
- Operator confirmation: Anomaly true/false?
- Model updates: Retrain with confirmed labels
- Threshold tuning: Adjust based on false positive rate
1355.5 Edge vs Cloud Detection
Decision Framework:
| Requirement | Deploy At Edge If… | Deploy In Cloud If… |
|---|---|---|
| Latency | <100ms required | 1-10s acceptable |
| Criticality | Safety-critical (motor shutdown) | Informational (dashboards) |
| Complexity | Simple patterns (out-of-range) | Complex temporal/spatial |
| Connectivity | Intermittent or unreliable | Always-on, high bandwidth |
| Power | Battery-powered devices | Mains-powered or cloud |
Hybrid Approach (Recommended): - Edge: Detect critical anomalies (safety), filter obvious noise - Cloud: Detect subtle patterns, perform root cause analysis, retrain models
Example: Factory Motor Monitoring
# Edge: Simple threshold detection (runs on ESP32 microcontroller)
def edge_detector(temperature, vibration):
"""
Runs every 100ms on edge device
Returns: (is_critical, should_alert)
"""
critical = (temperature > 95) or (vibration > 5.0)
if critical:
# Immediate action: Stop motor
stop_motor()
return True, True
# Send to gateway for advanced detection
return False, False
# Gateway: ML-based detection (Raspberry Pi)
class GatewayDetector:
def __init__(self):
self.model = load_isolation_forest() # Pre-trained
self.window = deque(maxlen=100) # 10 seconds at 10Hz
def update(self, sample):
"""
Runs every 100ms, batches for ML every 1s
sample: [temp, vibration, current, rpm]
"""
self.window.append(sample)
if len(self.window) == 100:
# Extract features from window
features = self.extract_features(self.window)
# ML prediction
anomaly_score = self.model.score([features])[0]
if anomaly_score < -0.1: # Threshold
send_alert("Moderate anomaly detected")
send_to_cloud(self.window) # For deep analysis
return anomaly_score < -0.1
return False
def extract_features(self, window):
"""Extract statistical features from time window"""
window_array = np.array(window)
return [
np.mean(window_array, axis=0), # Mean of each sensor
np.std(window_array, axis=0), # Std of each sensor
np.max(window_array, axis=0), # Max of each sensor
np.min(window_array, axis=0), # Min of each sensor
].flatten()
# Cloud: Deep learning pattern analysis (runs every 10s)
class CloudDetector:
def __init__(self):
self.lstm_model = load_lstm_model() # Pre-trained
def analyze_pattern(self, historical_windows):
"""
Analyze long-term patterns (hours/days)
historical_windows: Last 1000 windows (100s of data)
"""
# LSTM prediction
predicted_next = self.lstm_model.predict(historical_windows)
actual_next = historical_windows[-1]
error = np.mean((predicted_next - actual_next)**2)
if error > THRESHOLD:
# Pattern anomaly: gradual degradation detected
create_maintenance_ticket("Bearing wear pattern detected")
predict_time_to_failure()
return error1355.6 Handling Concept Drift
The Problem: “Normal” changes over time: - Sensors degrade and drift - Seasonal patterns (summer vs winter temperatures) - Equipment wear (vibration baseline increases) - Process changes (new production schedules)
Strategies:
- Continuous Retraining
- Retrain models weekly/monthly on recent data
- Use sliding window (last 30 days, not all historical data)
- Automate via MLOps pipelines
- Adaptive Thresholds
- Dynamic thresholds that adjust to recent baseline
- Example: “Flag if >3 sigma from last 7 days mean” (not all-time mean)
- Ensemble Models
- Combine short-term (catches sudden changes) and long-term (catches drift) models
- Example: Z-score (1-day window) + ARIMA (30-day window)
- Human-in-the-Loop
- Operator labels false positives/negatives
- Feedback trains model incrementally (online learning)
Implementation:
class AdaptiveDetector:
def __init__(self, retrain_interval=86400): # Retrain daily
self.model = IsolationForest()
self.training_buffer = deque(maxlen=10000) # Keep last 10k samples
self.last_retrain = time.time()
self.retrain_interval = retrain_interval
def update(self, sample):
"""Process new sample and periodically retrain"""
# Predict with current model
is_anomaly = self.model.predict([sample])[0] == -1
# Add to training buffer (if confirmed normal)
if not is_anomaly:
self.training_buffer.append(sample)
# Check if time to retrain
if time.time() - self.last_retrain > self.retrain_interval:
self.retrain()
return is_anomaly
def retrain(self):
"""Retrain model on recent data"""
if len(self.training_buffer) > 1000:
print(f"Retraining model on {len(self.training_buffer)} samples")
self.model.fit(list(self.training_buffer))
self.last_retrain = time.time()The Mistake: Training anomaly detection models once on historical data and deploying them permanently, assuming the definition of “normal” remains constant over the system’s operational lifetime.
Why It Happens: Model training is resource-intensive and disrupts operations. “If it ain’t broke, don’t fix it” thinking prevails. Concept drift is gradual and invisible - performance degrades slowly without obvious failure points. Teams lack automated drift detection and retraining pipelines.
The Fix: Design for continuous learning from the start:
Drift detection: Monitor model performance indicators continuously - track the distribution of anomaly scores over time using Page-Hinkley test or ADWIN algorithm. If the score distribution shifts significantly (KL divergence > 0.1), trigger investigation.
Scheduled retraining: Establish periodic retraining cycles (weekly, monthly) using recent data. For seasonal systems (HVAC, agriculture), retrain at season boundaries.
Online learning: For fast-drifting environments, use incremental learning algorithms (online Isolation Forest, streaming autoencoders) that update continuously without full retraining.
Human-in-the-loop feedback: When operators dismiss alerts as false positives or confirm true anomalies, feed this labeled data back into the training pipeline. Over 6 months, this feedback can improve precision by 20-40%.
Model versioning with A/B testing: Deploy new models alongside old ones, compare performance on held-out data, automatically promote better performers.
Reality check: a predictive maintenance model trained in summer may have 30% higher false positive rate in winter due to thermal effects on vibration patterns.
1355.7 Summary
Production anomaly detection requires layered architecture:
- Edge: Fast threshold detection for safety-critical anomalies (<100ms)
- Gateway: ML models for pattern detection (~1s latency)
- Cloud: Deep learning and continuous retraining (~10s latency)
Key Takeaway: Design for concept drift from the start. Implement feedback loops and scheduled retraining to maintain detection accuracy over time.
1355.8 What’s Next
- Performance Metrics: Learn to evaluate detection accuracy and tune thresholds for your domain
- Anomaly Detection Lab: Build a working anomaly detection system on ESP32