1303  Building IoT Streaming Pipelines

1303.1 Building IoT Streaming Pipelines

⏱️ ~15 min | ⭐⭐⭐ Advanced | πŸ“‹ P10.C14.U04

A complete IoT streaming pipeline transforms raw sensor data into actionable insights through multiple processing stages. This section covers the design and implementation of production-ready streaming pipelines.

1303.1.1 Pipeline Architecture

Graph diagram

Graph diagram
Figure 1303.1: Complete IoT streaming pipeline from edge devices through ingestion, enrichment, aggregation, and output stages

1303.1.2 Stage 1: Data Ingestion

The ingestion layer handles receiving data from diverse IoT sources and normalizing it for processing.

Design Principles:

  1. Protocol Translation: Convert MQTT, CoAP, HTTP into unified message format
  2. Schema Validation: Validate incoming data against defined schemas
  3. Partitioning: Route messages to appropriate partitions for parallel processing
  4. Backpressure: Handle traffic spikes without data loss
# Kafka producer with IoT-optimized configuration
from confluent_kafka import Producer

producer_config = {
    'bootstrap.servers': 'kafka:9092',
    'client.id': 'iot-gateway',
    'acks': 'all',  # Wait for all replicas
    'retries': 10,  # Retry on transient failures
    'batch.size': 65536,  # 64KB batches for efficiency
    'linger.ms': 5,  # Wait up to 5ms for batching
    'compression.type': 'lz4',  # Fast compression
}

def on_delivery(err, msg):
    if err:
        logger.error(f"Delivery failed: {err}")
        # Implement retry or dead-letter queue

producer = Producer(producer_config)

def publish_sensor_reading(sensor_id, reading):
    # Partition by sensor_id for ordered per-sensor processing
    producer.produce(
        topic='iot-raw-events',
        key=sensor_id.encode(),
        value=json.dumps(reading).encode(),
        callback=on_delivery
    )
    producer.poll(0)  # Trigger delivery callbacks

1303.1.3 Stage 2: Stream Processing

The processing layer applies transformations, aggregations, and analytics to the data stream.

Common Operations:

  1. Parsing: Extract structured data from raw messages
  2. Filtering: Remove invalid or irrelevant events
  3. Enrichment: Add metadata, geolocation, device info
  4. Aggregation: Compute statistics over windows
  5. Alerting: Detect anomalies and threshold violations
# Flink processing pipeline
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)

# Define source table from Kafka
t_env.execute_sql("""
    CREATE TABLE sensor_readings (
        sensor_id STRING,
        temperature DOUBLE,
        humidity DOUBLE,
        timestamp TIMESTAMP(3),
        WATERMARK FOR timestamp AS timestamp - INTERVAL '5' SECOND
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'iot-raw-events',
        'properties.bootstrap.servers' = 'kafka:9092',
        'format' = 'json'
    )
""")

# Tumbling window aggregation
result = t_env.sql_query("""
    SELECT
        sensor_id,
        TUMBLE_START(timestamp, INTERVAL '5' MINUTE) AS window_start,
        AVG(temperature) AS avg_temp,
        MAX(temperature) AS max_temp,
        MIN(temperature) AS min_temp,
        COUNT(*) AS reading_count
    FROM sensor_readings
    GROUP BY sensor_id, TUMBLE(timestamp, INTERVAL '5' MINUTE)
""")

1303.1.4 Stage 3: Output and Actions

The output layer delivers processed results to consumers and triggers automated actions.

Output Destinations:

  1. Time-Series Databases: InfluxDB, TimescaleDB for metrics storage
  2. Data Lakes: S3, HDFS for historical analysis
  3. Real-Time Dashboards: WebSocket pushes to Grafana, custom UIs
  4. Alerting Systems: PagerDuty, Slack, SMS for critical alerts
  5. Control Systems: Actuator commands, automated responses
# Multi-sink output configuration
@udf(result_type=Types.STRING())
def route_message(severity: str, message: str) -> str:
    """Route messages based on severity"""
    if severity == 'CRITICAL':
        # Immediate alert
        send_pagerduty_alert(message)
        return 'pagerduty'
    elif severity == 'WARNING':
        # Log to monitoring
        return 'monitoring'
    else:
        # Standard metrics storage
        return 'timeseries'

1303.2 Interactive Pipeline Demo

Experiment with a simplified streaming pipeline using Observable JavaScript:

This interactive demo simulates a real streaming pipeline with: - Configurable event rate: Adjust how fast sensor readings arrive - Windowed aggregation: Compute average temperature over configurable window sizes - Anomaly detection: Flag readings above the threshold - Real-time alerts: Display detected anomalies immediately

1303.3 Real-World Case Study: Netflix Streaming Analytics

Netflix processes over 8 million events per second from their streaming platform to provide real-time recommendations, detect playback issues, and optimize content delivery.

1303.3.1 System Architecture

Scale: - 230+ million subscribers globally - 8 million events/second during peak hours - 1.3 petabytes of data processed daily - Sub-second latency for recommendation updates

Pipeline:

  1. Event Collection: Client devices (TV, mobile, browser) send playback events every 30 seconds
    • Play, pause, seek, error, buffering events
    • Device capabilities, network conditions
  2. Real-time Processing (Apache Flink):
    • Detect playback failures within 5 seconds
    • Calculate real-time popularity metrics
    • Update recommendation models continuously
  3. Windowed Aggregations:
    • 1-minute tumbling windows for error rates by region
    • 10-minute sliding windows for trending content
    • Session windows for viewing sessions (gap timeout: 30 minutes)
  4. Outputs:
    • Real-time dashboards for operations teams
    • A/B testing metrics for product features
    • Content delivery network (CDN) optimization
    • Personalized homepage updated within 1 second

Key Challenges Solved:

  • Backpressure during outages: When CDN fails, millions of error events flood the pipeline. Kafka buffering prevents data loss.
  • Global clock skew: Devices in different timezones and with inaccurate clocks require watermark strategies with 5-minute lag.
  • Exactly-once for billing: View duration metrics must be exact for royalty payments to content creators.

Results: - 99.99% uptime for streaming analytics - $1 billion annual savings through network optimization - 80% of viewing driven by recommendation engine

1303.4 Understanding Check

You’re designing a stream processing system for a smart grid utility with the following requirements:

  • Scale: 100,000 smart meters
  • Frequency: Each meter reports power consumption every 15 seconds
  • Detection: Identify power theft within 1 minute of occurrence
  • Theft Pattern: Consumption drops by >50% compared to historical average but meter remains connected

Question 1: What is your event rate?

Solution: - 100,000 meters x (1 reading / 15 seconds) = 6,667 events per second - Peak rate (considering network retries): ~10,000 events per second

Question 2: What windowing strategy would you use for theft detection?

Solution: Sliding windows with: - Window size: 1 hour (for historical average) - Slide interval: 15 seconds (update on each reading) - Rationale: Need to compare current reading against recent average, updated continuously

Alternative approach: Two windows: - Long tumbling window (24 hours) for baseline average - Short sliding window (1 minute) for current consumption - Alert when ratio < 0.5

Question 3: Design a simple pipeline to detect anomalies

Solution:

%% fig-alt: "Anomaly detection pipeline showing five stages: Stage 1 Ingest receives MQTT data from meters and sends to Kafka topic partitioned by meter_id; Stage 2 Calculate Baseline uses a 1-hour sliding window with 15-second slides grouped by meter_id to compute average consumption; Stage 3 Compare Current vs Baseline joins current readings with baseline, calculates ratio, and flags meters where ratio is below 0.5 and meter is connected; Stage 4 Filter and Enrich filters flagged meters and enriches with customer data, timestamps, and severity; Stage 5 Output routes high severity alerts to PagerDuty, all alerts to fraud database, and updates real-time dashboard map."
%%{init: {'theme': 'base', 'themeVariables': {'primaryColor': '#2C3E50', 'primaryTextColor': '#fff', 'primaryBorderColor': '#16A085', 'lineColor': '#16A085', 'secondaryColor': '#E67E22', 'tertiaryColor': '#7F8C8D'}}}%%
flowchart TD
    subgraph S1["Stage 1: Ingest"]
        S1A["MQTT from meters β†’ Kafka topic"]
        S1B["Partition by meter_id for parallel processing"]
    end

    subgraph S2["Stage 2: Calculate Baseline"]
        S2A["Sliding window: 1 hour, slide 15 sec"]
        S2B["Group by: meter_id"]
        S2C["Compute: average_consumption"]
    end

    subgraph S3["Stage 3: Compare Current vs Baseline"]
        S3A["Join current reading with baseline"]
        S3B["Calculate: ratio = current / baseline"]
        S3C["Flag if: ratio < 0.5 AND meter_connected"]
    end

    subgraph S4["Stage 4: Filter & Enrich"]
        S4A["Filter: only flagged meters"]
        S4B["Enrich: customer_id, address, account_status"]
        S4C["Add: timestamp, severity, evidence"]
    end

    subgraph S5["Stage 5: Output"]
        S5A["High severity β†’ PagerDuty for investigation"]
        S5B["All alerts β†’ Fraud analytics database"]
        S5C["Dashboard β†’ Real-time map of suspected theft"]
    end

    S1 --> S2
    S2 --> S3
    S3 --> S4
    S4 --> S5

Technology Choice: Apache Flink - Reason: Sophisticated event-time windowing, exactly-once semantics critical for fraud detection, sub-second latency

1303.5 What’s Next

Continue to Handling Real-World Challenges to learn about late data handling, exactly-once processing, backpressure management, and fault tolerance strategies.