%%{init: {'theme': 'base', 'themeVariables': { 'primaryColor': '#2C3E50', 'primaryTextColor': '#fff', 'primaryBorderColor': '#16A085', 'lineColor': '#16A085', 'secondaryColor': '#E67E22', 'tertiaryColor': '#ecf0f1'}}}%%
flowchart TB
subgraph Original["Original Approach (Slow)"]
O1[Image Input<br/>65ms per image] --> O2[Decode Image<br/>10ms]
O2 --> O3[Detect Vehicles<br/>50ms<br/>BOTTLENECK]
O3 --> O4[Extract Metadata<br/>5ms]
O4 --> O5[Capacity:<br/>15 images/sec<br/>Need: 10,000/sec<br/>99.85% shortfall]
end
subgraph Optimized["Smart Sampling (Fast)"]
N1{Every 10th<br/>Frame?} -->|Yes| N2[Full Processing<br/>65ms]
N1 -->|No| N3[Motion Detection<br/>2ms]
N2 --> N4[Cache Result]
N3 -->|Motion| N5[Use Cached Data]
N3 -->|No Motion| N6[vehicles = 0]
N4 & N5 & N6 --> N7[Average: 8.3ms/frame<br/>Capacity: 120/sec/executor<br/>x100 executors = 12,000/sec]
end
style O3 fill:#E74C3C,stroke:#2C3E50,color:#fff
style O5 fill:#E74C3C,stroke:#2C3E50,color:#fff
style N7 fill:#27AE60,stroke:#2C3E50,color:#fff
1260 Big Data Operations
Learning Objectives
After completing this chapter, you will be able to:
- Monitor big data pipeline health with key metrics
- Debug common issues (Kafka lag, OOM errors, late arrivals)
- Implement production monitoring checklists
- Avoid the seven deadly pitfalls of IoT big data
1260.1 Debugging and Monitoring IoT Big Data Pipelines
Building a big data pipeline is just the beginning - keeping it healthy in production requires comprehensive monitoring and debugging strategies. This section covers practical approaches for tracking pipeline health, diagnosing failures, and optimizing performance.
1260.1.1 Key Metrics to Track
| Metric Category | Metric | Description | Alert Threshold | Tool |
|---|---|---|---|---|
| Data Flow | Message Latency | Time from sensor to storage | > 5 seconds | Kafka metrics, Grafana |
| Throughput | Messages per second | < 80% expected | Spark UI, Prometheus | |
| Backlog Size | Pending unprocessed messages | > 10,000 messages | Kafka lag monitoring | |
| Processing Rate | Records processed/sec | Declining trend | Spark streaming metrics | |
| Data Quality | Error Rate | Failed messages / total | > 1% | Custom validators |
| Schema Violations | Records with format errors | > 0.1% | Schema registry alerts | |
| Duplicate Rate | Duplicate message IDs | > 0.5% | Deduplication counters | |
| Missing Data | Expected vs actual sensors | > 5% devices offline | Device heartbeat tracking | |
| Infrastructure | Storage Growth | GB per day | > 120% expected | HDFS metrics, S3 CloudWatch |
| CPU Utilization | Spark executor CPU | > 80% sustained | Cluster manager | |
| Memory Pressure | JVM heap usage | > 85% | JVM metrics, GC logs | |
| Network I/O | Data transfer rates | Saturation | Network monitoring |
1260.1.2 Common Issues and Diagnosis
| Symptom | Likely Cause | Investigation Steps | Resolution |
|---|---|---|---|
| High Latency | Network congestion | Check queue depths, network bandwidth | Increase partitions, add brokers |
| Missing Data | Sensor offline | Check device heartbeats, last-seen timestamps | Investigate device connectivity |
| Duplicate Records | Retry logic | Check message IDs, producer configs | Implement idempotent writes |
| Schema Errors | Format mismatch | Validate against schema registry | Update schema, fix producers |
| OOM Errors | Memory leaks | Analyze heap dumps, GC logs | Increase memory, optimize code |
| Slow Queries | Missing partitions | Check query plans, partition strategy | Add partitions, update indexes |
| Kafka Lag Growing | Consumer too slow | Check processing time per batch | Scale consumers, optimize logic |
| Data Skew | Uneven partitioning | Analyze partition distribution | Repartition by better key |
1260.2 Debugging Common Big Data Issues
Symptoms:
Consumer lag: 50,000 messages (growing)
Processing time: 500ms per batch
Records per batch: 100
Diagnosis:
# Check consumer lag
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group iot-processor --describe
# Example output showing lag:
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
sensors 0 1000000 1050000 50000Root Cause Analysis:
# Calculate processing capacity
records_per_sec = 100 / 0.5 # 200 records/sec
arrival_rate = 500 # records/sec from producers
# Capacity shortfall
shortfall = arrival_rate - records_per_sec # 300 records/sec falling behindSolutions:
Scale consumers (if partitions > consumers):
# Add 2 more consumer instances # Rebalancing will distribute partitionsOptimize processing (if consumers == partitions):
# Before: Processing each record individually for record in batch: process(record) # 5ms per record # After: Batch processing process_batch(batch) # 50ms for 100 records (0.5ms per record)Increase partitions (if need more parallelism):
# Add partitions to topic (irreversible!) kafka-topics.sh --alter --topic sensors \ --partitions 20 --bootstrap-server localhost:9092
Symptoms:
ERROR Executor: Exception in task 1.0
java.lang.OutOfMemoryError: Java heap space
Root Cause Options:
A. Data Skew (one partition much larger)
# Check partition sizes
df.groupBy(spark_partition_id()).count().show()
# Output showing skew:
# Partition 0: 1M records
# Partition 1: 100k records
# Partition 2: 5M records <- PROBLEMSolution: Repartition by better key
# Before: Partitioned by sensor_id (some sensors very chatty)
df = df.repartition("sensor_id")
# After: Partition by hash of sensor_id and timestamp
from pyspark.sql.functions import hash, col
df = df.repartition(20, hash(col("sensor_id"), col("timestamp")))B. Insufficient Memory Configuration
# Check current config
spark.conf.get("spark.executor.memory") # "1g"
# Increase memory
spark-submit \
--executor-memory 4g \
--executor-memoryOverhead 1g \
--conf spark.memory.fraction=0.8 \
job.pyC. Too Much Data in Memory (cache/persist)
# Before: Caching entire dataset
df.cache() # May exceed memory!
# After: Cache only needed columns
df.select("sensor_id", "temperature").cache()
# Or use disk persistence
df.persist(StorageLevel.MEMORY_AND_DISK)Symptoms:
Average latency: 2 seconds
P99 latency: 45 seconds <- SPIKE
Diagnosis:
# Check Spark Streaming batch times
# Spark UI -> Streaming tab -> Batch Details
# Look for:
# - Scheduling delay (queuing time before processing)
# - Processing time (actual computation)
# - Total delay (end-to-end)
# Example output:
Batch Time: 2024-01-15 10:05:00
Scheduling Delay: 30s <- PROBLEM (queuing)
Processing Time: 3s
Total Delay: 33sRoot Cause: Batch processing time > batch interval
Batch interval: 10 seconds
Avg processing time: 12 seconds
-> Queuing builds up over time
Solutions:
- Optimize Processing Logic
# Before: Multiple passes over data
filtered = df.filter(...)
aggregated = filtered.groupBy(...).agg(...)
joined = aggregated.join(...)
# After: Single optimized query
result = df.filter(...) \
.groupBy(...).agg(...) \
.join(..., broadcast=True) # Broadcast small table- Scale Cluster
# Increase parallelism
spark.conf.set("spark.default.parallelism", 200)
spark.conf.set("spark.sql.shuffle.partitions", 200)Symptoms:
2024-01-15: Processing 1M records/hour
2024-01-16: Errors spike, only 200k records/hour
Error: "Missing field: humidity"
Diagnosis:
# Check schema registry
# Example: Sensor firmware update added "humidity_v2" field
# Old schema:
{"sensor_id": "string", "temp": "float", "humidity": "float"}
# New schema:
{"sensor_id": "string", "temp": "float", "humidity_v2": "float"}Solution: Handle Schema Evolution - Check for new field (humidity_v2) first - Fall back to old field (humidity) if new field missing - Use default value (None) if both missing - Alternative: Use schema registry (Confluent Schema Registry) for automatic backward/forward compatibility
1260.3 Production Monitoring Checklist
Before deploying a big data pipeline to production, ensure:
Infrastructure Monitoring: - [ ] CPU, memory, disk, network metrics collected - [ ] Alerting configured for resource exhaustion (>80% threshold) - [ ] Log aggregation enabled (ELK, Splunk, CloudWatch Logs) - [ ] Cluster autoscaling configured (if using cloud)
Data Quality Monitoring: - [ ] Schema validation alerts for format changes - [ ] Data completeness tracking (expected vs actual record counts) - [ ] Duplicate detection and alerting - [ ] Quarantine zone monitoring (review regularly) - [ ] Data lineage documented (know where data comes from/goes to)
Performance Monitoring: - [ ] End-to-end latency tracked (sensor to storage to query) - [ ] Throughput metrics (records/sec, GB/day) - [ ] Kafka consumer lag alerts (<1000 messages) - [ ] Spark job duration tracking (alert if >2x baseline) - [ ] Query performance benchmarks (alert if >2x slower)
Operational: - [ ] Runbooks for common issues (documented resolution steps) - [ ] On-call rotation and escalation paths defined - [ ] Automated failure recovery (restart failed jobs) - [ ] Regular capacity planning reviews (weekly/monthly) - [ ] Disaster recovery tested (restore from backup)
Cost Monitoring: - [ ] Cloud costs tracked by service (S3, EC2, EMR) - [ ] Storage growth monitored (enforce retention policies) - [ ] Unused resources identified (idle clusters) - [ ] Cost per million messages tracked
1260.4 Real-World Debugging Example
Scenario: Smart city processes 10,000 traffic cameras, each sending 1 image/sec.
Issue Detected:
Alert: Kafka consumer lag = 500,000 messages (increasing)
Storage growth: 200 GB/day (expected: 80 GB/day)
Query latency: 30s (expected: 5s)
Investigation:
Step 1: Check Kafka metrics
# Consumer lag by partition
PARTITION LAG
0 50,000
1 48,000
...
9 52,000
# Conclusion: All partitions lagging equally (not a skew issue)
# Problem is overall processing capacityStep 2: Check Spark processing time
Batch interval: 60 seconds
Processing time: 75 seconds <- PROBLEM!
Records per batch: 60,000 images
Step 3: Profile processing logic
Root Cause: Vehicle detection on every frame is too expensive!
Solution: Smart Sampling Strategy - 90% of frames: Lightweight motion detection only (2ms) - 10% of frames: Full vehicle detection + caching (65ms) - Average processing: 0.9 x 2ms + 0.1 x 65ms = 8.3ms per frame - Capacity: 120 images/sec per executor x 100 executors = 12,000 images/sec
Results After Optimization:
Kafka consumer lag: 0 messages (cleared in 2 hours)
Storage growth: 75 GB/day (compressed processed data)
Query latency: 3 seconds (partitioned by camera and time)
Cost savings: 60% reduction in compute (fewer resources needed)
Key Lessons: 1. Profile before optimizing - Identified vehicle detection as bottleneck 2. Domain-specific optimization - Traffic changes slowly; don’t need frame-by-frame analysis 3. Tiered processing - Full processing for keyframes, lightweight for others 4. Compression - Store processed metadata (KB), not raw images (MB)
1260.5 Seven Deadly Pitfalls of IoT Big Data
The Mistake: Keep all raw IoT data indefinitely because “we might need it later”
Why It’s Bad: - Storage costs explode: 1 billion x 100 bytes x 365 days = 36.5 TB/year - At $0.023/GB/month: $840/month = $10,080/year - Most raw data is never queried after 30 days
The Fix: Implement tiered storage + downsampling
| Data Tier | Retention | Storage | Monthly Cost |
|---|---|---|---|
| Hot (raw) | 7 days | Fast SSD | $100 |
| Warm (1-min avg) | 90 days | Slower | $20 |
| Cold (hourly avg) | Forever | S3 Glacier | $2 |
Savings: $840/month to $122/month = 85% reduction
The Mistake: Close windows immediately without waiting for late arrivals
What Goes Wrong: - Sensor loses network for 30 seconds - Data arrives 45 seconds late - Count is wrong because late data excluded - Reports show mysterious “dips” that didn’t happen
The Fix: Use watermarks with appropriate tolerance (2 minutes for most IoT scenarios)
Tradeoff: Results delayed by 2 minutes, but accurate
The Mistake: Trust all sensor data blindly
What Goes Wrong: - Faulty sensor reports temperature = 500C (impossible) - Bad data corrupts analytics: “Average temperature = 120C” - Triggers false alerts (fire alarm goes off)
The Fix: Implement data quality rules
| Validation | Check | Action |
|---|---|---|
| Range Check | -50C to 100C | Quarantine out-of-range |
| Rate of Change | < 10C per second | Flag sudden spikes |
| Sensor Health | Last reading < 10 min ago | Mark offline |
The Mistake: No deletion policy, keep everything forever
What Happens: - Month 1: 2 TB of data, fine - Month 6: 12 TB, queries slowing - Month 12: 24 TB, disk 95% full - Month 13: Database crashes
The Fix: Implement retention policies from day 1
| Tier | Granularity | Retention | Purpose |
|---|---|---|---|
| Raw | Every reading | 7 days | Real-time debugging |
| 1-minute | AVG/MIN/MAX | 90 days | Recent trends |
| Hourly | AVG/MIN/MAX | Forever | Long-term analysis |
Space Savings: 1 year raw (24 TB) to hourly aggregates (200 GB) = 99% reduction
The mistake: Using Spark, Hadoop, or Kafka for datasets that easily fit in memory on a laptop.
Symptoms: - Team spends more time managing infrastructure than analyzing data - Simple queries that should take seconds take minutes (cluster overhead) - Cloud bills are 10-100x higher than needed
The fix: Apply the “laptop test” before choosing big data tools:
# Quick sizing check
data_per_day_mb = num_sensors * readings_per_second * 86400 * bytes_per_reading / 1e6
data_per_year_gb = data_per_day_mb * 365 / 1000
print(f"Daily: {data_per_day_mb:.1f} MB, Yearly: {data_per_year_gb:.1f} GB")
# Decision thresholds:
# < 10 GB/day -> Pandas on laptop, PostgreSQL/InfluxDB (single node)
# 10-100 GB/day -> Time-series DB cluster, consider Kafka
# > 100 GB/day -> Now you might need Spark/HadoopExample: 1000 sensors at 1 reading/minute at 100 bytes = 144 MB/day = 52 GB/year. Fits on a $50 SSD. No Hadoop needed!
The Mistake: Rigidly defined schema that breaks when sensors change
What Goes Wrong: - New sensor models have additional fields - Old code crashes when encountering new fields - Have to migrate millions of old records - Downtime during migration
The Fix: Use schema evolution approach - Store flexible JSON objects or use Avro/Parquet with optional fields - Schema registry for automatic backward/forward compatibility - Tag data with schema version
The Rule: “Always design for future sensor types. Your schema will evolve.”
The Mistake: Send all raw sensor data to cloud
The Cost (Cloud-Only): - Bandwidth: 5 MB/second x 86,400 = 432 GB/day - Cloud ingress: $0.09/GB = $38.88/day = $1,166/month
The Fix: Edge processing
| Approach | Data Sent | Monthly Cost | Savings |
|---|---|---|---|
| Cloud-only | 432 GB/day | $1,166 | Baseline |
| Edge processing | 7.2 GB/day (alerts only) | $19.50 | 98% |
The Rule: “Process data as close to the source as possible. Only send insights to cloud, not raw data.”
1260.5.1 Summary of Common Mistakes
| Mistake | Impact | Fix | Savings/Improvement |
|---|---|---|---|
| Storing everything | $10K+/year storage costs | Tiered storage + downsampling | 85% cost reduction |
| Ignoring late data | Incorrect analytics | Watermarks + allowed lateness | Accurate results |
| No sensor validation | Bad data corrupts insights | Validation rules + quarantine | Prevents false alerts |
| Wrong database type | 45-second queries | Time-series database (InfluxDB) | 22x faster |
| No retention policy | Disk runs out | Automatic retention + aggregates | 99% space savings |
| Cloud-only processing | $1,166/month bandwidth | Edge processing + cloud for insights | 98% cost reduction |
| Rigid schemas | Code breaks with changes | Schema evolution (Avro/Parquet) | Zero downtime updates |
1260.6 Summary
- Key monitoring metrics include message latency (<5 seconds), Kafka consumer lag (<10,000 messages), error rate (<1%), and CPU utilization (<80% sustained).
- Common debugging patterns: Kafka lag means scale consumers or optimize processing; OOM errors indicate data skew or insufficient memory; latency spikes suggest batch processing time exceeds interval.
- Production checklist covers infrastructure monitoring, data quality tracking, performance metrics, operational procedures, and cost management.
- Seven deadly pitfalls include storing everything, ignoring late data, no sensor validation, wrong database choice, no retention policy, cloud-only processing, and rigid schemas.
1260.7 What’s Next
Now that you understand operational best practices, continue to:
- Big Data Case Studies - See real-world implementations
- Big Data Overview - Return to the chapter index