10  Integration Patterns for IoT

Chapter Topic
Stream Processing Overview of batch vs. stream processing for IoT
Fundamentals Windowing strategies, watermarks, and event-time semantics
Architectures Kafka, Flink, and Spark Streaming comparison
Pipelines End-to-end ingestion, processing, and output design
Challenges Late data, exactly-once semantics, and backpressure
Pitfalls Common mistakes and production worked examples
Basic Lab ESP32 circular buffers, windows, and event detection
Advanced Lab CEP, pattern matching, and anomaly detection on ESP32
Game & Summary Interactive review game and module summary
Interoperability Four levels of IoT interoperability
Interop Fundamentals Technical, syntactic, semantic, and organizational layers
Interop Standards SenML, JSON-LD, W3C WoT, and oneM2M
Integration Patterns Protocol adapters, gateways, and ontology mapping
In 60 Seconds

IoT integration patterns solve the challenge of connecting heterogeneous devices using different protocols. The protocol adapter pattern provides a common interface for MQTT, CoAP, and HTTP devices, while unified gateways centralize translation through a single integration point. For complex enterprise deployments, ontology mapping with RDF enables semantic integration across vendors using different vocabularies.

Key Concepts
  • Integration Pattern: Reusable architectural solution for a common IoT integration challenge (point-to-point, hub-and-spoke, ESB, event-driven)
  • Point-to-Point Integration: Direct connections between each pair of systems — manageable for 2-3 systems but creates N×(N-1)/2 connections and N² maintenance for N systems
  • Hub-and-Spoke Integration: Central broker or integration platform handling all message routing, reducing N-system integration to 2N connections
  • Enterprise Service Bus (ESB): Middleware platform providing mediation, transformation, routing, and protocol conversion for IoT system integration at scale
  • Event-Driven Architecture (EDA): Integration approach where systems communicate through events rather than direct API calls, reducing temporal coupling and enabling replay
  • API Gateway: Integration pattern providing a unified endpoint for heterogeneous backend IoT services, handling authentication, rate limiting, and protocol translation
  • Data Integration vs. Process Integration: Data integration synchronizes information across systems; process integration coordinates workflows across organizational and system boundaries
  • iPaaS (Integration Platform as a Service): Cloud-based integration platform (MuleSoft, Dell Boomi, Azure Integration Services) providing connectors, transformations, and orchestration for IoT integration

Learning Objectives

After completing this chapter, you will be able to:

  • Design protocol adapter patterns for heterogeneous IoT systems
  • Implement unified gateway architectures for multi-protocol environments
  • Choose between ESB and microservices integration approaches
  • Apply ontology mapping for semantic integration across vendors
  • Design production-ready interoperability solutions with redundancy

Integration patterns are like a multilingual reception desk at an international hotel. Guests arrive speaking different languages (MQTT, CoAP, HTTP), but the reception desk translates everything into one common language so the hotel staff can serve everyone equally. In IoT, a unified gateway acts as that reception desk – it accepts data from devices using different protocols and presents it through a single, consistent interface. This chapter shows you the design patterns that make this possible.

10.1 Prerequisites

Before diving into this chapter, you should be familiar with:

Artistic visualization of data fabric architecture showing unified access layer spanning multiple data sources including IoT streams, databases, and external APIs, with intelligent routing and caching enabling seamless data consumption regardless of underlying storage technology
Figure 10.1: Data fabric provides unified access across heterogeneous IoT data sources. Rather than point-to-point integrations, the fabric layer abstracts underlying storage technologies, enabling applications to consume data through consistent APIs regardless of whether the source is a time-series database, object store, or real-time stream.

10.2 Protocol Adapter Pattern

⏱️ ~15 min | ⭐⭐ Intermediate | 📋 P10.C11.U07

The adapter pattern abstracts communication differences through a common interface, allowing applications to interact uniformly with devices using different protocols.

10.2.1 Abstract Base Class Design

from abc import ABC, abstractmethod
from typing import Dict, Any, Optional

