§ learn

Consuming events

After Your first Action committed, there's a row in eventlog.events. Now consume it. Two paths: the local-event-handler module (no broker required) and Debezium → Kafka (CDC). Run both side by side against the same outbox.

The outbox is just a table. Anything that reads it can consume it. This lesson sets up both common paths.

                    eventlog.events  (delivered = false)

              ┌───────────┴────────────────┐
              ▼                            ▼
   Local event handler             Debezium CDC
   (EventFanoutJob +               (WAL → Kafka topic)
    EventHandlingJob —
    poll the DB, cluster-exclusive)
              │                            │
   handler.handle(envelope)        SMT routes per event_type
   on whichever JVM runs                   │
   the handling job                        ▼
              │                       Kafka consumers anywhere
              ▼                       (warehouse, billing, …)
   another action / projection

You don’t pick one. They can run simultaneously against the same outbox — the local handler’s delivered=true flip generates an UPDATE row that the Debezium SMTs drop (only INSERTs ship to Kafka), so neither sees the other’s noise.

Path A — Local event handler (start here)

The fastest path: no broker, no CDC, no Kafka cluster. The ekbatan-events:local-event-handler module ships two DistributedJobs — EventFanoutJob materializes per-handler delivery rows in event_notifications; EventHandlingJob polls those rows and calls your typed handler beans. Both jobs are cluster-exclusive (one cluster member runs each); the handler runs in whichever JVM is currently running EventHandlingJob, with retries, backoff, and dead-lettering handled automatically.

A.1 Add the module

// build.gradle.kts
dependencies {
    // Pulled transitively by ekbatan-spring-boot-starter — explicit line not needed.
}

The Spring Boot starter already pulls ekbatan-local-event-handler. Nothing to add.

The Spring Boot starter already pulls ekbatan-local-event-handler transitively. Nothing to add.

// build.gradle.kts
dependencies {
    implementation("io.github.zyraz-io:ekbatan-local-event-handler:0.1.0")
}
<dependency>
  <groupId>io.github.zyraz-io</groupId>
  <artifactId>ekbatan-local-event-handler</artifactId>
  <version>0.1.0</version>
</dependency>
// build.gradle.kts
dependencies {
    implementation("io.github.zyraz-io:ekbatan-local-event-handler:0.1.0")
}
<dependency>
  <groupId>io.github.zyraz-io</groupId>
  <artifactId>ekbatan-local-event-handler</artifactId>
  <version>0.1.0</version>
</dependency>
dependencies {
    implementation("io.github.zyraz-io:ekbatan-local-event-handler:0.1.0")
    implementation("io.github.zyraz-io:ekbatan-distributed-jobs:0.1.0")  // pulls the job runtime
}
<dependency>
  <groupId>io.github.zyraz-io</groupId>
  <artifactId>ekbatan-local-event-handler</artifactId>
  <version>0.1.0</version>
</dependency>

A.2 Write the handler

package io.example.wallet.handler;

import io.ekbatan.di.EkbatanEventHandler;
import io.ekbatan.events.localeventhandler.EventEnvelope;
import io.ekbatan.events.localeventhandler.EventHandler;
import io.example.wallet.model.events.WalletMoneyDepositedEvent;

@EkbatanEventHandler
public class WalletMoneyDepositedNotifier implements EventHandler<WalletMoneyDepositedEvent> {

    @Override
    public String name() {
        // Cluster-stable identifier; stored in event_notifications.handler_name.
        // Renaming this would re-deliver all past events.
        return "wallet-money-deposited-notifier";
    }

    @Override
    public Class<WalletMoneyDepositedEvent> eventType() {
        return WalletMoneyDepositedEvent.class;
    }

    @Override
    public void handle(EventEnvelope<WalletMoneyDepositedEvent> envelope) {
        final var event = envelope.event;
        System.out.printf("Deposited %s to wallet %s — new balance %s%n",
                event.amount, event.modelId, event.newBalance);
        // For real apps: dispatch another action via injected ActionExecutor,
        // call an SMS API, write a projection, etc.
    }
}

Three required methods:

Per the listen-to-yourself pattern, the handler can dispatch its own actions via an injected ActionExecutor to record its work in the same outbox.

A.3 Run

Restart your app. The next deposit you POST will produce both a WalletMoneyDepositedEvent row and a console line from the handler within ~1s (the polling interval). Check eventlog.event_notifications to see the handler-specific delivery record:

SELECT event_id, handler_name, state, attempts, updated_date
FROM eventlog.event_notifications
WHERE handler_name = 'wallet-money-deposited-notifier'
ORDER BY created_date DESC;

state = 'SUCCEEDED' means the handler returned without throwing. Retries, backoff, expiry, and dead-lettering all happen automatically — see Local event handler.

Path B — Debezium CDC → Kafka

When you need fan-out beyond your own application (other services, a data lake, an analytics warehouse), tail the outbox with Debezium. The framework ships two Kafka Connect SMTs that can turn real outbox event rows into Avro or Protobuf bytes before they reach Kafka. A downstream router can then fan the raw topic out to per-model or per-event topics.

B.1 Set up Debezium

Point a Debezium Postgres source connector at the database. Capture only the outbox table:

{
  "name": "wallet-outbox",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "wallet",
    "database.password": "wallet",
    "database.dbname": "wallet",
    "schema.include.list": "eventlog",
    "table.include.list": "eventlog.events",
    "plugin.name": "pgoutput",

    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",

    "transforms": "encodeAvro",
    "transforms.encodeAvro.type": "io.ekbatan.events.streaming.debeziumsmt.avro.OutboxToAvroTransform",
    "transforms.encodeAvro.actionEventSchema": "/schemas/ActionEvent.avsc",
    "transforms.encodeAvro.payloadSchemas": "WalletMoneyDepositedEvent:/schemas/WalletMoneyDepositedEvent.avsc",

    "errors.retry.timeout": "600000",
    "errors.retry.delay.max.ms": "30000",
    "errors.tolerance": "all",
    "errors.log.enable": "true",
    "errors.log.include.messages": "false"
  }
}

The OutboxToAvroTransform:

For Protobuf, swap OutboxToAvroTransformOutboxToProtobufTransform and use descriptor-set paths instead of Avro schema paths. See Streaming (Debezium) for the full SMT options and the DLQ caveat for source connector failures.

B.2 Consume from Kafka

Any Kafka consumer in any language can subscribe. Example with the Avro consumer in Java:

var props = new Properties();
props.put("bootstrap.servers", "kafka:9092");
props.put("group.id", "warehouse");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");

try (var consumer = new KafkaConsumer<String, byte[]>(props)) {
    consumer.subscribe(List.of("ekbatan.com.example.finance"));
    while (true) {
        for (var record : consumer.poll(Duration.ofSeconds(1))) {
            var bytes = record.value();
            // Decode bytes as ActionEvent, then decode ActionEvent.payload
            // with the schema matching ActionEvent.eventType.
        }
    }
}

The Avro and Protobuf integration tests include small consumers and routers that show the full decode path. They are test scaffolding, not a production consumer framework.

What just happened

If you start with Path A and later add Path B, no application code changes. The Debezium connector starts tailing the same table from now on; backfill earlier events by configuring snapshot.mode.

Next

Adding a shard — take the single-database wallet to two shards using EmbeddedBitsShardingStrategy. Your action signatures don’t change.

See also