§ reference

Distributed background jobs

For periodic background work that should run on **at most one** instance across a cluster — daily reports, hourly cleanups, periodic reconciliation — Ekbatan ships `JobRegistry` in the `ekbatan-distributed-jobs` module. It's a thin, opinionated facade over [db-scheduler](https://github.com/kagkarlsson/db-scheduler) that handles the tricky parts (atomic claim across instances, heartbeat-based crash recovery, graceful shutdown, per-task virtual-thread workers) while keeping the user-facing API tiny.

Defining a job

Extend DistributedJob:

@EkbatanDistributedJob
public class DailyReportJob extends DistributedJob {

    private final ReportService reportService;

    public DailyReportJob(ReportService reportService) {
        this.reportService = reportService;
    }

    @Override public String name() {
        return "daily-report";        // cluster-wide unique
    }

    @Override public Schedule schedule() {
        return Schedules.daily(LocalTime.of(2, 0));
    }

    @Override public void execute(ExecutionContext ctx) {
        reportService.generateAndSend();
    }
}

Schedule is db-scheduler’s interface, so any of its implementations work directly: FixedDelay, FixedRate, Cron, Daily, etc.

The @EkbatanDistributedJob annotation marks the class for discovery by the DI integrations (Spring Boot, Quarkus, Micronaut). Without DI, register the job manually — see Wiring without DI below.

Coordination semantics (inherited from db-scheduler)

This is at-most-one per scheduled slot, not at-most-one ever. A daily-at-02:00 job runs once at 02:00 across the cluster; if 02:00 passes while every node is down, the missed slot is picked up by the next live node when it polls.

The scheduled_tasks table

The module needs db-scheduler’s table provisioned in your application’s database. Verbatim PostgreSQL schema:

CREATE TABLE scheduled_tasks (
    task_name            TEXT     NOT NULL,
    task_instance        TEXT     NOT NULL,
    task_data            BYTEA,
    execution_time       TIMESTAMP WITH TIME ZONE NOT NULL,
    picked               BOOLEAN  NOT NULL,
    picked_by            TEXT,
    last_success         TIMESTAMP WITH TIME ZONE,
    last_failure         TIMESTAMP WITH TIME ZONE,
    consecutive_failures INT,
    last_heartbeat       TIMESTAMP WITH TIME ZONE,
    version              BIGINT   NOT NULL,
    priority             SMALLINT,
    PRIMARY KEY (task_name, task_instance)
);

CREATE INDEX execution_time_idx        ON scheduled_tasks (execution_time);
CREATE INDEX last_heartbeat_idx        ON scheduled_tasks (last_heartbeat);
CREATE INDEX priority_execution_time_idx ON scheduled_tasks (priority DESC, execution_time ASC);

For the MySQL/MariaDB equivalents, see db-scheduler’s postgresql_tables.sql and mysql_tables.sql. The TIMESTAMP WITH TIME ZONE column is db-scheduler’s choice (this is the one place the framework deliberately steps off the always-TIMESTAMP rule — db-scheduler owns the table and its schema).

A reference migration lives in ekbatan-integration-tests/distributed-jobs-pg/src/test/resources/db/migration/V0001__create_scheduled_tasks.sql.

Wiring without DI

JobRegistry is a builder facade over a single db-scheduler Scheduler:

import static io.ekbatan.distributedjobs.JobRegistry.jobRegistry;
import static io.ekbatan.core.persistence.ConnectionProvider.hikariConnectionProvider;

var jobsPool = hikariConnectionProvider(jobsDataSourceConfig);

var registry = jobRegistry()
        .connectionProvider(jobsPool)
        .withJob(new DailyReportJob(reportService))
        .withJob(new HourlyCleanupJob(cleanupService))
        .pollInterval(Duration.ofSeconds(10))
        .heartbeatInterval(Duration.ofSeconds(30))
        .shutdownMaxWait(Duration.ofSeconds(30))
        .build();   // a JVM shutdown hook is installed by default

registry.start();

For advanced db-scheduler settings not exposed by the builder (missedHeartbeatsLimit, deleteUnresolvedAfter, custom polling strategy, etc.), customizeScheduler(...) runs last in build() and can override any of Ekbatan’s defaults:

var registry = jobRegistry()
        .connectionProvider(jobsPool)
        .withJob(new DailyReportJob(reportService))
        .customizeScheduler(b -> b
                .missedHeartbeatsLimit(3)
                .deleteUnresolvedAfter(Duration.ofDays(30)))
        .build();

JobRegistry auto-sizes threads(jobs.size()) for the polling batch and swaps in Executors.newVirtualThreadPerTaskExecutor() for workers, so per-job concurrency is governed by virtual-thread scheduling rather than a fixed thread pool.

Wiring via DI

With the @EkbatanDistributedJob annotation in place, the DI integration registers each DistributedJob bean as a managed singleton and adds it to a JobRegistry configured from application.yml:

ekbatan:
  jobs:
    polling-interval: 10s
    heartbeat-interval: 30s
    shutdown-max-wait: 30s

  sharding:
    groups:
      - members:
          - configs:
              primary-config: {  }
              jobs-config:                 # dedicated pool for the scheduler — see next section
                jdbc-url: jdbc:postgresql://primary:5432/app
                username: app
                password: ${APP_PASSWORD}
                maximum-pool-size: 5

JobRegistry.start() is wired to your DI container’s lifecycle (Spring initMethod/destroyMethod, Quarkus @Observes StartupEvent/ShutdownEvent, Micronaut ApplicationEventListener<StartupEvent>).

The ekbatan.jobs.* properties also accept camelCase aliases: polling-interval / pollingInterval, heartbeat-interval / heartbeatInterval, and shutdown-max-wait / shutdownMaxWait.

Use a dedicated connection pool

Use a dedicated ConnectionProvider for JobRegistry — separate from your primary application pool. db-scheduler polls continuously, so you don’t want it competing with normal queries for connections. A small pool is enough (polling + heartbeats are low-volume).

The DI integrations expect this pool under the user-defined jobs-config / jobsConfig slot of the default shard’s first member, as shown above. Both spellings are accepted in external config. Manual wiring must use the canonical Java key: member.configFor("jobsConfig"), not member.configFor("jobs-config").

See also