1355  Real-Time Anomaly Detection Pipelines

1355.1 Learning Objectives

By the end of this chapter, you will be able to:

  • Design Three-Tier Architecture: Build anomaly detection systems spanning edge, fog, and cloud
  • Balance Latency and Accuracy: Choose where to deploy detection based on requirements
  • Handle Concept Drift: Implement adaptive systems that evolve with changing data patterns
  • Build Production Pipelines: Create end-to-end systems from data ingestion to alerting
TipMinimum Viable Understanding: Production Pipelines

Core Concept: Production anomaly detection requires a layered architecture - simple statistical methods at the edge for immediate response, ML models at gateways for deeper analysis, and cloud systems for training and pattern discovery.

Why It Matters: Edge-only detection is fast but misses complex patterns. Cloud-only detection has high latency and network dependency. The right architecture balances both.

Key Takeaway: Deploy threshold detection at the edge (<100ms), ML models at gateways (~1s), and deep learning in the cloud (~10s). Use feedback loops to continuously improve all layers.

1355.2 Prerequisites

Before diving into this chapter, you should be familiar with:

~15 min | Advanced | P10.C01.U05

1355.3 Introduction

Moving from algorithms to production systems requires careful architecture design balancing latency, accuracy, and resource constraints.

1355.4 End-to-End Architecture

A production anomaly detection system typically spans three tiers:

%%{init: {'theme': 'base', 'themeVariables': {'primaryColor': '#2C3E50', 'primaryTextColor': '#fff', 'primaryBorderColor': '#16A085', 'lineColor': '#16A085', 'secondaryColor': '#E67E22', 'tertiaryColor': '#7F8C8D'}}}%%

flowchart TB
    subgraph Edge["Edge Layer (ms latency)"]
        E1[Sensor Data]
        E2[Threshold Check]
        E3[Critical Alert]
        E1 --> E2 --> E3
    end

    subgraph Fog["Fog/Gateway Layer (s latency)"]
        F1[Feature Extraction]
        F2[ML Model Inference]
        F3[Pattern Alert]
        F1 --> F2 --> F3
    end

    subgraph Cloud["Cloud Layer (min latency)"]
        C1[Model Training]
        C2[Deep Analysis]
        C3[Trend Detection]
        C1 --> C2 --> C3
    end

    Edge --> Fog
    Fog --> Cloud
    Cloud -.->|Model Updates| Fog
    Cloud -.->|Threshold Updates| Edge

    style Edge fill:#16A085,color:#fff
    style Fog fill:#E67E22,color:#fff
    style Cloud fill:#2C3E50,color:#fff

Figure 1355.1: Three-tier anomaly detection architecture balancing latency and accuracy

Pipeline Stages:

  1. Data Ingestion
    • Protocols: MQTT (lightweight, edge), Kafka (high-throughput, cloud)
    • Buffering: Handle network interruptions with local queues
    • Sampling: Adaptive sampling - increase frequency when anomalies suspected
  2. Feature Extraction
    • Windowing: Sliding windows (e.g., 10-second windows, 5-second overlap)
    • Aggregation: Mean, std, min, max, percentiles per window
    • Domain-specific: FFT for vibration, wavelets for signals
    • Normalization: Scale features to common range (0-1 or z-score)
  3. Model Scoring
    • Edge: Simple statistical methods (<10ms per sample)
    • Gateway: Trained ML models (10-100ms per batch)
    • Cloud: Deep learning ensembles (1-10s per batch)
  4. Alert Generation
    • Thresholding: Confidence scores to binary decision
    • Hysteresis: Require N consecutive anomalies to reduce false alarms
    • Severity levels: Critical (act immediately) vs Warning (monitor)
  5. Feedback Loop
    • Operator confirmation: Anomaly true/false?
    • Model updates: Retrain with confirmed labels
    • Threshold tuning: Adjust based on false positive rate

1355.5 Edge vs Cloud Detection

Decision Framework:

Requirement Deploy At Edge If… Deploy In Cloud If…
Latency <100ms required 1-10s acceptable
Criticality Safety-critical (motor shutdown) Informational (dashboards)
Complexity Simple patterns (out-of-range) Complex temporal/spatial
Connectivity Intermittent or unreliable Always-on, high bandwidth
Power Battery-powered devices Mains-powered or cloud

Hybrid Approach (Recommended): - Edge: Detect critical anomalies (safety), filter obvious noise - Cloud: Detect subtle patterns, perform root cause analysis, retrain models

Example: Factory Motor Monitoring

# Edge: Simple threshold detection (runs on ESP32 microcontroller)
def edge_detector(temperature, vibration):
    """
    Runs every 100ms on edge device
    Returns: (is_critical, should_alert)
    """
    critical = (temperature > 95) or (vibration > 5.0)
    if critical:
        # Immediate action: Stop motor
        stop_motor()
        return True, True

    # Send to gateway for advanced detection
    return False, False

