Learn to configure Locust for comprehensive Apache Airflow DAG performance testing, including automated load testing scenarios, metrics collection, and CI/CD integration for production-grade workflow optimization.
Prerequisites
- Existing Apache Airflow installation
- PostgreSQL database
- Python 3.8+
- Root or sudo access
- 4GB+ RAM recommended
What this solves
Apache Airflow DAG performance testing ensures your workflows can handle production workloads without failures or bottlenecks. This tutorial shows you how to set up Locust, a Python-based load testing framework, to simulate realistic DAG execution scenarios, measure performance metrics, and identify optimization opportunities before deploying to production.
Prerequisites
You'll need an existing Apache Airflow installation with PostgreSQL backend. If you don't have Airflow set up yet, follow our Apache Airflow installation guide first.
Step-by-step installation
Update system packages
Start by updating your package manager to ensure you have the latest security patches and dependencies.
sudo apt update && sudo apt upgrade -y
sudo apt install -y python3-pip python3-venv git
Create dedicated testing environment
Create a separate directory and virtual environment for your Airflow performance testing setup to isolate dependencies.
mkdir -p /opt/airflow-testing
cd /opt/airflow-testing
python3 -m venv venv
source venv/bin/activate
Install Locust and dependencies
Install Locust along with Apache Airflow client libraries and monitoring tools for comprehensive performance testing.
pip install --upgrade pip
pip install locust==2.17.0 apache-airflow-client==2.8.0 requests psycopg2-binary pandas matplotlib seaborn
Configure Airflow API authentication
Create a dedicated user for performance testing with appropriate permissions to trigger and monitor DAGs.
export AIRFLOW_HOME=/opt/airflow
source /opt/airflow/venv/bin/activate
airflow users create \
--username perf_test \
--firstname Performance \
--lastname Tester \
--role Admin \
--email perf@example.com \
--password SecureTestPass123!
Create Locust performance test scenarios
Create a comprehensive Locust test file that simulates realistic DAG operations including triggering, monitoring, and querying execution status.
import json
import time
import random
from locust import HttpUser, task, between
from datetime import datetime, timedelta
import base64
class AirflowDAGUser(HttpUser):
wait_time = between(1, 3)
def on_start(self):
"""Set up authentication and test data"""
self.username = "perf_test"
self.password = "SecureTestPass123!"
self.dag_ids = ["example_dag_1", "example_dag_2", "data_pipeline_dag"]
self.dag_run_ids = []
# Create basic auth header
credentials = base64.b64encode(f"{self.username}:{self.password}".encode()).decode()
self.client.headers.update({"Authorization": f"Basic {credentials}"})
# Verify connection
response = self.client.get("/api/v1/dags")
if response.status_code != 200:
print(f"Authentication failed: {response.status_code}")
@task(3)
def trigger_dag_run(self):
"""Trigger DAG execution with configuration"""
dag_id = random.choice(self.dag_ids)
dag_run_id = f"perf_test_{int(time.time())}_{random.randint(1000, 9999)}"
payload = {
"dag_run_id": dag_run_id,
"conf": {
"test_mode": True,
"batch_size": random.randint(100, 1000)
}
}
with self.client.post(
f"/api/v1/dags/{dag_id}/dagRuns",
json=payload,
catch_response=True,
name="trigger_dag"
) as response:
if response.status_code == 200:
self.dag_run_ids.append((dag_id, dag_run_id))
response.success()
else:
response.failure(f"Failed to trigger DAG: {response.status_code}")
@task(2)
def check_dag_run_status(self):
"""Monitor DAG run execution status"""
if not self.dag_run_ids:
return
dag_id, dag_run_id = random.choice(self.dag_run_ids)
with self.client.get(
f"/api/v1/dags/{dag_id}/dagRuns/{dag_run_id}",
catch_response=True,
name="check_dag_status"
) as response:
if response.status_code == 200:
data = response.json()
state = data.get("state")
if state in ["success", "failed"]:
self.dag_run_ids.remove((dag_id, dag_run_id))
response.success()
else:
response.failure(f"Failed to check DAG status: {response.status_code}")
@task(2)
def list_task_instances(self):
"""Query task instances for performance monitoring"""
if not self.dag_run_ids:
return
dag_id, dag_run_id = random.choice(self.dag_run_ids)
with self.client.get(
f"/api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances",
catch_response=True,
name="list_tasks"
) as response:
if response.status_code == 200:
response.success()
else:
response.failure(f"Failed to list tasks: {response.status_code}")
@task(1)
def get_dag_details(self):
"""Retrieve DAG configuration and metadata"""
dag_id = random.choice(self.dag_ids)
with self.client.get(
f"/api/v1/dags/{dag_id}/details",
catch_response=True,
name="get_dag_details"
) as response:
if response.status_code == 200:
response.success()
else:
response.failure(f"Failed to get DAG details: {response.status_code}")
@task(1)
def query_dag_runs_history(self):
"""Query historical DAG runs for performance analysis"""
dag_id = random.choice(self.dag_ids)
start_date = (datetime.now() - timedelta(days=7)).isoformat()
params = {
"limit": 50,
"start_date_gte": start_date,
"order_by": "-start_date"
}
with self.client.get(
f"/api/v1/dags/{dag_id}/dagRuns",
params=params,
catch_response=True,
name="query_history"
) as response:
if response.status_code == 200:
response.success()
else:
response.failure(f"Failed to query history: {response.status_code}")
Create performance metrics collection script
Build a monitoring script that collects detailed Airflow performance metrics during load testing for comprehensive analysis.
#!/usr/bin/env python3
import psycopg2
import json
import time
import pandas as pd
from datetime import datetime, timedelta
import matplotlib.pyplot as plt
import seaborn as sns
class AirflowMetricsCollector:
def __init__(self, db_host="localhost", db_name="airflow", db_user="airflow", db_password="airflow"):
self.db_config = {
"host": db_host,
"database": db_name,
"user": db_user,
"password": db_password,
"port": 5432
}
def get_db_connection(self):
return psycopg2.connect(**self.db_config)
def collect_dag_performance_metrics(self, start_time, end_time):
"""Collect DAG performance metrics from database"""
query = """
SELECT
dr.dag_id,
dr.dag_run_id,
dr.state,
dr.start_date,
dr.end_date,
EXTRACT(EPOCH FROM (dr.end_date - dr.start_date)) as duration_seconds,
COUNT(ti.task_id) as total_tasks,
COUNT(CASE WHEN ti.state = 'success' THEN 1 END) as successful_tasks,
COUNT(CASE WHEN ti.state = 'failed' THEN 1 END) as failed_tasks,
AVG(EXTRACT(EPOCH FROM (ti.end_date - ti.start_date))) as avg_task_duration
FROM dag_run dr
LEFT JOIN task_instance ti ON dr.dag_id = ti.dag_id AND dr.run_id = ti.run_id
WHERE dr.start_date >= %s AND dr.start_date <= %s
GROUP BY dr.dag_id, dr.dag_run_id, dr.state, dr.start_date, dr.end_date
ORDER BY dr.start_date DESC
"""
with self.get_db_connection() as conn:
df = pd.read_sql_query(query, conn, params=[start_time, end_time])
return df
def collect_system_metrics(self, start_time, end_time):
"""Collect system-level performance metrics"""
queries = {
'scheduler_performance': """
SELECT
DATE_TRUNC('minute', start_date) as time_bucket,
COUNT(*) as dag_runs_started,
AVG(EXTRACT(EPOCH FROM (end_date - start_date))) as avg_duration
FROM dag_run
WHERE start_date >= %s AND start_date <= %s
GROUP BY DATE_TRUNC('minute', start_date)
ORDER BY time_bucket
""",
'task_queue_metrics': """
SELECT
state,
COUNT(*) as task_count,
AVG(EXTRACT(EPOCH FROM (end_date - start_date))) as avg_duration
FROM task_instance
WHERE start_date >= %s AND start_date <= %s
GROUP BY state
"""
}
results = {}
with self.get_db_connection() as conn:
for metric_name, query in queries.items():
results[metric_name] = pd.read_sql_query(query, conn, params=[start_time, end_time])
return results
def generate_performance_report(self, metrics_data, output_dir="/opt/airflow-testing/reports"):
"""Generate comprehensive performance analysis report"""
import os
os.makedirs(output_dir, exist_ok=True)
# DAG Performance Summary
dag_metrics = metrics_data
# Create visualizations
plt.figure(figsize=(15, 10))
# DAG Duration Distribution
plt.subplot(2, 2, 1)
dag_metrics['duration_seconds'].hist(bins=30)
plt.title('DAG Duration Distribution')
plt.xlabel('Duration (seconds)')
plt.ylabel('Frequency')
# Success Rate by DAG
plt.subplot(2, 2, 2)
success_rate = dag_metrics.groupby('dag_id').agg({
'state': lambda x: (x == 'success').sum() / len(x) * 100
}).round(2)
success_rate['state'].plot(kind='bar')
plt.title('Success Rate by DAG')
plt.ylabel('Success Rate (%)')
plt.xticks(rotation=45)
# Task Performance
plt.subplot(2, 2, 3)
dag_metrics['avg_task_duration'].hist(bins=20)
plt.title('Average Task Duration Distribution')
plt.xlabel('Duration (seconds)')
plt.ylabel('Frequency')
# Timeline Analysis
plt.subplot(2, 2, 4)
dag_metrics['start_date'] = pd.to_datetime(dag_metrics['start_date'])
timeline_data = dag_metrics.set_index('start_date')['duration_seconds'].resample('1min').mean()
timeline_data.plot()
plt.title('DAG Duration Over Time')
plt.xlabel('Time')
plt.ylabel('Average Duration (seconds)')
plt.tight_layout()
plt.savefig(f'{output_dir}/performance_analysis.png', dpi=300, bbox_inches='tight')
plt.close()
# Generate summary statistics
summary = {
'total_dag_runs': len(dag_metrics),
'success_rate': (dag_metrics['state'] == 'success').sum() / len(dag_metrics) * 100,
'avg_duration': dag_metrics['duration_seconds'].mean(),
'p95_duration': dag_metrics['duration_seconds'].quantile(0.95),
'p99_duration': dag_metrics['duration_seconds'].quantile(0.99),
'failed_runs': (dag_metrics['state'] == 'failed').sum()
}
with open(f'{output_dir}/performance_summary.json', 'w') as f:
json.dump(summary, f, indent=2, default=str)
return summary
if __name__ == "__main__":
collector = AirflowMetricsCollector()
end_time = datetime.now()
start_time = end_time - timedelta(hours=1)
metrics = collector.collect_dag_performance_metrics(start_time, end_time)
system_metrics = collector.collect_system_metrics(start_time, end_time)
summary = collector.generate_performance_report(metrics)
print(json.dumps(summary, indent=2, default=str))
Configure Locust for distributed testing
Create a configuration file for running Locust in distributed mode to simulate higher loads across multiple workers.
[master]
master = true
master-bind-host = 0.0.0.0
master-bind-port = 5557
web-host = 0.0.0.0
web-port = 8089
[worker]
master-host = localhost
master-port = 5557
[runtime]
run-time = 300s
users = 50
spawn-rate = 2
host = http://localhost:8080
[logging]
loglevel = INFO
logfile = /opt/airflow-testing/logs/locust.log
Create test execution scripts
Build automated scripts to run comprehensive performance tests and collect metrics for analysis.
#!/bin/bash
set -e
Configuration
TEST_NAME="airflow_dag_performance_$(date +%Y%m%d_%H%M%S)"
TEST_DIR="/opt/airflow-testing"
REPORTS_DIR="$TEST_DIR/reports/$TEST_NAME"
LOGS_DIR="$TEST_DIR/logs"
USERS=${1:-20}
SPAWN_RATE=${2:-2}
RUN_TIME=${3:-300}
Create directories
mkdir -p "$REPORTS_DIR" "$LOGS_DIR"
echo "Starting Airflow DAG Performance Test: $TEST_NAME"
echo "Users: $USERS, Spawn Rate: $SPAWN_RATE, Duration: ${RUN_TIME}s"
Activate virtual environment
source "$TEST_DIR/venv/bin/activate"
Start metrics collection in background
echo "Starting metrics collection..."
python3 "$TEST_DIR/metrics_collector.py" > "$REPORTS_DIR/pre_test_metrics.json" 2>&1 &
METRICS_PID=$!
Run Locust test
echo "Starting Locust load test..."
locust \
--locustfile="$TEST_DIR/locustfile.py" \
--headless \
--users="$USERS" \
--spawn-rate="$SPAWN_RATE" \
--run-time="${RUN_TIME}s" \
--host="http://localhost:8080" \
--html="$REPORTS_DIR/locust_report.html" \
--csv="$REPORTS_DIR/locust_stats" \
--logfile="$LOGS_DIR/locust_$TEST_NAME.log" \
--loglevel=INFO
Wait a moment for final metrics
sleep 30
Stop metrics collection and generate final report
echo "Generating final performance report..."
kill $METRICS_PID 2>/dev/null || true
Generate comprehensive report
python3 "$TEST_DIR/metrics_collector.py" > "$REPORTS_DIR/post_test_metrics.json" 2>&1
Create summary report
cat > "$REPORTS_DIR/test_summary.txt" << EOF
Airflow DAG Performance Test Summary
===================================
Test Name: $TEST_NAME
Date: $(date)
Users: $USERS
Spawn Rate: $SPAWN_RATE
Duration: ${RUN_TIME}s
Files Generated:
- locust_report.html: Detailed Locust performance report
- locust_stats_*.csv: Raw performance data
- performance_analysis.png: Performance visualizations
- performance_summary.json: Key metrics summary
Logs Location: $LOGS_DIR/locust_$TEST_NAME.log
EOF
echo "Performance test completed. Reports available in: $REPORTS_DIR"
echo "View the HTML report: file://$REPORTS_DIR/locust_report.html"
Set up CI/CD integration
Create a GitLab CI pipeline configuration for automated performance testing in your deployment workflow.
stages:
- performance-test
- report
variables:
AIRFLOW_HOME: "/opt/airflow"
TEST_USERS: "10"
TEST_DURATION: "180"
PERFORMANCE_THRESHOLD_P95: "60"
performance-test:
stage: performance-test
image: python:3.11-slim
services:
- postgres:13
variables:
POSTGRES_DB: airflow_test
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
DATABASE_URL: postgresql://airflow:airflow@postgres:5432/airflow_test
before_script:
- apt-get update && apt-get install -y git
- pip install locust apache-airflow-client psycopg2-binary pandas matplotlib seaborn
- git clone https://gitlab.com/your-org/airflow-dags.git /tmp/dags
script:
- cd /opt/airflow-testing
- source venv/bin/activate
- chmod +x run_performance_test.sh
- ./run_performance_test.sh $TEST_USERS 2 $TEST_DURATION
- python3 -c "
import json;
with open('reports/*/performance_summary.json') as f:
data = json.load(f);
p95 = data['p95_duration'];
threshold = float('$PERFORMANCE_THRESHOLD_P95');
print(f'P95 Duration: {p95}s, Threshold: {threshold}s');
exit(1 if p95 > threshold else 0)
"
artifacts:
when: always
reports:
junit: reports/*/junit.xml
paths:
- reports/
expire_in: 1 week
only:
- merge_requests
- main
generate-report:
stage: report
dependencies:
- performance-test
script:
- echo "Performance test completed successfully"
- ls -la reports/
artifacts:
reports:
junit: reports/*/junit.xml
paths:
- reports/
only:
- main
Configure monitoring and alerting
Set up Prometheus metrics collection for continuous performance monitoring of your Airflow instance.
#!/usr/bin/env python3
from prometheus_client import start_http_server, Gauge, Counter, Histogram
import psycopg2
import time
import threading
from datetime import datetime, timedelta
class AirflowPrometheusExporter:
def __init__(self, db_config, port=8000):
self.db_config = db_config
self.port = port
# Define Prometheus metrics
self.dag_runs_total = Counter('airflow_dag_runs_total', 'Total DAG runs', ['dag_id', 'state'])
self.dag_run_duration = Histogram('airflow_dag_run_duration_seconds', 'DAG run duration', ['dag_id'])
self.active_dag_runs = Gauge('airflow_active_dag_runs', 'Currently running DAGs', ['dag_id'])
self.task_instances_total = Counter('airflow_task_instances_total', 'Total task instances', ['dag_id', 'task_id', 'state'])
self.scheduler_heartbeat = Gauge('airflow_scheduler_heartbeat', 'Scheduler last heartbeat timestamp')
def collect_metrics(self):
"""Collect metrics from Airflow database"""
try:
with psycopg2.connect(**self.db_config) as conn:
with conn.cursor() as cur:
# DAG run metrics
cur.execute("""
SELECT dag_id, state, COUNT(*),
AVG(EXTRACT(EPOCH FROM (end_date - start_date)))
FROM dag_run
WHERE start_date >= NOW() - INTERVAL '1 hour'
GROUP BY dag_id, state
""")
for dag_id, state, count, avg_duration in cur.fetchall():
self.dag_runs_total.labels(dag_id=dag_id, state=state).inc(count)
if avg_duration:
self.dag_run_duration.labels(dag_id=dag_id).observe(avg_duration)
# Active DAG runs
cur.execute("""
SELECT dag_id, COUNT(*)
FROM dag_run
WHERE state = 'running'
GROUP BY dag_id
""")
# Reset gauge values
for dag_id, count in cur.fetchall():
self.active_dag_runs.labels(dag_id=dag_id).set(count)
# Scheduler heartbeat
cur.execute("SELECT EXTRACT(EPOCH FROM MAX(latest_heartbeat)) FROM job WHERE job_type = 'SchedulerJob'")
heartbeat = cur.fetchone()[0]
if heartbeat:
self.scheduler_heartbeat.set(heartbeat)
except Exception as e:
print(f"Error collecting metrics: {e}")
def run(self):
"""Start the Prometheus metrics server"""
start_http_server(self.port)
print(f"Prometheus exporter started on port {self.port}")
while True:
self.collect_metrics()
time.sleep(30)
if __name__ == "__main__":
db_config = {
"host": "localhost",
"database": "airflow",
"user": "airflow",
"password": "airflow",
"port": 5432
}
exporter = AirflowPrometheusExporter(db_config)
exporter.run()
Set correct permissions
Configure appropriate permissions for the testing environment and make scripts executable.
sudo chown -R airflow:airflow /opt/airflow-testing
chmod +x /opt/airflow-testing/run_performance_test.sh
chmod +x /opt/airflow-testing/metrics_collector.py
chmod +x /opt/airflow-testing/prometheus_exporter.py
mkdir -p /opt/airflow-testing/{logs,reports}
chmod 755 /opt/airflow-testing/{logs,reports}
Configure load testing metrics and monitoring
Set up Grafana dashboards
Create comprehensive Grafana dashboards to visualize Airflow performance metrics during load testing. This complements our existing Airflow monitoring setup.
{
"dashboard": {
"id": null,
"title": "Airflow DAG Performance Testing",
"tags": ["airflow", "performance", "testing"],
"panels": [
{
"id": 1,
"title": "DAG Run Success Rate",
"type": "stat",
"targets": [
{
"expr": "rate(airflow_dag_runs_total{state=\"success\"}[5m]) / rate(airflow_dag_runs_total[5m]) * 100",
"legendFormat": "Success Rate %"
}
],
"gridPos": {"h": 8, "w": 12, "x": 0, "y": 0}
},
{
"id": 2,
"title": "DAG Run Duration (P95)",
"type": "graph",
"targets": [
{
"expr": "histogram_quantile(0.95, airflow_dag_run_duration_seconds)",
"legendFormat": "P95 Duration"
}
],
"gridPos": {"h": 8, "w": 12, "x": 12, "y": 0}
},
{
"id": 3,
"title": "Active DAG Runs",
"type": "graph",
"targets": [
{
"expr": "sum(airflow_active_dag_runs) by (dag_id)",
"legendFormat": "{{dag_id}}"
}
],
"gridPos": {"h": 8, "w": 24, "x": 0, "y": 8}
}
],
"time": {
"from": "now-1h",
"to": "now"
},
"refresh": "10s"
}
}
Configure performance alerts
Set up automated alerts for performance degradation during load testing.
groups:
- name: airflow_performance
rules:
- alert: AirflowDAGHighFailureRate
expr: |
(
rate(airflow_dag_runs_total{state="failed"}[5m]) /
rate(airflow_dag_runs_total[5m])
) * 100 > 10
for: 2m
labels:
severity: warning
annotations:
summary: "High DAG failure rate detected"
description: "DAG failure rate is {{ $value }}% for {{ $labels.dag_id }}"
- alert: AirflowDAGSlowExecution
expr: |
histogram_quantile(0.95, airflow_dag_run_duration_seconds) > 300
for: 5m
labels:
severity: warning
annotations:
summary: "DAG execution is slower than expected"
description: "P95 DAG duration is {{ $value }}s for {{ $labels.dag_id }}"
- alert: AirflowSchedulerDown
expr: |
time() - airflow_scheduler_heartbeat > 300
for: 1m
labels:
severity: critical
annotations:
summary: "Airflow scheduler is not responding"
description: "Scheduler last heartbeat was {{ $value }}s ago"
Verify your setup
Test your performance testing configuration with a basic load test scenario.
cd /opt/airflow-testing
source venv/bin/activate
Test Locust configuration
locust --version
Verify Airflow API connectivity
curl -u perf_test:SecureTestPass123! http://localhost:8080/api/v1/dags
Run a short performance test
./run_performance_test.sh 5 1 60
Check generated reports
ls -la reports/
Verify metrics collection
python3 metrics_collector.py
Run your first performance test
Execute a comprehensive performance test to baseline your Airflow DAG performance.
# Run performance test with 20 users for 5 minutes
./run_performance_test.sh 20 2 300
Start Prometheus metrics exporter
nohup python3 prometheus_exporter.py &
Monitor test progress with Locust web UI (if running interactively)
locust --locustfile=locustfile.py --host=http://localhost:8080
Common issues
| Symptom | Cause | Fix |
|---|---|---|
| Authentication failed (401) | Invalid Airflow credentials | Verify user exists: airflow users list |
| Connection refused to Airflow | Airflow webserver not running | Start webserver: airflow webserver -D |
| Database connection errors | PostgreSQL not accessible | Check connection: psql -h localhost -U airflow -d airflow |
| No DAGs found in tests | DAG IDs don't exist | Update dag_ids list in locustfile.py with actual DAG names |
| Permission denied on reports | Incorrect directory ownership | Fix ownership: sudo chown -R airflow:airflow /opt/airflow-testing |
| Metrics collection failing | Database query permissions | Grant read access: GRANT SELECT ON ALL TABLES IN SCHEMA public TO perf_test; |
| High test failure rates | Airflow overloaded | Reduce user count and spawn rate in test configuration |
Next steps
Automated install script
Run this to automate the entire setup
#!/usr/bin/env bash
set -euo pipefail
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color
# Default values
AIRFLOW_HOME="${1:-/opt/airflow}"
TESTING_DIR="/opt/airflow-testing"
LOCUST_USER="${2:-perf_test}"
LOCUST_PASS="${3:-SecureTestPass123!}"
# Usage message
usage() {
echo "Usage: $0 [AIRFLOW_HOME] [TEST_USER] [TEST_PASSWORD]"
echo "Example: $0 /opt/airflow perf_test MyPassword123"
exit 1
}
# Error handling and cleanup
cleanup() {
echo -e "${RED}[ERROR] Script failed. Cleaning up...${NC}"
if [ -d "$TESTING_DIR" ]; then
rm -rf "$TESTING_DIR"
fi
}
trap cleanup ERR
# Check if running as root or with sudo
check_privileges() {
if [ "$EUID" -ne 0 ]; then
echo -e "${RED}This script must be run as root or with sudo${NC}"
exit 1
fi
}
# Detect distribution and package manager
detect_distro() {
if [ -f /etc/os-release ]; then
. /etc/os-release
case "$ID" in
ubuntu|debian)
PKG_MGR="apt"
PKG_UPDATE="apt update"
PKG_INSTALL="apt install -y"
PKG_UPGRADE="apt upgrade -y"
PYTHON_VENV="python3-venv"
;;
almalinux|rocky|centos|rhel|ol|fedora)
PKG_MGR="dnf"
PKG_UPDATE="dnf check-update || true"
PKG_INSTALL="dnf install -y"
PKG_UPGRADE="dnf upgrade -y"
PYTHON_VENV="python3-virtualenv"
;;
amzn)
PKG_MGR="yum"
PKG_UPDATE="yum check-update || true"
PKG_INSTALL="yum install -y"
PKG_UPGRADE="yum upgrade -y"
PYTHON_VENV="python3-virtualenv"
;;
*)
echo -e "${RED}Unsupported distribution: $ID${NC}"
exit 1
;;
esac
else
echo -e "${RED}/etc/os-release not found. Cannot detect distribution.${NC}"
exit 1
fi
}
# Validate arguments
validate_args() {
if [ $# -gt 3 ]; then
usage
fi
if [ ! -d "$AIRFLOW_HOME" ]; then
echo -e "${RED}Airflow home directory $AIRFLOW_HOME does not exist${NC}"
echo "Please install Apache Airflow first or provide correct path"
exit 1
fi
}
# Update system packages
update_system() {
echo -e "${GREEN}[1/8] Updating system packages...${NC}"
$PKG_UPDATE
$PKG_UPGRADE
echo -e "${GREEN}[2/8] Installing required system packages...${NC}"
$PKG_INSTALL python3-pip $PYTHON_VENV git postgresql-devel 2>/dev/null || \
$PKG_INSTALL python3-pip $PYTHON_VENV git libpq-dev 2>/dev/null || \
$PKG_INSTALL python3-pip $PYTHON_VENV git postgresql-libs
}
# Create testing environment
create_testing_env() {
echo -e "${GREEN}[3/8] Creating dedicated testing environment...${NC}"
mkdir -p "$TESTING_DIR"
cd "$TESTING_DIR"
python3 -m venv venv
chown -R airflow:airflow "$TESTING_DIR" 2>/dev/null || \
chown -R $(whoami):$(whoami) "$TESTING_DIR"
chmod 755 "$TESTING_DIR"
source venv/bin/activate
pip install --upgrade pip
}
# Install Python dependencies
install_dependencies() {
echo -e "${GREEN}[4/8] Installing Locust and dependencies...${NC}"
cd "$TESTING_DIR"
source venv/bin/activate
pip install locust==2.17.0 apache-airflow-client==2.8.0 requests psycopg2-binary pandas matplotlib seaborn
}
# Configure Airflow API user
configure_airflow_user() {
echo -e "${GREEN}[5/8] Configuring Airflow API authentication...${NC}"
export AIRFLOW_HOME="$AIRFLOW_HOME"
source "$AIRFLOW_HOME/venv/bin/activate" 2>/dev/null || echo -e "${YELLOW}Warning: Airflow venv not found at expected location${NC}"
airflow users create \
--username "$LOCUST_USER" \
--firstname Performance \
--lastname Tester \
--role Admin \
--email perf@example.com \
--password "$LOCUST_PASS" 2>/dev/null || echo -e "${YELLOW}User may already exist${NC}"
}
# Create Locust test file
create_locust_tests() {
echo -e "${GREEN}[6/8] Creating Locust performance test scenarios...${NC}"
cat > "$TESTING_DIR/locustfile.py" << 'EOF'
import json
import time
import random
from locust import HttpUser, task, between
from datetime import datetime, timedelta
import base64
class AirflowDAGUser(HttpUser):
wait_time = between(1, 3)
def on_start(self):
"""Set up authentication and test data"""
self.username = "perf_test"
self.password = "SecureTestPass123!"
self.dag_ids = ["example_dag_1", "example_dag_2", "data_pipeline_dag"]
self.dag_run_ids = []
# Create basic auth header
credentials = base64.b64encode(f"{self.username}:{self.password}".encode()).decode()
self.client.headers.update({"Authorization": f"Basic {credentials}"})
# Verify connection
response = self.client.get("/api/v1/dags")
if response.status_code != 200:
print(f"Authentication failed: {response.status_code}")
@task(3)
def trigger_dag_run(self):
"""Trigger DAG execution with configuration"""
dag_id = random.choice(self.dag_ids)
dag_run_id = f"perf_test_{int(time.time())}_{random.randint(1000, 9999)}"
payload = {
"dag_run_id": dag_run_id,
"conf": {
"test_mode": True,
"batch_size": random.randint(100, 1000)
}
}
with self.client.post(
f"/api/v1/dags/{dag_id}/dagRuns",
json=payload,
catch_response=True,
name="trigger_dag"
) as response:
if response.status_code == 200:
self.dag_run_ids.append((dag_id, dag_run_id))
response.success()
else:
response.failure(f"Failed to trigger DAG: {response.status_code}")
@task(2)
def check_dag_run_status(self):
"""Monitor DAG run execution status"""
if not self.dag_run_ids:
return
dag_id, dag_run_id = random.choice(self.dag_run_ids)
with self.client.get(
f"/api/v1/dags/{dag_id}/dagRuns/{dag_run_id}",
catch_response=True,
name="check_status"
) as response:
if response.status_code == 200:
response.success()
else:
response.failure(f"Failed to check status: {response.status_code}")
@task(1)
def list_dags(self):
"""List available DAGs"""
with self.client.get("/api/v1/dags", catch_response=True, name="list_dags") as response:
if response.status_code == 200:
response.success()
else:
response.failure(f"Failed to list DAGs: {response.status_code}")
EOF
chmod 644 "$TESTING_DIR/locustfile.py"
}
# Create helper scripts
create_helper_scripts() {
echo -e "${GREEN}[7/8] Creating helper scripts...${NC}"
# Start script
cat > "$TESTING_DIR/start_test.sh" << EOF
#!/bin/bash
cd $TESTING_DIR
source venv/bin/activate
locust --host=http://localhost:8080 --web-host=0.0.0.0 --web-port=8089
EOF
# Performance monitoring script
cat > "$TESTING_DIR/monitor.py" << 'EOF'
#!/usr/bin/env python3
import psycopg2
import time
import json
from datetime import datetime
def monitor_airflow_performance():
"""Monitor Airflow database performance during testing"""
try:
conn = psycopg2.connect(
host="localhost",
database="airflow",
user="airflow",
password="airflow"
)
while True:
with conn.cursor() as cur:
cur.execute("""
SELECT state, COUNT(*)
FROM dag_run
WHERE execution_date > NOW() - INTERVAL '1 hour'
GROUP BY state
""")
results = cur.fetchall()
timestamp = datetime.now().isoformat()
print(f"[{timestamp}] DAG Run States:")
for state, count in results:
print(f" {state}: {count}")
time.sleep(30)
except KeyboardInterrupt:
print("\nMonitoring stopped")
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
monitor_airflow_performance()
EOF
chmod 755 "$TESTING_DIR/start_test.sh"
chmod 755 "$TESTING_DIR/monitor.py"
chown -R airflow:airflow "$TESTING_DIR" 2>/dev/null || \
chown -R $(whoami):$(whoami) "$TESTING_DIR"
}
# Verify installation
verify_installation() {
echo -e "${GREEN}[8/8] Verifying installation...${NC}"
cd "$TESTING_DIR"
source venv/bin/activate
# Check Locust installation
if locust --version > /dev/null 2>&1; then
echo -e "${GREEN}✓ Locust installed successfully${NC}"
else
echo -e "${RED}✗ Locust installation failed${NC}"
exit 1
fi
# Check Python dependencies
python3 -c "import locust, airflow_client.client, requests, psycopg2" 2>/dev/null && \
echo -e "${GREEN}✓ Python dependencies installed${NC}" || \
echo -e "${RED}✗ Some Python dependencies missing${NC}"
# Check test files
[ -f "$TESTING_DIR/locustfile.py" ] && echo -e "${GREEN}✓ Locust test file created${NC}"
[ -f "$TESTING_DIR/start_test.sh" ] && echo -e "${GREEN}✓ Helper scripts created${NC}"
echo -e "${GREEN}Installation completed successfully!${NC}"
echo -e "${YELLOW}Next steps:${NC}"
echo "1. cd $TESTING_DIR"
echo "2. source venv/bin/activate"
echo "3. ./start_test.sh"
echo "4. Open http://localhost:8089 to access Locust web interface"
}
# Main execution
main() {
check_privileges
validate_args "$@"
detect_distro
update_system
create_testing_env
install_dependencies
configure_airflow_user
create_locust_tests
create_helper_scripts
verify_installation
}
main "$@"
Review the script before running. Execute with: bash install.sh