1260  Big Data Operations

Learning Objectives

After completing this chapter, you will be able to:

  • Monitor big data pipeline health with key metrics
  • Debug common issues (Kafka lag, OOM errors, late arrivals)
  • Implement production monitoring checklists
  • Avoid the seven deadly pitfalls of IoT big data

1260.1 Debugging and Monitoring IoT Big Data Pipelines

Building a big data pipeline is just the beginning - keeping it healthy in production requires comprehensive monitoring and debugging strategies. This section covers practical approaches for tracking pipeline health, diagnosing failures, and optimizing performance.

1260.1.1 Key Metrics to Track

Metric Category Metric Description Alert Threshold Tool
Data Flow Message Latency Time from sensor to storage > 5 seconds Kafka metrics, Grafana
Throughput Messages per second < 80% expected Spark UI, Prometheus
Backlog Size Pending unprocessed messages > 10,000 messages Kafka lag monitoring
Processing Rate Records processed/sec Declining trend Spark streaming metrics
Data Quality Error Rate Failed messages / total > 1% Custom validators
Schema Violations Records with format errors > 0.1% Schema registry alerts
Duplicate Rate Duplicate message IDs > 0.5% Deduplication counters
Missing Data Expected vs actual sensors > 5% devices offline Device heartbeat tracking
Infrastructure Storage Growth GB per day > 120% expected HDFS metrics, S3 CloudWatch
CPU Utilization Spark executor CPU > 80% sustained Cluster manager
Memory Pressure JVM heap usage > 85% JVM metrics, GC logs
Network I/O Data transfer rates Saturation Network monitoring

1260.1.2 Common Issues and Diagnosis

Symptom Likely Cause Investigation Steps Resolution
High Latency Network congestion Check queue depths, network bandwidth Increase partitions, add brokers
Missing Data Sensor offline Check device heartbeats, last-seen timestamps Investigate device connectivity
Duplicate Records Retry logic Check message IDs, producer configs Implement idempotent writes
Schema Errors Format mismatch Validate against schema registry Update schema, fix producers
OOM Errors Memory leaks Analyze heap dumps, GC logs Increase memory, optimize code
Slow Queries Missing partitions Check query plans, partition strategy Add partitions, update indexes
Kafka Lag Growing Consumer too slow Check processing time per batch Scale consumers, optimize logic
Data Skew Uneven partitioning Analyze partition distribution Repartition by better key

1260.2 Debugging Common Big Data Issues

Symptoms:

Consumer lag: 50,000 messages (growing)
Processing time: 500ms per batch
Records per batch: 100

Diagnosis:

# Check consumer lag
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --group iot-processor --describe

# Example output showing lag:
TOPIC    PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
sensors  0          1000000         1050000         50000

Root Cause Analysis:

# Calculate processing capacity
records_per_sec = 100 / 0.5  # 200 records/sec
arrival_rate = 500  # records/sec from producers

# Capacity shortfall
shortfall = arrival_rate - records_per_sec  # 300 records/sec falling behind

Solutions:

  1. Scale consumers (if partitions > consumers):

    # Add 2 more consumer instances
    # Rebalancing will distribute partitions
  2. Optimize processing (if consumers == partitions):

    # Before: Processing each record individually
    for record in batch:
        process(record)  # 5ms per record
    
    # After: Batch processing
    process_batch(batch)  # 50ms for 100 records (0.5ms per record)
  3. Increase partitions (if need more parallelism):

    # Add partitions to topic (irreversible!)
    kafka-topics.sh --alter --topic sensors \
      --partitions 20 --bootstrap-server localhost:9092

Symptoms:

ERROR Executor: Exception in task 1.0
java.lang.OutOfMemoryError: Java heap space

Root Cause Options:

A. Data Skew (one partition much larger)

# Check partition sizes
df.groupBy(spark_partition_id()).count().show()

# Output showing skew:
# Partition 0: 1M records
# Partition 1: 100k records
# Partition 2: 5M records <- PROBLEM

Solution: Repartition by better key

# Before: Partitioned by sensor_id (some sensors very chatty)
df = df.repartition("sensor_id")

# After: Partition by hash of sensor_id and timestamp
from pyspark.sql.functions import hash, col
df = df.repartition(20, hash(col("sensor_id"), col("timestamp")))

B. Insufficient Memory Configuration

# Check current config
spark.conf.get("spark.executor.memory")  # "1g"