# Gateway: ML-based detection (Raspberry Pi)
class GatewayDetector:
    def __init__(self):
        self.model = load_isolation_forest()  # Pre-trained
        self.window = deque(maxlen=100)  # 10 seconds at 10Hz

    def update(self, sample):
        """
        Runs every 100ms, batches for ML every 1s
        sample: [temp, vibration, current, rpm]
        """
        self.window.append(sample)

        if len(self.window) == 100:
            # Extract features from window
            features = self.extract_features(self.window)

            # ML prediction
            anomaly_score = self.model.score([features])[0]

            if anomaly_score < -0.1:  # Threshold
                send_alert("Moderate anomaly detected")
                send_to_cloud(self.window)  # For deep analysis

            return anomaly_score < -0.1

        return False

    def extract_features(self, window):
        """Extract statistical features from time window"""
        window_array = np.array(window)
        return [
            np.mean(window_array, axis=0),  # Mean of each sensor
            np.std(window_array, axis=0),   # Std of each sensor
            np.max(window_array, axis=0),   # Max of each sensor
            np.min(window_array, axis=0),   # Min of each sensor
        ].flatten()

# Cloud: Deep learning pattern analysis (runs every 10s)
class CloudDetector:
    def __init__(self):
        self.lstm_model = load_lstm_model()  # Pre-trained

    def analyze_pattern(self, historical_windows):
        """
        Analyze long-term patterns (hours/days)
        historical_windows: Last 1000 windows (100s of data)
        """
        # LSTM prediction
        predicted_next = self.lstm_model.predict(historical_windows)
        actual_next = historical_windows[-1]

        error = np.mean((predicted_next - actual_next)**2)

        if error > THRESHOLD:
            # Pattern anomaly: gradual degradation detected
            create_maintenance_ticket("Bearing wear pattern detected")
            predict_time_to_failure()

        return error

1355.6 Handling Concept Drift

The Problem: “Normal” changes over time: - Sensors degrade and drift - Seasonal patterns (summer vs winter temperatures) - Equipment wear (vibration baseline increases) - Process changes (new production schedules)

Strategies:

  1. Continuous Retraining
    • Retrain models weekly/monthly on recent data
    • Use sliding window (last 30 days, not all historical data)
    • Automate via MLOps pipelines
  2. Adaptive Thresholds
    • Dynamic thresholds that adjust to recent baseline
    • Example: “Flag if >3 sigma from last 7 days mean” (not all-time mean)
  3. Ensemble Models
    • Combine short-term (catches sudden changes) and long-term (catches drift) models
    • Example: Z-score (1-day window) + ARIMA (30-day window)
  4. Human-in-the-Loop
    • Operator labels false positives/negatives
    • Feedback trains model incrementally (online learning)

Implementation:

class AdaptiveDetector:
    def __init__(self, retrain_interval=86400):  # Retrain daily
        self.model = IsolationForest()
        self.training_buffer = deque(maxlen=10000)  # Keep last 10k samples
        self.last_retrain = time.time()
        self.retrain_interval = retrain_interval

    def update(self, sample):
        """Process new sample and periodically retrain"""
        # Predict with current model
        is_anomaly = self.model.predict([sample])[0] == -1

        # Add to training buffer (if confirmed normal)
        if not is_anomaly:
            self.training_buffer.append(sample)

        # Check if time to retrain
        if time.time() - self.last_retrain > self.retrain_interval:
            self.retrain()

        return is_anomaly

    def retrain(self):
        """Retrain model on recent data"""
        if len(self.training_buffer) > 1000:
            print(f"Retraining model on {len(self.training_buffer)} samples")
            self.model.fit(list(self.training_buffer))
            self.last_retrain = time.time()
CautionPitfall: Ignoring Concept Drift in Production Systems

The Mistake: Training anomaly detection models once on historical data and deploying them permanently, assuming the definition of “normal” remains constant over the system’s operational lifetime.

Why It Happens: Model training is resource-intensive and disrupts operations. “If it ain’t broke, don’t fix it” thinking prevails. Concept drift is gradual and invisible - performance degrades slowly without obvious failure points. Teams lack automated drift detection and retraining pipelines.

The Fix: Design for continuous learning from the start:

  1. Drift detection: Monitor model performance indicators continuously - track the distribution of anomaly scores over time using Page-Hinkley test or ADWIN algorithm. If the score distribution shifts significantly (KL divergence > 0.1), trigger investigation.

  2. Scheduled retraining: Establish periodic retraining cycles (weekly, monthly) using recent data. For seasonal systems (HVAC, agriculture), retrain at season boundaries.

  3. Online learning: For fast-drifting environments, use incremental learning algorithms (online Isolation Forest, streaming autoencoders) that update continuously without full retraining.

  4. Human-in-the-loop feedback: When operators dismiss alerts as false positives or confirm true anomalies, feed this labeled data back into the training pipeline. Over 6 months, this feedback can improve precision by 20-40%.

  5. Model versioning with A/B testing: Deploy new models alongside old ones, compare performance on held-out data, automatically promote better performers.

Reality check: a predictive maintenance model trained in summer may have 30% higher false positive rate in winter due to thermal effects on vibration patterns.

1355.7 Summary

Production anomaly detection requires layered architecture:

  • Edge: Fast threshold detection for safety-critical anomalies (<100ms)
  • Gateway: ML models for pattern detection (~1s latency)
  • Cloud: Deep learning and continuous retraining (~10s latency)

Key Takeaway: Design for concept drift from the start. Implement feedback loops and scheduled retraining to maintain detection accuracy over time.

1355.8 What’s Next