When a service creates something important, other parts of the business usually need to react. A route assignment may need to notify a driver, update a schedule, refresh a dashboard, and feed analytics. A simple implementation might call every downstream service directly, but that design becomes fragile as the system grows.
Event-driven design changes the flow. Instead of calling every interested service, the producer publishes a fact: something happened. Consumers subscribe to the topics they care about and process events independently. The producer does not need to know how many consumers exist, where they run, or how long they take.
This article walks through a practical event-driven design in Go. The example uses a transport domain, Apache Pulsar as the broker, Avro as the message contract format, and Go generics to avoid repeating producer and consumer code for every event type.
The Problem: Direct Calls Create a Chain of Failure
Imagine a service responsible for creating route assignments. A route assignment links a vehicle to a route for a specific time window. When a new assignment is stored, several other responsibilities appear:
- A driver may need an alert.
- A timetable may need an update.
- Capacity analytics may need a fresh calculation.
- A monitoring dashboard may need a live refresh.
- A reporting service may need the event for later analysis.
With direct synchronous calls, the assignment service must know every downstream API. The flow starts to look like this:
Client
|
v
Assignment API
|
+--> Driver Alert API
|
+--> Timetable API
|
+--> Capacity API
|
+--> Dashboard API
|
+--> Analytics API
This creates several problems:
- The assignment service becomes coupled to other teams' APIs.
- One slow dependency increases the total request time.
- One unavailable dependency can make the whole operation fail.
- Scaling must happen across the whole chain, not only where the load exists.
- Adding a new consumer requires changing the producer.
Event-driven design replaces that chain with a brokered flow:
Client
|
v
Assignment API
|
v
Topic: route-assignment-events
|
+--> Driver Alert Consumer
|
+--> Timetable Consumer
|
+--> Capacity Consumer
|
+--> Dashboard Consumer
|
+--> Analytics Consumer
The assignment service publishes one event. Each consumer handles its own work. If analytics is temporarily offline, the driver alert flow can still continue. If dashboard traffic grows, the dashboard consumer can scale without changing the assignment service.
Core Messaging Concepts
Before writing Go code, it helps to define the moving parts.
A producer publishes messages. In the example, the assignment service is a producer because it publishes an event after creating a route assignment.
A consumer receives messages from a topic and performs work. The dashboard service, timetable service, and notification service are consumers.
A broker receives messages, stores them, and delivers them to consumers. In systems such as Pulsar and Kafka, the broker also manages topics and partitions.
A topic is a named stream of related messages. A topic should represent a clear category of events, such as route assignment events or driver alert events.
A partition is a subdivision of a topic. Partitions allow parallel reads and writes while keeping order inside each partition. If ordering matters for a specific route, vehicle, or assignment, choose a stable key so related messages go to the same ordered path.
A subscription tells the broker how a consumer group reads from a topic. A shared subscription allows multiple consumer instances to process messages in parallel. A failover subscription keeps standby consumers ready when the active one fails.
A Dead Letter Queue, usually called a DLQ, stores messages that repeatedly fail processing. A DLQ prevents poison messages from blocking a topic forever.
Design Events as Published Contracts
An event is not a remote procedure call. It should not say, "please notify this driver now." It should describe a fact that already happened, such as a route assignment being recorded or an alert being requested.
A good event contract is small, stable, and useful to consumers. Avoid publishing your internal database shape. Every field you expose becomes something another team may depend on.
For a route assignment event, the shared contract might contain these fields:
| Field | Purpose |
|---|---|
| allocation_id | Stable identifier for this assignment event |
| vehicle_id | Vehicle involved in the assignment |
| driver_id | Driver who should receive assignment-related alerts |
| route_id | Route involved in the assignment |
| occurred_at | Time when the event happened |
| event_type | Name used for routing, logs, and diagnostics |
A notification topic should not receive the full route assignment event unless the notification service truly needs every field. A cleaner design is to publish a separate driver alert event with only notification-related information. This keeps the notification topic focused and prevents the notification service from depending on assignment internals.
Avro as the Shared Language
When multiple services exchange messages, raw byte arrays are not enough. Consumers need to know which fields exist, what type each field has, and how compatibility works when the message evolves.
Avro provides a compact schema-based format for structured messages. In a Pulsar-based design, the broker enforces schema compatibility. When a producer or consumer registers an incompatible schema, creation of that producer or subscription should fail early instead of letting bad data move through production.
A safe schema evolution strategy is to add new fields with defaults. That allows newer consumers to read older messages because the missing field can be filled with a known value. Removing or renaming fields is much riskier because existing consumers may still expect them.
You can generate Go types from Avro schema files so your application works with typed structs instead of unstructured data. A typical workflow looks like this:
go install github.com/hamba/avro/v2/cmd/avrogen@latest
~/go/bin/avrogen -pkg streamports -o internal/streamports/generated_events.go schemas/*.avsc
The generated structs become part of your application boundary. Regenerate them when schemas change, and run compatibility checks before deployment.
Keep Broker Details Outside the Domain
Event-driven code can become messy when business logic imports broker packages directly. A cleaner approach is to use ports and adapters.
In this structure:
- Domain and application code define what they need through interfaces.
- Pulsar code lives in adapters.
- Business handlers receive typed messages and do not know how the broker works.
- Tests can replace the broker adapter with simple fakes.
A practical layout can look like this:
transport-service/
cmd/
assignment-worker/
main.go
internal/
application/
route_handler.go
streamports/
messaging.go
generated_events.go
adapters/
pulsarstream/
producer.go
consumer.go
The important rule is simple: the application layer should not depend on Pulsar, Kafka, NATS, or any other broker library. It should depend on small interfaces that express publishing and processing behavior.
Generic Messaging Ports in Go
Most event types need the same infrastructure: encode, publish, receive, decode, acknowledge, retry, and route failed messages. Go generics let you define that pattern once and reuse it for many event types.
Here is an adapted set of generic ports:
package streamports
import "context"
type Encoder[T any] func(T) ([]byte, error)
type Decoder[T any] func([]byte) (T, error)
type Envelope[T any] struct {
ID string
Topic string
Attempts int
Attributes map[string]string
Value T
Ack func(context.Context) error
Nack func(context.Context) error
}
type Publisher[T any] interface {
Publish(ctx context.Context, event T) (messageID string, err error)
Close() error
}
type Handler[T any] interface {
Handle(ctx context.Context, event Envelope[T]) error
}
type Subscriber[T any] interface {
Run(ctx context.Context, handler Handler[T]) error
}
The type parameter T is the event type. For route assignments, T might be RouteAllocationRecorded. For driver alerts, it might be DriverAlertRequested.
This design gives you three useful properties:
- Type safety: a route assignment consumer cannot accidentally receive a driver alert struct.
- Reuse: the broker plumbing is written once.
- Testability: handlers can be tested without starting a broker.
Example Event Types
The real structs can be generated from Avro, but the application should treat them like normal Go types.
package streamports
type RouteAllocationRecorded struct {
AllocationID string `avro:"allocationId"`
VehicleID string `avro:"vehicleId"`
DriverID string `avro:"driverId"`
RouteID string `avro:"routeId"`
OccurredAt string `avro:"occurredAt"`
EventType string `avro:"eventType"`
}
type DriverAlertRequested struct {
RecipientID string `avro:"recipientId"`
Channel string `avro:"channel"`
Body string `avro:"body"`
OccurredAt string `avro:"occurredAt"`
EventType string `avro:"eventType"`
}
Notice that the alert event is not just a copy of the assignment event. It has a different purpose and a different contract. This keeps topics cleaner and reduces accidental coupling.
Building a Reusable Pulsar Producer
A producer adapter should do broker work only: encode an event, send it to Pulsar, and return the broker message ID. It should not contain business decisions.
package pulsarstream
import (
"context"
"fmt"
"github.com/apache/pulsar-client-go/pulsar"
)
type ProducerConfig[T any] struct {
Client pulsar.Client
Topic string
Schema pulsar.Schema
Encode func(T) ([]byte, error)
}
type Producer[T any] struct {
topic string
producer pulsar.Producer
encode func(T) ([]byte, error)
}
func NewProducer[T any](cfg ProducerConfig[T]) (*Producer[T], error) {
if cfg.Client == nil {
return nil, fmt.Errorf("pulsar client is required")
}
if cfg.Topic == "" {
return nil, fmt.Errorf("topic is required")
}
if cfg.Encode == nil {
return nil, fmt.Errorf("encoder is required")
}
opts := pulsar.ProducerOptions{Topic: cfg.Topic}
if cfg.Schema != nil {
opts.Schema = cfg.Schema
}
nativeProducer, err := cfg.Client.CreateProducer(opts)
if err != nil {
return nil, fmt.Errorf("create producer for %s: %w", cfg.Topic, err)
}
return &Producer[T]{
topic: cfg.Topic,
producer: nativeProducer,
encode: cfg.Encode,
}, nil
}
func (p *Producer[T]) Publish(ctx context.Context, event T) (string, error) {
body, err := p.encode(event)
if err != nil {
return "", fmt.Errorf("encode event for %s: %w", p.topic, err)
}
id, err := p.producer.Send(ctx, &pulsar.ProducerMessage{
Payload: body,
Properties: map[string]string{
"topic": p.topic,
},
})
if err != nil {
return "", fmt.Errorf("send event to %s: %w", p.topic, err)
}
return fmt.Sprint(id), nil
}
func (p *Producer[T]) Close() error {
p.producer.Close()
return nil
}
A service can create one concrete publisher per event type:
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: brokerURL,
})
if err != nil {
return fmt.Errorf("connect to pulsar: %w", err)
}
eventSchema := pulsar.NewAvroSchema(string(routeAllocationSchema), nil)
publisher, err := NewProducer(ProducerConfig[streamports.RouteAllocationRecorded]{
Client: client,
Topic: "persistent://public/default/route-allocation-events",
Schema: eventSchema,
Encode: encodeRouteAllocation,
})
if err != nil {
return err
}
defer publisher.Close()
In production, brokerURL and topic names should come from configuration, not from hard-coded values. If a service cannot connect to its broker at startup, failing fast is usually safer than running in a half-working state.
Building a Reusable Consumer
A consumer has the opposite job. It receives a raw message, decodes it, wraps it in an envelope, delegates to a handler, and acknowledges the result.
package pulsarstream
import (
"context"
"fmt"
"github.com/apache/pulsar-client-go/pulsar"
"example.com/transport/internal/streamports"
)
type ConsumerConfig[T any] struct {
Client pulsar.Client
Topic string
Subscription string
Schema pulsar.Schema
Decode func([]byte) (T, error)
}
type Consumer[T any] struct {
topic string
consumer pulsar.Consumer
decode func([]byte) (T, error)
}
func NewConsumer[T any](cfg ConsumerConfig[T]) (*Consumer[T], error) {
if cfg.Client == nil {
return nil, fmt.Errorf("pulsar client is required")
}
if cfg.Topic == "" || cfg.Subscription == "" {
return nil, fmt.Errorf("topic and subscription are required")
}
if cfg.Decode == nil {
return nil, fmt.Errorf("decoder is required")
}
opts := pulsar.ConsumerOptions{
Topic: cfg.Topic,
SubscriptionName: cfg.Subscription,
Type: pulsar.Shared,
}
if cfg.Schema != nil {
opts.Schema = cfg.Schema
}
nativeConsumer, err := cfg.Client.Subscribe(opts)
if err != nil {
return nil, fmt.Errorf("subscribe to %s: %w", cfg.Topic, err)
}
return &Consumer[T]{
topic: cfg.Topic,
consumer: nativeConsumer,
decode: cfg.Decode,
}, nil
}
func (c *Consumer[T]) Run(ctx context.Context, handler streamports.Handler[T]) error {
for {
msg, err := c.consumer.Receive(ctx)
if err != nil {
if ctx.Err() != nil {
return ctx.Err()
}
return fmt.Errorf("receive from %s: %w", c.topic, err)
}
value, err := c.decode(msg.Payload())
if err != nil {
c.consumer.Nack(msg)
continue
}
env := streamports.Envelope[T]{
ID: fmt.Sprint(msg.ID()),
Topic: c.topic,
Attempts: 0,
Attributes: msg.Properties(),
Value: value,
Ack: func(context.Context) error {
return c.consumer.Ack(msg)
},
Nack: func(context.Context) error {
c.consumer.Nack(msg)
return nil
},
}
if err := handler.Handle(ctx, env); err != nil {
_ = env.Nack(ctx)
continue
}
if err := env.Ack(ctx); err != nil {
return fmt.Errorf("ack message %s: %w", env.ID, err)
}
}
}
This code uses a shared subscription so multiple worker instances can process messages from the same subscription. That is useful when traffic grows and you want horizontal scaling.
The handler stays small and business-focused:
package application
import (
"context"
"fmt"
"example.com/transport/internal/streamports"
)
type RouteAllocationHandler struct {
Alerts streamports.Publisher[streamports.DriverAlertRequested]
}
func (h RouteAllocationHandler) Handle(
ctx context.Context,
event streamports.Envelope[streamports.RouteAllocationRecorded],
) error {
alert := streamports.DriverAlertRequested{
RecipientID: event.Value.DriverID,
Channel: "SMS",
Body: fmt.Sprintf("Vehicle %s was assigned to route %s", event.Value.VehicleID, event.Value.RouteID),
OccurredAt: event.Value.OccurredAt,
EventType: "DriverAlertRequested",
}
_, err := h.Alerts.Publish(ctx, alert)
return err
}
The handler does not import Pulsar. It only depends on a publisher interface. That makes it easy to test and easy to move to another broker adapter later.
Handling Failed Messages with a DLQ
Failures in event-driven systems are normal. A downstream database may be unavailable. A message may be malformed. A handler may reject a message because the business state is invalid. Retrying forever is dangerous because one poison message can block useful work.
A DLQ gives you a safe place to move messages that cannot be processed after repeated attempts. Pulsar can be configured with a DLQ policy, and you can also implement manual DLQ routing when you need custom behavior.
The wrapper below retries normal failures by returning an error. Once the message reaches the limit, it publishes the original event to a DLQ topic and returns nil, allowing the common consumer policy to acknowledge the original message so it does not loop forever.
package application
import (
"context"
"example.com/transport/internal/streamports"
)
type DLQHandler[T any] struct {
next streamports.Handler[T]
failedTopic streamports.Publisher[T]
maxAttempts int
}
func (h DLQHandler[T]) Handle(ctx context.Context, event streamports.Envelope[T]) error {
err := h.next.Handle(ctx, event)
if err == nil {
return nil
}
if event.Attempts < h.maxAttempts {
return err
}
if _, publishErr := h.failedTopic.Publish(ctx, event.Value); publishErr != nil {
return publishErr
}
return nil
}
The exact redelivery count depends on what metadata your broker adapter exposes. The important policy is consistent: retry temporary failures, isolate repeated failures, and keep enough information to diagnose them later.
Tune by Design Goal
Messaging configuration is full of tradeoffs. Do not tune randomly. Decide which goal matters most for a specific topic or consumer, then adjust the client behavior to match that goal.
| Goal | Useful direction | Cost |
|---|---|---|
| Low latency | Send quickly, reduce batching, use smaller queues, shorten redelivery delay, fail fast when buffers are full | Lower throughput and more network calls |
| High throughput | Keep batching enabled, allow larger batches, increase pending messages, prefetch more messages, use shared or key-based parallel consumption | Higher memory use and higher tail latency |
| Resilience | Use retries, DLQ policy, failover or shared subscriptions, reconnect backoff, longer timeouts where needed | More waiting during failure and more buffered work |
| Resource efficiency | Reduce producer pending messages, reduce receiver queue size, keep fewer broker connections, limit client memory | Lower throughput and possibly slower recovery |
For Pulsar clients, examples of relevant settings include producer batching, producer send timeout, pending message limits, consumer receiver queue size, negative-ack redelivery delay, ack grouping, connection timeout, operation timeout, and maximum broker connections.
A few practical rules help:
- For user-visible alerts, prefer lower latency. Avoid large buffers that make messages wait.
- For analytics pipelines, prefer throughput. Batching and prefetching usually matter more than single-message latency.
- For critical business events, prefer resilience. Do not drop messages just because the producer queue is temporarily full unless the application has a clear fallback.
- For small workers with limited memory, reduce queues and buffers before adding more features.
Measure after every tuning change. A setting that improves one topic may harm another.
Testing Event-Driven Go Code
Testing should happen at more than one level.
Unit tests are enough for pure handlers. A handler that receives a typed event and calls a publisher does not need a live broker. Use fake publishers and verify behavior.
package application_test
import (
"context"
"testing"
"example.com/transport/internal/application"
"example.com/transport/internal/streamports"
)
type savedPublisher[T any] struct {
events []T
}
func (p *savedPublisher[T]) Publish(ctx context.Context, event T) (string, error) {
p.events = append(p.events, event)
return "test-message-id", nil
}
func (p *savedPublisher[T]) Close() error { return nil }
func TestRouteAllocationHandlerPublishesDriverAlert(t *testing.T) {
alerts := &savedPublisher[streamports.DriverAlertRequested]{}
handler := application.RouteAllocationHandler{Alerts: alerts}
input := streamports.Envelope[streamports.RouteAllocationRecorded]{
ID: "msg-1",
Topic: "route-allocation-events",
Value: streamports.RouteAllocationRecorded{
AllocationID: "alloc-77",
VehicleID: "bus-12",
DriverID: "driver-44",
RouteID: "route-9",
OccurredAt: "2026-01-15T10:00:00Z",
EventType: "RouteAllocationRecorded",
},
}
if err := handler.Handle(context.Background(), input); err != nil {
t.Fatalf("handle event: %v", err)
}
if len(alerts.events) != 1 {
t.Fatalf("expected one alert, got %d", len(alerts.events))
}
if alerts.events[0].RecipientID != "driver-44" {
t.Fatalf("unexpected recipient: %s", alerts.events[0].RecipientID)
}
}
Integration tests are different. Some behavior requires a real broker:
- Schema registration and compatibility rejection
- Real producer and consumer interaction
- Redelivery after negative acknowledgment
- DLQ routing
- Subscription behavior with multiple consumers
For those cases, run a real broker in a controlled test environment, for example with container-based integration tests. Performance is not the purpose of those tests. The goal is to prove that the service works with real infrastructure before deployment.
Observability: Know What Happens After Deployment
Events make systems more flexible, but they also make failures less obvious. A request may finish successfully while one consumer fails later. Observability closes that gap.
Track broker-level metrics and application-level metrics together.
Important broker metrics include:
- Topic throughput
- Subscription backlog
- Consumer lag
- Redelivery count
- DLQ message count
Important application metrics include:
- Processing duration per message
- Successful acknowledgments
- Negative acknowledgments
- Retry count
- DLQ routing count
- Handler error count by reason
Broker metrics explain what happens inside the messaging platform. Application metrics explain what your code did with the messages. You need both.
Logs should be structured and traceable. At minimum, log these fields around publish and consume operations:
message_id=...
topic=...
subscription=...
event_type=...
status=...
attempt=...
Avoid logging full payloads in production. Payload logs can expose sensitive data, increase storage cost, and make incidents harder to search. Log stable identifiers instead, then use controlled tools to inspect payloads only when necessary.
A strong operational pattern is to deploy producers and consumers as separate pods or processes when their scaling profiles differ. A dashboard consumer, an analytics consumer, and a driver alert consumer usually have different CPU, memory, and latency needs. Independent deployment makes scaling and ownership clearer.
Common Mistakes to Avoid
Publishing Commands Instead of Facts
An event should describe what happened. If the message is really an instruction to another service, you may be hiding a synchronous dependency behind the broker. Use commands only when that is the intentional messaging pattern.
Exposing Too Many Fields
Every field in a shared schema is a promise. If consumers do not need a field, leave it out. A small contract is easier to evolve than a large one.
Mixing Different Languages in One Topic
A notification topic should carry notification-shaped events. A route assignment topic should carry route assignment events. Mixing unrelated event shapes makes consumers harder to reason about and complicates compatibility.
Acknowledging Before the Work Is Safe
If a consumer acknowledges a message before completing durable work, a crash can lose the message. Acknowledge only after the work is complete or safely recorded.
Retrying Forever Without a DLQ
Retries are useful for temporary failures. They are harmful for permanent failures. Use a DLQ so poison messages do not block the whole stream.
Relying Only on Broker Metrics
Broker metrics can show lag and throughput, but they cannot explain your business failure. Instrument handlers with domain-aware metrics.
Tuning for Throughput on a Low-Latency Topic
Large batches and large receiver queues improve throughput, but they add waiting time. Use different settings for alerting paths and analytics paths.
Production Checklist
Use this checklist before shipping an event-driven Go service:
- Event names describe facts that already happened.
- Each topic has a clear purpose and a stable message contract.
- Schemas expose only fields that consumers truly need.
- Schema evolution rules are documented before deployment.
- Producers and consumers are created during startup and fail fast on invalid schema or broker connection errors.
- Domain code depends on ports, not broker packages.
- Producer and consumer adapters are reusable across event types.
- Consumers acknowledge only after successful processing.
- Failed messages have retry rules and a DLQ path.
- Unit tests cover handlers without a broker.
- Integration tests cover broker-specific behavior with a real broker.
- Metrics include lag, throughput, processing time, ack count, nack count, retry count, and DLQ count.
- Logs include message ID, topic, subscription, event type, and status.
- Payloads are not logged in production unless there is a controlled, temporary reason.
- Scaling decisions are made per consumer, not globally for the whole system.
Conclusion
Event-driven Go systems work best when events are treated as stable facts, not hidden procedure calls. The producer publishes a clear contract, the broker stores and routes messages, and each consumer reacts at its own pace.
Go fits this style well because small interfaces and generics let you keep broker integration reusable without polluting business code. Pulsar provides durable topics, subscriptions, schema enforcement, acknowledgments, and DLQ support. Avro gives teams a shared language that can evolve carefully.
The result is not only a faster system. It is a system that can change. New consumers can appear without rewriting the producer. Slow services can scale independently. Failed messages can be inspected instead of lost. Operators can see lag, retries, and processing time before users notice a problem.
Build the stream around clear contracts, measured tradeoffs, and observable behavior. That is what turns messaging from a technical add-on into a reliable backbone for enterprise services.