# Idempotent: Use message ID for deduplicationdef process(msg):if msg.idin processed_ids:return# Already processed processed_ids.add(msg.id) counter += msg.value# Idempotent command with request IDdef process_command(cmd):if cmd.request_id in completed:return completed[cmd.request_id] # Return same result result = dispense_item() completed[cmd.request_id] = resultreturn result
How to avoid:
Include unique message IDs
Track processed message IDs
Design idempotent operations
Use database constraints to prevent duplicates
Show code
{const container =document.getElementById('kc-data-10');if (container &&typeof InlineKnowledgeCheck !=='undefined') { container.innerHTML=''; container.appendChild(InlineKnowledgeCheck.create({question:"A vending machine IoT system processes 'dispense item' commands. Due to network instability, a command to dispense a $2 snack is sent twice with the same request_id. The system uses at-least-once delivery without idempotency checks. What happens and how should it be fixed?",options: [ {text:"Two items are dispensed; fix by implementing idempotent operations using request_id",correct:true,feedback:"Correct! Without idempotency checks, each message triggers a dispense action - the customer gets 2 items for the price of 1. The fix is to track completed request_ids and skip duplicates: check if request_id exists in completed set before dispensing, then add it after success."}, {text:"One item is dispensed; Kafka automatically deduplicates commands",correct:false,feedback:"Kafka's at-least-once delivery explicitly allows duplicates - it guarantees no message loss, not no duplicates. Without application-level deduplication, both commands execute."}, {text:"Zero items are dispensed; the system detects a conflict and rejects both",correct:false,feedback:"Standard at-least-once processing doesn't detect or reject duplicates automatically. Both messages appear as independent valid commands and both would execute."}, {text:"The second command queues until the first completes; no duplicate action",correct:false,feedback:"Queueing doesn't prevent duplicate execution - it just orders when they execute. Both commands would still run in sequence, dispensing two items."} ],difficulty:"medium",topic:"data-processing" })); }}
The mistake: Without backpressure, slow consumers are overwhelmed by fast producers. This causes memory exhaustion, dropped messages, and system crashes during traffic spikes.
# Bounded queue with backpressurefrom queue import Queuequeue = Queue(maxsize=1000)def producer():whileTrue: queue.put(generate_data(), block=True) # Blocks when full# Or drop/sample under pressuredef producer_with_sampling():if queue.full():if random() <0.1: # Keep 10% under pressure queue.put_nowait(data)else: queue.put(data)
How to avoid:
Use bounded buffers and queues
Implement blocking producers
Add sampling under load
Monitor queue depths
Scale consumers dynamically
1304.2 Worked Example: Real-Time Fraud Detection Pipeline
1304.3 Worked Example: Stream Processing for Payment Terminal Fraud Detection
Scenario: A payment processor operates 50,000 IoT payment terminals (card readers, POS systems) generating 2,000 transactions per second. The fraud team needs to detect suspicious patterns in real-time: - Velocity attacks: Same card used at multiple distant locations within minutes - Amount anomalies: Unusual transaction amounts for a given merchant - Burst attacks: Many small transactions in rapid succession
Goal: Design a stream processing pipeline that detects fraud within 500ms of transaction arrival while ensuring exactly-once processing to prevent false positives and missed detections.
What we do: Define the transaction event schema and set up Kafka ingestion.
Event Schema:
# Transaction event from payment terminaltransaction_schema = {"transaction_id": "uuid", # Unique identifier"card_hash": "string", # Anonymized card identifier"terminal_id": "string", # Payment terminal ID"merchant_id": "string", # Merchant identifier"amount_cents": "int", # Transaction amount in cents"currency": "string", # ISO currency code"location": {"lat": "double","lon": "double" },"timestamp": "timestamp", # Event time (when card was swiped)"terminal_time": "timestamp"# Processing time (when terminal sent)}# Example event{"transaction_id": "txn-a1b2c3d4","card_hash": "hash_xyz789","terminal_id": "term_12345","merchant_id": "merch_grocery_001","amount_cents": 4523,"currency": "USD","location": {"lat": 40.7128, "lon": -74.0060},"timestamp": "2026-01-10T14:23:45.123Z","terminal_time": "2026-01-10T14:23:45.456Z"}
Kafka Topic Configuration:
# Kafka topic for raw transactionstopic_config = {"name": "transactions-raw","partitions": 32, # Parallelism for 2K TPS"replication_factor": 3, # Durability"retention_ms": 7*24*3600*1000, # 7 days"cleanup_policy": "delete"}# Partition by card_hash for ordered per-card processing# This ensures all transactions for a card go to same partitionproducer.send("transactions-raw", key=transaction["card_hash"], value=transaction)
Why: Partitioning by card_hash ensures all transactions for a single card arrive at the same Flink task instance in order. This is critical for velocity detection (same card at multiple locations) without expensive cross-partition coordination.
What we do: Implement sliding windows to detect velocity and burst attacks.
Flink Pipeline for Velocity Detection:
from pyflink.datastream import StreamExecutionEnvironmentfrom pyflink.datastream.window import SlidingEventTimeWindowsfrom pyflink.common.time import Timeenv = StreamExecutionEnvironment.get_execution_environment()env.set_parallelism(32)# Source: Kafka transactionstransactions = env.add_source(kafka_source)# Velocity detection: Same card, different locations within 5 minutesvelocity_alerts = ( transactions .key_by(lambda t: t["card_hash"]) .window(SlidingEventTimeWindows.of( Time.minutes(5), # Window size: 5 minutes Time.seconds(30) # Slide: evaluate every 30 seconds )) .apply(VelocityDetector()))class VelocityDetector:defapply(self, card_hash, window, transactions):"""Detect impossible travel (>100km in <5 min).""" locations = [(t["location"], t["timestamp"]) for t in transactions]for i, (loc1, time1) inenumerate(locations):for loc2, time2 in locations[i+1:]: distance_km = haversine(loc1, loc2) time_diff_min =abs((time2 - time1).total_seconds()) /60if time_diff_min >0: speed_kmh = distance_km / (time_diff_min /60)# Flag if speed > 500 km/h (impossible by car)if speed_kmh >500:yield FraudAlert( card_hash=card_hash, alert_type="VELOCITY_ATTACK", confidence=min(0.99, speed_kmh /1000), transactions=[t["transaction_id"] for t in transactions], details=f"Speed: {speed_kmh:.0f} km/h over {distance_km:.1f} km" )return# One alert per window
Burst Attack Detection:
# Burst detection: >10 transactions in 2 minutes from same cardburst_alerts = ( transactions .key_by(lambda t: t["card_hash"]) .window(SlidingEventTimeWindows.of( Time.minutes(2), # Window size: 2 minutes Time.seconds(15) # Slide: check every 15 seconds )) .apply(BurstDetector()))class BurstDetector:defapply(self, card_hash, window, transactions):"""Detect card testing attacks (many small transactions).""" txn_list =list(transactions) count =len(txn_list) avg_amount =sum(t["amount_cents"] for t in txn_list) / count# Flag: >10 transactions OR >5 transactions with avg <$5if count >10or (count >5and avg_amount <500):yield FraudAlert( card_hash=card_hash, alert_type="BURST_ATTACK", confidence=min(0.95, count /20), transactions=[t["transaction_id"] for t in txn_list], details=f"{count} txns, avg ${avg_amount/100:.2f}" )
Why: Sliding windows allow continuous evaluation without waiting for window boundaries. A 5-minute window sliding every 30 seconds catches fraud quickly while reducing computational overhead compared to per-event evaluation.
What we do: Configure checkpointing and idempotent sinks to prevent duplicates.
Checkpointing Configuration:
from pyflink.datastream import CheckpointingMode# Enable exactly-once checkpointingenv.enable_checkpointing( interval=10000, # Checkpoint every 10 seconds mode=CheckpointingMode.EXACTLY_ONCE)# Checkpoint configurationenv.get_checkpoint_config().set_min_pause_between_checkpoints(5000)env.get_checkpoint_config().set_checkpoint_timeout(60000)env.get_checkpoint_config().set_max_concurrent_checkpoints(1)env.get_checkpoint_config().enable_externalized_checkpoints( ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)# State backend for large state (card history)env.set_state_backend(RocksDBStateBackend("s3://flink-state/checkpoints"))
Idempotent Alert Sink:
class IdempotentAlertSink:"""Ensures each alert is processed exactly once."""def__init__(self, redis_client, alert_topic):self.redis = redis_clientself.kafka_producer = KafkaProducer( bootstrap_servers="kafka:9092", enable_idempotence=True, # Kafka idempotent producer acks="all" )self.alert_topic = alert_topicdef invoke(self, alert: FraudAlert):# Deduplication key: alert_type + card + window_end dedup_key =f"alert:{alert.alert_type}:{alert.card_hash}:{alert.window_end}"# Check if already processed (Redis SET NX with TTL)ifself.redis.set(dedup_key, "1", nx=True, ex=3600):# First time seeing this alert - process itself.kafka_producer.send(self.alert_topic, key=alert.card_hash, value=alert.to_json() )# Also write to PostgreSQL for auditself.write_to_postgres(alert)# else: Already processed, skip (idempotent)def write_to_postgres(self, alert):"""Idempotent insert using ON CONFLICT."""self.cursor.execute(""" INSERT INTO fraud_alerts (alert_id, card_hash, alert_type, confidence, created_at) VALUES (%s, %s, %s, %s, %s) ON CONFLICT (alert_id) DO NOTHING """, (alert.id, alert.card_hash, alert.alert_type, alert.confidence, alert.created_at))
Why: Exactly-once requires coordination between Flink checkpoints, Kafka transactions, and downstream sinks. The Redis deduplication key and PostgreSQL ON CONFLICT ensure that even if a failure causes reprocessing, each alert is delivered exactly once.
What we do: Configure watermarks and allowed lateness for out-of-order events.
Watermark Strategy:
from pyflink.datastream import WatermarkStrategyfrom pyflink.common import Duration# Watermark: event_time - 30 seconds (allow 30s network delay)watermark_strategy = ( WatermarkStrategy .for_bounded_out_of_orderness(Duration.of_seconds(30)) .with_timestamp_assigner(lambda t, _: t["timestamp"].timestamp() *1000 ))transactions_with_watermarks = transactions.assign_timestamps_and_watermarks( watermark_strategy)# Window configuration with allowed latenessvelocity_alerts = ( transactions_with_watermarks .key_by(lambda t: t["card_hash"]) .window(SlidingEventTimeWindows.of(Time.minutes(5), Time.seconds(30))) .allowed_lateness(Time.minutes(2)) # Accept late data up to 2 min .side_output_late_data(late_data_tag) # Capture very late data .apply(VelocityDetector()))# Handle late data separately (log for investigation)late_transactions = velocity_alerts.get_side_output(late_data_tag)late_transactions.add_sink(late_data_sink) # Write to audit log
Why: Payment terminals may have network delays or clock drift. A 30-second watermark delay handles normal network latency, while 2-minute allowed lateness captures most edge cases. Very late data goes to side output for manual review rather than being silently dropped.
What we do: Add metrics and alerting for pipeline reliability.
Why: Real-time fraud detection is useless if the pipeline is slow or losing data. These metrics ensure the team knows immediately if processing latency exceeds the 500ms SLA or if checkpoints fail (risking exactly-once guarantees).
Outcome: A production fraud detection pipeline processing 2,000 TPS with <200ms average latency and exactly-once delivery guarantees.
%% fig-alt: "Fraud detection architecture showing data flow from 50,000 terminals through Kafka with 32 partitions to Flink with 32 parallel tasks. Flink runs three detectors in parallel: Velocity Detector using 5-minute sliding windows, Burst Detector using 2-minute sliding windows, and Amount Anomaly Detector maintaining per-merchant state. All detectors feed into an Alert Router which outputs to three destinations: Kafka for alerts, PostgreSQL for audit logging, and Redis for deduplication. The final output goes to a Risk Dashboard and Auto-Block API."
%%{init: {'theme': 'base', 'themeVariables': {'primaryColor': '#2C3E50', 'primaryTextColor': '#fff', 'primaryBorderColor': '#16A085', 'lineColor': '#16A085', 'secondaryColor': '#E67E22', 'tertiaryColor': '#7F8C8D'}}}%%
flowchart LR
T["Terminals<br/>(50K)"] --> K["Kafka<br/>(32 partitions)"]
K --> F["Flink<br/>(32 parallel tasks)"]
F --> V["Velocity Detector<br/>(5-min sliding window)"]
F --> B["Burst Detector<br/>(2-min sliding window)"]
F --> A["Amount Anomaly<br/>(per-merchant state)"]
V --> AR["Alert Router"]
B --> AR
A --> AR
AR --> KA["Kafka (alerts)"]
AR --> PG["PostgreSQL (audit)"]
AR --> RD["Redis (dedup)"]
AR --> RDA["Risk Dashboard +<br/>Auto-Block API"]
Key Decisions Made: 1. Kafka partitioning by card_hash: Ensures per-card ordering without coordination 2. Sliding windows: Balance detection speed vs computational cost 3. 30-second watermarks: Handle network delays while keeping latency low 4. Redis deduplication: Ensures exactly-once even across Flink restarts 5. RocksDB state backend: Handles large per-card state (transaction history) 6. Side output for late data: Never silently drop data, always audit
Show code
{const container =document.getElementById('kc-data-11');if (container &&typeof InlineKnowledgeCheck !=='undefined') { container.innerHTML=''; container.appendChild(InlineKnowledgeCheck.create({question:"A payment fraud detection system must identify 'velocity attacks' where the same credit card is used at two locations more than 500km apart within 5 minutes (impossible by normal travel). Which combination of stream processing features is REQUIRED for this detection?",options: [ {text:"Round-robin partitioning + tumbling windows + simple aggregation",correct:false,feedback:"Round-robin partitioning scatters card events across partitions, making it impossible to compare locations for the same card without expensive cross-partition coordination. Per-card keying is essential."}, {text:"Key-based partitioning by card_id + sliding windows + distance calculation with state",correct:true,feedback:"Correct! Key-based partitioning ensures all transactions for a card arrive at the same task. Sliding windows enable continuous evaluation (not waiting for fixed boundaries). Stateful processing stores recent locations to calculate distances between consecutive transactions for velocity detection."}, {text:"Global windows + broadcast state + batch aggregation",correct:false,feedback:"Global windows don't bound computation - you'd accumulate all transactions forever. Broadcast state is for lookup data, not per-card history. Batch aggregation defeats real-time detection requirements."}, {text:"Session windows + CEP patterns + exactly-once delivery",correct:false,feedback:"Session windows group by activity gaps, not fixed time intervals - a card used once per day would never trigger windows. CEP is useful but the core requirement is distance calculation across time-bounded transactions, which sliding windows address."} ],difficulty:"hard",topic:"data-processing" })); }}