GoEvent-Driven Systems
June 13, 2026

Building Event-Driven Go Services with Pulsar, Avro, and Generic Consumers

When a service creates something important, other parts of the business usually need to react. A route assignment may need to notify a driver, update a schedule, refresh a dashboard, and feed analytics. A simple implementation might call every downstream service directly, but that design becomes fragile as the system grows.

Event-driven design changes the flow. Instead of calling every interested service, the producer publishes a fact: something happened. Consumers subscribe to the topics they care about and process events independently. The producer does not need to know how many consumers exist, where they run, or how long they take.

This article walks through a practical event-driven design in Go. The example uses a transport domain, Apache Pulsar as the broker, Avro as the message contract format, and Go generics to avoid repeating producer and consumer code for every event type.

The Problem: Direct Calls Create a Chain of Failure

Imagine a service responsible for creating route assignments. A route assignment links a vehicle to a route for a specific time window. When a new assignment is stored, several other responsibilities appear:

  • A driver may need an alert.
  • A timetable may need an update.
  • Capacity analytics may need a fresh calculation.
  • A monitoring dashboard may need a live refresh.
  • A reporting service may need the event for later analysis.

With direct synchronous calls, the assignment service must know every downstream API. The flow starts to look like this:

Client
  |
  v
Assignment API
  |
  +--> Driver Alert API
  |
  +--> Timetable API
  |
  +--> Capacity API
  |
  +--> Dashboard API
  |
  +--> Analytics API

This creates several problems:

  • The assignment service becomes coupled to other teams' APIs.
  • One slow dependency increases the total request time.
  • One unavailable dependency can make the whole operation fail.
  • Scaling must happen across the whole chain, not only where the load exists.
  • Adding a new consumer requires changing the producer.

Event-driven design replaces that chain with a brokered flow:

Client
  |
  v
Assignment API
  |
  v
Topic: route-assignment-events
  |
  +--> Driver Alert Consumer
  |
  +--> Timetable Consumer
  |
  +--> Capacity Consumer
  |
  +--> Dashboard Consumer
  |
  +--> Analytics Consumer

The assignment service publishes one event. Each consumer handles its own work. If analytics is temporarily offline, the driver alert flow can still continue. If dashboard traffic grows, the dashboard consumer can scale without changing the assignment service.

Core Messaging Concepts

Before writing Go code, it helps to define the moving parts.

A producer publishes messages. In the example, the assignment service is a producer because it publishes an event after creating a route assignment.

A consumer receives messages from a topic and performs work. The dashboard service, timetable service, and notification service are consumers.

A broker receives messages, stores them, and delivers them to consumers. In systems such as Pulsar and Kafka, the broker also manages topics and partitions.

A topic is a named stream of related messages. A topic should represent a clear category of events, such as route assignment events or driver alert events.

A partition is a subdivision of a topic. Partitions allow parallel reads and writes while keeping order inside each partition. If ordering matters for a specific route, vehicle, or assignment, choose a stable key so related messages go to the same ordered path.

A subscription tells the broker how a consumer group reads from a topic. A shared subscription allows multiple consumer instances to process messages in parallel. A failover subscription keeps standby consumers ready when the active one fails.

A Dead Letter Queue, usually called a DLQ, stores messages that repeatedly fail processing. A DLQ prevents poison messages from blocking a topic forever.

Design Events as Published Contracts

An event is not a remote procedure call. It should not say, "please notify this driver now." It should describe a fact that already happened, such as a route assignment being recorded or an alert being requested.

A good event contract is small, stable, and useful to consumers. Avoid publishing your internal database shape. Every field you expose becomes something another team may depend on.

For a route assignment event, the shared contract might contain these fields:

Field Purpose
allocation_id Stable identifier for this assignment event
vehicle_id Vehicle involved in the assignment
driver_id Driver who should receive assignment-related alerts
route_id Route involved in the assignment
occurred_at Time when the event happened
event_type Name used for routing, logs, and diagnostics

