1313 Integration Patterns for IoT
Learning Objectives
After completing this chapter, you will be able to:
- Design protocol adapter patterns for heterogeneous IoT systems
- Implement unified gateway architectures for multi-protocol environments
- Choose between ESB and microservices integration approaches
- Apply ontology mapping for semantic integration across vendors
- Design production-ready interoperability solutions with redundancy
1313.1 Prerequisites
Before diving into this chapter, you should be familiar with:
- Interoperability Fundamentals: Understanding the four interoperability levels and challenges
- Interoperability Standards: Knowledge of SenML, JSON-LD, and W3C WoT for standardized data exchange
- Networking Basics: Familiarity with MQTT, CoAP, and HTTP protocols
1313.2 Protocol Adapter Pattern
The adapter pattern abstracts communication differences through a common interface, allowing applications to interact uniformly with devices using different protocols.
1313.2.1 Abstract Base Class Design
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional
class ProtocolAdapter(ABC):
"""Abstract base class for protocol adapters"""
@abstractmethod
def register_device(self, device_id: str, config: Dict) -> bool:
"""Register a new device with the adapter"""
pass
@abstractmethod
def read_data(self, device_id: str) -> Optional[Dict[str, Any]]:
"""Read current data from device"""
pass
@abstractmethod
def send_command(self, device_id: str, command: str, params: Dict) -> bool:
"""Send command to device"""
pass
@abstractmethod
def disconnect(self, device_id: str) -> bool:
"""Disconnect and cleanup device connection"""
pass1313.2.2 Concrete MQTT Adapter
import paho.mqtt.client as mqtt
class MQTTAdapter(ProtocolAdapter):
def __init__(self, broker: str, port: int = 1883):
self.broker = broker
self.port = port
self.client = mqtt.Client()
self.devices: Dict[str, Dict] = {}
self.latest_data: Dict[str, Dict] = {}
self.client.on_message = self._on_message
self.client.connect(broker, port)
self.client.loop_start()
def _on_message(self, client, userdata, message):
topic = message.topic
for device_id, config in self.devices.items():
if topic == config.get('data_topic'):
self.latest_data[device_id] = json.loads(message.payload)
def register_device(self, device_id: str, config: Dict) -> bool:
self.devices[device_id] = config
self.client.subscribe(config['data_topic'])
return True
def read_data(self, device_id: str) -> Optional[Dict[str, Any]]:
return self.latest_data.get(device_id)
def send_command(self, device_id: str, command: str, params: Dict) -> bool:
config = self.devices.get(device_id)
if not config:
return False
payload = json.dumps({"command": command, **params})
self.client.publish(config['command_topic'], payload)
return True
def disconnect(self, device_id: str) -> bool:
config = self.devices.pop(device_id, None)
if config:
self.client.unsubscribe(config['data_topic'])
return True1313.2.3 Concrete CoAP Adapter
from aiocoap import Context, Message, GET, PUT
class CoAPAdapter(ProtocolAdapter):
def __init__(self):
self.devices: Dict[str, Dict] = {}
self.context = None
async def _ensure_context(self):
if not self.context:
self.context = await Context.create_client_context()
def register_device(self, device_id: str, config: Dict) -> bool:
self.devices[device_id] = config
return True
async def read_data_async(self, device_id: str) -> Optional[Dict[str, Any]]:
await self._ensure_context()
config = self.devices.get(device_id)
if not config:
return None
uri = f"coap://{config['host']}:{config['port']}/{config['resource']}"
request = Message(code=GET, uri=uri)
response = await self.context.request(request).response
return json.loads(response.payload.decode())
def read_data(self, device_id: str) -> Optional[Dict[str, Any]]:
import asyncio
return asyncio.run(self.read_data_async(device_id))
def send_command(self, device_id: str, command: str, params: Dict) -> bool:
# Similar async implementation with PUT
pass
def disconnect(self, device_id: str) -> bool:
return self.devices.pop(device_id, None) is not None1313.2.4 Why Abstract Methods Matter
1313.3 Unified Gateway Architecture
A unified gateway centralizes protocol translation, providing applications with a single integration point regardless of underlying device protocols.
1313.3.1 Gateway Implementation
class UnifiedGateway:
def __init__(self):
self.adapters: Dict[str, ProtocolAdapter] = {}
self.device_registry: Dict[str, str] = {} # device_id -> protocol
def register_adapter(self, protocol: str, adapter: ProtocolAdapter):
"""Register a protocol adapter"""
self.adapters[protocol] = adapter
def register_device(self, device_id: str, protocol: str, config: Dict) -> bool:
"""Register device with appropriate adapter"""
if protocol not in self.adapters:
raise ValueError(f"No adapter for protocol: {protocol}")
success = self.adapters[protocol].register_device(device_id, config)
if success:
self.device_registry[device_id] = protocol
return success
def read_data(self, device_id: str) -> Optional[Dict[str, Any]]:
"""Read data from any device using appropriate adapter"""
protocol = self.device_registry.get(device_id)
if not protocol:
return None
return self.adapters[protocol].read_data(device_id)
def read_all(self) -> Dict[str, Dict[str, Any]]:
"""Read data from all registered devices"""
results = {}
for device_id in self.device_registry:
data = self.read_data(device_id)
if data:
results[device_id] = data
return results1313.3.2 Usage Example
# Initialize gateway
gateway = UnifiedGateway()
# Register protocol adapters
gateway.register_adapter("mqtt", MQTTAdapter("mqtt.example.com"))
gateway.register_adapter("coap", CoAPAdapter())
gateway.register_adapter("http", HTTPAdapter())
# Register devices (protocol-specific details hidden)
gateway.register_device("temp-001", "mqtt", {
"data_topic": "sensors/temp-001/data",
"command_topic": "sensors/temp-001/cmd"
})
gateway.register_device("occupancy-002", "coap", {
"host": "192.168.1.50",
"port": 5683,
"resource": "status"
})
# Application uses unified interface
all_data = gateway.read_all()
# Returns: {"temp-001": {"temp": 22.5}, "occupancy-002": {"occupied": true}}1313.4 Enterprise Service Bus (ESB) Pattern
For complex enterprise IoT ecosystems, the Enterprise Service Bus provides centralized routing, transformation, and orchestration.
1313.4.1 ESB Capabilities
| Capability | Description | IoT Example |
|---|---|---|
| Protocol Mediation | Translate between protocols | MQTT → SOAP for ERP |
| Data Transformation | Convert data formats | SenML → JSON-LD → XML |
| Content-Based Routing | Route by message content | Temperature >40°C → alerts |
| Orchestration | Coordinate multi-step workflows | Sensor → rules → DB → notify |
| QoS Management | Guaranteed delivery, throttling | Priority queuing for alarms |
1313.4.2 When to Use ESB vs Microservices
| Factor | ESB | Microservices |
|---|---|---|
| Scale | 100+ devices/services | Any scale |
| Complexity | Complex transformations | Simple point-to-point |
| Governance | Centralized control needed | Decentralized OK |
| Latency | Acceptable latency increase | Ultra-low latency needed |
| Team Structure | Central integration team | Distributed teams |
1313.5 Semantic Integration with Ontology Mapping
When integrating sensors from different manufacturers using different vocabularies, ontology mapping enables semantic integration.
1313.5.1 The Challenge
Manufacturer A uses airTemp, B uses temperature, C uses AT. All mean the same thing but machines cannot know this without semantic mapping.
1313.5.2 Solution: RDF Ontology Mapping
# Ontology mapping file (mapping.ttl)
@prefix manA: <http://vendor-a.com/ont#> .
@prefix manB: <http://vendor-b.com/ont#> .
@prefix manC: <http://vendor-c.com/ont#> .
@prefix sosa: <http://www.w3.org/ns/sosa/> .
@prefix owl: <http://www.w3.org/2002/07/owl#> .
# Map all vendor terms to common SOSA concept
manA:airTemp owl:equivalentProperty sosa:hasResult .
manB:temperature owl:equivalentProperty sosa:hasResult .
manC:AT owl:equivalentProperty sosa:hasResult .
1313.5.3 Query Across Heterogeneous Data
With mappings loaded, SPARQL queries work across all vendors:
PREFIX sosa: <http://www.w3.org/ns/sosa/>
SELECT ?sensor ?temperature
WHERE {
?observation sosa:madeBySensor ?sensor .
?observation sosa:hasResult ?temperature .
}
This query finds temperature readings regardless of whether the original data used airTemp, temperature, or AT.
1313.6 Worked Example: Smart Building Gateway Design
Scenario: A commercial building integrator must unify data from three vendor systems that use different protocols and data formats. The building management system needs a single API to access all sensor data for HVAC optimization.
Given: - Vendor A (HVAC): Modbus TCP, registers as 16-bit integers, temperature in 0.1°F increments - Vendor B (Occupancy): BACnet/IP, JSON objects, occupancy as boolean with timestamp - Vendor C (Energy): MQTT, SenML format, power in watts with ISO 8601 timestamps - Target output: Unified JSON-LD with SOSA ontology annotations - 200 sensors total (80 HVAC, 60 occupancy, 60 energy meters)
Solution:
Step 1: Analyze source data formats
Vendor A (Modbus) raw reading:
Register 40001: 0x02BC (700 decimal) → 70.0°F
Vendor B (BACnet) message:
{"objectId": "AI:1", "presentValue": true, "timestamp": "2024-01-15T10:30:00"}Vendor C (SenML) message:
[{"bn": "urn:dev:meter:1234:", "n": "power", "v": 1523.5, "u": "W", "t": 1705315800}]Step 2: Design canonical data model
Create unified schema using JSON-LD with SOSA ontology:
{
"@context": {
"sosa": "http://www.w3.org/ns/sosa/",
"qudt": "http://qudt.org/schema/qudt/"
},
"@type": "sosa:Observation",
"sosa:madeBySensor": "building-a/floor-2/zone-1/temp-01",
"sosa:observedProperty": "qudt:Temperature",
"sosa:hasSimpleResult": 21.1,
"qudt:unit": "qudt:DEG_C",
"sosa:resultTime": "2024-01-15T10:30:00Z"
}Step 3: Implement protocol adapters
| Source | Transformation Required |
|---|---|
| Modbus (A) | Read register → divide by 10 → convert °F to °C → wrap in JSON-LD |
| BACnet (B) | Parse JSON → map objectId to sensor URI → add ontology context |
| MQTT/SenML (C) | Parse SenML → convert Unix timestamp to ISO 8601 → map to SOSA |
Temperature conversion: °C = (°F - 32) × 5/9 Example: 70.0°F → (70 - 32) × 5/9 = 21.1°C
Step 4: Calculate gateway sizing
| Vendor | Sensors | Poll Rate | Messages/sec |
|---|---|---|---|
| A (Modbus) | 80 | 1/min | 1.3 |
| B (BACnet) | 60 | 1/min | 1.0 |
| C (MQTT) | 60 | 1/sec | 60.0 |
Total throughput: 62.3 messages/sec = 3,738 messages/minute
Gateway requirements: - CPU: Low (simple transformations) - ARM Cortex-A53 sufficient - Memory: 512 MB (protocol stacks + buffer) - Storage: 4 GB (24-hour local cache for offline operation)
Step 5: Define semantic mapping rules
mappings:
vendor_a:
source_field: "register_value"
transform: "(value / 10 - 32) * 5 / 9"
target: "sosa:hasSimpleResult"
unit: "qudt:DEG_C"
vendor_b:
source_field: "presentValue"
transform: "boolean_to_int"
target: "sosa:hasSimpleResult"
observed_property: "sosa:Occupancy"
vendor_c:
source_field: "v"
transform: "passthrough"
target: "sosa:hasSimpleResult"
unit_mapping:
"W": "qudt:W"
"kWh": "qudt:KiloW-HR"Result: The gateway provides a single REST API endpoint returning JSON-LD formatted observations from all 200 sensors with consistent units (metric), timestamps (ISO 8601), and semantic annotations (SOSA/QUDT). Building management software queries one API instead of three proprietary interfaces.
Key Insight: Interoperability requires addressing all four levels—Technical (protocol adapters for Modbus/BACnet/MQTT), Syntactic (JSON-LD output format), Semantic (SOSA ontology + unit conversion), and Organizational (consistent naming conventions like building/floor/zone/sensor). Skipping any level creates integration debt that compounds as the system scales.
1313.7 Production Considerations
1313.7.1 Pitfall: Missing Data Lineage Tracking
The Mistake: Transforming and aggregating IoT data through multiple pipeline stages without tracking provenance, making it impossible to trace anomalies back to their source.
Why It Happens: Lineage tracking adds storage overhead and pipeline complexity. Initial deployments focus on getting data flowing.
The Fix: Embed lineage metadata from the start using correlation IDs that flow through all transformations. Each processing stage should emit: source_id, transform_id, timestamp, and version. Store lineage in a queryable format (graph database or lineage service like Apache Atlas).
1313.7.2 Pitfall: Undocumented Unit Assumptions
The Mistake: Processing sensor data without explicit unit documentation, leading to silent conversion errors when data from different sources uses different measurement units.
The Fix: Make units explicit and machine-readable in every data payload using standards like SenML ("u": "Cel") or QUDT ontology references. Never assume implicit units. Implement unit validation at ingestion: reject or flag data missing unit annotations.
1313.7.3 Pitfall: Stale Device Registry
The Mistake: Enriching streaming data with device metadata from a registry that is updated asynchronously, causing enrichment failures when devices are added or moved.
The Fix: Implement registry change propagation as events in the same streaming platform. Use changelog-based enrichment (Kafka KTable pattern) where registry updates automatically propagate to join results. Set cache TTLs appropriate to your change frequency.
1313.8 Visual Reference Gallery
1313.9 Summary
- Protocol adapter pattern abstracts communication differences through a common interface (register_device, read_data, send_command) with concrete implementations for each protocol, enabling applications to interact uniformly without hardcoding protocol details.
- Unified gateway architecture centralizes protocol translation, providing applications with a single integration point and enabling centralized data validation, routing, and security enforcement across heterogeneous device ecosystems.
- Enterprise Service Bus (ESB) provides centralized routing, transformation, and orchestration for complex enterprise IoT deployments (>100 devices/services), with capabilities including protocol mediation, content-based routing, and guaranteed delivery.
- Ontology mapping using RDF and SPARQL enables semantic integration across vendors using different vocabularies by linking terms to common reference ontologies (SOSA/SSN), allowing unified queries regardless of original terminology.
- Production deployments require redundant gateways, data lineage tracking, explicit unit documentation, and registry synchronization to avoid common pitfalls that cause silent failures at scale.
1313.10 What’s Next
Now that you understand integration patterns:
- Edge Processing: Explore Edge Compute Patterns for local data handling
- Big Data: Scale up with Big Data Overview
- Data Storage: Learn storage patterns in Data Storage and Databases
Test Your Knowledge: - Data Analytics Quiz - Knowledge Gaps - Data
Data Management: - Data Storage - Database patterns - Big Data Overview - Scaling data systems - Data in the Cloud - Cloud data services
Protocols & Standards: - IoT Protocols Overview - Protocol landscape - MQTT - Pub/sub messaging - CoAP - REST for IoT
Architecture: - IoT Reference Models - Standard frameworks
Learning Hubs: - Quiz Navigator - Test your understanding
1313.11 Resources
1313.11.1 Standards Organizations
1313.11.2 Libraries
- RDFLib - Python RDF library
- Apache Jena - Java RDF and SPARQL
- Eclipse Paho - MQTT clients
- aiocoap - Python CoAP library