1263 Big Data Pipeline Configurator
Interactive Tool for Designing IoT Data Processing Pipelines
animation
big-data
data-pipeline
architecture
streaming
1263.1 IoT Big Data Pipeline Designer
Design and visualize complete big data pipelines for IoT applications. This interactive tool helps you configure each layer of your data architecture, estimate throughput and costs, identify bottlenecks, and receive scaling recommendations.
NoteTool Overview
This configurator guides you through designing a complete IoT data pipeline with five key layers:
- Data Sources: IoT sensors, logs, APIs, databases
- Ingestion Layer: Kafka, Kinesis, MQTT brokers, HTTP endpoints
- Processing Layer: Spark Streaming, Flink, Storm, custom processors
- Storage Layer: HDFS, S3, time-series DB, data lake
- Analytics Layer: Batch processing, real-time dashboards, ML pipelines
TipHow to Use This Tool
- Configure data sources - Select source types, device count, and data rates
- Select technologies for each pipeline layer
- Drag and arrange components on the pipeline canvas
- Configure individual components via the configuration panel
- View data flow animation showing throughput
- Review latency, cost, and scalability metrics
- Check pipeline validation for potential issues
Show code
{
// ============================================
// Big Data Pipeline Configurator - Enhanced
// IoT Data Architecture Design Tool
// ============================================
const d3 = await require("d3@7");
// IEEE Color palette
const colors = {
navy: "#2C3E50",
teal: "#16A085",
orange: "#E67E22",
gray: "#7F8C8D",
lightGray: "#ECF0F1",
white: "#FFFFFF",
green: "#27AE60",
red: "#E74C3C",
darkGray: "#34495E",
purple: "#9B59B6",
blue: "#3498DB",
yellow: "#F1C40F",
cyan: "#00BCD4",
pink: "#E91E63"
};
// Data source types
const dataSources = {
sensors: {
name: "IoT Sensors",
icon: "📡",
color: colors.teal,
avgMsgSize: 256,
avgRate: 10,
description: "Temperature, humidity, motion, etc."
},
logs: {
name: "System Logs",
icon: "📝",
color: colors.blue,
avgMsgSize: 512,
avgRate: 100,
description: "Application and system logs"
},
apis: {
name: "External APIs",
icon: "🔌",
color: colors.purple,
avgMsgSize: 1024,
avgRate: 5,
description: "Third-party data feeds"
},
databases: {
name: "Database CDC",
icon: "🗄️",
color: colors.orange,
avgMsgSize: 2048,
avgRate: 50,
description: "Change data capture streams"
},
files: {
name: "File Upload",
icon: "📁",
color: colors.gray,
avgMsgSize: 10240,
avgRate: 1,
description: "Batch file ingestion"
}
};
// Layer configurations with detailed specs
const layers = {
ingestion: {
name: "Ingestion Layer",
icon: "📥",
color: colors.teal,
options: {
kafka: {
name: "Apache Kafka",
shortName: "Kafka",
throughput: 1000000,
latency: 5,
costPerHour: 0.25,
minUnits: 3,
strengths: ["High throughput", "Durable", "Mature ecosystem"],
weaknesses: ["Operational complexity", "Zookeeper dependency"],
useCases: ["High-volume streaming", "Event sourcing"]
},
kinesis: {
name: "AWS Kinesis",
shortName: "Kinesis",
throughput: 1000,
latency: 200,
costPerHour: 0.015,
minUnits: 1,
strengths: ["Managed service", "AWS integration", "Auto-scaling"],
weaknesses: ["AWS lock-in", "Shard limitations"],
useCases: ["AWS-native apps", "Serverless pipelines"]
},
mqtt: {
name: "MQTT Broker",
shortName: "MQTT",
throughput: 100000,
latency: 10,
costPerHour: 0.10,
minUnits: 2,
strengths: ["Lightweight", "IoT-native", "Low bandwidth"],
weaknesses: ["No replay", "Limited durability"],
useCases: ["Constrained devices", "Real-time telemetry"]
},
http: {
name: "HTTP/REST API",
shortName: "HTTP",
throughput: 50000,
latency: 50,
costPerHour: 0.05,
minUnits: 2,
strengths: ["Universal", "Simple", "Stateless"],
weaknesses: ["Higher overhead", "No streaming"],
useCases: ["Legacy integration", "Request-response"]
},
pubsub: {
name: "Google Pub/Sub",
shortName: "Pub/Sub",
throughput: 10000000,
latency: 100,
costPerHour: 0.00004,
minUnits: 0,
strengths: ["Serverless", "Global", "No provisioning"],
weaknesses: ["GCP lock-in", "Cost at scale"],
useCases: ["Global distribution", "Variable workloads"]
}
}
},
processing: {
name: "Processing Layer",
icon: "⚙️",
color: colors.orange,
options: {
spark: {
name: "Spark Streaming",
shortName: "Spark",
throughput: 500000,
latency: 500,
costPerHour: 0.15,
minUnits: 2,
strengths: ["Unified batch/stream", "ML integration", "SQL support"],
weaknesses: ["Micro-batch latency", "Memory intensive"],
useCases: ["ETL pipelines", "ML workflows"]
},
flink: {
name: "Apache Flink",
shortName: "Flink",
throughput: 1000000,
latency: 10,
costPerHour: 0.12,
minUnits: 2,
strengths: ["True streaming", "Exactly-once", "Low latency"],
weaknesses: ["Steeper learning curve", "Smaller ecosystem"],
useCases: ["Real-time analytics", "Event-driven apps"]
},
storm: {
name: "Apache Storm",
shortName: "Storm",
throughput: 100000,
latency: 50,
costPerHour: 0.10,
minUnits: 3,
strengths: ["Low latency", "At-least-once", "Simple topology"],
weaknesses: ["No native state", "Dated ecosystem"],
useCases: ["Simple transformations", "Legacy systems"]
},
custom: {
name: "Custom Processor",
shortName: "Custom",
throughput: 200000,
latency: 100,
costPerHour: 0.08,
minUnits: 1,
strengths: ["Full control", "Domain-specific", "Lightweight"],
weaknesses: ["Maintenance burden", "No ecosystem"],
useCases: ["Simple transforms", "Edge processing"]
}
}
},
storage: {
name: "Storage Layer",
icon: "💾",
color: colors.purple,
options: {
hdfs: {
name: "Apache HDFS",
shortName: "HDFS",
throughput: 100,
costPerTB: 15,
minUnits: 3,
strengths: ["On-premise", "High throughput", "Hadoop ecosystem"],
weaknesses: ["Operational overhead", "Not cloud-native"],
useCases: ["On-prem data lakes", "Batch analytics"]
},
s3: {
name: "Amazon S3 / Object Store",
shortName: "S3",
throughput: 3500,
costPerTB: 23,
minUnits: 0,
strengths: ["Unlimited scale", "11 9s durability", "Pay-per-use"],
weaknesses: ["Latency", "Request pricing"],
useCases: ["Data lakes", "Archive storage"]
},
tsdb: {
name: "Time-Series DB",
shortName: "TSDB",
throughput: 500000,
costPerTB: 50,
minUnits: 3,
strengths: ["Optimized queries", "Compression", "Retention policies"],
weaknesses: ["Specialized use", "Schema constraints"],
useCases: ["Metrics", "IoT telemetry", "Monitoring"]
},
datalake: {
name: "Delta Lake / Iceberg",
shortName: "Lake",
throughput: 200,
costPerTB: 25,
minUnits: 0,
strengths: ["ACID transactions", "Time travel", "Schema evolution"],
weaknesses: ["Compute overhead", "Learning curve"],
useCases: ["Analytics lakehouse", "ML features"]
}
}
},
analytics: {
name: "Analytics Layer",
icon: "📊",
color: colors.blue,
options: {
batch: {
name: "Batch Analytics",
shortName: "Batch",
latency: 60000,
costPerHour: 0.10,
minUnits: 2,
strengths: ["Cost-effective", "Complex queries", "Full dataset"],
weaknesses: ["High latency", "Stale results"],
useCases: ["Daily reports", "Historical analysis"]
},
realtime: {
name: "Real-time Streaming",
shortName: "RT Stream",
latency: 100,
costPerHour: 0.20,
minUnits: 3,
strengths: ["Low latency", "Live dashboards", "Alerting"],
weaknesses: ["Higher cost", "Limited aggregations"],
useCases: ["Monitoring", "Anomaly detection"]
},
ml: {
name: "ML Pipeline",
shortName: "ML",
latency: 1000,
costPerHour: 0.50,
minUnits: 2,
strengths: ["Predictive insights", "Automation", "Pattern detection"],
weaknesses: ["Complexity", "Training overhead"],
useCases: ["Predictive maintenance", "Forecasting"]
},
hybrid: {
name: "Lambda (Batch + RT)",
shortName: "Lambda",
latency: 500,
costPerHour: 0.30,
minUnits: 4,
strengths: ["Best of both", "Flexible queries", "Complete views"],
weaknesses: ["Complexity", "Dual maintenance"],
useCases: ["Enterprise analytics", "Mixed workloads"]
}
}
}
};
// Pipeline state
let state = {
sources: {
sensors: { enabled: true, count: 10000, rate: 10 },
logs: { enabled: false, count: 100, rate: 100 },
apis: { enabled: false, count: 10, rate: 5 },
databases: { enabled: false, count: 5, rate: 50 },
files: { enabled: false, count: 1, rate: 1 }
},
selectedIngestion: "kafka",
selectedProcessing: "flink",
selectedStorage: "datalake",
selectedAnalytics: "realtime",
animationSpeed: 1,
showDataFlow: true
};
// Create main container
const container = d3.create("div")
.style("font-family", "system-ui, -apple-system, sans-serif")
.style("max-width", "1200px")
.style("margin", "0 auto");
// Header
const header = container.append("div")
.style("text-align", "center")
.style("margin-bottom", "15px")
.style("padding", "15px")
.style("background", `linear-gradient(135deg, ${colors.navy} 0%, ${colors.darkGray} 100%)`)
.style("border-radius", "12px")
.style("color", colors.white);
header.append("div")
.style("font-size", "22px")
.style("font-weight", "bold")
.text("Big Data Pipeline Configurator");
header.append("div")
.style("font-size", "13px")
.style("opacity", "0.8")
.style("margin-top", "5px")
.text("Design, visualize, and optimize IoT data architectures");
// Main layout - three columns
const mainLayout = container.append("div")
.style("display", "grid")
.style("grid-template-columns", "280px 1fr 280px")
.style("gap", "15px");
// ============================================
// LEFT PANEL - Data Sources & Configuration
// ============================================
const leftPanel = mainLayout.append("div");
// Data Sources Section
const sourcesSection = leftPanel.append("div")
.style("background", colors.white)
.style("border", `2px solid ${colors.navy}`)
.style("border-radius", "12px")
.style("padding", "12px")
.style("margin-bottom", "12px");
sourcesSection.append("div")
.style("font-weight", "bold")
.style("color", colors.navy)
.style("margin-bottom", "12px")
.style("font-size", "14px")
.style("border-bottom", `2px solid ${colors.teal}`)
.style("padding-bottom", "6px")
.text("📱 Data Sources");
// Source toggles and configuration
Object.entries(dataSources).forEach(([key, source]) => {
const sourceRow = sourcesSection.append("div")
.style("margin-bottom", "10px")
.style("padding", "8px")
.style("background", state.sources[key].enabled ? `${source.color}11` : colors.lightGray)
.style("border-radius", "8px")
.style("border", `1px solid ${state.sources[key].enabled ? source.color : colors.lightGray}`)
.attr("id", `source-${key}`);
const headerRow = sourceRow.append("div")
.style("display", "flex")
.style("justify-content", "space-between")
.style("align-items", "center")
.style("cursor", "pointer")
.on("click", function() {
state.sources[key].enabled = !state.sources[key].enabled;
updateUI();
});
headerRow.append("div")
.style("display", "flex")
.style("align-items", "center")
.style("gap", "6px")
.html(`<span style="font-size:14px">${source.icon}</span>
<span style="font-size:11px;font-weight:bold;color:${source.color}">${source.name}</span>`);
headerRow.append("input")
.attr("type", "checkbox")
.property("checked", state.sources[key].enabled)
.style("cursor", "pointer")
.on("change", function() {
state.sources[key].enabled = this.checked;
updateUI();
});
// Expandable config
const configDiv = sourceRow.append("div")
.attr("id", `config-${key}`)
.style("display", state.sources[key].enabled ? "block" : "none")
.style("margin-top", "8px")
.style("padding-top", "8px")
.style("border-top", `1px solid ${colors.lightGray}`);
// Device count
const countRow = configDiv.append("div")
.style("display", "flex")
.style("justify-content", "space-between")
.style("align-items", "center")
.style("margin-bottom", "6px");
countRow.append("label")
.style("font-size", "10px")
.style("color", colors.gray)
.text("Count:");
countRow.append("input")
.attr("type", "number")
.attr("min", "1")
.attr("max", "10000000")
.property("value", state.sources[key].count)
.style("width", "80px")
.style("padding", "4px")
.style("border", `1px solid ${colors.lightGray}`)
.style("border-radius", "4px")
.style("font-size", "11px")
.on("input", function() {
state.sources[key].count = parseInt(this.value) || 1;
updateUI();
});
// Message rate
const rateRow = configDiv.append("div")
.style("display", "flex")
.style("justify-content", "space-between")
.style("align-items", "center");
rateRow.append("label")
.style("font-size", "10px")
.style("color", colors.gray)
.text("Rate (msg/s):");
rateRow.append("input")
.attr("type", "number")
.attr("min", "0.1")
.attr("max", "10000")
.attr("step", "0.1")
.property("value", state.sources[key].rate)
.style("width", "80px")
.style("padding", "4px")
.style("border", `1px solid ${colors.lightGray}`)
.style("border-radius", "4px")
.style("font-size", "11px")
.on("input", function() {
state.sources[key].rate = parseFloat(this.value) || 1;
updateUI();
});
});
// Throughput Summary
const throughputBox = leftPanel.append("div")
.style("background", `linear-gradient(135deg, ${colors.teal}22 0%, ${colors.teal}11 100%)`)
.style("border", `2px solid ${colors.teal}`)
.style("border-radius", "12px")
.style("padding", "12px")
.style("margin-bottom", "12px");
throughputBox.append("div")
.style("font-size", "10px")
.style("color", colors.gray)
.style("text-transform", "uppercase")
.text("Total Pipeline Throughput");
const throughputValue = throughputBox.append("div")
.attr("id", "throughput-value")
.style("font-size", "24px")
.style("font-weight", "bold")
.style("color", colors.teal);
const throughputDetail = throughputBox.append("div")
.attr("id", "throughput-detail")
.style("font-size", "11px")
.style("color", colors.gray);
// Layer Selection Panels
function createLayerPanel(layerKey) {
const layer = layers[layerKey];
const panel = leftPanel.append("div")
.style("background", colors.white)
.style("border", `2px solid ${layer.color}`)
.style("border-radius", "12px")
.style("padding", "10px")
.style("margin-bottom", "10px");
panel.append("div")
.style("font-weight", "bold")
.style("color", layer.color)
.style("margin-bottom", "8px")
.style("font-size", "13px")
.text(`${layer.icon} ${layer.name}`);
const grid = panel.append("div")
.attr("id", `${layerKey}-grid`)
.style("display", "grid")
.style("grid-template-columns", "1fr 1fr")
.style("gap", "5px");
return grid;
}
const ingestionGrid = createLayerPanel("ingestion");
const processingGrid = createLayerPanel("processing");
const storageGrid = createLayerPanel("storage");
const analyticsGrid = createLayerPanel("analytics");
// ============================================
// CENTER PANEL - Pipeline Canvas
// ============================================
const centerPanel = mainLayout.append("div");
// Pipeline Visualization Canvas
const canvasSection = centerPanel.append("div")
.style("background", colors.white)
.style("border", `2px solid ${colors.navy}`)
.style("border-radius", "12px")
.style("padding", "15px")
.style("margin-bottom", "15px");
const canvasHeader = canvasSection.append("div")
.style("display", "flex")
.style("justify-content", "space-between")
.style("align-items", "center")
.style("margin-bottom", "10px");
canvasHeader.append("div")
.style("font-weight", "bold")
.style("color", colors.navy)
.style("font-size", "14px")
.text("🔄 Pipeline Visualization");
// Animation toggle
const animToggle = canvasHeader.append("div")
.style("display", "flex")
.style("align-items", "center")
.style("gap", "8px");
animToggle.append("label")
.style("font-size", "11px")
.style("color", colors.gray)
.text("Show Flow:");
animToggle.append("input")
.attr("type", "checkbox")
.property("checked", state.showDataFlow)
.on("change", function() {
state.showDataFlow = this.checked;
updatePipelineVisualization();
});
const canvasSvg = canvasSection.append("div")
.attr("id", "pipeline-canvas");
// Latency Breakdown
const latencySection = centerPanel.append("div")
.style("background", colors.white)
.style("border", `2px solid ${colors.cyan}`)
.style("border-radius", "12px")
.style("padding", "15px")
.style("margin-bottom", "15px");
latencySection.append("div")
.style("font-weight", "bold")
.style("color", colors.cyan)
.style("margin-bottom", "10px")
.style("font-size", "14px")
.text("⏱️ Latency Analysis");
const latencyContent = latencySection.append("div")
.attr("id", "latency-content");
// ============================================
// RIGHT PANEL - Metrics & Recommendations
// ============================================
const rightPanel = mainLayout.append("div");
// Cost Estimation
const costSection = rightPanel.append("div")
.style("background", colors.white)
.style("border", `2px solid ${colors.green}`)
.style("border-radius", "12px")
.style("padding", "12px")
.style("margin-bottom", "12px");
costSection.append("div")
.style("font-weight", "bold")
.style("color", colors.green)
.style("margin-bottom", "10px")
.style("font-size", "14px")
.text("💰 Monthly Cost Estimate");
const costContent = costSection.append("div")
.attr("id", "cost-content");
// Scalability Metrics
const scaleSection = rightPanel.append("div")
.style("background", colors.white)
.style("border", `2px solid ${colors.orange}`)
.style("border-radius", "12px")
.style("padding", "12px")
.style("margin-bottom", "12px");
scaleSection.append("div")
.style("font-weight", "bold")
.style("color", colors.orange)
.style("margin-bottom", "10px")
.style("font-size", "14px")
.text("📈 Scalability Recommendations");
const scaleContent = scaleSection.append("div")
.attr("id", "scale-content");
// Pipeline Validation
const validationSection = rightPanel.append("div")
.style("background", colors.white)
.style("border", `2px solid ${colors.red}`)
.style("border-radius", "12px")
.style("padding", "12px")
.style("margin-bottom", "12px");
validationSection.append("div")
.style("font-weight", "bold")
.style("color", colors.red)
.style("margin-bottom", "10px")
.style("font-size", "14px")
.text("✅ Pipeline Validation");
const validationContent = validationSection.append("div")
.attr("id", "validation-content");
// Component Configuration Panel
const configSection = rightPanel.append("div")
.style("background", colors.white)
.style("border", `2px solid ${colors.purple}`)
.style("border-radius", "12px")
.style("padding", "12px");
configSection.append("div")
.style("font-weight", "bold")
.style("color", colors.purple)
.style("margin-bottom", "10px")
.style("font-size", "14px")
.text("🔧 Component Details");
const configContent = configSection.append("div")
.attr("id", "config-content");
// ============================================
// Calculation Functions
// ============================================
function calculateTotalThroughput() {
let totalMsgPerSec = 0;
let totalBytesPerSec = 0;
Object.entries(state.sources).forEach(([key, source]) => {
if (source.enabled) {
const msgRate = source.count * source.rate;
totalMsgPerSec += msgRate;
totalBytesPerSec += msgRate * dataSources[key].avgMsgSize;
}
});
return {
msgPerSec: totalMsgPerSec,
bytesPerSec: totalBytesPerSec,
mbPerSec: totalBytesPerSec / (1024 * 1024),
gbPerDay: (totalBytesPerSec * 86400) / (1024 * 1024 * 1024),
tbPerMonth: (totalBytesPerSec * 86400 * 30) / (1024 * 1024 * 1024 * 1024)
};
}
function calculateMetrics() {
const throughput = calculateTotalThroughput();
const ingestion = layers.ingestion.options[state.selectedIngestion];
const processing = layers.processing.options[state.selectedProcessing];
const storage = layers.storage.options[state.selectedStorage];
const analytics = layers.analytics.options[state.selectedAnalytics];
// Calculate required units
const ingestionUnits = Math.max(ingestion.minUnits, Math.ceil(throughput.msgPerSec / ingestion.throughput));
const processingUnits = Math.max(processing.minUnits, Math.ceil(throughput.msgPerSec / processing.throughput));
const storageUnits = Math.max(storage.minUnits, 1);
const analyticsUnits = Math.max(analytics.minUnits, 1);
// Calculate costs
const hoursPerMonth = 730;
const ingestionCost = state.selectedIngestion === "pubsub" ?
(throughput.msgPerSec * 3600 * hoursPerMonth * ingestion.costPerHour) / 10000 :
ingestionUnits * ingestion.costPerHour * hoursPerMonth;
const processingCost = processingUnits * processing.costPerHour * hoursPerMonth;
const storageCost = throughput.tbPerMonth * storage.costPerTB;
const analyticsCost = analyticsUnits * analytics.costPerHour * hoursPerMonth;
const networkCost = throughput.tbPerMonth * 50; // $50/TB egress estimate
// Calculate latencies
const totalLatency = ingestion.latency + processing.latency + analytics.latency;
// Capacity utilization
const ingestionUtil = (throughput.msgPerSec / (ingestionUnits * ingestion.throughput)) * 100;
const processingUtil = (throughput.msgPerSec / (processingUnits * processing.throughput)) * 100;
// Validation checks
const validations = [];
if (ingestionUtil > 80) {
validations.push({
severity: ingestionUtil > 100 ? "critical" : "warning",
message: `Ingestion at ${ingestionUtil.toFixed(0)}% capacity`,
fix: `Add ${Math.ceil((throughput.msgPerSec - ingestionUnits * ingestion.throughput * 0.7) / ingestion.throughput)} more ${ingestion.shortName} units`
});
}
if (processingUtil > 80) {
validations.push({
severity: processingUtil > 100 ? "critical" : "warning",
message: `Processing at ${processingUtil.toFixed(0)}% capacity`,
fix: `Add ${Math.ceil((throughput.msgPerSec - processingUnits * processing.throughput * 0.7) / processing.throughput)} more ${processing.shortName} nodes`
});
}
if (totalLatency > 1000 && state.selectedAnalytics === "realtime") {
validations.push({
severity: "warning",
message: `High latency (${totalLatency}ms) for real-time analytics`,
fix: "Consider Flink for lower processing latency"
});
}
if (throughput.tbPerMonth > 10 && state.selectedStorage === "tsdb") {
validations.push({
severity: "info",
message: `High volume (${throughput.tbPerMonth.toFixed(1)} TB/mo) for TSDB`,
fix: "Consider tiered storage with S3 for cold data"
});
}
if (state.selectedIngestion === "mqtt" && throughput.msgPerSec > 50000) {
validations.push({
severity: "warning",
message: "MQTT may struggle at high throughput",
fix: "Consider Kafka for better scalability"
});
}
if (validations.length === 0) {
validations.push({
severity: "success",
message: "Pipeline configuration looks healthy",
fix: "No immediate issues detected"
});
}
// Scaling recommendations
const recommendations = [];
if (throughput.msgPerSec > 100000) {
recommendations.push({
icon: "🔧",
title: "Partition Strategy",
detail: `Use ${Math.max(24, Math.ceil(throughput.msgPerSec / 10000))} partitions`
});
}
if (throughput.tbPerMonth > 5) {
recommendations.push({
icon: "🗜️",
title: "Enable Compression",
detail: "Use Snappy/ZSTD for 3-5x reduction"
});
}
recommendations.push({
icon: "📐",
title: "Architecture Pattern",
detail: throughput.msgPerSec > 500000 ? "Lambda (batch + speed)" : "Kappa (single stream)"
});
if (state.selectedAnalytics === "ml" && throughput.msgPerSec > 100000) {
recommendations.push({
icon: "🤖",
title: "ML Feature Store",
detail: "Add feature store for training/serving"
});
}
return {
throughput,
units: { ingestion: ingestionUnits, processing: processingUnits, storage: storageUnits, analytics: analyticsUnits },
costs: {
ingestion: ingestionCost,
processing: processingCost,
storage: storageCost,
analytics: analyticsCost,
network: networkCost,
total: ingestionCost + processingCost + storageCost + analyticsCost + networkCost
},
latency: {
ingestion: ingestion.latency,
processing: processing.latency,
analytics: analytics.latency,
total: totalLatency
},
utilization: { ingestion: ingestionUtil, processing: processingUtil },
validations,
recommendations
};
}
// ============================================
// Update Functions
// ============================================
function updateLayerGrids() {
Object.entries(layers).forEach(([layerKey, layer]) => {
const grid = container.select(`#${layerKey}-grid`);
grid.html("");
const selected = layerKey === "ingestion" ? state.selectedIngestion :
layerKey === "processing" ? state.selectedProcessing :
layerKey === "storage" ? state.selectedStorage : state.selectedAnalytics;
Object.entries(layer.options).forEach(([key, option]) => {
const isSelected = selected === key;
grid.append("button")
.style("padding", "6px 4px")
.style("font-size", "10px")
.style("font-weight", isSelected ? "bold" : "normal")
.style("background", isSelected ? layer.color : colors.lightGray)
.style("color", isSelected ? colors.white : colors.navy)
.style("border", `2px solid ${layer.color}`)
.style("border-radius", "6px")
.style("cursor", "pointer")
.style("transition", "all 0.2s ease")
.text(option.shortName)
.on("click", function() {
if (layerKey === "ingestion") state.selectedIngestion = key;
else if (layerKey === "processing") state.selectedProcessing = key;
else if (layerKey === "storage") state.selectedStorage = key;
else state.selectedAnalytics = key;
updateUI();
})
.on("mouseenter", function() {
if (!isSelected) d3.select(this).style("background", `${layer.color}44`);
showComponentDetails(layerKey, key);
})
.on("mouseleave", function() {
if (!isSelected) d3.select(this).style("background", colors.lightGray);
});
});
});
}
function showComponentDetails(layerKey, optionKey) {
const layer = layers[layerKey];
const option = layer.options[optionKey];
const content = container.select("#config-content");
content.html("");
content.append("div")
.style("font-weight", "bold")
.style("color", layer.color)
.style("margin-bottom", "8px")
.text(`${layer.icon} ${option.name}`);
// Specs
const specs = [];
if (option.throughput) specs.push(`Throughput: ${option.throughput.toLocaleString()} msg/s`);
if (option.latency) specs.push(`Latency: ${option.latency}ms`);
if (option.costPerHour) specs.push(`Cost: $${option.costPerHour}/hr/unit`);
if (option.costPerTB) specs.push(`Cost: $${option.costPerTB}/TB/mo`);
specs.forEach(spec => {
content.append("div")
.style("font-size", "10px")
.style("color", colors.navy)
.style("padding", "2px 0")
.text(spec);
});
// Strengths
if (option.strengths) {
content.append("div")
.style("font-size", "10px")
.style("color", colors.green)
.style("margin-top", "8px")
.style("font-weight", "bold")
.text("Strengths:");
option.strengths.forEach(s => {
content.append("div")
.style("font-size", "9px")
.style("color", colors.gray)
.text(`✓ ${s}`);
});
}
// Weaknesses
if (option.weaknesses) {
content.append("div")
.style("font-size", "10px")
.style("color", colors.red)
.style("margin-top", "6px")
.style("font-weight", "bold")
.text("Weaknesses:");
option.weaknesses.forEach(w => {
content.append("div")
.style("font-size", "9px")
.style("color", colors.gray)
.text(`✗ ${w}`);
});
}
}
function updatePipelineVisualization() {
const canvasDiv = container.select("#pipeline-canvas");
canvasDiv.html("");
const width = 580;
const height = 280;
const svg = canvasDiv.append("svg")
.attr("viewBox", `0 0 ${width} ${height}`)
.attr("width", "100%")
.style("background", `linear-gradient(180deg, ${colors.lightGray}22 0%, ${colors.white} 100%)`)
.style("border-radius", "8px");
const metrics = calculateMetrics();
const ingestion = layers.ingestion.options[state.selectedIngestion];
const processing = layers.processing.options[state.selectedProcessing];
const storage = layers.storage.options[state.selectedStorage];
const analytics = layers.analytics.options[state.selectedAnalytics];
// Define pipeline stages
const stages = [
{
name: "Sources",
icon: "📱",
x: 50,
color: colors.navy,
detail: `${Object.values(state.sources).filter(s => s.enabled).length} types`
},
{
name: ingestion.shortName,
icon: "📥",
x: 160,
color: colors.teal,
detail: `${metrics.units.ingestion} units`
},
{
name: processing.shortName,
icon: "⚙️",
x: 280,
color: colors.orange,
detail: `${metrics.units.processing} nodes`
},
{
name: storage.shortName,
icon: "💾",
x: 400,
color: colors.purple,
detail: `${metrics.throughput.tbPerMonth.toFixed(1)} TB/mo`
},
{
name: analytics.shortName,
icon: "📊",
x: 520,
color: colors.blue,
detail: `${metrics.units.analytics} nodes`
}
];
const centerY = height / 2;
// Draw connections with animated data flow
for (let i = 0; i < stages.length - 1; i++) {
const startX = stages[i].x + 40;
const endX = stages[i + 1].x - 10;
// Background pipe
svg.append("line")
.attr("x1", startX)
.attr("y1", centerY)
.attr("x2", endX)
.attr("y2", centerY)
.attr("stroke", colors.lightGray)
.attr("stroke-width", 12)
.attr("stroke-linecap", "round");
// Data flow line
if (state.showDataFlow) {
const flowLine = svg.append("line")
.attr("x1", startX)
.attr("y1", centerY)
.attr("x2", endX)
.attr("y2", centerY)
.attr("stroke", stages[i].color)
.attr("stroke-width", 4)
.attr("stroke-dasharray", "15,10")
.attr("stroke-linecap", "round");
// Animate the dash offset
function animateDash() {
flowLine.attr("stroke-dashoffset", 0)
.transition()
.duration(2000 / state.animationSpeed)
.ease(d3.easeLinear)
.attr("stroke-dashoffset", -50)
.on("end", animateDash);
}
animateDash();
// Data particles
const particleCount = Math.min(5, Math.ceil(metrics.throughput.msgPerSec / 100000));
for (let p = 0; p < particleCount; p++) {
const particle = svg.append("circle")
.attr("r", 4)
.attr("fill", stages[i].color)
.attr("opacity", 0.8);
function animateParticle() {
particle
.attr("cx", startX)
.attr("cy", centerY)
.transition()
.duration((1500 + p * 300) / state.animationSpeed)
.delay(p * 200)
.ease(d3.easeLinear)
.attr("cx", endX)
.on("end", animateParticle);
}
animateParticle();
}
}
// Arrow head
svg.append("polygon")
.attr("points", `${endX-8},${centerY-6} ${endX},${centerY} ${endX-8},${centerY+6}`)
.attr("fill", stages[i + 1].color);
}
// Draw stage nodes
stages.forEach((stage, i) => {
const g = svg.append("g")
.attr("transform", `translate(${stage.x}, ${centerY})`);
// Outer glow for selected
g.append("circle")
.attr("r", 42)
.attr("fill", "none")
.attr("stroke", stage.color)
.attr("stroke-width", 2)
.attr("opacity", 0.3);
// Main circle
g.append("circle")
.attr("r", 35)
.attr("fill", colors.white)
.attr("stroke", stage.color)
.attr("stroke-width", 3);
// Icon
g.append("text")
.attr("y", -3)
.attr("text-anchor", "middle")
.attr("font-size", "22px")
.text(stage.icon);
// Label
g.append("text")
.attr("y", 58)
.attr("text-anchor", "middle")
.attr("font-size", "11px")
.attr("font-weight", "bold")
.attr("fill", stage.color)
.text(stage.name);
// Detail
g.append("text")
.attr("y", 72)
.attr("text-anchor", "middle")
.attr("font-size", "9px")
.attr("fill", colors.gray)
.text(stage.detail);
});
// Throughput label
const throughputText = metrics.throughput.msgPerSec >= 1000000 ?
`${(metrics.throughput.msgPerSec / 1000000).toFixed(2)}M msg/s` :
metrics.throughput.msgPerSec >= 1000 ?
`${(metrics.throughput.msgPerSec / 1000).toFixed(0)}K msg/s` :
`${metrics.throughput.msgPerSec.toFixed(0)} msg/s`;
svg.append("text")
.attr("x", width / 2)
.attr("y", 25)
.attr("text-anchor", "middle")
.attr("font-size", "12px")
.attr("font-weight", "bold")
.attr("fill", colors.navy)
.text(`Data Flow: ${throughputText} (${metrics.throughput.mbPerSec.toFixed(1)} MB/s)`);
// Latency label
svg.append("text")
.attr("x", width / 2)
.attr("y", height - 15)
.attr("text-anchor", "middle")
.attr("font-size", "11px")
.attr("fill", colors.gray)
.text(`End-to-end latency: ~${metrics.latency.total}ms`);
}
function updateLatencyDisplay(metrics) {
const content = container.select("#latency-content");
content.html("");
const latencies = [
{ name: "Ingestion", value: metrics.latency.ingestion, color: colors.teal },
{ name: "Processing", value: metrics.latency.processing, color: colors.orange },
{ name: "Analytics", value: metrics.latency.analytics, color: colors.blue }
];
// Latency bar chart
const maxLatency = Math.max(...latencies.map(l => l.value));
latencies.forEach(lat => {
const row = content.append("div")
.style("margin-bottom", "8px");
const header = row.append("div")
.style("display", "flex")
.style("justify-content", "space-between")
.style("font-size", "11px")
.style("margin-bottom", "3px");
header.append("span")
.style("color", lat.color)
.style("font-weight", "bold")
.text(lat.name);
header.append("span")
.style("color", colors.navy)
.text(`${lat.value}ms`);
const barBg = row.append("div")
.style("height", "8px")
.style("background", colors.lightGray)
.style("border-radius", "4px")
.style("overflow", "hidden");
barBg.append("div")
.style("width", `${(lat.value / maxLatency) * 100}%`)
.style("height", "100%")
.style("background", lat.color)
.style("border-radius", "4px");
});
// Total latency
content.append("div")
.style("display", "flex")
.style("justify-content", "space-between")
.style("padding-top", "8px")
.style("border-top", `2px solid ${colors.cyan}`)
.style("margin-top", "8px")
.style("font-weight", "bold")
.html(`<span style="color:${colors.navy}">Total E2E</span>
<span style="color:${colors.cyan}">${metrics.latency.total}ms</span>`);
}
function updateCostDisplay(metrics) {
const content = container.select("#cost-content");
content.html("");
const costs = [
{ name: "Ingestion", value: metrics.costs.ingestion, color: colors.teal },
{ name: "Processing", value: metrics.costs.processing, color: colors.orange },
{ name: "Storage", value: metrics.costs.storage, color: colors.purple },
{ name: "Analytics", value: metrics.costs.analytics, color: colors.blue },
{ name: "Network", value: metrics.costs.network, color: colors.gray }
];
costs.forEach(cost => {
const row = content.append("div")
.style("display", "flex")
.style("justify-content", "space-between")
.style("padding", "4px 0")
.style("font-size", "11px");
row.append("span")
.style("color", cost.color)
.style("font-weight", "bold")
.text(cost.name);
row.append("span")
.style("color", colors.navy)
.text(`$${cost.value.toLocaleString(undefined, {maximumFractionDigits: 0})}`);
});
content.append("div")
.style("display", "flex")
.style("justify-content", "space-between")
.style("padding", "8px 0")
.style("margin-top", "8px")
.style("border-top", `2px solid ${colors.green}`)
.style("font-weight", "bold")
.html(`<span style="color:${colors.navy}">Total Monthly</span>
<span style="color:${colors.green};font-size:16px">$${metrics.costs.total.toLocaleString(undefined, {maximumFractionDigits: 0})}</span>`);
}
function updateScaleDisplay(metrics) {
const content = container.select("#scale-content");
content.html("");
metrics.recommendations.forEach(rec => {
const item = content.append("div")
.style("display", "flex")
.style("gap", "8px")
.style("padding", "8px")
.style("background", colors.lightGray)
.style("border-radius", "6px")
.style("margin-bottom", "6px");
item.append("span")
.style("font-size", "16px")
.text(rec.icon);
const text = item.append("div");
text.append("div")
.style("font-size", "11px")
.style("font-weight", "bold")
.style("color", colors.navy)
.text(rec.title);
text.append("div")
.style("font-size", "10px")
.style("color", colors.gray)
.text(rec.detail);
});
}
function updateValidationDisplay(metrics) {
const content = container.select("#validation-content");
content.html("");
metrics.validations.forEach(v => {
const severityColors = {
critical: colors.red,
warning: colors.orange,
info: colors.blue,
success: colors.green
};
const severityIcons = {
critical: "🔴",
warning: "🟡",
info: "🔵",
success: "✅"
};
const item = content.append("div")
.style("background", `${severityColors[v.severity]}11`)
.style("border-left", `3px solid ${severityColors[v.severity]}`)
.style("padding", "8px")
.style("margin-bottom", "6px")
.style("border-radius", "0 6px 6px 0");
item.append("div")
.style("font-size", "11px")
.style("font-weight", "bold")
.style("color", severityColors[v.severity])
.text(`${severityIcons[v.severity]} ${v.message}`);
item.append("div")
.style("font-size", "10px")
.style("color", colors.gray)
.style("margin-top", "4px")
.text(`→ ${v.fix}`);
});
}
function updateThroughputDisplay(metrics) {
const throughputText = metrics.throughput.msgPerSec >= 1000000 ?
`${(metrics.throughput.msgPerSec / 1000000).toFixed(2)}M msg/s` :
metrics.throughput.msgPerSec >= 1000 ?
`${(metrics.throughput.msgPerSec / 1000).toFixed(1)}K msg/s` :
`${metrics.throughput.msgPerSec.toFixed(0)} msg/s`;
container.select("#throughput-value").text(throughputText);
container.select("#throughput-detail")
.text(`${metrics.throughput.mbPerSec.toFixed(2)} MB/s | ${metrics.throughput.gbPerDay.toFixed(1)} GB/day`);
}
function updateSourcePanels() {
Object.entries(state.sources).forEach(([key, source]) => {
const sourceDiv = container.select(`#source-${key}`);
const sourceInfo = dataSources[key];
sourceDiv
.style("background", source.enabled ? `${sourceInfo.color}11` : colors.lightGray)
.style("border-color", source.enabled ? sourceInfo.color : colors.lightGray);
container.select(`#config-${key}`)
.style("display", source.enabled ? "block" : "none");
});
}
// Main update function
function updateUI() {
const metrics = calculateMetrics();
updateSourcePanels();
updateThroughputDisplay(metrics);
updateLayerGrids();
updatePipelineVisualization();
updateLatencyDisplay(metrics);
updateCostDisplay(metrics);
updateScaleDisplay(metrics);
updateValidationDisplay(metrics);
// Show default component details
showComponentDetails("ingestion", state.selectedIngestion);
}
// Initialize
updateUI();
return container.node();
}1263.2 Understanding Big Data Pipelines for IoT
1263.2.1 Pipeline Architecture Layers
A complete IoT big data pipeline consists of five key layers, each with specific responsibilities:
| Layer | Purpose | Key Technologies | Key Considerations |
|---|---|---|---|
| Data Sources | Generate raw data | Sensors, APIs, DBs | Volume, velocity, variety |
| Ingestion | Receive and buffer data | Kafka, MQTT, Kinesis | Throughput, durability, ordering |
| Processing | Transform and enrich | Flink, Spark, Storm | Latency, exactly-once, windowing |
| Storage | Persist for analysis | S3, HDFS, TSDB | Cost, query performance, retention |
| Analytics | Generate insights | Presto, Dashboards, ML | Concurrency, latency, accuracy |
1263.2.2 Technology Selection Guide
TipIngestion Layer Selection
| Technology | Best For | Throughput | Latency |
|---|---|---|---|
| Apache Kafka | High-volume, on-premise | 1M+ msg/s | ~5ms |
| AWS Kinesis | AWS-native, serverless | 1K/shard | ~200ms |
| MQTT | Constrained devices | 100K msg/s | ~10ms |
| HTTP/REST | Legacy integration | 50K req/s | ~50ms |
| Google Pub/Sub | Global, auto-scaling | 10M+ msg/s | ~100ms |
TipProcessing Layer Selection
| Technology | Processing Model | Best For | Latency |
|---|---|---|---|
| Apache Flink | True streaming | Real-time, exactly-once | ~10ms |
| Spark Streaming | Micro-batch | ETL, ML pipelines | ~500ms |
| Apache Storm | True streaming | Simple topologies | ~50ms |
| Custom | Varies | Domain-specific logic | ~100ms |
1263.2.3 Cost Optimization Strategies
- Right-size resources: Start small and scale based on actual metrics
- Use compression: Reduce storage and network costs by 3-5x with Snappy/ZSTD
- Implement data lifecycle: Archive or delete old data automatically
- Choose serverless where appropriate: Pay-per-use for variable workloads
- Monitor and adjust: Use metrics to identify over-provisioned resources
- Batch when possible: Combine micro-batches for processing efficiency
1263.2.4 Common Architecture Patterns
| Pattern | Description | When to Use |
|---|---|---|
| Kappa | Single stream processing path | Simpler pipelines, < 500K msg/s |
| Lambda | Batch + speed layers | Complex analytics, historical + real-time |
| Lakehouse | Unified batch and streaming storage | Modern analytics, ML workloads |
1263.2.5 Bottleneck Identification
| Symptom | Likely Cause | Solution |
|---|---|---|
| High ingestion latency | Insufficient partitions | Increase partition count |
| Processing lag growing | Underprovisioned workers | Add processing nodes |
| Query timeouts | Large data scans | Partition data, add indexes |
| Storage costs growing | No data lifecycle | Implement retention policies |
| Network saturation | Uncompressed data | Enable compression |
1263.3 Related Topics
- Stream Processing - Deep dive into stream processing concepts
- Edge Data Acquisition - Patterns for handling IoT data at scale
- Time-Series Databases - Specialized storage for IoT metrics
- Database Selection Tool - Choose the right database for your workload
Interactive tool created for the IoT Class Textbook