191 Gateway Protocol Translation Lab
191.1 Learning Objectives
By the end of this chapter, you will be able to:
- Implement I2C Communication: Read temperature sensors via I2C protocol
- Implement SPI Communication: Read accelerometer data via SPI interface
- Publish to MQTT: Format and publish sensor data to cloud broker
- Add Edge Processing: Implement local data aggregation and anomaly detection
- Handle Protocol Challenges: Manage timing mismatches and data formatting
191.2 Introduction
Hands-on Wokwi simulation implementing a complete gateway that bridges I2C/SPI sensors to MQTT cloud protocols.
191.3 Gateway Protocol Translation Lab
191.3.1 Lab Introduction: What Youβll Learn
This hands-on lab demonstrates the core concepts of IoT gateway architecture using an ESP32 microcontroller simulation. Youβll explore how gateways serve as the critical bridge between sensor networks and cloud platforms, handling protocol translation, data transformation, and edge processing.
After completing this lab, you will be able to:
- Implement Protocol Bridging: Translate between simulated MQTT-style messages and HTTP-style requests on a single device
- Transform Data Formats: Convert between JSON, binary, and human-readable formats at the gateway layer
- Apply Edge Preprocessing: Filter, aggregate, and compress sensor data before βcloudβ transmission
- Aggregate Multiple Devices: Combine readings from multiple simulated sensors into unified messages
- Implement Local Caching: Buffer data during simulated network outages and batch transmissions
Think of an IoT gateway like a translator at the United Nations:
- Sensors speak different βlanguagesβ (I2C, SPI, analog signals)
- Cloud platforms only understand specific βlanguagesβ (HTTP, MQTT)
- The gateway sits in the middle, translating everything so everyone can communicate!
In this lab, your ESP32 acts as that translator, showing you exactly what happens inside a real IoT gateway.
191.3.2 Wokwi Simulation Environment
Wokwi is a free online simulator for Arduino, ESP32, and other microcontrollers. It allows you to experiment with gateway concepts without purchasing hardware. The simulation below includes an ESP32 with virtual sensors to demonstrate gateway operations.
Launch the simulator below and paste the provided code to explore gateway concepts interactively.
- Click the green Play button to start the simulation
- Watch the Serial Monitor (115200 baud) for gateway output
- The simulation demonstrates concepts - no actual network connectivity
- Modify the code to experiment with different scenarios
- Use Stop and Play to restart with code changes
191.3.3 Complete Gateway Simulation Code
Copy and paste this code into the Wokwi editor. The code demonstrates a comprehensive IoT gateway implementation with protocol translation, data transformation, edge processing, device aggregation, and local caching.
/*
* ============================================================================
* IoT GATEWAY PROTOCOL TRANSLATION LAB
* ============================================================================
*
* This simulation demonstrates the five core functions of an IoT gateway:
* 1. Protocol Bridging (MQTT-style to HTTP-style translation)
* 2. Data Format Transformation (JSON, Binary, Human-readable)
* 3. Edge Preprocessing and Filtering
* 4. Device Aggregation
* 5. Local Caching and Buffering
*
* Educational Purpose: Understand gateway architecture without real hardware
*
* Author: IoT Class Educational Series
* License: MIT
* ============================================================================
*/
#include <Arduino.h>
#include <ArduinoJson.h>
// ============================================================================
// CONFIGURATION CONSTANTS
// ============================================================================
// Gateway Identity
const char* GATEWAY_ID = "GW-ESP32-001";
const char* GATEWAY_VERSION = "1.0.0";
// Simulated Network Settings
const char* MQTT_BROKER = "mqtt.example.com";
const int MQTT_PORT = 1883;
const char* HTTP_ENDPOINT = "https://api.cloud.example.com/v1/telemetry";
// Timing Configuration (milliseconds)
const unsigned long SENSOR_READ_INTERVAL = 2000; // Read sensors every 2s
const unsigned long AGGREGATION_INTERVAL = 10000; // Aggregate every 10s
const unsigned long CACHE_FLUSH_INTERVAL = 30000; // Flush cache every 30s
const unsigned long NETWORK_CHECK_INTERVAL = 5000; // Check network every 5s
// Edge Processing Thresholds
const float TEMP_CHANGE_THRESHOLD = 0.5; // Only report if change > 0.5C
const float HUMIDITY_CHANGE_THRESHOLD = 2.0; // Only report if change > 2%
const int MOTION_DEBOUNCE_MS = 1000; // Ignore motion for 1s after trigger
// Cache Configuration
const int MAX_CACHE_SIZE = 50; // Maximum cached messages
const int BATCH_SIZE = 10; // Messages per batch transmission
// ============================================================================
// DATA STRUCTURES
// ============================================================================
// Represents a single sensor reading
struct SensorReading {
char sensorId[20];
char sensorType[15];
float value;
char unit[10];
unsigned long timestamp;
int quality; // 0-100 signal quality
};
// Represents a cached message waiting for transmission
struct CachedMessage {
char topic[50];
char payload[256];
unsigned long timestamp;
int priority; // 1=critical, 2=normal, 3=low
int retryCount;
bool valid;
};
// Gateway statistics
struct GatewayStats {
unsigned long messagesReceived;
unsigned long messagesTransmitted;
unsigned long messagesCached;
unsigned long messagesDropped;
unsigned long protocolTranslations;
unsigned long bytesProcessed;
unsigned long bytesSaved; // By compression/aggregation
unsigned long uptime;
};
// ============================================================================
// GLOBAL STATE
// ============================================================================
// Simulated sensor values
float currentTemp = 22.5;
float currentHumidity = 45.0;
float currentPressure = 1013.25;
bool motionDetected = false;
int lightLevel = 500;
// Previous values for change detection
float lastReportedTemp = 0;
float lastReportedHumidity = 0;
unsigned long lastMotionTime = 0;
// Cache for offline buffering
CachedMessage messageCache[MAX_CACHE_SIZE];
int cacheHead = 0;
int cacheCount = 0;
// Aggregation buffer
SensorReading aggregationBuffer[10];
int aggregationCount = 0;
// Network simulation state
bool networkConnected = true;
int networkFailureCounter = 0;
unsigned long lastNetworkCheck = 0;
// Timing trackers
unsigned long lastSensorRead = 0;
unsigned long lastAggregation = 0;
unsigned long lastCacheFlush = 0;
// Statistics
GatewayStats stats = {0, 0, 0, 0, 0, 0, 0, 0};
// ============================================================================
// UTILITY FUNCTIONS
// ============================================================================
/**
* Generates a simulated timestamp (seconds since boot)
*/
unsigned long getTimestamp() {
return millis() / 1000;
}
/**
* Formats a timestamp as ISO-8601 string
*/
void formatTimestamp(unsigned long ts, char* buffer, size_t len) {
// In real implementation, would use NTP time
// For simulation, we use relative time
snprintf(buffer, len, "2024-01-15T%02lu:%02lu:%02luZ",
(ts / 3600) % 24, (ts / 60) % 60, ts % 60);
}
/**
* Calculates a simple checksum for data integrity
*/
uint16_t calculateChecksum(const char* data, size_t len) {
uint16_t sum = 0;
for (size_t i = 0; i < len; i++) {
sum += (uint8_t)data[i];
}
return sum;
}
/**
* Simulates network connectivity with occasional failures
*/
bool checkNetworkStatus() {
// Simulate network going down every ~30 seconds for ~10 seconds
unsigned long cycleTime = millis() % 40000;
if (cycleTime > 30000 && cycleTime < 40000) {
if (networkConnected) {
Serial.println("\n[NETWORK] Connection lost - entering offline mode");
networkConnected = false;
networkFailureCounter++;
}
return false;
} else {
if (!networkConnected) {
Serial.println("\n[NETWORK] Connection restored - flushing cache");
networkConnected = true;
}
return true;
}
}
// ============================================================================
// SENSOR SIMULATION
// ============================================================================
/**
* Simulates reading from multiple sensors
* In real gateway: Would read from I2C, SPI, analog pins, etc.
*/
void simulateSensorReadings() {
// Temperature: Gradual drift with some noise
currentTemp += random(-10, 11) / 20.0;
currentTemp = constrain(currentTemp, 15.0, 35.0);
// Humidity: Inverse correlation with temperature
currentHumidity = 70.0 - currentTemp + random(-5, 6);
currentHumidity = constrain(currentHumidity, 20.0, 90.0);
// Pressure: Slow variation
currentPressure += random(-5, 6) / 10.0;
currentPressure = constrain(currentPressure, 980.0, 1040.0);
// Motion: Random events
if (random(100) < 5) { // 5% chance per reading
if (millis() - lastMotionTime > MOTION_DEBOUNCE_MS) {
motionDetected = true;
lastMotionTime = millis();
}
} else {
motionDetected = false;
}
// Light: Sinusoidal pattern simulating day/night
lightLevel = 500 + (int)(300 * sin(millis() / 60000.0));
}
// ============================================================================
// PROTOCOL TRANSLATION FUNCTIONS
// ============================================================================
/**
* Converts sensor data to MQTT-style message format
* MQTT uses topics and compact payloads
*/
void formatAsMqttMessage(const SensorReading& reading, char* topic,
char* payload, size_t payloadLen) {
// MQTT topic hierarchy: gateway/sensorType/sensorId
snprintf(topic, 50, "iot/%s/%s/%s", GATEWAY_ID,
reading.sensorType, reading.sensorId);
// Create JSON payload (compact for MQTT)
StaticJsonDocument<200> doc;
doc["v"] = reading.value;
doc["u"] = reading.unit;
doc["t"] = reading.timestamp;
doc["q"] = reading.quality;
serializeJson(doc, payload, payloadLen);
stats.protocolTranslations++;
}
/**
* Converts sensor data to HTTP-style request format
* HTTP uses REST endpoints and verbose payloads
*/
void formatAsHttpRequest(const SensorReading& reading, char* request,
size_t requestLen) {
// Build full HTTP-style payload with headers
StaticJsonDocument<400> doc;
// Device identification
doc["gatewayId"] = GATEWAY_ID;
doc["sensorId"] = reading.sensorId;
doc["sensorType"] = reading.sensorType;
// Measurement data
JsonObject measurement = doc.createNestedObject("measurement");
measurement["value"] = reading.value;
measurement["unit"] = reading.unit;
measurement["quality"] = reading.quality;
// Metadata
char timeStr[30];
formatTimestamp(reading.timestamp, timeStr, sizeof(timeStr));
doc["timestamp"] = timeStr;
doc["apiVersion"] = "v1";
// Simulate HTTP request format
char jsonPayload[300];
serializeJson(doc, jsonPayload, sizeof(jsonPayload));
snprintf(request, requestLen,
"POST /api/v1/telemetry HTTP/1.1\r\n"
"Host: api.cloud.example.com\r\n"
"Content-Type: application/json\r\n"
"X-Gateway-ID: %s\r\n"
"Content-Length: %d\r\n"
"\r\n%s",
GATEWAY_ID, strlen(jsonPayload), jsonPayload);
stats.protocolTranslations++;
}
/**
* Translates MQTT-style message to HTTP format (protocol bridging)
*/
void translateMqttToHttp(const char* mqttTopic, const char* mqttPayload,
char* httpRequest, size_t httpLen) {
Serial.println("\n[TRANSLATION] MQTT -> HTTP Protocol Bridge");
Serial.printf(" Input Topic: %s\n", mqttTopic);
Serial.printf(" Input Payload: %s\n", mqttPayload);
// Parse MQTT payload
StaticJsonDocument<200> mqttDoc;
deserializeJson(mqttDoc, mqttPayload);
// Extract components from topic
// Format: iot/gateway/sensorType/sensorId
char topicCopy[50];
strncpy(topicCopy, mqttTopic, sizeof(topicCopy));
char* parts[5];
int partCount = 0;
char* token = strtok(topicCopy, "/");
while (token && partCount < 5) {
parts[partCount++] = token;
token = strtok(NULL, "/");
}
// Build HTTP request
StaticJsonDocument<400> httpDoc;
httpDoc["source"] = "mqtt-bridge";
httpDoc["originalTopic"] = mqttTopic;
if (partCount >= 4) {
httpDoc["gateway"] = parts[1];
httpDoc["sensorType"] = parts[2];
httpDoc["sensorId"] = parts[3];
}
httpDoc["value"] = mqttDoc["v"];
httpDoc["unit"] = mqttDoc["u"];
httpDoc["quality"] = mqttDoc["q"];
char jsonBody[350];
serializeJsonPretty(httpDoc, jsonBody, sizeof(jsonBody));
snprintf(httpRequest, httpLen,
"POST /api/v1/mqtt-bridge HTTP/1.1\r\n"
"Content-Type: application/json\r\n"
"X-Translated-From: MQTT\r\n"
"\r\n%s", jsonBody);
Serial.println(" Output HTTP Request:");
Serial.println(httpRequest);
stats.protocolTranslations++;
}
// ============================================================================
// DATA TRANSFORMATION FUNCTIONS
// ============================================================================
/**
* Converts reading to binary format (compact for transmission)
* Reduces payload size by ~60% compared to JSON
*/
void formatAsBinary(const SensorReading& reading, uint8_t* buffer,
size_t* len) {
// Binary format:
// [0-3]: timestamp (uint32)
// [4-7]: value (float)
// [8]: quality (uint8)
// [9]: sensor type code (uint8)
// [10-11]: checksum (uint16)
memcpy(buffer, &reading.timestamp, 4);
memcpy(buffer + 4, &reading.value, 4);
buffer[8] = (uint8_t)reading.quality;
// Encode sensor type as single byte
if (strcmp(reading.sensorType, "temperature") == 0) buffer[9] = 0x01;
else if (strcmp(reading.sensorType, "humidity") == 0) buffer[9] = 0x02;
else if (strcmp(reading.sensorType, "pressure") == 0) buffer[9] = 0x03;
else if (strcmp(reading.sensorType, "motion") == 0) buffer[9] = 0x04;
else if (strcmp(reading.sensorType, "light") == 0) buffer[9] = 0x05;
else buffer[9] = 0xFF;
uint16_t checksum = calculateChecksum((char*)buffer, 10);
memcpy(buffer + 10, &checksum, 2);
*len = 12;
Serial.println("\n[TRANSFORM] JSON -> Binary Conversion");
Serial.printf(" Original JSON size: ~%d bytes\n", 80);
Serial.printf(" Binary size: %d bytes\n", *len);
Serial.printf(" Compression: %.1f%%\n", (1.0 - (*len / 80.0)) * 100);
stats.bytesSaved += (80 - *len);
}
/**
* Converts binary back to JSON (for demonstration)
*/
void binaryToJson(const uint8_t* buffer, char* json, size_t jsonLen) {
uint32_t timestamp;
float value;
memcpy(×tamp, buffer, 4);
memcpy(&value, buffer + 4, 4);
uint8_t quality = buffer[8];
uint8_t typeCode = buffer[9];
const char* sensorType;
switch (typeCode) {
case 0x01: sensorType = "temperature"; break;
case 0x02: sensorType = "humidity"; break;
case 0x03: sensorType = "pressure"; break;
case 0x04: sensorType = "motion"; break;
case 0x05: sensorType = "light"; break;
default: sensorType = "unknown";
}
snprintf(json, jsonLen,
"{\"timestamp\":%lu,\"value\":%.2f,\"type\":\"%s\",\"quality\":%d}",
timestamp, value, sensorType, quality);
}
/**
* Formats data as human-readable text (for debugging/display)
*/
void formatAsHumanReadable(const SensorReading& reading, char* output,
size_t len) {
char timeStr[30];
formatTimestamp(reading.timestamp, timeStr, sizeof(timeStr));
snprintf(output, len,
"Sensor Report:\n"
" ID: %s\n"
" Type: %s\n"
" Value: %.2f %s\n"
" Quality: %d/100\n"
" Time: %s",
reading.sensorId, reading.sensorType,
reading.value, reading.unit, reading.quality, timeStr);
}
// ============================================================================
// EDGE PREPROCESSING FUNCTIONS
// ============================================================================
/**
* Applies threshold filtering - only report significant changes
*/
bool shouldReportReading(const SensorReading& reading) {
bool shouldReport = false;
if (strcmp(reading.sensorType, "temperature") == 0) {
float delta = abs(reading.value - lastReportedTemp);
shouldReport = (delta >= TEMP_CHANGE_THRESHOLD);
if (shouldReport) {
lastReportedTemp = reading.value;
}
} else if (strcmp(reading.sensorType, "humidity") == 0) {
float delta = abs(reading.value - lastReportedHumidity);
shouldReport = (delta >= HUMIDITY_CHANGE_THRESHOLD);
if (shouldReport) {
lastReportedHumidity = reading.value;
}
} else if (strcmp(reading.sensorType, "motion") == 0) {
// Always report motion events (edge-triggered)
shouldReport = (reading.value > 0);
} else {
// Report other sensors always (could add more filters)
shouldReport = true;
}
if (!shouldReport) {
Serial.printf("[FILTER] Suppressed %s reading (below threshold)\n",
reading.sensorType);
stats.bytesSaved += 80; // Estimate bytes saved
}
return shouldReport;
}
/**
* Calculates statistics over aggregation window
*/
void calculateAggregateStats(const char* sensorType, float* avg,
float* min, float* max, int* count) {
*avg = 0; *min = 999999; *max = -999999; *count = 0;
for (int i = 0; i < aggregationCount; i++) {
if (strcmp(aggregationBuffer[i].sensorType, sensorType) == 0) {
float val = aggregationBuffer[i].value;
*avg += val;
if (val < *min) *min = val;
if (val > *max) *max = val;
(*count)++;
}
}
if (*count > 0) {
*avg /= *count;
}
}
// ============================================================================
// DEVICE AGGREGATION FUNCTIONS
// ============================================================================
/**
* Adds reading to aggregation buffer
*/
void addToAggregationBuffer(const SensorReading& reading) {
if (aggregationCount < 10) {
aggregationBuffer[aggregationCount++] = reading;
}
}
/**
* Creates aggregated message from multiple sensor readings
*/
void createAggregatedMessage(char* message, size_t len) {
StaticJsonDocument<600> doc;
doc["gatewayId"] = GATEWAY_ID;
doc["aggregationType"] = "time-window";
doc["windowSeconds"] = AGGREGATION_INTERVAL / 1000;
doc["timestamp"] = getTimestamp();
JsonArray sensors = doc.createNestedArray("sensors");
// Aggregate temperature
float avg, minVal, maxVal;
int count;
calculateAggregateStats("temperature", &avg, &minVal, &maxVal, &count);
if (count > 0) {
JsonObject temp = sensors.createNestedObject();
temp["type"] = "temperature";
temp["avg"] = serialized(String(avg, 2));
temp["min"] = serialized(String(minVal, 2));
temp["max"] = serialized(String(maxVal, 2));
temp["count"] = count;
}
// Aggregate humidity
calculateAggregateStats("humidity", &avg, &minVal, &maxVal, &count);
if (count > 0) {
JsonObject hum = sensors.createNestedObject();
hum["type"] = "humidity";
hum["avg"] = serialized(String(avg, 2));
hum["min"] = serialized(String(minVal, 2));
hum["max"] = serialized(String(maxVal, 2));
hum["count"] = count;
}
serializeJsonPretty(doc, message, len);
// Calculate data reduction
int originalSize = aggregationCount * 80; // Estimate individual messages
int aggregatedSize = strlen(message);
float reduction = (1.0 - (float)aggregatedSize / originalSize) * 100;
Serial.println("\n[AGGREGATION] Created aggregated message:");
Serial.println(message);
Serial.printf(" Original: %d individual messages (~%d bytes)\n",
aggregationCount, originalSize);
Serial.printf(" Aggregated: 1 message (%d bytes)\n", aggregatedSize);
Serial.printf(" Data reduction: %.1f%%\n", reduction);
stats.bytesSaved += (originalSize - aggregatedSize);
// Clear buffer
aggregationCount = 0;
}
// ============================================================================
// LOCAL CACHING FUNCTIONS
// ============================================================================
/**
* Adds message to local cache (for offline operation)
*/
bool cacheMessage(const char* topic, const char* payload, int priority) {
if (cacheCount >= MAX_CACHE_SIZE) {
// Cache full - drop lowest priority message
int dropIndex = -1;
int lowestPriority = 0;
for (int i = 0; i < MAX_CACHE_SIZE; i++) {
if (messageCache[i].valid &&
messageCache[i].priority > lowestPriority) {
lowestPriority = messageCache[i].priority;
dropIndex = i;
}
}
if (dropIndex >= 0 && priority < lowestPriority) {
messageCache[dropIndex].valid = false;
cacheCount--;
stats.messagesDropped++;
Serial.printf("[CACHE] Dropped low-priority message to make room\n");
} else {
Serial.printf("[CACHE] Cannot cache - buffer full\n");
stats.messagesDropped++;
return false;
}
}
// Find empty slot
for (int i = 0; i < MAX_CACHE_SIZE; i++) {
if (!messageCache[i].valid) {
strncpy(messageCache[i].topic, topic,
sizeof(messageCache[i].topic));
strncpy(messageCache[i].payload, payload,
sizeof(messageCache[i].payload));
messageCache[i].timestamp = getTimestamp();
messageCache[i].priority = priority;
messageCache[i].retryCount = 0;
messageCache[i].valid = true;
cacheCount++;
stats.messagesCached++;
Serial.printf("[CACHE] Stored message (priority %d): %d/%d\n",
priority, cacheCount, MAX_CACHE_SIZE);
return true;
}
}
return false;
}
/**
* Retrieves and sends cached messages when network available
*/
void flushCache() {
if (cacheCount == 0) return;
Serial.println("\n[CACHE] Flushing cached messages...");
int batchCount = 0;
int flushed = 0;
// Send in priority order (1 = critical first)
for (int priority = 1; priority <= 3 && batchCount < BATCH_SIZE; priority++) {
for (int i = 0; i < MAX_CACHE_SIZE && batchCount < BATCH_SIZE; i++) {
if (messageCache[i].valid && messageCache[i].priority == priority) {
Serial.printf(" Sending cached: %s\n", messageCache[i].topic);
// Simulate transmission
messageCache[i].valid = false;
cacheCount--;
flushed++;
batchCount++;
stats.messagesTransmitted++;
}
}
}
Serial.printf("[CACHE] Flushed %d messages, %d remaining\n",
flushed, cacheCount);
}
// ============================================================================
// MAIN GATEWAY PROCESSING LOOP
// ============================================================================
/**
* Processes a single sensor and demonstrates gateway functions
*/
void processSensor(const char* sensorId, const char* sensorType,
float value, const char* unit) {
SensorReading reading;
strncpy(reading.sensorId, sensorId, sizeof(reading.sensorId));
strncpy(reading.sensorType, sensorType, sizeof(reading.sensorType));
reading.value = value;
strncpy(reading.unit, unit, sizeof(reading.unit));
reading.timestamp = getTimestamp();
reading.quality = 85 + random(15); // 85-100 quality
stats.messagesReceived++;
stats.bytesProcessed += sizeof(SensorReading);
// === EDGE PREPROCESSING: Apply threshold filtering ===
if (!shouldReportReading(reading)) {
return; // Filtered out - don't process further
}
// === DEVICE AGGREGATION: Add to buffer ===
addToAggregationBuffer(reading);
// === DATA TRANSFORMATION: Show different formats ===
char mqttTopic[50];
char mqttPayload[200];
formatAsMqttMessage(reading, mqttTopic, mqttPayload, sizeof(mqttPayload));
Serial.println("\n========================================");
Serial.printf("[GATEWAY] Processing: %s\n", sensorType);
Serial.println("========================================");
// Show MQTT format
Serial.println("\n[FORMAT 1] MQTT Style:");
Serial.printf(" Topic: %s\n", mqttTopic);
Serial.printf(" Payload: %s\n", mqttPayload);
// === PROTOCOL TRANSLATION: MQTT to HTTP ===
char httpRequest[512];
translateMqttToHttp(mqttTopic, mqttPayload, httpRequest, sizeof(httpRequest));
// Show binary format
uint8_t binaryData[20];
size_t binaryLen;
formatAsBinary(reading, binaryData, &binaryLen);
Serial.print(" Binary (hex): ");
for (size_t i = 0; i < binaryLen; i++) {
Serial.printf("%02X ", binaryData[i]);
}
Serial.println();
// === LOCAL CACHING: Store for offline operation ===
if (!networkConnected) {
int priority = (strcmp(sensorType, "motion") == 0) ? 1 : 2;
cacheMessage(mqttTopic, mqttPayload, priority);
} else {
stats.messagesTransmitted++;
}
}
/**
* Prints gateway statistics
*/
void printStatistics() {
Serial.println("\n========================================");
Serial.println(" GATEWAY STATISTICS SUMMARY ");
Serial.println("========================================");
Serial.printf(" Uptime: %lu seconds\n", millis() / 1000);
Serial.printf(" Messages Received: %lu\n", stats.messagesReceived);
Serial.printf(" Messages Transmitted: %lu\n", stats.messagesTransmitted);
Serial.printf(" Messages Cached: %lu\n", stats.messagesCached);
Serial.printf(" Messages Dropped: %lu\n", stats.messagesDropped);
Serial.printf(" Protocol Translations: %lu\n", stats.protocolTranslations);
Serial.printf(" Bytes Processed: %lu\n", stats.bytesProcessed);
Serial.printf(" Bytes Saved (edge processing): %lu\n", stats.bytesSaved);
Serial.printf(" Cache Utilization: %d/%d\n", cacheCount, MAX_CACHE_SIZE);
Serial.printf(" Network Status: %s\n", networkConnected ? "CONNECTED" : "OFFLINE");
Serial.printf(" Network Failures: %d\n", networkFailureCounter);
Serial.println("========================================\n");
}
// ============================================================================
// ARDUINO SETUP AND LOOP
// ============================================================================
void setup() {
Serial.begin(115200);
delay(1000);
// Initialize random seed
randomSeed(analogRead(0));
// Initialize cache
for (int i = 0; i < MAX_CACHE_SIZE; i++) {
messageCache[i].valid = false;
}
// Print startup banner
Serial.println("\n");
Serial.println("ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ");
Serial.println("β β");
Serial.println("β IoT GATEWAY PROTOCOL TRANSLATION LAB β");
Serial.println("β β");
Serial.println("β This simulation demonstrates: β");
Serial.println("β 1. Protocol Bridging (MQTT <-> HTTP) β");
Serial.println("β 2. Data Format Transformation β");
Serial.println("β 3. Edge Preprocessing and Filtering β");
Serial.println("β 4. Device Aggregation β");
Serial.println("β 5. Local Caching and Buffering β");
Serial.println("β β");
Serial.println("ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ");
Serial.println();
Serial.printf("Gateway ID: %s\n", GATEWAY_ID);
Serial.printf("Version: %s\n", GATEWAY_VERSION);
Serial.printf("Simulated MQTT Broker: %s:%d\n", MQTT_BROKER, MQTT_PORT);
Serial.printf("Simulated HTTP Endpoint: %s\n", HTTP_ENDPOINT);
Serial.println("\nStarting gateway operations...\n");
}
void loop() {
unsigned long currentTime = millis();
// === CHECK NETWORK STATUS ===
if (currentTime - lastNetworkCheck >= NETWORK_CHECK_INTERVAL) {
checkNetworkStatus();
lastNetworkCheck = currentTime;
}
// === READ AND PROCESS SENSORS ===
if (currentTime - lastSensorRead >= SENSOR_READ_INTERVAL) {
simulateSensorReadings();
// Process each simulated sensor
processSensor("TEMP-001", "temperature", currentTemp, "C");
processSensor("HUM-001", "humidity", currentHumidity, "%");
// Process motion only when detected
if (motionDetected) {
processSensor("MOT-001", "motion", 1.0, "event");
}
// Process light sensor occasionally
if (random(100) < 30) { // 30% chance
processSensor("LUX-001", "light", lightLevel, "lux");
}
lastSensorRead = currentTime;
}
// === PERFORM AGGREGATION ===
if (currentTime - lastAggregation >= AGGREGATION_INTERVAL) {
if (aggregationCount > 0) {
char aggregatedMsg[600];
createAggregatedMessage(aggregatedMsg, sizeof(aggregatedMsg));
}
lastAggregation = currentTime;
}
// === FLUSH CACHE IF NETWORK AVAILABLE ===
if (currentTime - lastCacheFlush >= CACHE_FLUSH_INTERVAL) {
if (networkConnected && cacheCount > 0) {
flushCache();
}
// Print statistics periodically
printStatistics();
lastCacheFlush = currentTime;
}
delay(100); // Small delay for simulation stability
}191.3.4 Understanding the Code: Key Gateway Components
The gateway translates between MQTT and HTTP protocols:
// MQTT message format (compact, topic-based)
Topic: iot/GW-ESP32-001/temperature/TEMP-001
Payload: {"v":23.5,"u":"C","t":1234567890,"q":92}
// Translated to HTTP request (verbose, endpoint-based)
POST /api/v1/telemetry HTTP/1.1
Content-Type: application/json
{
"gatewayId": "GW-ESP32-001",
"sensorId": "TEMP-001",
"measurement": {
"value": 23.5,
"unit": "C"
}
}Key insight: Protocol translation adds overhead but enables interoperability. MQTTβs compact format saves bandwidth, while HTTPβs verbosity adds context and compatibility.
The code demonstrates three data formats:
| Format | Size | Use Case |
|---|---|---|
| JSON | ~80 bytes | Human-readable, debugging |
| Binary | 12 bytes | Bandwidth-constrained links |
| Human-readable | ~200 bytes | Console/display output |
Binary encoding reduces payload by 85% - critical for LPWAN (LoRa, Sigfox) where every byte costs energy.
The shouldReportReading() function demonstrates threshold-based filtering:
- Temperature: Only report changes > 0.5Β°C
- Humidity: Only report changes > 2%
- Motion: Always report (event-based)
Result: 40-80% data reduction with minimal information loss.
Multiple readings are combined into statistical summaries:
{
"gatewayId": "GW-ESP32-001",
"windowSeconds": 10,
"sensors": [{
"type": "temperature",
"avg": 23.45,
"min": 22.8,
"max": 24.1,
"count": 5
}]
}5 individual messages (400 bytes) become 1 aggregated message (200 bytes) - 50% reduction plus cloud processing savings.
When network connectivity fails:
- Messages are stored in a local buffer (50 messages)
- Priority queue ensures critical data (motion) is preserved
- Cache flushes when connectivity returns
- Batch transmission reduces connection overhead
191.3.5 Lab Challenges
Modify the formatAsHttpRequest() function to also generate CoAP-style messages. CoAP uses different methods (GET, PUT, POST, DELETE) and compact option encoding.
Hint: CoAP messages use option numbers instead of text headers: - Option 3: Uri-Host - Option 11: Uri-Path - Option 12: Content-Format
Add a function that detects temperature anomalies using a simple moving average:
bool isAnomaly(float currentValue, float* history, int historyLen) {
// Calculate moving average
// Flag as anomaly if current value > 2 standard deviations from mean
}When an anomaly is detected, immediately send to cloud (bypass aggregation) and set priority=1.
Implement simple dictionary-based compression for repeated strings:
- Create a dictionary mapping common strings to single bytes
- Replace strings like βtemperatureβ with 0x01
- Compare compressed vs uncompressed size
Target: Achieve 40%+ compression on JSON payloads.
Add rate limiting to prevent overwhelming the cloud endpoint:
// Maximum 10 messages per minute per sensor type
// Queue excess messages for delayed transmission
// Drop oldest if queue exceeds threshold191.3.6 Expected Outcomes
When running the simulation, you should observe:
- Protocol Translation Output: MQTT and HTTP message formats side-by-side
- Data Reduction Statistics: Bytes saved through filtering and aggregation
- Network Failure Handling: Messages cached during simulated outages
- Cache Flush Operations: Batched transmission when connectivity returns
- Statistics Summary: Comprehensive gateway performance metrics
Sample Output:
[GATEWAY] Processing: temperature
[FORMAT 1] MQTT Style:
Topic: iot/GW-ESP32-001/temperature/TEMP-001
Payload: {"v":23.52,"u":"C","t":45,"q":91}
[TRANSLATION] MQTT -> HTTP Protocol Bridge
Input Topic: iot/GW-ESP32-001/temperature/TEMP-001
Input Payload: {"v":23.52,"u":"C","t":45,"q":91}
Output HTTP Request:
POST /api/v1/mqtt-bridge HTTP/1.1
Content-Type: application/json
{
"source": "mqtt-bridge",
"gateway": "GW-ESP32-001",
"value": 23.52
}
[TRANSFORM] JSON -> Binary Conversion
Original JSON size: ~80 bytes
Binary size: 12 bytes
Compression: 85.0%
- Gateways are not simple routers - They perform complex protocol translation, data transformation, and edge intelligence
- Edge processing dramatically reduces bandwidth - Filtering and aggregation can achieve 90%+ data reduction
- Offline capability is essential - Local caching ensures data integrity during network outages
- Protocol choice impacts efficiency - Binary formats are 5-10x more compact than JSON
- Aggregation trades latency for efficiency - Batching reduces connections but adds delay
191.4 Summary
This chapter covered hands-on wokwi simulation implementing a complete gateway that bridges i2c/spi sensors to mqtt cloud protocols.
Key Takeaways: - Protocol bridging requires understanding timing semantics, not just message translation - Gateway processor requirements depend on edge processing complexity - Different protocols (I2C, SPI, UART, MQTT) have fundamentally different characteristics - Effective gateways implement buffering, state management, and data transformation
191.6 Whatβs Next
Continue to Message Queue Lab to learn about build a message broker simulation to understand pub/sub patterns, qos levels, topic routing, and message queuing.