class ProtocolAdapter(ABC):
    """Abstract base class for protocol adapters"""

    @abstractmethod
    def register_device(self, device_id: str, config: Dict) -> bool:
        """Register a new device with the adapter"""
        pass

    @abstractmethod
    def read_data(self, device_id: str) -> Optional[Dict[str, Any]]:
        """Read current data from device"""
        pass

    @abstractmethod
    def send_command(self, device_id: str, command: str, params: Dict) -> bool:
        """Send command to device"""
        pass

    @abstractmethod
    def disconnect(self, device_id: str) -> bool:
        """Disconnect and cleanup device connection"""
        pass

10.2.2 Concrete MQTT Adapter

import json
import paho.mqtt.client as mqtt

class MQTTAdapter(ProtocolAdapter):
    def __init__(self, broker: str, port: int = 1883):
        self.broker = broker
        self.port = port
        self.client = mqtt.Client()
        self.devices: Dict[str, Dict] = {}
        self.latest_data: Dict[str, Dict] = {}

        self.client.on_message = self._on_message
        self.client.connect(broker, port)
        self.client.loop_start()

    def _on_message(self, client, userdata, message):
        topic = message.topic
        for device_id, config in self.devices.items():
            if topic == config.get('data_topic'):
                self.latest_data[device_id] = json.loads(message.payload)

    def register_device(self, device_id: str, config: Dict) -> bool:
        self.devices[device_id] = config
        self.client.subscribe(config['data_topic'])
        return True

    def read_data(self, device_id: str) -> Optional[Dict[str, Any]]:
        return self.latest_data.get(device_id)

    def send_command(self, device_id: str, command: str, params: Dict) -> bool:
        config = self.devices.get(device_id)
        if not config:
            return False
        payload = json.dumps({"command": command, **params})
        self.client.publish(config['command_topic'], payload)
        return True

    def disconnect(self, device_id: str) -> bool:
        config = self.devices.pop(device_id, None)
        if config:
            self.client.unsubscribe(config['data_topic'])
        return True

10.2.3 Concrete CoAP Adapter

import json
from aiocoap import Context, Message, GET, PUT

class CoAPAdapter(ProtocolAdapter):
    def __init__(self):
        self.devices: Dict[str, Dict] = {}
        self.context = None

    async def _ensure_context(self):
        if not self.context:
            self.context = await Context.create_client_context()

    def register_device(self, device_id: str, config: Dict) -> bool:
        self.devices[device_id] = config
        return True

    async def read_data_async(self, device_id: str) -> Optional[Dict[str, Any]]:
        await self._ensure_context()
        config = self.devices.get(device_id)
        if not config:
            return None

        uri = f"coap://{config['host']}:{config['port']}/{config['resource']}"
        request = Message(code=GET, uri=uri)
        response = await self.context.request(request).response
        return json.loads(response.payload.decode())

    def read_data(self, device_id: str) -> Optional[Dict[str, Any]]:
        import asyncio
        return asyncio.run(self.read_data_async(device_id))

    def send_command(self, device_id: str, command: str, params: Dict) -> bool:
        # Similar async implementation with PUT
        pass

    def disconnect(self, device_id: str) -> bool:
        return self.devices.pop(device_id, None) is not None

10.2.4 Why Abstract Methods Matter

10.3 Unified Gateway Architecture

⏱️ ~10 min | ⭐⭐ Intermediate | 📋 P10.C11.U08

A unified gateway centralizes protocol translation, providing applications with a single integration point regardless of underlying device protocols.

10.3.1 Gateway Implementation

class UnifiedGateway:
    def __init__(self):
        self.adapters: Dict[str, ProtocolAdapter] = {}
        self.device_registry: Dict[str, str] = {}  # device_id -> protocol

    def register_adapter(self, protocol: str, adapter: ProtocolAdapter):
        """Register a protocol adapter"""
        self.adapters[protocol] = adapter

    def register_device(self, device_id: str, protocol: str, config: Dict) -> bool:
        """Register device with appropriate adapter"""
        if protocol not in self.adapters:
            raise ValueError(f"No adapter for protocol: {protocol}")

        success = self.adapters[protocol].register_device(device_id, config)
        if success:
            self.device_registry[device_id] = protocol
        return success

    def read_data(self, device_id: str) -> Optional[Dict[str, Any]]:
        """Read data from any device using appropriate adapter"""
        protocol = self.device_registry.get(device_id)
        if not protocol:
            return None
        return self.adapters[protocol].read_data(device_id)

    def read_all(self) -> Dict[str, Dict[str, Any]]:
        """Read data from all registered devices"""
        results = {}
        for device_id in self.device_registry:
            data = self.read_data(device_id)
            if data:
                results[device_id] = data
        return results

