Set up Kafka Connect cluster with high availability and load balancing

Advanced 45 min Apr 24, 2026 46 views
Ubuntu 24.04 Debian 12 AlmaLinux 9 Rocky Linux 9

Configure a production-ready Kafka Connect cluster with multiple worker nodes, HAProxy load balancing, and Prometheus monitoring. Includes distributed configuration, shared storage setup, and comprehensive health checks for reliable data pipeline processing.

Prerequisites

  • Existing Apache Kafka cluster with at least 3 brokers
  • Minimum 3 servers for Kafka Connect workers (4GB RAM each)
  • 1 server for HAProxy load balancer
  • 1 server for Prometheus monitoring
  • Network connectivity between all servers

What this solves

Kafka Connect provides a scalable framework for streaming data between Apache Kafka and external systems. A single Kafka Connect worker can become a bottleneck and single point of failure. This tutorial shows you how to set up a distributed Kafka Connect cluster with multiple worker nodes, HAProxy load balancing for high availability, and Prometheus monitoring for observability.

Step-by-step configuration

Update system packages

Start by updating your package manager across all cluster nodes to ensure you get the latest versions.

sudo apt update && sudo apt upgrade -y
sudo apt install -y openjdk-11-jdk wget curl
sudo dnf update -y
sudo dnf install -y java-11-openjdk wget curl

Download and install Kafka Connect

Download the latest Kafka distribution which includes Kafka Connect. We'll install this on three nodes for high availability.

cd /opt
sudo wget https://downloads.apache.org/kafka/2.8.2/kafka_2.13-2.8.2.tgz
sudo tar -xzf kafka_2.13-2.8.2.tgz
sudo mv kafka_2.13-2.8.2 kafka
sudo chown -R $USER:$USER /opt/kafka

Create Kafka Connect user and directories

Create a dedicated user for running Kafka Connect services and set up the required directory structure.

sudo useradd -r -s /bin/false kafka-connect
sudo mkdir -p /var/lib/kafka-connect/{config,plugins,logs}
sudo mkdir -p /etc/kafka-connect
sudo chown -R kafka-connect:kafka-connect /var/lib/kafka-connect
sudo chown -R kafka-connect:kafka-connect /etc/kafka-connect

Configure distributed Kafka Connect on node 1

Create the distributed worker configuration for the first node. This configuration enables cluster coordination through Kafka topics.

bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092

Cluster coordination topics

group.id=connect-cluster config.storage.topic=connect-configs config.storage.replication.factor=3 offset.storage.topic=connect-offsets offset.storage.replication.factor=3 offset.storage.partitions=25 status.storage.topic=connect-status status.storage.replication.factor=3 status.storage.partitions=5

REST API configuration

rest.host.name=203.0.113.10 rest.port=8083 rest.advertised.host.name=203.0.113.10 rest.advertised.port=8083

Converter configuration

key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true

Plugin path

plugin.path=/var/lib/kafka-connect/plugins

Worker configuration

task.shutdown.graceful.timeout.ms=10000 offset.flush.interval.ms=10000 offset.flush.timeout.ms=5000

Security and logging

security.protocol=PLAINTEXT log4j.configuration=file:/etc/kafka-connect/connect-log4j.properties

Configure distributed Kafka Connect on node 2

Create the configuration for the second worker node with different REST API binding.

bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092

Cluster coordination topics (same as node1)

group.id=connect-cluster config.storage.topic=connect-configs config.storage.replication.factor=3 offset.storage.topic=connect-offsets offset.storage.replication.factor=3 offset.storage.partitions=25 status.storage.topic=connect-status status.storage.replication.factor=3 status.storage.partitions=5

REST API configuration for node2

rest.host.name=203.0.113.11 rest.port=8083 rest.advertised.host.name=203.0.113.11 rest.advertised.port=8083

Converter configuration

key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true

Plugin path

plugin.path=/var/lib/kafka-connect/plugins

Worker configuration

task.shutdown.graceful.timeout.ms=10000 offset.flush.interval.ms=10000 offset.flush.timeout.ms=5000

Security and logging

security.protocol=PLAINTEXT log4j.configuration=file:/etc/kafka-connect/connect-log4j.properties

Configure distributed Kafka Connect on node 3

