1304  Common Pitfalls and Worked Examples

1304.1 Common Pitfalls

⏱️ ~15 min | ⭐⭐⭐ Advanced | πŸ“‹ P10.C14.U06

WarningCommon Pitfall: Non-Idempotent Message Processing

The mistake: Without idempotency, retried messages cause duplicate processing. This leads to double counting, double billing, or repeated actions.

Symptoms:

  • Duplicate actions (double billing, double counting)
  • Data inconsistency
  • Retry storms cause cascading effects
  • Incorrect totals and aggregations

Wrong approach:

# Non-idempotent: Each message increments counter
def process(msg):
    counter += msg.value  # Duplicate messages double-count!

# Non-idempotent command
def process_command(cmd):
    if cmd.action == "dispense":
        dispense_item()  # Retry = dispense twice!

Correct approach:

# Idempotent: Use message ID for deduplication
def process(msg):
    if msg.id in processed_ids:
        return  # Already processed
    processed_ids.add(msg.id)
    counter += msg.value

# Idempotent command with request ID
def process_command(cmd):
    if cmd.request_id in completed:
        return completed[cmd.request_id]  # Return same result
    result = dispense_item()
    completed[cmd.request_id] = result
    return result

How to avoid:

  • Include unique message IDs
  • Track processed message IDs
  • Design idempotent operations
  • Use database constraints to prevent duplicates
WarningCommon Pitfall: Missing Backpressure Handling

The mistake: Without backpressure, slow consumers are overwhelmed by fast producers. This causes memory exhaustion, dropped messages, and system crashes during traffic spikes.

Symptoms:

  • Memory exhaustion
  • System crashes under load
  • Lost data during traffic spikes
  • Increasing latency

Wrong approach:

# Unbounded queue - memory grows indefinitely
queue = []
def producer():
    while True:
        queue.append(generate_data())

def consumer():
    while queue:
        process(queue.pop(0))  # Slower than producer
# Eventually: OutOfMemoryError

Correct approach:

# Bounded queue with backpressure
from queue import Queue

queue = Queue(maxsize=1000)

def producer():
    while True:
        queue.put(generate_data(), block=True)  # Blocks when full

# Or drop/sample under pressure
def producer_with_sampling():
    if queue.full():
        if random() < 0.1:  # Keep 10% under pressure
            queue.put_nowait(data)
    else:
        queue.put(data)

How to avoid:

  • Use bounded buffers and queues
  • Implement blocking producers
  • Add sampling under load
  • Monitor queue depths
  • Scale consumers dynamically

1304.2 Worked Example: Real-Time Fraud Detection Pipeline

1304.3 Worked Example: Stream Processing for Payment Terminal Fraud Detection

Scenario: A payment processor operates 50,000 IoT payment terminals (card readers, POS systems) generating 2,000 transactions per second. The fraud team needs to detect suspicious patterns in real-time: - Velocity attacks: Same card used at multiple distant locations within minutes - Amount anomalies: Unusual transaction amounts for a given merchant - Burst attacks: Many small transactions in rapid succession

Goal: Design a stream processing pipeline that detects fraud within 500ms of transaction arrival while ensuring exactly-once processing to prevent false positives and missed detections.

What we do: Define the transaction event schema and set up Kafka ingestion.

Event Schema:

# Transaction event from payment terminal
transaction_schema = {
    "transaction_id": "uuid",       # Unique identifier
    "card_hash": "string",          # Anonymized card identifier
    "terminal_id": "string",        # Payment terminal ID
    "merchant_id": "string",        # Merchant identifier
    "amount_cents": "int",          # Transaction amount in cents
    "currency": "string",           # ISO currency code
    "location": {
        "lat": "double",
        "lon": "double"
    },
    "timestamp": "timestamp",       # Event time (when card was swiped)
    "terminal_time": "timestamp"    # Processing time (when terminal sent)
}

# Example event
{
    "transaction_id": "txn-a1b2c3d4",
    "card_hash": "hash_xyz789",
    "terminal_id": "term_12345",
    "merchant_id": "merch_grocery_001",
    "amount_cents": 4523,
    "currency": "USD",
    "location": {"lat": 40.7128, "lon": -74.0060},
    "timestamp": "2026-01-10T14:23:45.123Z",
    "terminal_time": "2026-01-10T14:23:45.456Z"
}