A notification topic should not receive the full route assignment event unless the notification service truly needs every field. A cleaner design is to publish a separate driver alert event with only notification-related information. This keeps the notification topic focused and prevents the notification service from depending on assignment internals.

Avro as the Shared Language

When multiple services exchange messages, raw byte arrays are not enough. Consumers need to know which fields exist, what type each field has, and how compatibility works when the message evolves.

Avro provides a compact schema-based format for structured messages. In a Pulsar-based design, the broker enforces schema compatibility. When a producer or consumer registers an incompatible schema, creation of that producer or subscription should fail early instead of letting bad data move through production.

A safe schema evolution strategy is to add new fields with defaults. That allows newer consumers to read older messages because the missing field can be filled with a known value. Removing or renaming fields is much riskier because existing consumers may still expect them.

You can generate Go types from Avro schema files so your application works with typed structs instead of unstructured data. A typical workflow looks like this:

go install github.com/hamba/avro/v2/cmd/avrogen@latest
~/go/bin/avrogen -pkg streamports -o internal/streamports/generated_events.go schemas/*.avsc

The generated structs become part of your application boundary. Regenerate them when schemas change, and run compatibility checks before deployment.

Keep Broker Details Outside the Domain

Event-driven code can become messy when business logic imports broker packages directly. A cleaner approach is to use ports and adapters.

In this structure:

  • Domain and application code define what they need through interfaces.
  • Pulsar code lives in adapters.
  • Business handlers receive typed messages and do not know how the broker works.
  • Tests can replace the broker adapter with simple fakes.

A practical layout can look like this:

transport-service/
  cmd/
    assignment-worker/
      main.go
  internal/
    application/
      route_handler.go
    streamports/
      messaging.go
      generated_events.go
    adapters/
      pulsarstream/
        producer.go
        consumer.go

The important rule is simple: the application layer should not depend on Pulsar, Kafka, NATS, or any other broker library. It should depend on small interfaces that express publishing and processing behavior.

Generic Messaging Ports in Go

Most event types need the same infrastructure: encode, publish, receive, decode, acknowledge, retry, and route failed messages. Go generics let you define that pattern once and reuse it for many event types.

Here is an adapted set of generic ports:

package streamports

import "context"

type Encoder[T any] func(T) ([]byte, error)
type Decoder[T any] func([]byte) (T, error)

type Envelope[T any] struct {
    ID         string
    Topic      string
    Attempts   int
    Attributes map[string]string
    Value      T

    Ack  func(context.Context) error
    Nack func(context.Context) error
}

type Publisher[T any] interface {
    Publish(ctx context.Context, event T) (messageID string, err error)
    Close() error
}

type Handler[T any] interface {
    Handle(ctx context.Context, event Envelope[T]) error
}

type Subscriber[T any] interface {
    Run(ctx context.Context, handler Handler[T]) error
}

The type parameter T is the event type. For route assignments, T might be RouteAllocationRecorded. For driver alerts, it might be DriverAlertRequested.

This design gives you three useful properties:

  • Type safety: a route assignment consumer cannot accidentally receive a driver alert struct.
  • Reuse: the broker plumbing is written once.
  • Testability: handlers can be tested without starting a broker.

Example Event Types

The real structs can be generated from Avro, but the application should treat them like normal Go types.

package streamports

type RouteAllocationRecorded struct {
    AllocationID string `avro:"allocationId"`
    VehicleID    string `avro:"vehicleId"`
    DriverID     string `avro:"driverId"`
    RouteID      string `avro:"routeId"`
    OccurredAt   string `avro:"occurredAt"`
    EventType    string `avro:"eventType"`
}

type DriverAlertRequested struct {
    RecipientID string `avro:"recipientId"`
    Channel     string `avro:"channel"`
    Body        string `avro:"body"`
    OccurredAt  string `avro:"occurredAt"`
    EventType   string `avro:"eventType"`
}

Notice that the alert event is not just a copy of the assignment event. It has a different purpose and a different contract. This keeps topics cleaner and reduces accidental coupling.

Building a Reusable Pulsar Producer

A producer adapter should do broker work only: encode an event, send it to Pulsar, and return the broker message ID. It should not contain business decisions.

package pulsarstream

import (
    "context"
    "fmt"

    "github.com/apache/pulsar-client-go/pulsar"
)

type ProducerConfig[T any] struct {
    Client  pulsar.Client
    Topic   string
    Schema  pulsar.Schema
    Encode  func(T) ([]byte, error)
}

type Producer[T any] struct {
    topic    string
    producer pulsar.Producer
    encode   func(T) ([]byte, error)
}

func NewProducer[T any](cfg ProducerConfig[T]) (*Producer[T], error) {
    if cfg.Client == nil {
        return nil, fmt.Errorf("pulsar client is required")
    }
    if cfg.Topic == "" {
        return nil, fmt.Errorf("topic is required")
    }
    if cfg.Encode == nil {
        return nil, fmt.Errorf("encoder is required")
    }

    opts := pulsar.ProducerOptions{Topic: cfg.Topic}
    if cfg.Schema != nil {
        opts.Schema = cfg.Schema
    }

    nativeProducer, err := cfg.Client.CreateProducer(opts)
    if err != nil {
        return nil, fmt.Errorf("create producer for %s: %w", cfg.Topic, err)
    }

    return &Producer[T]{
        topic:    cfg.Topic,
        producer: nativeProducer,
        encode:   cfg.Encode,
    }, nil
}

func (p *Producer[T]) Publish(ctx context.Context, event T) (string, error) {
    body, err := p.encode(event)
    if err != nil {
        return "", fmt.Errorf("encode event for %s: %w", p.topic, err)
    }

    id, err := p.producer.Send(ctx, &pulsar.ProducerMessage{
        Payload: body,
        Properties: map[string]string{
            "topic": p.topic,
        },
    })
    if err != nil {
        return "", fmt.Errorf("send event to %s: %w", p.topic, err)
    }

    return fmt.Sprint(id), nil
}

func (p *Producer[T]) Close() error {
    p.producer.Close()
    return nil
}

A service can create one concrete publisher per event type:

client, err := pulsar.NewClient(pulsar.ClientOptions{
    URL: brokerURL,
})
if err != nil {
    return fmt.Errorf("connect to pulsar: %w", err)
}

eventSchema := pulsar.NewAvroSchema(string(routeAllocationSchema), nil)

publisher, err := NewProducer(ProducerConfig[streamports.RouteAllocationRecorded]{
    Client: client,
    Topic:  "persistent://public/default/route-allocation-events",
    Schema: eventSchema,
    Encode: encodeRouteAllocation,
})
if err != nil {
    return err
}
defer publisher.Close()

In production, brokerURL and topic names should come from configuration, not from hard-coded values. If a service cannot connect to its broker at startup, failing fast is usually safer than running in a half-working state.

Building a Reusable Consumer

A consumer has the opposite job. It receives a raw message, decodes it, wraps it in an envelope, delegates to a handler, and acknowledges the result.

package pulsarstream

import (
    "context"
    "fmt"

    "github.com/apache/pulsar-client-go/pulsar"
    "example.com/transport/internal/streamports"
)

type ConsumerConfig[T any] struct {
    Client       pulsar.Client
    Topic        string
    Subscription string
    Schema       pulsar.Schema
    Decode       func([]byte) (T, error)
}

type Consumer[T any] struct {
    topic    string
    consumer pulsar.Consumer
    decode   func([]byte) (T, error)
}

func NewConsumer[T any](cfg ConsumerConfig[T]) (*Consumer[T], error) {
    if cfg.Client == nil {
        return nil, fmt.Errorf("pulsar client is required")
    }
    if cfg.Topic == "" || cfg.Subscription == "" {
        return nil, fmt.Errorf("topic and subscription are required")
    }
    if cfg.Decode == nil {
        return nil, fmt.Errorf("decoder is required")
    }

    opts := pulsar.ConsumerOptions{
        Topic:            cfg.Topic,
        SubscriptionName: cfg.Subscription,
        Type:             pulsar.Shared,
    }
    if cfg.Schema != nil {
        opts.Schema = cfg.Schema
    }

    nativeConsumer, err := cfg.Client.Subscribe(opts)
    if err != nil {
        return nil, fmt.Errorf("subscribe to %s: %w", cfg.Topic, err)
    }

    return &Consumer[T]{
        topic:    cfg.Topic,
        consumer: nativeConsumer,
        decode:   cfg.Decode,
    }, nil
}

func (c *Consumer[T]) Run(ctx context.Context, handler streamports.Handler[T]) error {
    for {
        msg, err := c.consumer.Receive(ctx)
        if err != nil {
            if ctx.Err() != nil {
                return ctx.Err()
            }
            return fmt.Errorf("receive from %s: %w", c.topic, err)
        }

        value, err := c.decode(msg.Payload())
        if err != nil {
            c.consumer.Nack(msg)
            continue
        }

        env := streamports.Envelope[T]{
            ID:         fmt.Sprint(msg.ID()),
            Topic:      c.topic,
            Attempts:   0,
            Attributes: msg.Properties(),
            Value:      value,
            Ack: func(context.Context) error {
                return c.consumer.Ack(msg)
            },
            Nack: func(context.Context) error {
                c.consumer.Nack(msg)
                return nil
            },
        }

        if err := handler.Handle(ctx, env); err != nil {
            _ = env.Nack(ctx)
            continue
        }

        if err := env.Ack(ctx); err != nil {
            return fmt.Errorf("ack message %s: %w", env.ID, err)
        }
    }
}

This code uses a shared subscription so multiple worker instances can process messages from the same subscription. That is useful when traffic grows and you want horizontal scaling.

The handler stays small and business-focused:

package application

import (
    "context"
    "fmt"

    "example.com/transport/internal/streamports"
)

type RouteAllocationHandler struct {
    Alerts streamports.Publisher[streamports.DriverAlertRequested]
}

func (h RouteAllocationHandler) Handle(
    ctx context.Context,
    event streamports.Envelope[streamports.RouteAllocationRecorded],
) error {
    alert := streamports.DriverAlertRequested{
        RecipientID: event.Value.DriverID,
        Channel:     "SMS",
        Body:        fmt.Sprintf("Vehicle %s was assigned to route %s", event.Value.VehicleID, event.Value.RouteID),
        OccurredAt:  event.Value.OccurredAt,
        EventType:   "DriverAlertRequested",
    }

    _, err := h.Alerts.Publish(ctx, alert)
    return err
}

The handler does not import Pulsar. It only depends on a publisher interface. That makes it easy to test and easy to move to another broker adapter later.

Handling Failed Messages with a DLQ

Failures in event-driven systems are normal. A downstream database may be unavailable. A message may be malformed. A handler may reject a message because the business state is invalid. Retrying forever is dangerous because one poison message can block useful work.

A DLQ gives you a safe place to move messages that cannot be processed after repeated attempts. Pulsar can be configured with a DLQ policy, and you can also implement manual DLQ routing when you need custom behavior.

The wrapper below retries normal failures by returning an error. Once the message reaches the limit, it publishes the original event to a DLQ topic and returns nil, allowing the common consumer policy to acknowledge the original message so it does not loop forever.

package application

import (
    "context"

    "example.com/transport/internal/streamports"
)

type DLQHandler[T any] struct {
    next        streamports.Handler[T]
    failedTopic streamports.Publisher[T]
    maxAttempts int
}

func (h DLQHandler[T]) Handle(ctx context.Context, event streamports.Envelope[T]) error {
    err := h.next.Handle(ctx, event)
    if err == nil {
        return nil
    }

    if event.Attempts < h.maxAttempts {
        return err
    }

    if _, publishErr := h.failedTopic.Publish(ctx, event.Value); publishErr != nil {
        return publishErr
    }

    return nil
}

The exact redelivery count depends on what metadata your broker adapter exposes. The important policy is consistent: retry temporary failures, isolate repeated failures, and keep enough information to diagnose them later.

Tune by Design Goal

Messaging configuration is full of tradeoffs. Do not tune randomly. Decide which goal matters most for a specific topic or consumer, then adjust the client behavior to match that goal.

Goal Useful direction Cost
Low latency Send quickly, reduce batching, use smaller queues, shorten redelivery delay, fail fast when buffers are full Lower throughput and more network calls
High throughput Keep batching enabled, allow larger batches, increase pending messages, prefetch more messages, use shared or key-based parallel consumption Higher memory use and higher tail latency
Resilience Use retries, DLQ policy, failover or shared subscriptions, reconnect backoff, longer timeouts where needed More waiting during failure and more buffered work
Resource efficiency Reduce producer pending messages, reduce receiver queue size, keep fewer broker connections, limit client memory Lower throughput and possibly slower recovery

For Pulsar clients, examples of relevant settings include producer batching, producer send timeout, pending message limits, consumer receiver queue size, negative-ack redelivery delay, ack grouping, connection timeout, operation timeout, and maximum broker connections.

A few practical rules help:

  1. For user-visible alerts, prefer lower latency. Avoid large buffers that make messages wait.
  2. For analytics pipelines, prefer throughput. Batching and prefetching usually matter more than single-message latency.
  3. For critical business events, prefer resilience. Do not drop messages just because the producer queue is temporarily full unless the application has a clear fallback.
  4. For small workers with limited memory, reduce queues and buffers before adding more features.

Measure after every tuning change. A setting that improves one topic may harm another.

Testing Event-Driven Go Code

Testing should happen at more than one level.

Unit tests are enough for pure handlers. A handler that receives a typed event and calls a publisher does not need a live broker. Use fake publishers and verify behavior.

package application_test

import (
    "context"
    "testing"

    "example.com/transport/internal/application"
    "example.com/transport/internal/streamports"
)

type savedPublisher[T any] struct {
    events []T
}

func (p *savedPublisher[T]) Publish(ctx context.Context, event T) (string, error) {
    p.events = append(p.events, event)
    return "test-message-id", nil
}

func (p *savedPublisher[T]) Close() error { return nil }

func TestRouteAllocationHandlerPublishesDriverAlert(t *testing.T) {
    alerts := &savedPublisher[streamports.DriverAlertRequested]{}
    handler := application.RouteAllocationHandler{Alerts: alerts}

    input := streamports.Envelope[streamports.RouteAllocationRecorded]{
        ID:    "msg-1",
        Topic: "route-allocation-events",
        Value: streamports.RouteAllocationRecorded{
            AllocationID: "alloc-77",
            VehicleID:    "bus-12",
            DriverID:     "driver-44",
            RouteID:      "route-9",
            OccurredAt:   "2026-01-15T10:00:00Z",
            EventType:    "RouteAllocationRecorded",
        },
    }

    if err := handler.Handle(context.Background(), input); err != nil {
        t.Fatalf("handle event: %v", err)
    }

    if len(alerts.events) != 1 {
        t.Fatalf("expected one alert, got %d", len(alerts.events))
    }

    if alerts.events[0].RecipientID != "driver-44" {
        t.Fatalf("unexpected recipient: %s", alerts.events[0].RecipientID)
    }
}

Integration tests are different. Some behavior requires a real broker:

  • Schema registration and compatibility rejection
  • Real producer and consumer interaction
  • Redelivery after negative acknowledgment
  • DLQ routing
  • Subscription behavior with multiple consumers

For those cases, run a real broker in a controlled test environment, for example with container-based integration tests. Performance is not the purpose of those tests. The goal is to prove that the service works with real infrastructure before deployment.

Observability: Know What Happens After Deployment

Events make systems more flexible, but they also make failures less obvious. A request may finish successfully while one consumer fails later. Observability closes that gap.

Track broker-level metrics and application-level metrics together.

Important broker metrics include:

  • Topic throughput
  • Subscription backlog
  • Consumer lag
  • Redelivery count
  • DLQ message count

Important application metrics include:

  • Processing duration per message
  • Successful acknowledgments
  • Negative acknowledgments
  • Retry count
  • DLQ routing count
  • Handler error count by reason

Broker metrics explain what happens inside the messaging platform. Application metrics explain what your code did with the messages. You need both.

Logs should be structured and traceable. At minimum, log these fields around publish and consume operations:

message_id=...
topic=...
subscription=...
event_type=...
status=...
attempt=...

Avoid logging full payloads in production. Payload logs can expose sensitive data, increase storage cost, and make incidents harder to search. Log stable identifiers instead, then use controlled tools to inspect payloads only when necessary.

A strong operational pattern is to deploy producers and consumers as separate pods or processes when their scaling profiles differ. A dashboard consumer, an analytics consumer, and a driver alert consumer usually have different CPU, memory, and latency needs. Independent deployment makes scaling and ownership clearer.

Common Mistakes to Avoid

Publishing Commands Instead of Facts

An event should describe what happened. If the message is really an instruction to another service, you may be hiding a synchronous dependency behind the broker. Use commands only when that is the intentional messaging pattern.

Exposing Too Many Fields

Every field in a shared schema is a promise. If consumers do not need a field, leave it out. A small contract is easier to evolve than a large one.

Mixing Different Languages in One Topic

A notification topic should carry notification-shaped events. A route assignment topic should carry route assignment events. Mixing unrelated event shapes makes consumers harder to reason about and complicates compatibility.

Acknowledging Before the Work Is Safe

If a consumer acknowledges a message before completing durable work, a crash can lose the message. Acknowledge only after the work is complete or safely recorded.

Retrying Forever Without a DLQ

Retries are useful for temporary failures. They are harmful for permanent failures. Use a DLQ so poison messages do not block the whole stream.

Relying Only on Broker Metrics

Broker metrics can show lag and throughput, but they cannot explain your business failure. Instrument handlers with domain-aware metrics.

Tuning for Throughput on a Low-Latency Topic

Large batches and large receiver queues improve throughput, but they add waiting time. Use different settings for alerting paths and analytics paths.

Production Checklist

Use this checklist before shipping an event-driven Go service:

  • Event names describe facts that already happened.
  • Each topic has a clear purpose and a stable message contract.
  • Schemas expose only fields that consumers truly need.
  • Schema evolution rules are documented before deployment.
  • Producers and consumers are created during startup and fail fast on invalid schema or broker connection errors.
  • Domain code depends on ports, not broker packages.
  • Producer and consumer adapters are reusable across event types.
  • Consumers acknowledge only after successful processing.
  • Failed messages have retry rules and a DLQ path.
  • Unit tests cover handlers without a broker.
  • Integration tests cover broker-specific behavior with a real broker.
  • Metrics include lag, throughput, processing time, ack count, nack count, retry count, and DLQ count.
  • Logs include message ID, topic, subscription, event type, and status.
  • Payloads are not logged in production unless there is a controlled, temporary reason.
  • Scaling decisions are made per consumer, not globally for the whole system.

Conclusion

Event-driven Go systems work best when events are treated as stable facts, not hidden procedure calls. The producer publishes a clear contract, the broker stores and routes messages, and each consumer reacts at its own pace.

Go fits this style well because small interfaces and generics let you keep broker integration reusable without polluting business code. Pulsar provides durable topics, subscriptions, schema enforcement, acknowledgments, and DLQ support. Avro gives teams a shared language that can evolve carefully.

The result is not only a faster system. It is a system that can change. New consumers can appear without rewriting the producer. Slow services can scale independently. Failed messages can be inspected instead of lost. Operators can see lag, retries, and processing time before users notice a problem.

Build the stream around clear contracts, measured tradeoffs, and observable behavior. That is what turns messaging from a technical add-on into a reliable backbone for enterprise services.

Share:

Comments0

Home Profile Menu Sidebar
Top