10.3.2 Usage Example

# Initialize gateway
gateway = UnifiedGateway()

# Register protocol adapters
gateway.register_adapter("mqtt", MQTTAdapter("mqtt.example.com"))
gateway.register_adapter("coap", CoAPAdapter())
gateway.register_adapter("http", HTTPAdapter())

# Register devices (protocol-specific details hidden)
gateway.register_device("temp-001", "mqtt", {
    "data_topic": "sensors/temp-001/data",
    "command_topic": "sensors/temp-001/cmd"
})

gateway.register_device("occupancy-002", "coap", {
    "host": "192.168.1.50",
    "port": 5683,
    "resource": "status"
})

# Application uses unified interface
all_data = gateway.read_all()
# Returns: {"temp-001": {"temp": 22.5}, "occupancy-002": {"occupied": true}}

10.4 Enterprise Service Bus (ESB) Pattern

⏱️ ~10 min | ⭐⭐ Intermediate | 📋 P10.C11.U09

For complex enterprise IoT ecosystems, the Enterprise Service Bus provides centralized routing, transformation, and orchestration.

10.4.1 ESB Capabilities

Capability Description IoT Example
Protocol Mediation Translate between protocols MQTT → SOAP for ERP
Data Transformation Convert data formats SenML → JSON-LD → XML
Content-Based Routing Route by message content Temperature >40°C → alerts
Orchestration Coordinate multi-step workflows Sensor → rules → DB → notify
QoS Management Guaranteed delivery, throttling Priority queuing for alarms

10.4.2 When to Use ESB vs Microservices

Factor ESB Microservices
Scale 100+ devices/services Any scale
Complexity Complex transformations Simple point-to-point
Governance Centralized control needed Decentralized OK
Latency Acceptable latency increase Ultra-low latency needed
Team Structure Central integration team Distributed teams

10.5 Semantic Integration with Ontology Mapping

⏱️ ~10 min | ⭐⭐⭐ Advanced | 📋 P10.C11.U10

When integrating sensors from different manufacturers using different vocabularies, ontology mapping enables semantic integration.

10.5.1 The Challenge

Manufacturer A uses airTemp, B uses temperature, C uses AT. All mean the same thing but machines cannot know this without semantic mapping.

10.5.2 Solution: RDF Ontology Mapping

# Ontology mapping file (mapping.ttl)
@prefix manA: <http://vendor-a.com/ont#> .
@prefix manB: <http://vendor-b.com/ont#> .
@prefix manC: <http://vendor-c.com/ont#> .
@prefix sosa: <http://www.w3.org/ns/sosa/> .
@prefix owl: <http://www.w3.org/2002/07/owl#> .

# Map all vendor terms to common SOSA concept
manA:airTemp owl:equivalentProperty sosa:hasResult .
manB:temperature owl:equivalentProperty sosa:hasResult .
manC:AT owl:equivalentProperty sosa:hasResult .

10.5.3 Query Across Heterogeneous Data

With mappings loaded, SPARQL queries work across all vendors:

PREFIX sosa: <http://www.w3.org/ns/sosa/>

SELECT ?sensor ?temperature
WHERE {
  ?observation sosa:madeBySensor ?sensor .
  ?observation sosa:hasResult ?temperature .
}

This query finds temperature readings regardless of whether the original data used airTemp, temperature, or AT.

10.6 Worked Example: Smart Building Gateway Design

Scenario: A commercial building integrator must unify data from three vendor systems that use different protocols and data formats. The building management system needs a single API to access all sensor data for HVAC optimization.