Kafka Topic Configuration:

# Kafka topic for raw transactions
topic_config = {
    "name": "transactions-raw",
    "partitions": 32,              # Parallelism for 2K TPS
    "replication_factor": 3,       # Durability
    "retention_ms": 7 * 24 * 3600 * 1000,  # 7 days
    "cleanup_policy": "delete"
}

# Partition by card_hash for ordered per-card processing
# This ensures all transactions for a card go to same partition
producer.send(
    "transactions-raw",
    key=transaction["card_hash"],
    value=transaction
)

Why: Partitioning by card_hash ensures all transactions for a single card arrive at the same Flink task instance in order. This is critical for velocity detection (same card at multiple locations) without expensive cross-partition coordination.

What we do: Implement sliding windows to detect velocity and burst attacks.

Flink Pipeline for Velocity Detection:

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.window import SlidingEventTimeWindows
from pyflink.common.time import Time

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(32)

# Source: Kafka transactions
transactions = env.add_source(kafka_source)

# Velocity detection: Same card, different locations within 5 minutes
velocity_alerts = (
    transactions
    .key_by(lambda t: t["card_hash"])
    .window(SlidingEventTimeWindows.of(
        Time.minutes(5),    # Window size: 5 minutes
        Time.seconds(30)    # Slide: evaluate every 30 seconds
    ))
    .apply(VelocityDetector())
)

class VelocityDetector:
    def apply(self, card_hash, window, transactions):
        """Detect impossible travel (>100km in <5 min)."""
        locations = [(t["location"], t["timestamp"]) for t in transactions]

        for i, (loc1, time1) in enumerate(locations):
            for loc2, time2 in locations[i+1:]:
                distance_km = haversine(loc1, loc2)
                time_diff_min = abs((time2 - time1).total_seconds()) / 60

                if time_diff_min > 0:
                    speed_kmh = distance_km / (time_diff_min / 60)

                    # Flag if speed > 500 km/h (impossible by car)
                    if speed_kmh > 500:
                        yield FraudAlert(
                            card_hash=card_hash,
                            alert_type="VELOCITY_ATTACK",
                            confidence=min(0.99, speed_kmh / 1000),
                            transactions=[t["transaction_id"] for t in transactions],
                            details=f"Speed: {speed_kmh:.0f} km/h over {distance_km:.1f} km"
                        )
                        return  # One alert per window

Burst Attack Detection:

# Burst detection: >10 transactions in 2 minutes from same card
burst_alerts = (
    transactions
    .key_by(lambda t: t["card_hash"])
    .window(SlidingEventTimeWindows.of(
        Time.minutes(2),    # Window size: 2 minutes
        Time.seconds(15)    # Slide: check every 15 seconds
    ))
    .apply(BurstDetector())
)

class BurstDetector:
    def apply(self, card_hash, window, transactions):
        """Detect card testing attacks (many small transactions)."""
        txn_list = list(transactions)
        count = len(txn_list)
        avg_amount = sum(t["amount_cents"] for t in txn_list) / count

        # Flag: >10 transactions OR >5 transactions with avg <$5
        if count > 10 or (count > 5 and avg_amount < 500):
            yield FraudAlert(
                card_hash=card_hash,
                alert_type="BURST_ATTACK",
                confidence=min(0.95, count / 20),
                transactions=[t["transaction_id"] for t in txn_list],
                details=f"{count} txns, avg ${avg_amount/100:.2f}"
            )

Why: Sliding windows allow continuous evaluation without waiting for window boundaries. A 5-minute window sliding every 30 seconds catches fraud quickly while reducing computational overhead compared to per-event evaluation.

What we do: Configure checkpointing and idempotent sinks to prevent duplicates.

Checkpointing Configuration:

from pyflink.datastream import CheckpointingMode

# Enable exactly-once checkpointing
env.enable_checkpointing(
    interval=10000,  # Checkpoint every 10 seconds
    mode=CheckpointingMode.EXACTLY_ONCE
)