Create the configuration for the third worker node to complete the cluster setup.

bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092

Cluster coordination topics (same as other nodes)

group.id=connect-cluster config.storage.topic=connect-configs config.storage.replication.factor=3 offset.storage.topic=connect-offsets offset.storage.replication.factor=3 offset.storage.partitions=25 status.storage.topic=connect-status status.storage.replication.factor=3 status.storage.partitions=5

REST API configuration for node3

rest.host.name=203.0.113.12 rest.port=8083 rest.advertised.host.name=203.0.113.12 rest.advertised.port=8083

Converter configuration

key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true

Plugin path

plugin.path=/var/lib/kafka-connect/plugins

Worker configuration

task.shutdown.graceful.timeout.ms=10000 offset.flush.interval.ms=10000 offset.flush.timeout.ms=5000

Security and logging

security.protocol=PLAINTEXT log4j.configuration=file:/etc/kafka-connect/connect-log4j.properties

Configure logging for all nodes

Set up consistent logging configuration across all Kafka Connect worker nodes.

log4j.rootLogger=INFO, stdout, connectAppender

Console appender

log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n

File appender

log4j.appender.connectAppender=org.apache.log4j.DailyRollingFileAppender log4j.appender.connectAppender.DatePattern='.'yyyy-MM-dd-HH log4j.appender.connectAppender.File=/var/lib/kafka-connect/logs/connect.log log4j.appender.connectAppender.layout=org.apache.log4j.PatternLayout log4j.appender.connectAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

Reduce verbosity for some packages

log4j.logger.org.apache.kafka.connect.runtime.WorkerSourceTask=WARN log4j.logger.org.apache.kafka.connect.runtime.WorkerSinkTask=WARN log4j.logger.org.reflections.Reflections=ERROR

Create systemd service files

Create systemd service files for each Kafka Connect worker node to enable automatic startup and management.

[Unit]
Description=Kafka Connect Distributed Worker Node 1
Documentation=https://kafka.apache.org/documentation/#connect
Requires=network.target
After=network.target

[Service]
Type=simple
User=kafka-connect
Group=kafka-connect
ExecStart=/opt/kafka/bin/connect-distributed.sh /etc/kafka-connect/connect-distributed-node1.properties
ExecStop=/bin/kill -TERM $MAINPID
Restart=on-failure
RestartSec=10

Environment=JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
Environment=KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
Environment=KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35"

[Install]
WantedBy=multi-user.target

Create similar service files for nodes 2 and 3, replacing the node number and configuration file path.

Install and configure HAProxy for load balancing

Install HAProxy on a separate server to provide load balancing and high availability for the Kafka Connect cluster.

sudo apt update
sudo apt install -y haproxy
sudo dnf install -y haproxy

Configure HAProxy for Kafka Connect load balancing

Configure HAProxy to distribute REST API requests across all Kafka Connect worker nodes with health checks.

global
    daemon
    user haproxy
    group haproxy
    log stdout local0
    chroot /var/lib/haproxy
    stats socket /run/haproxy/admin.sock mode 660 level admin
    stats timeout 30s

defaults
    mode http
    log global
    option httplog
    option dontlognull
    timeout connect 5000
    timeout client 50000
    timeout server 50000
    errorfile 400 /etc/haproxy/errors/400.http
    errorfile 403 /etc/haproxy/errors/403.http
    errorfile 408 /etc/haproxy/errors/408.http
    errorfile 500 /etc/haproxy/errors/500.http
    errorfile 502 /etc/haproxy/errors/502.http
    errorfile 503 /etc/haproxy/errors/503.http
    errorfile 504 /etc/haproxy/errors/504.http

Stats page

listen stats bind *:8404 stats enable stats uri /stats stats refresh 30s stats admin if TRUE

Kafka Connect REST API load balancer

frontend kafka_connect_frontend bind *:8083 default_backend kafka_connect_backend backend kafka_connect_backend balance roundrobin option httpchk GET / http-check expect status 200 server connect-node1 203.0.113.10:8083 check inter 5000 rise 2 fall 3 server connect-node2 203.0.113.11:8083 check inter 5000 rise 2 fall 3 server connect-node3 203.0.113.12:8083 check inter 5000 rise 2 fall 3

Create cluster coordination topics