Given:

  • Vendor A (HVAC): Modbus TCP, registers as 16-bit integers, temperature in 0.1°F increments
  • Vendor B (Occupancy): BACnet/IP, JSON objects, occupancy as boolean with timestamp
  • Vendor C (Energy): MQTT, SenML format, power in watts with ISO 8601 timestamps
  • Target output: Unified JSON-LD with SOSA ontology annotations
  • 200 sensors total (80 HVAC, 60 occupancy, 60 energy meters)

Solution:

Step 1: Analyze source data formats

Vendor A (Modbus) raw reading:

Register 40001: 0x02BC (700 decimal) → 70.0°F

Vendor B (BACnet) message:

{"objectId": "AI:1", "presentValue": true, "timestamp": "2024-01-15T10:30:00"}

Vendor C (SenML) message:

[{"bn": "urn:dev:meter:1234:", "n": "power", "v": 1523.5, "u": "W", "t": 1705315800}]

Step 2: Design canonical data model

Create unified schema using JSON-LD with SOSA ontology:

{
  "@context": {
    "sosa": "http://www.w3.org/ns/sosa/",
    "qudt": "http://qudt.org/schema/qudt/"
  },
  "@type": "sosa:Observation",
  "sosa:madeBySensor": "building-a/floor-2/zone-1/temp-01",
  "sosa:observedProperty": "qudt:Temperature",
  "sosa:hasSimpleResult": 21.1,
  "qudt:unit": "qudt:DEG_C",
  "sosa:resultTime": "2024-01-15T10:30:00Z"
}

Step 3: Implement protocol adapters

Source Transformation Required
Modbus (A) Read register → divide by 10 → convert °F to °C → wrap in JSON-LD
BACnet (B) Parse JSON → map objectId to sensor URI → add ontology context
MQTT/SenML (C) Parse SenML → convert Unix timestamp to ISO 8601 → map to SOSA

Temperature conversion: °C = (°F - 32) × 5/9 Example: 70.0°F → (70 - 32) × 5/9 = 21.1°C

Step 4: Calculate gateway sizing

Vendor Sensors Poll Rate Messages/sec
A (Modbus) 80 1/min 1.3
B (BACnet) 60 1/min 1.0
C (MQTT) 60 1/sec 60.0

Total throughput: 62.3 messages/sec = 3,738 messages/minute

Gateway requirements: - CPU: Low (simple transformations) - ARM Cortex-A53 sufficient - Memory: 512 MB (protocol stacks + buffer) - Storage: 4 GB (24-hour local cache for offline operation)

Step 5: Define semantic mapping rules

mappings:
  vendor_a:
    source_field: "register_value"
    transform: "(value / 10 - 32) * 5 / 9"
    target: "sosa:hasSimpleResult"
    unit: "qudt:DEG_C"

  vendor_b:
    source_field: "presentValue"
    transform: "boolean_to_int"
    target: "sosa:hasSimpleResult"
    observed_property: "sosa:Occupancy"

  vendor_c:
    source_field: "v"
    transform: "passthrough"
    target: "sosa:hasSimpleResult"
    unit_mapping:
      "W": "qudt:W"
      "kWh": "qudt:KiloW-HR"

Result: The gateway provides a single REST API endpoint returning JSON-LD formatted observations from all 200 sensors with consistent units (metric), timestamps (ISO 8601), and semantic annotations (SOSA/QUDT). Building management software queries one API instead of three proprietary interfaces.

Key Insight: Interoperability requires addressing all four levels–Technical (protocol adapters for Modbus/BACnet/MQTT), Syntactic (JSON-LD output format), Semantic (SOSA ontology + unit conversion), and Organizational (consistent naming conventions like building/floor/zone/sensor). Skipping any level creates integration debt that compounds as the system scales.

10.7 Production Considerations

10.7.1 Pitfall: Missing Data Lineage Tracking

Pitfall: Missing Data Lineage Tracking

The Mistake: Transforming and aggregating IoT data through multiple pipeline stages without tracking provenance, making it impossible to trace anomalies back to their source.

Why It Happens: Lineage tracking adds storage overhead and pipeline complexity. Initial deployments focus on getting data flowing.

The Fix: Embed lineage metadata from the start using correlation IDs that flow through all transformations. Each processing stage should emit: source_id, transform_id, timestamp, and version. Store lineage in a queryable format (graph database or lineage service like Apache Atlas).