# Checkpoint configuration
env.get_checkpoint_config().set_min_pause_between_checkpoints(5000)
env.get_checkpoint_config().set_checkpoint_timeout(60000)
env.get_checkpoint_config().set_max_concurrent_checkpoints(1)
env.get_checkpoint_config().enable_externalized_checkpoints(
    ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
)

# State backend for large state (card history)
env.set_state_backend(RocksDBStateBackend("s3://flink-state/checkpoints"))

Idempotent Alert Sink:

class IdempotentAlertSink:
    """Ensures each alert is processed exactly once."""

    def __init__(self, redis_client, alert_topic):
        self.redis = redis_client
        self.kafka_producer = KafkaProducer(
            bootstrap_servers="kafka:9092",
            enable_idempotence=True,  # Kafka idempotent producer
            acks="all"
        )
        self.alert_topic = alert_topic

    def invoke(self, alert: FraudAlert):
        # Deduplication key: alert_type + card + window_end
        dedup_key = f"alert:{alert.alert_type}:{alert.card_hash}:{alert.window_end}"

        # Check if already processed (Redis SET NX with TTL)
        if self.redis.set(dedup_key, "1", nx=True, ex=3600):
            # First time seeing this alert - process it
            self.kafka_producer.send(
                self.alert_topic,
                key=alert.card_hash,
                value=alert.to_json()
            )
            # Also write to PostgreSQL for audit
            self.write_to_postgres(alert)
        # else: Already processed, skip (idempotent)

    def write_to_postgres(self, alert):
        """Idempotent insert using ON CONFLICT."""
        self.cursor.execute("""
            INSERT INTO fraud_alerts
                (alert_id, card_hash, alert_type, confidence, created_at)
            VALUES (%s, %s, %s, %s, %s)
            ON CONFLICT (alert_id) DO NOTHING
        """, (alert.id, alert.card_hash, alert.alert_type,
              alert.confidence, alert.created_at))

Why: Exactly-once requires coordination between Flink checkpoints, Kafka transactions, and downstream sinks. The Redis deduplication key and PostgreSQL ON CONFLICT ensure that even if a failure causes reprocessing, each alert is delivered exactly once.

What we do: Configure watermarks and allowed lateness for out-of-order events.

Watermark Strategy:

from pyflink.datastream import WatermarkStrategy
from pyflink.common import Duration

# Watermark: event_time - 30 seconds (allow 30s network delay)
watermark_strategy = (
    WatermarkStrategy
    .for_bounded_out_of_orderness(Duration.of_seconds(30))
    .with_timestamp_assigner(
        lambda t, _: t["timestamp"].timestamp() * 1000
    )
)

transactions_with_watermarks = transactions.assign_timestamps_and_watermarks(
    watermark_strategy
)

# Window configuration with allowed lateness
velocity_alerts = (
    transactions_with_watermarks
    .key_by(lambda t: t["card_hash"])
    .window(SlidingEventTimeWindows.of(Time.minutes(5), Time.seconds(30)))
    .allowed_lateness(Time.minutes(2))  # Accept late data up to 2 min
    .side_output_late_data(late_data_tag)  # Capture very late data
    .apply(VelocityDetector())
)

# Handle late data separately (log for investigation)
late_transactions = velocity_alerts.get_side_output(late_data_tag)
late_transactions.add_sink(late_data_sink)  # Write to audit log

Late Data Handling Policy:

# Terminal clock drift detection
class ClockDriftMonitor:
    def process(self, transaction):
        event_time = transaction["timestamp"]
        processing_time = datetime.now(timezone.utc)
        drift_seconds = (processing_time - event_time).total_seconds()

        # Flag terminals with >60s clock drift
        if abs(drift_seconds) > 60:
            yield TerminalAlert(
                terminal_id=transaction["terminal_id"],
                alert_type="CLOCK_DRIFT",
                drift_seconds=drift_seconds
            )

Why: Payment terminals may have network delays or clock drift. A 30-second watermark delay handles normal network latency, while 2-minute allowed lateness captures most edge cases. Very late data goes to side output for manual review rather than being silently dropped.

What we do: Add metrics and alerting for pipeline reliability.

Key Metrics:

# Flink metrics exposed to Prometheus
pipeline_metrics = {
    # Throughput metrics
    "transactions_processed_total": Counter,
    "transactions_processed_rate": Gauge,  # TPS

    # Latency metrics
    "processing_latency_ms": Histogram,    # End-to-end latency
    "event_time_lag_ms": Gauge,            # How far behind real-time

    # Fraud detection metrics
    "fraud_alerts_total": Counter,
    "fraud_alerts_by_type": Counter,       # Labeled by alert_type

    # Health metrics
    "checkpoint_duration_ms": Histogram,
    "checkpoint_failures_total": Counter,
    "late_events_total": Counter,
    "backpressure_ratio": Gauge
}

# Alert rules (Prometheus/Alertmanager)
alert_rules = """
- alert: HighProcessingLatency
  expr: histogram_quantile(0.99, processing_latency_ms) > 500
  for: 2m
  labels:
    severity: critical
  annotations:
    summary: "Fraud detection latency exceeds 500ms SLA"

- alert: CheckpointFailures
  expr: increase(checkpoint_failures_total[5m]) > 0
  for: 1m
  labels:
    severity: warning
  annotations:
    summary: "Flink checkpoint failures detected"

- alert: HighBackpressure
  expr: backpressure_ratio > 0.5
  for: 5m
  labels:
    severity: warning
  annotations:
    summary: "Pipeline backpressure indicates capacity issues"
"""

Why: Real-time fraud detection is useless if the pipeline is slow or losing data. These metrics ensure the team knows immediately if processing latency exceeds the 500ms SLA or if checkpoints fail (risking exactly-once guarantees).

Outcome: A production fraud detection pipeline processing 2,000 TPS with <200ms average latency and exactly-once delivery guarantees.

Pipeline Performance: | Metric | Target | Achieved | |——–|——–|β€”β€”β€”-| | Throughput | 2,000 TPS | 2,500 TPS (headroom) | | P50 Latency | <200ms | 85ms | | P99 Latency | <500ms | 340ms | | False Positive Rate | <1% | 0.3% | | Missed Fraud Rate | <0.1% | 0.05% | | Exactly-Once Delivery | 100% | 100% (verified) |

Architecture Summary:

%% fig-alt: "Fraud detection architecture showing data flow from 50,000 terminals through Kafka with 32 partitions to Flink with 32 parallel tasks. Flink runs three detectors in parallel: Velocity Detector using 5-minute sliding windows, Burst Detector using 2-minute sliding windows, and Amount Anomaly Detector maintaining per-merchant state. All detectors feed into an Alert Router which outputs to three destinations: Kafka for alerts, PostgreSQL for audit logging, and Redis for deduplication. The final output goes to a Risk Dashboard and Auto-Block API."
%%{init: {'theme': 'base', 'themeVariables': {'primaryColor': '#2C3E50', 'primaryTextColor': '#fff', 'primaryBorderColor': '#16A085', 'lineColor': '#16A085', 'secondaryColor': '#E67E22', 'tertiaryColor': '#7F8C8D'}}}%%
flowchart LR
    T["Terminals<br/>(50K)"] --> K["Kafka<br/>(32 partitions)"]
    K --> F["Flink<br/>(32 parallel tasks)"]

    F --> V["Velocity Detector<br/>(5-min sliding window)"]
    F --> B["Burst Detector<br/>(2-min sliding window)"]
    F --> A["Amount Anomaly<br/>(per-merchant state)"]

    V --> AR["Alert Router"]
    B --> AR
    A --> AR

    AR --> KA["Kafka (alerts)"]
    AR --> PG["PostgreSQL (audit)"]
    AR --> RD["Redis (dedup)"]

    AR --> RDA["Risk Dashboard +<br/>Auto-Block API"]

Key Decisions Made: 1. Kafka partitioning by card_hash: Ensures per-card ordering without coordination 2. Sliding windows: Balance detection speed vs computational cost 3. 30-second watermarks: Handle network delays while keeping latency low 4. Redis deduplication: Ensures exactly-once even across Flink restarts 5. RocksDB state backend: Handles large per-card state (transaction history) 6. Side output for late data: Never silently drop data, always audit

1304.4 What’s Next

Continue to Hands-On Lab: Basic Stream Processing to implement streaming concepts on ESP32 with a 45-minute Wokwi lab.