Before starting the Kafka Connect workers, create the required coordination topics on your Kafka cluster.

# Create config storage topic
/opt/kafka/bin/kafka-topics.sh --create \
  --bootstrap-server kafka1:9092 \
  --topic connect-configs \
  --partitions 1 \
  --replication-factor 3 \
  --config cleanup.policy=compact

Create offset storage topic

/opt/kafka/bin/kafka-topics.sh --create \ --bootstrap-server kafka1:9092 \ --topic connect-offsets \ --partitions 25 \ --replication-factor 3 \ --config cleanup.policy=compact

Create status storage topic

/opt/kafka/bin/kafka-topics.sh --create \ --bootstrap-server kafka1:9092 \ --topic connect-status \ --partitions 5 \ --replication-factor 3 \ --config cleanup.policy=compact

Start Kafka Connect cluster

Start all Kafka Connect worker nodes and HAProxy to begin cluster operation.

# Start Kafka Connect workers on each node
sudo systemctl daemon-reload
sudo systemctl enable kafka-connect-node1
sudo systemctl start kafka-connect-node1

Check status

sudo systemctl status kafka-connect-node1

Start HAProxy

sudo systemctl enable haproxy sudo systemctl start haproxy sudo systemctl status haproxy

Install Prometheus for monitoring

Set up Prometheus monitoring to track Kafka Connect cluster health and performance metrics.

sudo apt install -y prometheus
sudo dnf install -y prometheus

Configure JMX exporter for Kafka Connect metrics

Download and configure the JMX exporter to expose Kafka Connect metrics to Prometheus.

# Download JMX Prometheus exporter
cd /opt/kafka
sudo wget https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/0.17.2/jmx_prometheus_javaagent-0.17.2.jar
rules:
  # Connect worker metrics
  - pattern: 'kafka.connect<>([^:]+)'
    name: kafka_connect_worker_$1
    type: GAUGE
  
  # Connect worker rebalance metrics
  - pattern: 'kafka.connect<>([^:]+)'
    name: kafka_connect_worker_rebalance_$1
    type: GAUGE
  
  # Connector metrics
  - pattern: 'kafka.connect<>([^:]+)'
    name: kafka_connect_connector_$2
    labels:
      connector: $1
    type: GAUGE
  
  # Task metrics
  - pattern: 'kafka.connect<>([^:]+)'
    name: kafka_connect_task_$3
    labels:
      connector: $1
      task: $2
    type: GAUGE

Update systemd service with JMX monitoring

Modify the systemd service to include JMX exporter for Prometheus metrics collection.

[Unit]
Description=Kafka Connect Distributed Worker Node 1
Documentation=https://kafka.apache.org/documentation/#connect
Requires=network.target
After=network.target

[Service]
Type=simple
User=kafka-connect
Group=kafka-connect
ExecStart=/opt/kafka/bin/connect-distributed.sh /etc/kafka-connect/connect-distributed-node1.properties
ExecStop=/bin/kill -TERM $MAINPID
Restart=on-failure
RestartSec=10

Environment=JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
Environment=KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
Environment=KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35"
Environment=KAFKA_OPTS="-javaagent:/opt/kafka/jmx_prometheus_javaagent-0.17.2.jar=9404:/etc/kafka-connect/jmx-exporter-config.yml"

[Install]
WantedBy=multi-user.target
Note: Update the JMX port for each node (9404, 9405, 9406) to avoid conflicts when running multiple workers on different servers.

Configure Prometheus scraping

Add scraping configuration for Kafka Connect JMX metrics and HAProxy stats to Prometheus.

global:
  scrape_interval: 15s
  evaluation_interval: 15s

scrape_configs:
  # Kafka Connect JMX metrics
  - job_name: 'kafka-connect'
    static_configs:
      - targets:
        - '203.0.113.10:9404'
        - '203.0.113.11:9405'
        - '203.0.113.12:9406'
    metrics_path: /metrics
    scrape_interval: 30s

  # HAProxy stats
  - job_name: 'haproxy'
    static_configs:
      - targets: ['203.0.113.13:8404']
    metrics_path: /stats/prometheus
    scrape_interval: 30s

Create health check script

Create a comprehensive health check script to monitor cluster status and individual worker health.

#!/bin/bash

Kafka Connect cluster health check script

set -e CONNECT_ENDPOINT="http://203.0.113.13:8083" LOG_FILE="/var/log/kafka-connect-health.log" log() { echo "$(date): $1" | tee -a $LOG_FILE }

Check cluster connectivity

check_cluster() { if curl -s -f "$CONNECT_ENDPOINT/" > /dev/null; then log "Cluster endpoint is healthy" return 0 else log "ERROR: Cluster endpoint is not responding" return 1 fi }

Check worker nodes

check_workers() { local workers workers=$(curl -s "$CONNECT_ENDPOINT/" | jq -r '.kafka_cluster_id // "unknown"') if [[ "$workers" != "unknown" ]]; then log "Workers are connected to Kafka cluster: $workers" return 0 else log "ERROR: Workers not properly connected" return 1 fi }

Check connector status

check_connectors() { local connectors connectors=$(curl -s "$CONNECT_ENDPOINT/connectors" | jq -r '. | length') log "Active connectors: $connectors" # List any failed connectors curl -s "$CONNECT_ENDPOINT/connectors" | jq -r '.[]' | while read connector; do status=$(curl -s "$CONNECT_ENDPOINT/connectors/$connector/status" | jq -r '.connector.state') if [[ "$status" == "FAILED" ]]; then log "ERROR: Connector $connector is in FAILED state" fi done }

Main health check

main() { log "Starting Kafka Connect health check" if check_cluster && check_workers; then check_connectors log "Health check completed successfully" exit 0 else log "Health check failed" exit 1 fi } main
sudo chmod +x /usr/local/bin/kafka-connect-health-check.sh

Set up automated health monitoring

Create a systemd timer to run health checks periodically and alert on failures.

[Unit]
Description=Kafka Connect Health Check
Wants=kafka-connect-health.timer

[Service]
Type=oneshot
User=root
ExecStart=/usr/local/bin/kafka-connect-health-check.sh
StandardOutput=journal
StandardError=journal
[Unit]
Description=Run Kafka Connect Health Check every 5 minutes
Requires=kafka-connect-health.service

[Timer]
OnBootSec=5min
OnUnitActiveSec=5min
Unit=kafka-connect-health.service

[Install]
WantedBy=timers.target
sudo systemctl daemon-reload
sudo systemctl enable kafka-connect-health.timer
sudo systemctl start kafka-connect-health.timer

Verify your setup

Test the Kafka Connect cluster functionality and monitoring setup.

# Check cluster status through load balancer
curl -s http://203.0.113.13:8083/ | jq .

List available connector plugins

curl -s http://203.0.113.13:8083/connector-plugins | jq .

Check individual worker nodes

curl -s http://203.0.113.10:8083/ | jq -r '.kafka_cluster_id' curl -s http://203.0.113.11:8083/ | jq -r '.kafka_cluster_id' curl -s http://203.0.113.12:8083/ | jq -r '.kafka_cluster_id'

Verify HAProxy stats

curl -s http://203.0.113.13:8404/stats

Check Prometheus metrics

curl -s http://203.0.113.10:9404/metrics | grep kafka_connect

Verify health check timer

sudo systemctl status kafka-connect-health.timer sudo journalctl -u kafka-connect-health.service -f

Common issues

SymptomCauseFix
Worker fails to startCoordination topics missingCreate topics with /opt/kafka/bin/kafka-topics.sh --create
Workers can't join clusterMismatched group.idVerify all workers use same group.id=connect-cluster
HAProxy shows backend downWorker REST API not respondingCheck worker logs with journalctl -u kafka-connect-node1
Connectors fail to distributeInsufficient coordination topic partitionsIncrease offset storage partitions to 25+
High memory usageDefault heap settings too highAdjust KAFKA_HEAP_OPTS in systemd service
JMX metrics not exposedJMX exporter not configuredVerify KAFKA_OPTS includes javaagent path

Next steps

Running this in production?

Want this handled for you? Running this at scale adds a second layer of work: capacity planning, failover drills, cost control, and on-call. See how we run infrastructure like this for European teams.

Automated install script

Run this to automate the entire setup

Need help?

Don't want to manage this yourself?

We handle managed devops services for businesses that depend on uptime. From initial setup to ongoing operations.