# Increase memory
spark-submit \
  --executor-memory 4g \
  --executor-memoryOverhead 1g \
  --conf spark.memory.fraction=0.8 \
  job.py

C. Too Much Data in Memory (cache/persist)

# Before: Caching entire dataset
df.cache()  # May exceed memory!

# After: Cache only needed columns
df.select("sensor_id", "temperature").cache()

# Or use disk persistence
df.persist(StorageLevel.MEMORY_AND_DISK)

Symptoms:

Average latency: 2 seconds
P99 latency: 45 seconds <- SPIKE

Diagnosis:

# Check Spark Streaming batch times
# Spark UI -> Streaming tab -> Batch Details

# Look for:
# - Scheduling delay (queuing time before processing)
# - Processing time (actual computation)
# - Total delay (end-to-end)

# Example output:
Batch Time: 2024-01-15 10:05:00
Scheduling Delay: 30s  <- PROBLEM (queuing)
Processing Time: 3s
Total Delay: 33s

Root Cause: Batch processing time > batch interval

Batch interval: 10 seconds
Avg processing time: 12 seconds
-> Queuing builds up over time

Solutions:

  1. Optimize Processing Logic
# Before: Multiple passes over data
filtered = df.filter(...)
aggregated = filtered.groupBy(...).agg(...)
joined = aggregated.join(...)

# After: Single optimized query
result = df.filter(...) \
    .groupBy(...).agg(...) \
    .join(..., broadcast=True)  # Broadcast small table
  1. Scale Cluster
# Increase parallelism
spark.conf.set("spark.default.parallelism", 200)
spark.conf.set("spark.sql.shuffle.partitions", 200)

Symptoms:

2024-01-15: Processing 1M records/hour
2024-01-16: Errors spike, only 200k records/hour
Error: "Missing field: humidity"

Diagnosis:

# Check schema registry
# Example: Sensor firmware update added "humidity_v2" field

# Old schema:
{"sensor_id": "string", "temp": "float", "humidity": "float"}

# New schema:
{"sensor_id": "string", "temp": "float", "humidity_v2": "float"}

Solution: Handle Schema Evolution - Check for new field (humidity_v2) first - Fall back to old field (humidity) if new field missing - Use default value (None) if both missing - Alternative: Use schema registry (Confluent Schema Registry) for automatic backward/forward compatibility

1260.3 Production Monitoring Checklist

CautionProduction Monitoring Checklist

Before deploying a big data pipeline to production, ensure:

Infrastructure Monitoring: - [ ] CPU, memory, disk, network metrics collected - [ ] Alerting configured for resource exhaustion (>80% threshold) - [ ] Log aggregation enabled (ELK, Splunk, CloudWatch Logs) - [ ] Cluster autoscaling configured (if using cloud)

Data Quality Monitoring: - [ ] Schema validation alerts for format changes - [ ] Data completeness tracking (expected vs actual record counts) - [ ] Duplicate detection and alerting - [ ] Quarantine zone monitoring (review regularly) - [ ] Data lineage documented (know where data comes from/goes to)

Performance Monitoring: - [ ] End-to-end latency tracked (sensor to storage to query) - [ ] Throughput metrics (records/sec, GB/day) - [ ] Kafka consumer lag alerts (<1000 messages) - [ ] Spark job duration tracking (alert if >2x baseline) - [ ] Query performance benchmarks (alert if >2x slower)

Operational: - [ ] Runbooks for common issues (documented resolution steps) - [ ] On-call rotation and escalation paths defined - [ ] Automated failure recovery (restart failed jobs) - [ ] Regular capacity planning reviews (weekly/monthly) - [ ] Disaster recovery tested (restore from backup)

Cost Monitoring: - [ ] Cloud costs tracked by service (S3, EC2, EMR) - [ ] Storage growth monitored (enforce retention policies) - [ ] Unused resources identified (idle clusters) - [ ] Cost per million messages tracked

1260.4 Real-World Debugging Example

TipReal-World Debugging Example: Smart City Traffic Analysis

Scenario: Smart city processes 10,000 traffic cameras, each sending 1 image/sec.

Issue Detected:

Alert: Kafka consumer lag = 500,000 messages (increasing)
Storage growth: 200 GB/day (expected: 80 GB/day)
Query latency: 30s (expected: 5s)

Investigation:

Step 1: Check Kafka metrics

# Consumer lag by partition
PARTITION  LAG
0          50,000
1          48,000
...
9          52,000

