Configure a production-ready Kafka Connect cluster with multiple worker nodes, HAProxy load balancing, and Prometheus monitoring. Includes distributed configuration, shared storage setup, and comprehensive health checks for reliable data pipeline processing.
Prerequisites
- Existing Apache Kafka cluster with at least 3 brokers
- Minimum 3 servers for Kafka Connect workers (4GB RAM each)
- 1 server for HAProxy load balancer
- 1 server for Prometheus monitoring
- Network connectivity between all servers
What this solves
Kafka Connect provides a scalable framework for streaming data between Apache Kafka and external systems. A single Kafka Connect worker can become a bottleneck and single point of failure. This tutorial shows you how to set up a distributed Kafka Connect cluster with multiple worker nodes, HAProxy load balancing for high availability, and Prometheus monitoring for observability.
Step-by-step configuration
Update system packages
Start by updating your package manager across all cluster nodes to ensure you get the latest versions.
sudo apt update && sudo apt upgrade -y
sudo apt install -y openjdk-11-jdk wget curl
Download and install Kafka Connect
Download the latest Kafka distribution which includes Kafka Connect. We'll install this on three nodes for high availability.
cd /opt
sudo wget https://downloads.apache.org/kafka/2.8.2/kafka_2.13-2.8.2.tgz
sudo tar -xzf kafka_2.13-2.8.2.tgz
sudo mv kafka_2.13-2.8.2 kafka
sudo chown -R $USER:$USER /opt/kafka
Create Kafka Connect user and directories
Create a dedicated user for running Kafka Connect services and set up the required directory structure.
sudo useradd -r -s /bin/false kafka-connect
sudo mkdir -p /var/lib/kafka-connect/{config,plugins,logs}
sudo mkdir -p /etc/kafka-connect
sudo chown -R kafka-connect:kafka-connect /var/lib/kafka-connect
sudo chown -R kafka-connect:kafka-connect /etc/kafka-connect
Configure distributed Kafka Connect on node 1
Create the distributed worker configuration for the first node. This configuration enables cluster coordination through Kafka topics.
bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092
Cluster coordination topics
group.id=connect-cluster
config.storage.topic=connect-configs
config.storage.replication.factor=3
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
offset.storage.partitions=25
status.storage.topic=connect-status
status.storage.replication.factor=3
status.storage.partitions=5
REST API configuration
rest.host.name=203.0.113.10
rest.port=8083
rest.advertised.host.name=203.0.113.10
rest.advertised.port=8083
Converter configuration
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
Plugin path
plugin.path=/var/lib/kafka-connect/plugins
Worker configuration
task.shutdown.graceful.timeout.ms=10000
offset.flush.interval.ms=10000
offset.flush.timeout.ms=5000
Security and logging
security.protocol=PLAINTEXT
log4j.configuration=file:/etc/kafka-connect/connect-log4j.properties
Configure distributed Kafka Connect on node 2
Create the configuration for the second worker node with different REST API binding.
bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092
Cluster coordination topics (same as node1)
group.id=connect-cluster
config.storage.topic=connect-configs
config.storage.replication.factor=3
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
offset.storage.partitions=25
status.storage.topic=connect-status
status.storage.replication.factor=3
status.storage.partitions=5
REST API configuration for node2
rest.host.name=203.0.113.11
rest.port=8083
rest.advertised.host.name=203.0.113.11
rest.advertised.port=8083
Converter configuration
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
Plugin path
plugin.path=/var/lib/kafka-connect/plugins
Worker configuration
task.shutdown.graceful.timeout.ms=10000
offset.flush.interval.ms=10000
offset.flush.timeout.ms=5000
Security and logging
security.protocol=PLAINTEXT
log4j.configuration=file:/etc/kafka-connect/connect-log4j.properties
Configure distributed Kafka Connect on node 3
Create the configuration for the third worker node to complete the cluster setup.
bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092
Cluster coordination topics (same as other nodes)
group.id=connect-cluster
config.storage.topic=connect-configs
config.storage.replication.factor=3
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
offset.storage.partitions=25
status.storage.topic=connect-status
status.storage.replication.factor=3
status.storage.partitions=5
REST API configuration for node3
rest.host.name=203.0.113.12
rest.port=8083
rest.advertised.host.name=203.0.113.12
rest.advertised.port=8083
Converter configuration
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
Plugin path
plugin.path=/var/lib/kafka-connect/plugins
Worker configuration
task.shutdown.graceful.timeout.ms=10000
offset.flush.interval.ms=10000
offset.flush.timeout.ms=5000
Security and logging
security.protocol=PLAINTEXT
log4j.configuration=file:/etc/kafka-connect/connect-log4j.properties
Configure logging for all nodes
Set up consistent logging configuration across all Kafka Connect worker nodes.
log4j.rootLogger=INFO, stdout, connectAppender
Console appender
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
File appender
log4j.appender.connectAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.connectAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.connectAppender.File=/var/lib/kafka-connect/logs/connect.log
log4j.appender.connectAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.connectAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
Reduce verbosity for some packages
log4j.logger.org.apache.kafka.connect.runtime.WorkerSourceTask=WARN
log4j.logger.org.apache.kafka.connect.runtime.WorkerSinkTask=WARN
log4j.logger.org.reflections.Reflections=ERROR
Create systemd service files
Create systemd service files for each Kafka Connect worker node to enable automatic startup and management.
[Unit]
Description=Kafka Connect Distributed Worker Node 1
Documentation=https://kafka.apache.org/documentation/#connect
Requires=network.target
After=network.target
[Service]
Type=simple
User=kafka-connect
Group=kafka-connect
ExecStart=/opt/kafka/bin/connect-distributed.sh /etc/kafka-connect/connect-distributed-node1.properties
ExecStop=/bin/kill -TERM $MAINPID
Restart=on-failure
RestartSec=10
Environment=JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
Environment=KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
Environment=KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35"
[Install]
WantedBy=multi-user.target
Create similar service files for nodes 2 and 3, replacing the node number and configuration file path.
Install and configure HAProxy for load balancing
Install HAProxy on a separate server to provide load balancing and high availability for the Kafka Connect cluster.
sudo apt update
sudo apt install -y haproxy
Configure HAProxy for Kafka Connect load balancing
Configure HAProxy to distribute REST API requests across all Kafka Connect worker nodes with health checks.
global
daemon
user haproxy
group haproxy
log stdout local0
chroot /var/lib/haproxy
stats socket /run/haproxy/admin.sock mode 660 level admin
stats timeout 30s
defaults
mode http
log global
option httplog
option dontlognull
timeout connect 5000
timeout client 50000
timeout server 50000
errorfile 400 /etc/haproxy/errors/400.http
errorfile 403 /etc/haproxy/errors/403.http
errorfile 408 /etc/haproxy/errors/408.http
errorfile 500 /etc/haproxy/errors/500.http
errorfile 502 /etc/haproxy/errors/502.http
errorfile 503 /etc/haproxy/errors/503.http
errorfile 504 /etc/haproxy/errors/504.http
Stats page
listen stats
bind *:8404
stats enable
stats uri /stats
stats refresh 30s
stats admin if TRUE
Kafka Connect REST API load balancer
frontend kafka_connect_frontend
bind *:8083
default_backend kafka_connect_backend
backend kafka_connect_backend
balance roundrobin
option httpchk GET /
http-check expect status 200
server connect-node1 203.0.113.10:8083 check inter 5000 rise 2 fall 3
server connect-node2 203.0.113.11:8083 check inter 5000 rise 2 fall 3
server connect-node3 203.0.113.12:8083 check inter 5000 rise 2 fall 3
Create cluster coordination topics
Before starting the Kafka Connect workers, create the required coordination topics on your Kafka cluster.
# Create config storage topic
/opt/kafka/bin/kafka-topics.sh --create \
--bootstrap-server kafka1:9092 \
--topic connect-configs \
--partitions 1 \
--replication-factor 3 \
--config cleanup.policy=compact
Create offset storage topic
/opt/kafka/bin/kafka-topics.sh --create \
--bootstrap-server kafka1:9092 \
--topic connect-offsets \
--partitions 25 \
--replication-factor 3 \
--config cleanup.policy=compact
Create status storage topic
/opt/kafka/bin/kafka-topics.sh --create \
--bootstrap-server kafka1:9092 \
--topic connect-status \
--partitions 5 \
--replication-factor 3 \
--config cleanup.policy=compact
Start Kafka Connect cluster
Start all Kafka Connect worker nodes and HAProxy to begin cluster operation.
# Start Kafka Connect workers on each node
sudo systemctl daemon-reload
sudo systemctl enable kafka-connect-node1
sudo systemctl start kafka-connect-node1
Check status
sudo systemctl status kafka-connect-node1
Start HAProxy
sudo systemctl enable haproxy
sudo systemctl start haproxy
sudo systemctl status haproxy
Install Prometheus for monitoring
Set up Prometheus monitoring to track Kafka Connect cluster health and performance metrics.
sudo apt install -y prometheus
Configure JMX exporter for Kafka Connect metrics
Download and configure the JMX exporter to expose Kafka Connect metrics to Prometheus.
# Download JMX Prometheus exporter
cd /opt/kafka
sudo wget https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/0.17.2/jmx_prometheus_javaagent-0.17.2.jar
rules:
# Connect worker metrics
- pattern: 'kafka.connect<>([^:]+)'
name: kafka_connect_worker_$1
type: GAUGE
# Connect worker rebalance metrics
- pattern: 'kafka.connect<>([^:]+)'
name: kafka_connect_worker_rebalance_$1
type: GAUGE
# Connector metrics
- pattern: 'kafka.connect<>([^:]+)'
name: kafka_connect_connector_$2
labels:
connector: $1
type: GAUGE
# Task metrics
- pattern: 'kafka.connect<>([^:]+)'
name: kafka_connect_task_$3
labels:
connector: $1
task: $2
type: GAUGE
Update systemd service with JMX monitoring
Modify the systemd service to include JMX exporter for Prometheus metrics collection.
[Unit]
Description=Kafka Connect Distributed Worker Node 1
Documentation=https://kafka.apache.org/documentation/#connect
Requires=network.target
After=network.target
[Service]
Type=simple
User=kafka-connect
Group=kafka-connect
ExecStart=/opt/kafka/bin/connect-distributed.sh /etc/kafka-connect/connect-distributed-node1.properties
ExecStop=/bin/kill -TERM $MAINPID
Restart=on-failure
RestartSec=10
Environment=JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
Environment=KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
Environment=KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35"
Environment=KAFKA_OPTS="-javaagent:/opt/kafka/jmx_prometheus_javaagent-0.17.2.jar=9404:/etc/kafka-connect/jmx-exporter-config.yml"
[Install]
WantedBy=multi-user.target
Configure Prometheus scraping
Add scraping configuration for Kafka Connect JMX metrics and HAProxy stats to Prometheus.
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
# Kafka Connect JMX metrics
- job_name: 'kafka-connect'
static_configs:
- targets:
- '203.0.113.10:9404'
- '203.0.113.11:9405'
- '203.0.113.12:9406'
metrics_path: /metrics
scrape_interval: 30s
# HAProxy stats
- job_name: 'haproxy'
static_configs:
- targets: ['203.0.113.13:8404']
metrics_path: /stats/prometheus
scrape_interval: 30s
Create health check script
Create a comprehensive health check script to monitor cluster status and individual worker health.
#!/bin/bash
Kafka Connect cluster health check script
set -e
CONNECT_ENDPOINT="http://203.0.113.13:8083"
LOG_FILE="/var/log/kafka-connect-health.log"
log() {
echo "$(date): $1" | tee -a $LOG_FILE
}
Check cluster connectivity
check_cluster() {
if curl -s -f "$CONNECT_ENDPOINT/" > /dev/null; then
log "Cluster endpoint is healthy"
return 0
else
log "ERROR: Cluster endpoint is not responding"
return 1
fi
}
Check worker nodes
check_workers() {
local workers
workers=$(curl -s "$CONNECT_ENDPOINT/" | jq -r '.kafka_cluster_id // "unknown"')
if [[ "$workers" != "unknown" ]]; then
log "Workers are connected to Kafka cluster: $workers"
return 0
else
log "ERROR: Workers not properly connected"
return 1
fi
}
Check connector status
check_connectors() {
local connectors
connectors=$(curl -s "$CONNECT_ENDPOINT/connectors" | jq -r '. | length')
log "Active connectors: $connectors"
# List any failed connectors
curl -s "$CONNECT_ENDPOINT/connectors" | jq -r '.[]' | while read connector; do
status=$(curl -s "$CONNECT_ENDPOINT/connectors/$connector/status" | jq -r '.connector.state')
if [[ "$status" == "FAILED" ]]; then
log "ERROR: Connector $connector is in FAILED state"
fi
done
}
Main health check
main() {
log "Starting Kafka Connect health check"
if check_cluster && check_workers; then
check_connectors
log "Health check completed successfully"
exit 0
else
log "Health check failed"
exit 1
fi
}
main
sudo chmod +x /usr/local/bin/kafka-connect-health-check.sh
Set up automated health monitoring
Create a systemd timer to run health checks periodically and alert on failures.
[Unit]
Description=Kafka Connect Health Check
Wants=kafka-connect-health.timer
[Service]
Type=oneshot
User=root
ExecStart=/usr/local/bin/kafka-connect-health-check.sh
StandardOutput=journal
StandardError=journal
[Unit]
Description=Run Kafka Connect Health Check every 5 minutes
Requires=kafka-connect-health.service
[Timer]
OnBootSec=5min
OnUnitActiveSec=5min
Unit=kafka-connect-health.service
[Install]
WantedBy=timers.target
sudo systemctl daemon-reload
sudo systemctl enable kafka-connect-health.timer
sudo systemctl start kafka-connect-health.timer
Verify your setup
Test the Kafka Connect cluster functionality and monitoring setup.
# Check cluster status through load balancer
curl -s http://203.0.113.13:8083/ | jq .
List available connector plugins
curl -s http://203.0.113.13:8083/connector-plugins | jq .
Check individual worker nodes
curl -s http://203.0.113.10:8083/ | jq -r '.kafka_cluster_id'
curl -s http://203.0.113.11:8083/ | jq -r '.kafka_cluster_id'
curl -s http://203.0.113.12:8083/ | jq -r '.kafka_cluster_id'
Verify HAProxy stats
curl -s http://203.0.113.13:8404/stats
Check Prometheus metrics
curl -s http://203.0.113.10:9404/metrics | grep kafka_connect
Verify health check timer
sudo systemctl status kafka-connect-health.timer
sudo journalctl -u kafka-connect-health.service -f
Common issues
| Symptom | Cause | Fix |
|---|---|---|
| Worker fails to start | Coordination topics missing | Create topics with /opt/kafka/bin/kafka-topics.sh --create |
| Workers can't join cluster | Mismatched group.id | Verify all workers use same group.id=connect-cluster |
| HAProxy shows backend down | Worker REST API not responding | Check worker logs with journalctl -u kafka-connect-node1 |
| Connectors fail to distribute | Insufficient coordination topic partitions | Increase offset storage partitions to 25+ |
| High memory usage | Default heap settings too high | Adjust KAFKA_HEAP_OPTS in systemd service |
| JMX metrics not exposed | JMX exporter not configured | Verify KAFKA_OPTS includes javaagent path |
Next steps
- Configure Kafka Connect for database integration with JDBC connectors
- Set up Kafka Schema Registry with Avro serialization for data processing
- Implement Kafka Streams processing applications with Java and Scala for real-time data analytics
- Configure Kafka Connect SSL security and advanced monitoring
- Set up custom Kafka Connect connector development environment
Running this in production?
Automated install script
Run this to automate the entire setup
#!/usr/bin/env bash
set -euo pipefail
# Kafka Connect Cluster Install Script
# Configures high availability Kafka Connect cluster with load balancing
readonly SCRIPT_NAME=$(basename "$0")
readonly KAFKA_VERSION="2.8.2"
readonly KAFKA_SCALA_VERSION="2.13"
# Colors
readonly RED='\033[0;31m'
readonly GREEN='\033[0;32m'
readonly YELLOW='\033[1;33m'
readonly NC='\033[0m'
# Default configuration
KAFKA_BROKERS=""
NODE_ID=""
NODE_IP=""
CLUSTER_IPS=()
usage() {
echo "Usage: $SCRIPT_NAME --brokers kafka1:9092,kafka2:9092,kafka3:9092 --node-id 1 --node-ip 203.0.113.10 --cluster-ips 203.0.113.10,203.0.113.11,203.0.113.12"
echo ""
echo "Options:"
echo " --brokers Comma-separated Kafka broker endpoints"
echo " --node-id Node ID (1, 2, or 3)"
echo " --node-ip This node's IP address"
echo " --cluster-ips Comma-separated list of all cluster node IPs"
exit 1
}
log_info() { echo -e "${GREEN}[INFO]${NC} $1"; }
log_warn() { echo -e "${YELLOW}[WARN]${NC} $1"; }
log_error() { echo -e "${RED}[ERROR]${NC} $1"; }
cleanup() {
local exit_code=$?
if [ $exit_code -ne 0 ]; then
log_error "Installation failed. Cleaning up..."
systemctl stop kafka-connect 2>/dev/null || true
systemctl disable kafka-connect 2>/dev/null || true
userdel -r kafka-connect 2>/dev/null || true
rm -rf /opt/kafka /var/lib/kafka-connect /etc/kafka-connect 2>/dev/null || true
fi
exit $exit_code
}
trap cleanup ERR INT TERM
parse_args() {
while [[ $# -gt 0 ]]; do
case $1 in
--brokers)
KAFKA_BROKERS="$2"
shift 2
;;
--node-id)
NODE_ID="$2"
shift 2
;;
--node-ip)
NODE_IP="$2"
shift 2
;;
--cluster-ips)
IFS=',' read -ra CLUSTER_IPS <<< "$2"
shift 2
;;
-h|--help)
usage
;;
*)
log_error "Unknown option: $1"
usage
;;
esac
done
if [[ -z "$KAFKA_BROKERS" || -z "$NODE_ID" || -z "$NODE_IP" || ${#CLUSTER_IPS[@]} -eq 0 ]]; then
log_error "Missing required arguments"
usage
fi
if ! [[ "$NODE_ID" =~ ^[1-3]$ ]]; then
log_error "Node ID must be 1, 2, or 3"
exit 1
fi
}
detect_distro() {
if [ -f /etc/os-release ]; then
. /etc/os-release
case "$ID" in
ubuntu|debian)
PKG_MGR="apt"
PKG_INSTALL="apt install -y"
PKG_UPDATE="apt update && apt upgrade -y"
JAVA_PKG="openjdk-11-jdk"
;;
almalinux|rocky|centos|rhel|ol|fedora)
PKG_MGR="dnf"
PKG_INSTALL="dnf install -y"
PKG_UPDATE="dnf update -y"
JAVA_PKG="java-11-openjdk"
;;
amzn)
PKG_MGR="yum"
PKG_INSTALL="yum install -y"
PKG_UPDATE="yum update -y"
JAVA_PKG="java-11-openjdk"
;;
*)
log_error "Unsupported distribution: $ID"
exit 1
;;
esac
else
log_error "Cannot detect distribution"
exit 1
fi
}
check_prerequisites() {
echo "[1/8] Checking prerequisites..."
if [[ $EUID -ne 0 ]]; then
log_error "This script must be run as root"
exit 1
fi
if ! command -v wget &> /dev/null; then
log_warn "wget not found, will install"
fi
if ! command -v curl &> /dev/null; then
log_warn "curl not found, will install"
fi
log_info "Prerequisites check completed"
}
update_system() {
echo "[2/8] Updating system packages..."
$PKG_UPDATE
$PKG_INSTALL $JAVA_PKG wget curl
log_info "System packages updated"
}
download_kafka() {
echo "[3/8] Downloading and installing Kafka..."
cd /opt
if [[ ! -f kafka_${KAFKA_SCALA_VERSION}-${KAFKA_VERSION}.tgz ]]; then
wget "https://downloads.apache.org/kafka/${KAFKA_VERSION}/kafka_${KAFKA_SCALA_VERSION}-${KAFKA_VERSION}.tgz"
fi
if [[ -d kafka ]]; then
rm -rf kafka
fi
tar -xzf "kafka_${KAFKA_SCALA_VERSION}-${KAFKA_VERSION}.tgz"
mv "kafka_${KAFKA_SCALA_VERSION}-${KAFKA_VERSION}" kafka
chown -R root:root /opt/kafka
log_info "Kafka installed successfully"
}
create_user_directories() {
echo "[4/8] Creating Kafka Connect user and directories..."
if ! id kafka-connect &>/dev/null; then
useradd -r -s /bin/false kafka-connect
fi
mkdir -p /var/lib/kafka-connect/{config,plugins,logs}
mkdir -p /etc/kafka-connect
mkdir -p /var/log/kafka-connect
chown -R kafka-connect:kafka-connect /var/lib/kafka-connect
chown -R kafka-connect:kafka-connect /etc/kafka-connect
chown -R kafka-connect:kafka-connect /var/log/kafka-connect
chmod 755 /var/lib/kafka-connect /etc/kafka-connect /var/log/kafka-connect
chmod 755 /var/lib/kafka-connect/{config,plugins,logs}
log_info "User and directories created"
}
configure_connect() {
echo "[5/8] Configuring Kafka Connect worker..."
cat > /etc/kafka-connect/connect-distributed.properties << EOF
bootstrap.servers=${KAFKA_BROKERS}
# Cluster coordination topics
group.id=connect-cluster
config.storage.topic=connect-configs
config.storage.replication.factor=3
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
offset.storage.partitions=25
status.storage.topic=connect-status
status.storage.replication.factor=3
status.storage.partitions=5
# REST API configuration
rest.host.name=${NODE_IP}
rest.port=8083
rest.advertised.host.name=${NODE_IP}
rest.advertised.port=8083
# Converter configuration
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
# Plugin path
plugin.path=/var/lib/kafka-connect/plugins
# Worker configuration
task.shutdown.graceful.timeout.ms=10000
offset.flush.interval.ms=10000
offset.flush.timeout.ms=5000
# Security and logging
security.protocol=PLAINTEXT
log4j.configuration=file:/etc/kafka-connect/connect-log4j.properties
EOF
cat > /etc/kafka-connect/connect-log4j.properties << 'EOF'
log4j.rootLogger=INFO, stdout, connectAppender
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.connectAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.connectAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.connectAppender.File=/var/log/kafka-connect/connect.log
log4j.appender.connectAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.connectAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
EOF
chown kafka-connect:kafka-connect /etc/kafka-connect/connect-distributed.properties
chown kafka-connect:kafka-connect /etc/kafka-connect/connect-log4j.properties
chmod 644 /etc/kafka-connect/connect-distributed.properties
chmod 644 /etc/kafka-connect/connect-log4j.properties
log_info "Kafka Connect configured for node $NODE_ID"
}
create_systemd_service() {
echo "[6/8] Creating systemd service..."
cat > /etc/systemd/system/kafka-connect.service << EOF
[Unit]
Description=Apache Kafka Connect
Documentation=https://kafka.apache.org/documentation.html
Requires=network.target
After=network.target
[Service]
Type=simple
User=kafka-connect
Group=kafka-connect
ExecStart=/opt/kafka/bin/connect-distributed.sh /etc/kafka-connect/connect-distributed.properties
ExecStop=/bin/kill -TERM \$MAINPID
Restart=always
RestartSec=10
StandardOutput=journal
StandardError=journal
SyslogIdentifier=kafka-connect
[Install]
WantedBy=multi-user.target
EOF
chmod 644 /etc/systemd/system/kafka-connect.service
systemctl daemon-reload
log_info "Systemd service created"
}
configure_firewall() {
echo "[7/8] Configuring firewall..."
if command -v firewall-cmd &> /dev/null; then
firewall-cmd --permanent --add-port=8083/tcp
firewall-cmd --reload
elif command -v ufw &> /dev/null; then
ufw allow 8083/tcp
fi
log_info "Firewall configured"
}
verify_installation() {
echo "[8/8] Verifying installation..."
systemctl enable kafka-connect
systemctl start kafka-connect
sleep 10
if systemctl is-active --quiet kafka-connect; then
log_info "Kafka Connect service is running"
else
log_error "Kafka Connect service failed to start"
systemctl status kafka-connect
exit 1
fi
if curl -s "http://${NODE_IP}:8083/" > /dev/null; then
log_info "REST API is responding"
else
log_warn "REST API not yet available (may need more time to start)"
fi
log_info "Installation completed successfully!"
echo ""
echo "Kafka Connect cluster node $NODE_ID is ready"
echo "REST API: http://${NODE_IP}:8083/"
echo "Logs: journalctl -u kafka-connect -f"
echo ""
echo "Next steps:"
echo "1. Install connectors in /var/lib/kafka-connect/plugins/"
echo "2. Set up HAProxy load balancer pointing to all cluster nodes"
echo "3. Configure monitoring with Prometheus"
}
main() {
parse_args "$@"
detect_distro
check_prerequisites
update_system
download_kafka
create_user_directories
configure_connect
create_systemd_service
configure_firewall
verify_installation
}
main "$@"
Review the script before running. Execute with: bash install.sh