10.7.2 Pitfall: Undocumented Unit Assumptions

Pitfall: Undocumented Unit Assumptions

The Mistake: Processing sensor data without explicit unit documentation, leading to silent conversion errors when data from different sources uses different measurement units.

The Fix: Make units explicit and machine-readable in every data payload using standards like SenML ("u": "Cel") or QUDT ontology references. Never assume implicit units. Implement unit validation at ingestion: reject or flag data missing unit annotations.

10.7.3 Pitfall: Stale Device Registry

Pitfall: Stale Device Registry

The Mistake: Enriching streaming data with device metadata from a registry that is updated asynchronously, causing enrichment failures when devices are added or moved.

The Fix: Implement registry change propagation as events in the same streaming platform. Use changelog-based enrichment (Kafka KTable pattern) where registry updates automatically propagate to join results. Set cache TTLs appropriate to your change frequency.

10.9 Summary

  • Protocol adapter pattern abstracts communication differences through a common interface (register_device, read_data, send_command) with concrete implementations for each protocol, enabling applications to interact uniformly without hardcoding protocol details.
  • Unified gateway architecture centralizes protocol translation, providing applications with a single integration point and enabling centralized data validation, routing, and security enforcement across heterogeneous device ecosystems.
  • Enterprise Service Bus (ESB) provides centralized routing, transformation, and orchestration for complex enterprise IoT deployments (>100 devices/services), with capabilities including protocol mediation, content-based routing, and guaranteed delivery.
  • Ontology mapping using RDF and SPARQL enables semantic integration across vendors using different vocabularies by linking terms to common reference ontologies (SOSA/SSN), allowing unified queries regardless of original terminology.
  • Production deployments require redundant gateways, data lineage tracking, explicit unit documentation, and registry synchronization to avoid common pitfalls that cause silent failures at scale.

10.10 Concept Relationships

Integration patterns bridge theory and implementation:

These connections show patterns as bridges between concepts and code.

10.11 See Also

Implementation Frameworks:

  • Apache Camel – Enterprise integration patterns with 300+ connectors
  • Spring Integration – Java-based messaging and integration
  • Node-RED – Visual flow-based programming for IoT integration

Gateway Solutions:

Protocol gateway throughput determines max device capacity. Calculate gateway sizing for multi-protocol IoT deployment:

Message Processing Rate: \(R = \frac{N_{cores} \times E}{T_{avg}}\) where \(N_{cores}\) = core count, \(E\) = parallelization efficiency (0.7–0.9), \(T_{avg}\) = weighted average processing time per message.

Worked example (smart building gateway with 200 sensors, 3 protocols):

  • Gateway: 4-core ARM processor, 85% efficiency
  • MQTT adapter: 0.5 ms/message (simple JSON parse)
  • CoAP adapter: 1.2 ms/message (binary decode + ACK)
  • Modbus adapter: 2.0 ms/message (register read + conversion)
  • Weighted average (80 MQTT, 60 CoAP, 60 Modbus): \(T_{avg} = \frac{80(0.5) + 60(1.2) + 60(2.0)}{200} = \frac{232}{200} = 1.16\) ms/message

Each core processes \(\frac{1000}{1.16} = 862\) messages/sec. With 4 cores at 85% efficiency: $R = 4 = $ 2,931 messages/sec.

From the worked example above, actual load is 62.3 msg/sec (80 MQTT at 1/sec + 60 BACnet at 1/min + 60 Modbus at 1/min). Gateway utilization: \(\frac{62.3}{2{,}931} = 2.1\%\). Sufficient headroom for significant growth.

10.13 What’s Next

If you want to… Read this
Study the fundamentals of interoperability Interoperability Fundamentals
Learn IoT interoperability standards Interoperability Standards
See the complete interoperability overview Interoperability
Understand protocol bridging in practice Communication and Protocol Bridging

Now that you understand integration patterns:

Test Your Knowledge:

Data Management:

Protocols & Standards:

Architecture:

Learning Hubs:

10.14 Resources

10.14.1 Standards Organizations

10.14.2 Libraries