# Conclusion: All partitions lagging equally (not a skew issue)
# Problem is overall processing capacity

Step 2: Check Spark processing time

Batch interval: 60 seconds
Processing time: 75 seconds <- PROBLEM!
Records per batch: 60,000 images

Step 3: Profile processing logic

%%{init: {'theme': 'base', 'themeVariables': { 'primaryColor': '#2C3E50', 'primaryTextColor': '#fff', 'primaryBorderColor': '#16A085', 'lineColor': '#16A085', 'secondaryColor': '#E67E22', 'tertiaryColor': '#ecf0f1'}}}%%
flowchart TB
    subgraph Original["Original Approach (Slow)"]
        O1[Image Input<br/>65ms per image] --> O2[Decode Image<br/>10ms]
        O2 --> O3[Detect Vehicles<br/>50ms<br/>BOTTLENECK]
        O3 --> O4[Extract Metadata<br/>5ms]
        O4 --> O5[Capacity:<br/>15 images/sec<br/>Need: 10,000/sec<br/>99.85% shortfall]
    end

    subgraph Optimized["Smart Sampling (Fast)"]
        N1{Every 10th<br/>Frame?} -->|Yes| N2[Full Processing<br/>65ms]
        N1 -->|No| N3[Motion Detection<br/>2ms]
        N2 --> N4[Cache Result]
        N3 -->|Motion| N5[Use Cached Data]
        N3 -->|No Motion| N6[vehicles = 0]
        N4 & N5 & N6 --> N7[Average: 8.3ms/frame<br/>Capacity: 120/sec/executor<br/>x100 executors = 12,000/sec]
    end

    style O3 fill:#E74C3C,stroke:#2C3E50,color:#fff
    style O5 fill:#E74C3C,stroke:#2C3E50,color:#fff
    style N7 fill:#27AE60,stroke:#2C3E50,color:#fff

Figure 1260.1: Traffic Camera Processing Before and After Smart Sampling Optimization

Root Cause: Vehicle detection on every frame is too expensive!

Solution: Smart Sampling Strategy - 90% of frames: Lightweight motion detection only (2ms) - 10% of frames: Full vehicle detection + caching (65ms) - Average processing: 0.9 x 2ms + 0.1 x 65ms = 8.3ms per frame - Capacity: 120 images/sec per executor x 100 executors = 12,000 images/sec

Results After Optimization:

Kafka consumer lag: 0 messages (cleared in 2 hours)
Storage growth: 75 GB/day (compressed processed data)
Query latency: 3 seconds (partitioned by camera and time)
Cost savings: 60% reduction in compute (fewer resources needed)

Key Lessons: 1. Profile before optimizing - Identified vehicle detection as bottleneck 2. Domain-specific optimization - Traffic changes slowly; don’t need frame-by-frame analysis 3. Tiered processing - Full processing for keyframes, lightweight for others 4. Compression - Store processed metadata (KB), not raw images (MB)

1260.5 Seven Deadly Pitfalls of IoT Big Data

CautionPitfall 1: Storing Everything “Just in Case”

The Mistake: Keep all raw IoT data indefinitely because “we might need it later”

Why It’s Bad: - Storage costs explode: 1 billion x 100 bytes x 365 days = 36.5 TB/year - At $0.023/GB/month: $840/month = $10,080/year - Most raw data is never queried after 30 days

The Fix: Implement tiered storage + downsampling

Data Tier Retention Storage Monthly Cost
Hot (raw) 7 days Fast SSD $100
Warm (1-min avg) 90 days Slower $20
Cold (hourly avg) Forever S3 Glacier $2

Savings: $840/month to $122/month = 85% reduction

CautionPitfall 2: Ignoring Late-Arriving Data

The Mistake: Close windows immediately without waiting for late arrivals

What Goes Wrong: - Sensor loses network for 30 seconds - Data arrives 45 seconds late - Count is wrong because late data excluded - Reports show mysterious “dips” that didn’t happen

The Fix: Use watermarks with appropriate tolerance (2 minutes for most IoT scenarios)

Tradeoff: Results delayed by 2 minutes, but accurate

CautionPitfall 3: Not Handling Sensor Failures

The Mistake: Trust all sensor data blindly

What Goes Wrong: - Faulty sensor reports temperature = 500C (impossible) - Bad data corrupts analytics: “Average temperature = 120C” - Triggers false alerts (fire alarm goes off)

The Fix: Implement data quality rules

