A retail platform builds a live customer activity view from two Kafka topics. One topic contains profile updates such as name, address, and loyalty status. The other contains purchases. A Kafka Streams application enriches each purchase with the latest profile and writes the result to an output topic.
The first implementation appears correct. New purchases produce enriched records immediately. Then a customer changes their address and nothing happens. The output remains stale until that customer makes another purchase.
The problem is not consumer lag or a failed broker. It comes from the semantics of the join. A stream-to-table join runs when a new stream record arrives. Updating the table changes the lookup state, but it does not automatically reprocess purchases that were handled earlier.
The fix requires more than replacing one operator. The application must model purchases and profiles according to their real meaning, align keys and partitions, materialize state safely, and choose a join that reacts when either side changes.
The System We Need to Build
Assume the platform has these components:
ProfileServicepublishes the latest customer profile.PurchaseServicepublishes completed purchases.CustomerActivityServicecombines both data sources in real time.CustomerPortalServicereads the latest enriched customer view.- Kafka stores the input, intermediate, changelog, and output topics.
- Local state stores hold the data required for aggregation and joins.
The required data flow is:
ProfileService
|
v
customer-profile-state --------------------+
|
v
CustomerActivityService
|
v
PurchaseService customer-activity-view
| |
v v
purchase-events -----------------> CustomerPortalService
The output must change in both cases:
- A new purchase arrives for a customer.
- The customer's profile changes even when no new purchase arrives.
That second requirement is what the naive stream-to-table design fails to satisfy.
Why Stream Processing Fits This Requirement
A scheduled extract-transform-load pipeline could periodically read profiles and purchases, combine them, and replace a reporting table. That approach may be acceptable when results can be several hours old.
This platform needs updates soon after an event occurs. Stream processing fits because it continuously:
- Reads incoming events.
- Transforms or aggregates them.
- Produces updated results.
Kafka Streams is a Java library embedded in an ordinary server application. It does not require a separate processing cluster. The application reads from Kafka, executes a processing topology, and writes results back to Kafka.
A topology is a directed graph of processing steps:
Source topic
|
v
Filter invalid records
|
v
Change key
|
v
Aggregate into state
|
v
Join with another table
|
v
Materialize result
|
v
Output topic
Kafka Streams provides two important abstractions:
KStreamrepresents a continuous sequence of independent events.KTablerepresents the latest value for each key, similar to a continuously updated table.
Choosing between them is an architectural decision, not merely an API preference.
Diagnose the Naive Join
Suppose purchases are modeled as a KStream and profiles are modelled as a KTable.
Purchase KStream + Profile KTable -> Enriched purchase
When a purchase arrives, Kafka Streams looks up the current profile using the purchase key and emits an enriched result.
The trigger is the purchase event.
Purchase arrives
|
v
Look up current profile
|
v
Emit enriched purchase
A later profile update changes the table state:
Profile changes
|
v
Update profile state store
|
v
No earlier purchase is reprocessed
This behaviour is useful when the desired result is an immutable enriched event that reflects the profile known at purchase-processing time. It is wrong when the output is supposed to be a current customer view.
The design must therefore represent both sides as changing state.
Model the Business Meaning Before Writing the Topology
The two inputs have different raw forms:
Profile input
The profile topic already represents the latest state of each customer.
Key: customer identifier
Value: latest customer profile
This naturally becomes a KTable.
Purchase input
The purchase topic is a stream of events.
Key: purchase identifier
Value: purchase details with customer identifier
The application needs a current purchase summary per customer, not an isolated purchase event. It must first:
- Select the customer identifier as the key.
- Group purchases by that key.
- Aggregate them into a summary.
- Materialize the summary as a table.
The result is:
Profile KTable + PurchaseSummary KTable -> CustomerActivity KTable
A table-to-table join reacts when either table changes. A profile update can therefore produce a fresh customer view without waiting for another purchase.
The Correct Topology
The corrected architecture is:
purchase-events
|
v
Filter unusable purchases
|
v
Select customerId as key
|
v
Automatic repartition topic
|
v
Aggregate purchases per customer
|
v
Purchase summary state store
|
+-----------------------------+
|
v
KTable-to-KTable join
^
|
customer-profile-state |
| |
v |
Profile state store ----------------+
|
v
customer-activity-view
Kafka Streams may create internal topics for two purposes:
- Repartition topics redistribute records after the key changes.
- Changelog topics back up state stores for recovery.
These internal topics are part of the application's data architecture. They consume network, disk, and broker resources and must be included in capacity planning and monitoring.
Implement the Topology with the Kafka Streams DSL
The following Java example uses a different domain and structure from common introductory examples while preserving the same architectural idea.
StreamsBuilder builder = new StreamsBuilder();
KStream<String, PurchaseEvent> purchases =
builder.stream("purchase-events");
KStream<String, PurchaseEvent> purchasesByCustomer = purchases
.filter((purchaseId, purchase) ->
purchase != null && purchase.customerId() != null)
.selectKey((purchaseId, purchase) ->
purchase.customerId());
KTable<String, PurchaseSummary> purchaseSummaries =
purchasesByCustomer
.groupByKey()
.aggregate(
PurchaseSummary::empty,
(customerId, purchase, currentSummary) ->
currentSummary.include(purchase),
Materialized.as("purchase-summary-store")
);
KTable<String, CustomerProfile> profiles =
builder.table(
"customer-profile-state",
Materialized.as("customer-profile-store")
);
KTable<String, CustomerActivityView> customerViews =
profiles.leftJoin(
purchaseSummaries,
CustomerActivityView::combine,
Materialized.as("customer-activity-store")
);
customerViews
.toStream()
.to("customer-activity-view");
The exact serializers and domain classes depend on the application contract. The important stages are:
selectKeychanges the purchase key tocustomerId.groupByKeygroups all purchases belonging to one customer.aggregatecreates a stateful purchase summary.builder.tabletreats profiles as current keyed state.leftJoinproduces a current view that changes when either table changes.toStreamconverts table updates into output events.
A left join is useful when the platform wants a customer view even before the first purchase exists. An inner join would emit only when both sides have a value.
Why Rekeying Is Required
The raw purchase topic is keyed by purchase identifier, while the profile topic is keyed by customer identifier.
purchase-events:
key = purchaseId
customer-profile-state:
key = customerId
A join cannot match related records reliably when the keys represent different entities.
The application changes the purchase key:
Before:
P-84319 -> customer C-9004
After rekeying:
C-9004 -> purchase P-84319
Changing a key is not just an in-memory operation. Records must be redistributed so that all data for the same customer reaches the same task. Kafka Streams creates a repartition topic to perform that redistribution.
Repartitioning has a cost:
- Extra Kafka writes
- Extra Kafka reads
- Additional broker storage
- Additional network traffic
- More end-to-end latency
The cost is justified when the new key is required for a correct aggregation or join. Avoid repeated key changes that create unnecessary repartition steps.
Co-Partition Inputs Before Joining
Matching keys must be processed together. Topics used in a partitioned join need compatible partitioning:
- The same logical key
- The same key serialization
- The same partition count
- A consistent partitioning algorithm
Consider this broken design:
customer-profile-state:
key = customerId
partitions = 6
purchase-summary:
key = customerId
partitions = 12
Even though the key values look identical, partition alignment is inconsistent. Related records may not reach corresponding tasks.
A safer design keeps joined topics aligned:
customer-profile-state:
key = customerId
partitions = N
purchase-summary:
key = customerId
partitions = N
Use a GlobalKTable only when broadcasting the complete table to every task is acceptable. It removes co-partitioning constraints for that lookup, but duplicates the full dataset across application instances. That can be inappropriate for a large profile dataset.
Separate Stateless and Stateful Operations
Stateless operations process each event independently. They do not need information from earlier records.
Examples include:
- Filtering invalid records
- Mapping one record to another form
- Expanding one input into several outputs
- Routing records to different topics
- Changing a key
Stateful operations require history or accumulated context.
Examples include:
- Counting purchases per customer
- Maintaining the latest profile
- Aggregating purchase totals
- Joining profiles with purchase summaries
- Calculating results over time windows
Stateful operations are more operationally expensive because the application must store, restore, and protect intermediate state.
A good topology performs cheap stateless validation before expensive stateful processing:
Raw event
|
v
Validate and filter
|
v
Normalize key and fields
|
v
Stateful aggregation or join
This prevents unusable records from consuming state-store space and repartition traffic.
Understand How Local State Remains Durable
Each Kafka Streams task has local state stores associated with its assigned partitions and topology.
A stateful task includes:
- Its input partitions
- Its processing topology
- Its local state stores
Kafka Streams commonly uses RocksDB for persistent local state. Local storage keeps lookups fast because the processing code does not need a remote database request for every event.
Local disks alone are not enough. If an instance fails, its local store may be unavailable. Kafka Streams writes state-store changes to Kafka changelog topics.
Incoming events
|
v
Task updates local RocksDB store
|
v
State changes backed up to changelog topic
When a task moves to another instance, Kafka Streams restores the state from the changelog before normal processing continues.
This produces a useful balance:
- Local state provides low-latency access.
- Kafka changelogs provide fault-tolerant recovery.
- Task reassignment provides horizontal scalability.
The trade-off is recovery time. A large store can require substantial disk I/O, memory, and network traffic during restoration.
Understand Tasks, Instances, and Parallelism
Kafka Streams divides the topology into tasks. Tasks are distributed across threads and application instances.
Input partitions
P0 P1 P2 P3
Tasks
T0 T1 T2 T3
Application instances
Instance A: T0, T1
Instance B: T2, T3
A task is an indivisible unit containing its partition assignments, state stores, and topology instance.
Adding application instances can improve parallel processing only when there are enough input partitions and tasks. Deploying more instances than available tasks leaves some capacity unused.
When one instance fails:
Before failure:
Instance A: T0, T1
Instance B: T2, T3
After Instance B fails:
Instance A: T0, T1, T2, T3
Kafka Streams reassigns the failed tasks. Stateful tasks then restore or reuse their stores before they are fully ready.
Partition count is therefore both a Kafka topic decision and a stream-processing capacity decision.
Enable Exactly-Once Processing for Kafka State and Outputs
A stateful application often performs three related actions:
- Writes an output record.
- Updates a state store and its changelog.
- Commits the consumed input offset.
A failure between these actions can otherwise cause duplicated output or inconsistent state after a retry.
Kafka Streams can wrap these Kafka operations in an atomic transaction. Enable the supported exactly-once mode with:
application.id=customer-activity-service
bootstrap.servers=kafka-a:9092,kafka-b:9092,kafka-c:9092
processing.guarantee=exactly_once_v2
With this mode, output writes, changelog updates, and input-offset commits succeed or fail together within Kafka.
This guarantee applies to Kafka-managed processing. It does not automatically include an external database write in the same transaction. A topology that writes to a remote database still needs an application-level consistency strategy.
Add Time Windows Only When the Business Question Is Time-Bounded
The customer view may also need a rolling activity metric, such as purchases per reporting period.
An unbounded count answers:
How many purchases has this customer made since processing began?
A windowed count answers:
How many purchases did this customer make during a particular time interval?
Kafka Streams supports several window types:
| Window type | Behaviour | Suitable use |
|---|---|---|
| Tumbling | Fixed size, no overlap | Purchases per day |
| Hopping | Fixed size, overlapping intervals | Moving activity reports |
| Session | Closes after an inactivity gap | Customer browsing sessions |
| Sliding | Based on timestamp distance between related events | Stream-to-stream joins |
A conceptual windowed aggregation looks like this:
KTable<Windowed<String>, Long> purchaseCounts =
purchasesByCustomer
.groupByKey()
.windowedBy(
TimeWindows.ofSizeAndGrace(
reportingWindow,
allowedLateness
)
)
.count();
The business requirement should determine the window size and grace period. Do not select them only because a code example uses convenient values.
Prevent Silent Loss from Late Records
Kafka stores records in partition-offset order, but business logic may depend on event time. Event timestamps can be out of order because of:
- Producer clock differences
- Network delays
- Upstream buffering
- Retries
- Events arriving from several topics
Kafka Streams does not automatically know that a timestamp inside the payload is the authoritative business time. Configure a timestamp extractor when event-time processing depends on that field.
public final class PurchaseTimeExtractor implements TimestampExtractor {
@Override
public long extract(
ConsumerRecord<Object, Object> record,
long partitionTime
) {
PurchaseEvent event = (PurchaseEvent) record.value();
return event.occurredAtEpochMillis();
}
}
Late records create another risk. A record can arrive after its event-time window appears complete. Without an appropriate grace period, it may be discarded from the windowed result without producing a normal application exception.
The grace period defines how long the application accepts late records after the window boundary.
A wider grace period:
- Accepts more delayed data.
- Keeps window state longer.
- Uses more disk and memory.
- Delays final results.
A narrow grace period:
- Produces final results sooner.
- Reduces retained window state.
- Increases the risk of excluding late events.
Measure real arrival delays before selecting the grace period.
Choose the Correct Join Type
Different joins answer different questions.
Stream-to-stream join
Use this when both inputs are event streams and related records should match within a time window.
Example:
PurchasePlaced + PaymentConfirmed -> PaidPurchase
The application waits for matching events according to their keys and timestamps. Both streams must be co-partitioned. Records that never find a match within the window follow the selected join semantics.
Stream-to-table join
Use this when each incoming stream event needs the latest value from a table.
Example:
New purchase + current customer profile -> enriched purchase
The stream record triggers the output. A later table update does not recreate old enriched events.
Table-to-table join
Use this when the output represents current state and must change when either side changes.
Example:
Current profile + current purchase summary -> current customer activity view
This is the correct choice for the stale-view problem.
The join model must match the output's meaning. A technically valid join can still produce the wrong business behaviour.
Decide Whether to Query State Stores Directly
The final customer activity table is already materialized in a local state store. Kafka Streams supports interactive queries that let the application read queryable stores directly.
This can reduce the need for another database, but local state is distributed across tasks.
Customer C-9004
|
v
Hash and partition metadata
|
v
Instance B owns the relevant state
A request arriving at Instance A may need to be routed to Instance B. The application must discover which instance owns the key and provide a network endpoint for forwarding requests.
Interactive queries are attractive when:
- The query is a direct key lookup.
- The team accepts instance-aware request routing.
- The state store already contains the required view.
- The application can expose and secure a query endpoint.
Publishing the materialized view to an output topic and loading it into a dedicated query store may be simpler when:
- Queries require secondary indexes or complex filtering.
- Clients should not depend on stream-task placement.
- The serving tier needs an independent scaling model.
- Multiple applications need the same result.
Plan State Restoration Before Production
A stateful application can start slowly after failure because it must restore RocksDB stores from changelog topics.
Common restoration bottlenecks include:
- Heavy disk reads and writes
- Network transfer from Kafka
- Memory pressure from buffers and caches
- Too much state assigned to one task
- Too few partitions to distribute restoration
- Large stores with long histories
A practical recovery plan should:
- Measure state size per task.
- Benchmark restoration from an empty local disk.
- Test several instances restoring concurrently.
- Verify that local disks have enough capacity.
- Tune RocksDB only after measuring the bottleneck.
- Use standby tasks when faster failover justifies additional resources.
- Monitor restoration progress and readiness.
- Confirm that a recovering instance does not receive production traffic too early.
Standby tasks maintain preloaded copies of state on inactive task assignments. They can reduce failover time because less state must be rebuilt after reassignment. They also increase storage and processing overhead.
Monitor the Topology, Not Only the Process
A running Java process does not prove that the streaming result is correct or current.
Monitor at least:
- Input consumer lag
- Output record rate
- Processing throughput
- Processing latency
- Repartition-topic traffic
- Changelog-topic traffic
- State-store size
- Local disk usage
- RocksDB resource use
- State-restoration duration
- Task reassignments
- Deserialization failures
- Records excluded by time-window policies
- Late-record behaviour
- Join match and miss rates
A multi-step topology is difficult to debug when logs show only the final failure. Add trace identifiers to events and preserve them through processing where possible.
Distributed tracing systems such as OpenTelemetry, Jaeger, or Zipkin can help show how one event moves through several processors. Logging should identify the topology stage, key, topic, partition, and offset needed for diagnosis.
Know When Kafka Streams Is Not the Best Choice
Kafka Streams is a strong fit when:
- Kafka is the primary source and sink.
- The application is implemented in Java.
- The team wants a library embedded in a normal service.
- Processing consists of filters, maps, aggregations, windows, and joins.
- Local state plus Kafka changelogs provide an acceptable recovery model.
Consider ksqlDB when the team prefers declarative stream transformations and the processing logic fits its supported operations. It can run persistent streaming queries and expose current materialized results without requiring a custom Java service for every workflow.
Consider Apache Flink when:
- Sources and sinks extend well beyond Kafka.
- The platform needs both bounded batch and unbounded stream processing.
- The workflow requires a larger distributed processing runtime.
- State, checkpointing, and execution planning need more independent control.
- The organization accepts a steeper learning and operational curve.
Managed cloud services may reduce infrastructure work, but they introduce provider-specific capabilities, limits, and costs. The choice should follow the workload and operating model rather than a desire to standardize every pipeline on one tool.
Test the Failure That Created the Stale View
The most important test proves that a profile-only update changes the result.
Test 1: Profile update retriggers output
- Publish a profile for customer
C-9004. - Publish one purchase for the same customer.
- Verify the combined view.
- Publish a new address without publishing another purchase.
- Verify that the customer activity output changes.
A stream-to-table design fails step 5. The table-to-table design should pass it.
Test 2: Purchases are rekeyed correctly
- Publish purchases with different purchase IDs but the same customer ID.
- Verify that the records are grouped into one customer summary.
- Inspect partition distribution for unexpected hot keys.
- Confirm that joined topics use compatible partition counts.
Test 3: Either table can change
- Update only the purchase summary.
- Verify that the output changes.
- Update only the profile.
- Verify that the output changes again.
Test 4: Instance failure restores state
- Build non-empty state stores.
- Stop the instance that owns a selected customer key.
- Allow task reassignment.
- Verify state restoration from changelog data.
- Publish another event and confirm correct processing.
Test 5: Transactional failure does not expose partial output
- Begin processing an input record.
- Interrupt the application before the Kafka transaction commits.
- Restart the application.
- Confirm that state, output, and input progress remain consistent.
Test 6: Late event behaviour matches policy
- Publish events with controlled timestamps.
- Deliver one event after the normal window closes but within the grace period.
- Verify that it updates the result.
- Deliver another event after the grace period.
- Confirm that monitoring reveals the exclusion according to policy.
Common Mistakes
Using a stream-to-table join for a mutable current view
The join runs only when the stream side receives a record. Table changes do not recreate past stream results.
Joining records with different logical keys
A purchase ID cannot join directly with a customer ID. Rekey before grouping or joining.
Ignoring partition-count alignment
Matching keys still fail to meet in the same task when joined topics are not co-partitioned correctly.
Treating every operation as stateless
Aggregations, windows, and joins require stored context. They need disk, memory, changelogs, and recovery planning.
Assuming local state is lost on failure
Kafka Streams backs persistent state with changelog topics, but restoration time can be significant.
Assuming exactly-once includes an external database
The Kafka transaction covers Kafka outputs, state changelogs, and consumed offsets. External systems remain outside that atomic boundary.
Using processing time when the business requires event time
Offset order and event-time order are not always the same. Configure timestamp extraction deliberately.
Setting no meaningful grace period
Late records can be omitted from windowed results without a conventional exception.
Adding instances without enough partitions
Parallelism is bounded by input partitions and generated tasks.
Exposing interactive queries without routing
The requested key may live on another application instance.
Monitoring only application uptime
A healthy process can still have stale state, growing lag, failed joins, or an overloaded state-restoration path.
Production Checklist
Before releasing a stateful Kafka Streams application, confirm:
- The output is defined as an event stream or current state.
- Every input is modelled deliberately as
KStream,KTable, orGlobalKTable. - Join triggers match the business requirement.
- Keys represent the entity used for grouping and joining.
- Rekeying and repartition costs are understood.
- Joined topics use compatible partitioning.
- State stores have explicit names where operational visibility matters.
- Changelog and repartition topics are included in capacity planning.
- Local disk can hold active and restoring state.
- Exactly-once processing is enabled when required.
- External side effects have their own consistency strategy.
- Event-time extraction is configured when payload time matters.
- Window type, size, and grace period follow measured business behaviour.
- Late records are observable.
- State restoration has been benchmarked.
- Standby state is considered for faster failover.
- Interactive-query routing is implemented or avoided deliberately.
- Lag, state size, disk, restoration, task movement, and topology stages are monitored.
- Tests cover updates from both sides of every stateful join.
Conclusion
The stale customer view was caused by a mismatch between the meaning of the output and the semantics of the join. A stream-to-table join enriches new events with current reference data. It does not maintain a current combined view when the table changes later.
The corrected design rekeys purchase events by customer, aggregates them into a KTable, represents profiles as another KTable, and joins the two tables. That topology reacts when either side changes.
Correct joins are only one part of a production streaming application. Keys, partitions, state stores, changelog topics, exactly-once processing, event time, late data, restoration, and monitoring all influence whether the result remains accurate during normal traffic and failure.
When the business meaning of streams and tables is defined first, Kafka Streams becomes more than a convenient API. It becomes a reliable way to maintain continuously updated state without waiting for another unrelated event to hide a stale result.