β±οΈ ~15 min | βββ Advanced | π P10.C14.U04
A complete IoT streaming pipeline transforms raw sensor data into actionable insights through multiple processing stages. This section covers the design and implementation of production-ready streaming pipelines.
1303.1.1 Pipeline Architecture
Graph diagram
Figure 1303.1: Complete IoT streaming pipeline from edge devices through ingestion, enrichment, aggregation, and output stages
1303.1.2 Stage 1: Data Ingestion
The ingestion layer handles receiving data from diverse IoT sources and normalizing it for processing.
Design Principles:
Protocol Translation: Convert MQTT, CoAP, HTTP into unified message format
Schema Validation: Validate incoming data against defined schemas
Partitioning: Route messages to appropriate partitions for parallel processing
Backpressure: Handle traffic spikes without data loss
# Kafka producer with IoT-optimized configurationfrom confluent_kafka import Producerproducer_config = {'bootstrap.servers': 'kafka:9092','client.id': 'iot-gateway','acks': 'all', # Wait for all replicas'retries': 10, # Retry on transient failures'batch.size': 65536, # 64KB batches for efficiency'linger.ms': 5, # Wait up to 5ms for batching'compression.type': 'lz4', # Fast compression}def on_delivery(err, msg):if err: logger.error(f"Delivery failed: {err}")# Implement retry or dead-letter queueproducer = Producer(producer_config)def publish_sensor_reading(sensor_id, reading):# Partition by sensor_id for ordered per-sensor processing producer.produce( topic='iot-raw-events', key=sensor_id.encode(), value=json.dumps(reading).encode(), callback=on_delivery ) producer.poll(0) # Trigger delivery callbacks
Show code
{const container =document.getElementById('kc-data-5');if (container &&typeof InlineKnowledgeCheck !=='undefined') { container.innerHTML=''; container.appendChild(InlineKnowledgeCheck.create({question:"An IoT gateway receives data from 10,000 sensors using MQTT, CoAP, and HTTP protocols. For downstream processing efficiency, all data needs to be in a common format. Which component should handle protocol translation?",options: [ {text:"Stream processor (Flink/Kafka Streams) should translate protocols",correct:false,feedback:"Stream processors should focus on business logic (aggregation, pattern detection), not protocol translation. Mixing protocol parsing with analytics creates tight coupling and reduces reusability."}, {text:"Ingestion layer should translate to unified format before stream processing",correct:true,feedback:"Correct! The ingestion layer (API gateway, MQTT broker, protocol adapters) should normalize all protocols to a common message format (e.g., JSON with standard schema) before publishing to Kafka. This decouples protocol handling from analytics and allows the stream processor to focus on business logic."}, {text:"Database layer should handle translation during storage",correct:false,feedback:"By the time data reaches the database, real-time processing opportunities have passed. Translation must happen early in the pipeline to enable stream processing on normalized data."}, {text:"Each downstream consumer should translate independently",correct:false,feedback:"Having N consumers each implement M protocol translations creates O(N*M) complexity. Centralizing translation in the ingestion layer means implementing each protocol once, consumed by all downstream systems."} ],difficulty:"medium",topic:"data-processing" })); }}
1303.1.3 Stage 2: Stream Processing
The processing layer applies transformations, aggregations, and analytics to the data stream.
Common Operations:
Parsing: Extract structured data from raw messages
Filtering: Remove invalid or irrelevant events
Enrichment: Add metadata, geolocation, device info
Aggregation: Compute statistics over windows
Alerting: Detect anomalies and threshold violations
# Flink processing pipelinefrom pyflink.datastream import StreamExecutionEnvironmentfrom pyflink.table import StreamTableEnvironmentenv = StreamExecutionEnvironment.get_execution_environment()t_env = StreamTableEnvironment.create(env)# Define source table from Kafkat_env.execute_sql(""" CREATE TABLE sensor_readings ( sensor_id STRING, temperature DOUBLE, humidity DOUBLE, timestamp TIMESTAMP(3), WATERMARK FOR timestamp AS timestamp - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'iot-raw-events', 'properties.bootstrap.servers' = 'kafka:9092', 'format' = 'json' )""")# Tumbling window aggregationresult = t_env.sql_query(""" SELECT sensor_id, TUMBLE_START(timestamp, INTERVAL '5' MINUTE) AS window_start, AVG(temperature) AS avg_temp, MAX(temperature) AS max_temp, MIN(temperature) AS min_temp, COUNT(*) AS reading_count FROM sensor_readings GROUP BY sensor_id, TUMBLE(timestamp, INTERVAL '5' MINUTE)""")
1303.1.4 Stage 3: Output and Actions
The output layer delivers processed results to consumers and triggers automated actions.
Output Destinations:
Time-Series Databases: InfluxDB, TimescaleDB for metrics storage
Data Lakes: S3, HDFS for historical analysis
Real-Time Dashboards: WebSocket pushes to Grafana, custom UIs
Alerting Systems: PagerDuty, Slack, SMS for critical alerts
Control Systems: Actuator commands, automated responses
# Multi-sink output configuration@udf(result_type=Types.STRING())def route_message(severity: str, message: str) ->str:"""Route messages based on severity"""if severity =='CRITICAL':# Immediate alert send_pagerduty_alert(message)return'pagerduty'elif severity =='WARNING':# Log to monitoringreturn'monitoring'else:# Standard metrics storagereturn'timeseries'
1303.2 Interactive Pipeline Demo
Experiment with a simplified streaming pipeline using Observable JavaScript:
This interactive demo simulates a real streaming pipeline with: - Configurable event rate: Adjust how fast sensor readings arrive - Windowed aggregation: Compute average temperature over configurable window sizes - Anomaly detection: Flag readings above the threshold - Real-time alerts: Display detected anomalies immediately
1303.3 Real-World Case Study: Netflix Streaming Analytics
Netflix processes over 8 million events per second from their streaming platform to provide real-time recommendations, detect playback issues, and optimize content delivery.
1303.3.1 System Architecture
Scale: - 230+ million subscribers globally - 8 million events/second during peak hours - 1.3 petabytes of data processed daily - Sub-second latency for recommendation updates
1-minute tumbling windows for error rates by region
10-minute sliding windows for trending content
Session windows for viewing sessions (gap timeout: 30 minutes)
Outputs:
Real-time dashboards for operations teams
A/B testing metrics for product features
Content delivery network (CDN) optimization
Personalized homepage updated within 1 second
Key Challenges Solved:
Backpressure during outages: When CDN fails, millions of error events flood the pipeline. Kafka buffering prevents data loss.
Global clock skew: Devices in different timezones and with inaccurate clocks require watermark strategies with 5-minute lag.
Exactly-once for billing: View duration metrics must be exact for royalty payments to content creators.
Results: - 99.99% uptime for streaming analytics - $1 billion annual savings through network optimization - 80% of viewing driven by recommendation engine
1303.4 Understanding Check
NoteScenario: Smart Grid Processing
Youβre designing a stream processing system for a smart grid utility with the following requirements:
Scale: 100,000 smart meters
Frequency: Each meter reports power consumption every 15 seconds
Detection: Identify power theft within 1 minute of occurrence
Theft Pattern: Consumption drops by >50% compared to historical average but meter remains connected
Question 1: What is your event rate?
Solution: - 100,000 meters x (1 reading / 15 seconds) = 6,667 events per second - Peak rate (considering network retries): ~10,000 events per second
Question 2: What windowing strategy would you use for theft detection?
Solution: Sliding windows with: - Window size: 1 hour (for historical average) - Slide interval: 15 seconds (update on each reading) - Rationale: Need to compare current reading against recent average, updated continuously
Alternative approach: Two windows: - Long tumbling window (24 hours) for baseline average - Short sliding window (1 minute) for current consumption - Alert when ratio < 0.5
Question 3: Design a simple pipeline to detect anomalies
Solution:
%% fig-alt: "Anomaly detection pipeline showing five stages: Stage 1 Ingest receives MQTT data from meters and sends to Kafka topic partitioned by meter_id; Stage 2 Calculate Baseline uses a 1-hour sliding window with 15-second slides grouped by meter_id to compute average consumption; Stage 3 Compare Current vs Baseline joins current readings with baseline, calculates ratio, and flags meters where ratio is below 0.5 and meter is connected; Stage 4 Filter and Enrich filters flagged meters and enriches with customer data, timestamps, and severity; Stage 5 Output routes high severity alerts to PagerDuty, all alerts to fraud database, and updates real-time dashboard map."
%%{init: {'theme': 'base', 'themeVariables': {'primaryColor': '#2C3E50', 'primaryTextColor': '#fff', 'primaryBorderColor': '#16A085', 'lineColor': '#16A085', 'secondaryColor': '#E67E22', 'tertiaryColor': '#7F8C8D'}}}%%
flowchart TD
subgraph S1["Stage 1: Ingest"]
S1A["MQTT from meters β Kafka topic"]
S1B["Partition by meter_id for parallel processing"]
end
subgraph S2["Stage 2: Calculate Baseline"]
S2A["Sliding window: 1 hour, slide 15 sec"]
S2B["Group by: meter_id"]
S2C["Compute: average_consumption"]
end
subgraph S3["Stage 3: Compare Current vs Baseline"]
S3A["Join current reading with baseline"]
S3B["Calculate: ratio = current / baseline"]
S3C["Flag if: ratio < 0.5 AND meter_connected"]
end
subgraph S4["Stage 4: Filter & Enrich"]
S4A["Filter: only flagged meters"]
S4B["Enrich: customer_id, address, account_status"]
S4C["Add: timestamp, severity, evidence"]
end
subgraph S5["Stage 5: Output"]
S5A["High severity β PagerDuty for investigation"]
S5B["All alerts β Fraud analytics database"]
S5C["Dashboard β Real-time map of suspected theft"]
end
S1 --> S2
S2 --> S3
S3 --> S4
S4 --> S5
Continue to Handling Real-World Challenges to learn about late data handling, exactly-once processing, backpressure management, and fault tolerance strategies.