Validation Check Action
Range Check -50C to 100C Quarantine out-of-range
Rate of Change < 10C per second Flag sudden spikes
Sensor Health Last reading < 10 min ago Mark offline
CautionPitfall 4: No Data Retention Policy

The Mistake: No deletion policy, keep everything forever

What Happens: - Month 1: 2 TB of data, fine - Month 6: 12 TB, queries slowing - Month 12: 24 TB, disk 95% full - Month 13: Database crashes

The Fix: Implement retention policies from day 1

Tier Granularity Retention Purpose
Raw Every reading 7 days Real-time debugging
1-minute AVG/MIN/MAX 90 days Recent trends
Hourly AVG/MIN/MAX Forever Long-term analysis

Space Savings: 1 year raw (24 TB) to hourly aggregates (200 GB) = 99% reduction

CautionPitfall 5: Over-engineering Small Data

The mistake: Using Spark, Hadoop, or Kafka for datasets that easily fit in memory on a laptop.

Symptoms: - Team spends more time managing infrastructure than analyzing data - Simple queries that should take seconds take minutes (cluster overhead) - Cloud bills are 10-100x higher than needed

The fix: Apply the “laptop test” before choosing big data tools:

# Quick sizing check
data_per_day_mb = num_sensors * readings_per_second * 86400 * bytes_per_reading / 1e6
data_per_year_gb = data_per_day_mb * 365 / 1000

print(f"Daily: {data_per_day_mb:.1f} MB, Yearly: {data_per_year_gb:.1f} GB")

# Decision thresholds:
# < 10 GB/day -> Pandas on laptop, PostgreSQL/InfluxDB (single node)
# 10-100 GB/day -> Time-series DB cluster, consider Kafka
# > 100 GB/day -> Now you might need Spark/Hadoop

Example: 1000 sensors at 1 reading/minute at 100 bytes = 144 MB/day = 52 GB/year. Fits on a $50 SSD. No Hadoop needed!

CautionPitfall 6: Not Planning for Schema Evolution

The Mistake: Rigidly defined schema that breaks when sensors change

What Goes Wrong: - New sensor models have additional fields - Old code crashes when encountering new fields - Have to migrate millions of old records - Downtime during migration

The Fix: Use schema evolution approach - Store flexible JSON objects or use Avro/Parquet with optional fields - Schema registry for automatic backward/forward compatibility - Tag data with schema version

The Rule: “Always design for future sensor types. Your schema will evolve.”

CautionPitfall 7: Processing All Data in Cloud (Ignoring Edge)

The Mistake: Send all raw sensor data to cloud

The Cost (Cloud-Only): - Bandwidth: 5 MB/second x 86,400 = 432 GB/day - Cloud ingress: $0.09/GB = $38.88/day = $1,166/month

The Fix: Edge processing

Approach Data Sent Monthly Cost Savings
Cloud-only 432 GB/day $1,166 Baseline
Edge processing 7.2 GB/day (alerts only) $19.50 98%

The Rule: “Process data as close to the source as possible. Only send insights to cloud, not raw data.”

1260.5.1 Summary of Common Mistakes

Mistake Impact Fix Savings/Improvement
Storing everything $10K+/year storage costs Tiered storage + downsampling 85% cost reduction
Ignoring late data Incorrect analytics Watermarks + allowed lateness Accurate results
No sensor validation Bad data corrupts insights Validation rules + quarantine Prevents false alerts
Wrong database type 45-second queries Time-series database (InfluxDB) 22x faster
No retention policy Disk runs out Automatic retention + aggregates 99% space savings
Cloud-only processing $1,166/month bandwidth Edge processing + cloud for insights 98% cost reduction
Rigid schemas Code breaks with changes Schema evolution (Avro/Parquet) Zero downtime updates

1260.6 Summary

  • Key monitoring metrics include message latency (<5 seconds), Kafka consumer lag (<10,000 messages), error rate (<1%), and CPU utilization (<80% sustained).
  • Common debugging patterns: Kafka lag means scale consumers or optimize processing; OOM errors indicate data skew or insufficient memory; latency spikes suggest batch processing time exceeds interval.
  • Production checklist covers infrastructure monitoring, data quality tracking, performance metrics, operational procedures, and cost management.
  • Seven deadly pitfalls include storing everything, ignoring late data, no sensor validation, wrong database choice, no retention policy, cloud-only processing, and rigid schemas.

1260.7 What’s Next

Now that you understand operational best practices, continue to: