Set up Apache Airflow DAG performance testing with Locust load testing framework

Advanced 45 min Apr 03, 2026 80 views
Ubuntu 24.04 Debian 12 AlmaLinux 9 Rocky Linux 9

Learn to configure Locust for comprehensive Apache Airflow DAG performance testing, including automated load testing scenarios, metrics collection, and CI/CD integration for production-grade workflow optimization.

Prerequisites

  • Existing Apache Airflow installation
  • PostgreSQL database
  • Python 3.8+
  • Root or sudo access
  • 4GB+ RAM recommended

What this solves

Apache Airflow DAG performance testing ensures your workflows can handle production workloads without failures or bottlenecks. This tutorial shows you how to set up Locust, a Python-based load testing framework, to simulate realistic DAG execution scenarios, measure performance metrics, and identify optimization opportunities before deploying to production.

Prerequisites

You'll need an existing Apache Airflow installation with PostgreSQL backend. If you don't have Airflow set up yet, follow our Apache Airflow installation guide first.

Step-by-step installation

Update system packages

Start by updating your package manager to ensure you have the latest security patches and dependencies.

sudo apt update && sudo apt upgrade -y
sudo apt install -y python3-pip python3-venv git
sudo dnf update -y
sudo dnf install -y python3-pip python3-virtualenv git

Create dedicated testing environment

Create a separate directory and virtual environment for your Airflow performance testing setup to isolate dependencies.

mkdir -p /opt/airflow-testing
cd /opt/airflow-testing
python3 -m venv venv
source venv/bin/activate

Install Locust and dependencies

Install Locust along with Apache Airflow client libraries and monitoring tools for comprehensive performance testing.

pip install --upgrade pip
pip install locust==2.17.0 apache-airflow-client==2.8.0 requests psycopg2-binary pandas matplotlib seaborn

Configure Airflow API authentication

Create a dedicated user for performance testing with appropriate permissions to trigger and monitor DAGs.

export AIRFLOW_HOME=/opt/airflow
source /opt/airflow/venv/bin/activate
airflow users create \
    --username perf_test \
    --firstname Performance \
    --lastname Tester \
    --role Admin \
    --email perf@example.com \
    --password SecureTestPass123!

Create Locust performance test scenarios

Create a comprehensive Locust test file that simulates realistic DAG operations including triggering, monitoring, and querying execution status.

import json
import time
import random
from locust import HttpUser, task, between
from datetime import datetime, timedelta
import base64

class AirflowDAGUser(HttpUser):
    wait_time = between(1, 3)
    
    def on_start(self):
        """Set up authentication and test data"""
        self.username = "perf_test"
        self.password = "SecureTestPass123!"
        self.dag_ids = ["example_dag_1", "example_dag_2", "data_pipeline_dag"]
        self.dag_run_ids = []
        
        # Create basic auth header
        credentials = base64.b64encode(f"{self.username}:{self.password}".encode()).decode()
        self.client.headers.update({"Authorization": f"Basic {credentials}"})
        
        # Verify connection
        response = self.client.get("/api/v1/dags")
        if response.status_code != 200:
            print(f"Authentication failed: {response.status_code}")
    
    @task(3)
    def trigger_dag_run(self):
        """Trigger DAG execution with configuration"""
        dag_id = random.choice(self.dag_ids)
        dag_run_id = f"perf_test_{int(time.time())}_{random.randint(1000, 9999)}"
        
        payload = {
            "dag_run_id": dag_run_id,
            "conf": {
                "test_mode": True,
                "batch_size": random.randint(100, 1000)
            }
        }
        
        with self.client.post(
            f"/api/v1/dags/{dag_id}/dagRuns",
            json=payload,
            catch_response=True,
            name="trigger_dag"
        ) as response:
            if response.status_code == 200:
                self.dag_run_ids.append((dag_id, dag_run_id))
                response.success()
            else:
                response.failure(f"Failed to trigger DAG: {response.status_code}")
    
    @task(2)
    def check_dag_run_status(self):
        """Monitor DAG run execution status"""
        if not self.dag_run_ids:
            return
        
        dag_id, dag_run_id = random.choice(self.dag_run_ids)
        
        with self.client.get(
            f"/api/v1/dags/{dag_id}/dagRuns/{dag_run_id}",
            catch_response=True,
            name="check_dag_status"
        ) as response:
            if response.status_code == 200:
                data = response.json()
                state = data.get("state")
                if state in ["success", "failed"]:
                    self.dag_run_ids.remove((dag_id, dag_run_id))
                response.success()
            else:
                response.failure(f"Failed to check DAG status: {response.status_code}")
    
    @task(2)
    def list_task_instances(self):
        """Query task instances for performance monitoring"""
        if not self.dag_run_ids:
            return
        
        dag_id, dag_run_id = random.choice(self.dag_run_ids)
        
        with self.client.get(
            f"/api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances",
            catch_response=True,
            name="list_tasks"
        ) as response:
            if response.status_code == 200:
                response.success()
            else:
                response.failure(f"Failed to list tasks: {response.status_code}")
    
    @task(1)
    def get_dag_details(self):
        """Retrieve DAG configuration and metadata"""
        dag_id = random.choice(self.dag_ids)
        
        with self.client.get(
            f"/api/v1/dags/{dag_id}/details",
            catch_response=True,
            name="get_dag_details"
        ) as response:
            if response.status_code == 200:
                response.success()
            else:
                response.failure(f"Failed to get DAG details: {response.status_code}")
    
    @task(1)
    def query_dag_runs_history(self):
        """Query historical DAG runs for performance analysis"""
        dag_id = random.choice(self.dag_ids)
        start_date = (datetime.now() - timedelta(days=7)).isoformat()
        
        params = {
            "limit": 50,
            "start_date_gte": start_date,
            "order_by": "-start_date"
        }
        
        with self.client.get(
            f"/api/v1/dags/{dag_id}/dagRuns",
            params=params,
            catch_response=True,
            name="query_history"
        ) as response:
            if response.status_code == 200:
                response.success()
            else:
                response.failure(f"Failed to query history: {response.status_code}")

Create performance metrics collection script

Build a monitoring script that collects detailed Airflow performance metrics during load testing for comprehensive analysis.

#!/usr/bin/env python3
import psycopg2
import json
import time
import pandas as pd
from datetime import datetime, timedelta
import matplotlib.pyplot as plt
import seaborn as sns

class AirflowMetricsCollector:
    def __init__(self, db_host="localhost", db_name="airflow", db_user="airflow", db_password="airflow"):
        self.db_config = {
            "host": db_host,
            "database": db_name,
            "user": db_user,
            "password": db_password,
            "port": 5432
        }
    
    def get_db_connection(self):
        return psycopg2.connect(**self.db_config)
    
    def collect_dag_performance_metrics(self, start_time, end_time):
        """Collect DAG performance metrics from database"""
        query = """
        SELECT 
            dr.dag_id,
            dr.dag_run_id,
            dr.state,
            dr.start_date,
            dr.end_date,
            EXTRACT(EPOCH FROM (dr.end_date - dr.start_date)) as duration_seconds,
            COUNT(ti.task_id) as total_tasks,
            COUNT(CASE WHEN ti.state = 'success' THEN 1 END) as successful_tasks,
            COUNT(CASE WHEN ti.state = 'failed' THEN 1 END) as failed_tasks,
            AVG(EXTRACT(EPOCH FROM (ti.end_date - ti.start_date))) as avg_task_duration
        FROM dag_run dr
        LEFT JOIN task_instance ti ON dr.dag_id = ti.dag_id AND dr.run_id = ti.run_id
        WHERE dr.start_date >= %s AND dr.start_date <= %s
        GROUP BY dr.dag_id, dr.dag_run_id, dr.state, dr.start_date, dr.end_date
        ORDER BY dr.start_date DESC
        """
        
        with self.get_db_connection() as conn:
            df = pd.read_sql_query(query, conn, params=[start_time, end_time])
        
        return df
    
    def collect_system_metrics(self, start_time, end_time):
        """Collect system-level performance metrics"""
        queries = {
            'scheduler_performance': """
                SELECT 
                    DATE_TRUNC('minute', start_date) as time_bucket,
                    COUNT(*) as dag_runs_started,
                    AVG(EXTRACT(EPOCH FROM (end_date - start_date))) as avg_duration
                FROM dag_run 
                WHERE start_date >= %s AND start_date <= %s
                GROUP BY DATE_TRUNC('minute', start_date)
                ORDER BY time_bucket
            """,
            'task_queue_metrics': """
                SELECT 
                    state,
                    COUNT(*) as task_count,
                    AVG(EXTRACT(EPOCH FROM (end_date - start_date))) as avg_duration
                FROM task_instance 
                WHERE start_date >= %s AND start_date <= %s
                GROUP BY state
            """
        }
        
        results = {}
        with self.get_db_connection() as conn:
            for metric_name, query in queries.items():
                results[metric_name] = pd.read_sql_query(query, conn, params=[start_time, end_time])
        
        return results
    
    def generate_performance_report(self, metrics_data, output_dir="/opt/airflow-testing/reports"):
        """Generate comprehensive performance analysis report"""
        import os
        os.makedirs(output_dir, exist_ok=True)
        
        # DAG Performance Summary
        dag_metrics = metrics_data
        
        # Create visualizations
        plt.figure(figsize=(15, 10))
        
        # DAG Duration Distribution
        plt.subplot(2, 2, 1)
        dag_metrics['duration_seconds'].hist(bins=30)
        plt.title('DAG Duration Distribution')
        plt.xlabel('Duration (seconds)')
        plt.ylabel('Frequency')
        
        # Success Rate by DAG
        plt.subplot(2, 2, 2)
        success_rate = dag_metrics.groupby('dag_id').agg({
            'state': lambda x: (x == 'success').sum() / len(x) * 100
        }).round(2)
        success_rate['state'].plot(kind='bar')
        plt.title('Success Rate by DAG')
        plt.ylabel('Success Rate (%)')
        plt.xticks(rotation=45)
        
        # Task Performance
        plt.subplot(2, 2, 3)
        dag_metrics['avg_task_duration'].hist(bins=20)
        plt.title('Average Task Duration Distribution')
        plt.xlabel('Duration (seconds)')
        plt.ylabel('Frequency')
        
        # Timeline Analysis
        plt.subplot(2, 2, 4)
        dag_metrics['start_date'] = pd.to_datetime(dag_metrics['start_date'])
        timeline_data = dag_metrics.set_index('start_date')['duration_seconds'].resample('1min').mean()
        timeline_data.plot()
        plt.title('DAG Duration Over Time')
        plt.xlabel('Time')
        plt.ylabel('Average Duration (seconds)')
        
        plt.tight_layout()
        plt.savefig(f'{output_dir}/performance_analysis.png', dpi=300, bbox_inches='tight')
        plt.close()
        
        # Generate summary statistics
        summary = {
            'total_dag_runs': len(dag_metrics),
            'success_rate': (dag_metrics['state'] == 'success').sum() / len(dag_metrics) * 100,
            'avg_duration': dag_metrics['duration_seconds'].mean(),
            'p95_duration': dag_metrics['duration_seconds'].quantile(0.95),
            'p99_duration': dag_metrics['duration_seconds'].quantile(0.99),
            'failed_runs': (dag_metrics['state'] == 'failed').sum()
        }
        
        with open(f'{output_dir}/performance_summary.json', 'w') as f:
            json.dump(summary, f, indent=2, default=str)
        
        return summary

if __name__ == "__main__":
    collector = AirflowMetricsCollector()
    end_time = datetime.now()
    start_time = end_time - timedelta(hours=1)
    
    metrics = collector.collect_dag_performance_metrics(start_time, end_time)
    system_metrics = collector.collect_system_metrics(start_time, end_time)
    
    summary = collector.generate_performance_report(metrics)
    print(json.dumps(summary, indent=2, default=str))

Configure Locust for distributed testing

Create a configuration file for running Locust in distributed mode to simulate higher loads across multiple workers.

[master]
master = true
master-bind-host = 0.0.0.0
master-bind-port = 5557
web-host = 0.0.0.0
web-port = 8089

[worker]
master-host = localhost
master-port = 5557

[runtime]
run-time = 300s
users = 50
spawn-rate = 2
host = http://localhost:8080

[logging]
loglevel = INFO
logfile = /opt/airflow-testing/logs/locust.log

Create test execution scripts

Build automated scripts to run comprehensive performance tests and collect metrics for analysis.

#!/bin/bash

set -e

Configuration

TEST_NAME="airflow_dag_performance_$(date +%Y%m%d_%H%M%S)" TEST_DIR="/opt/airflow-testing" REPORTS_DIR="$TEST_DIR/reports/$TEST_NAME" LOGS_DIR="$TEST_DIR/logs" USERS=${1:-20} SPAWN_RATE=${2:-2} RUN_TIME=${3:-300}

Create directories

mkdir -p "$REPORTS_DIR" "$LOGS_DIR" echo "Starting Airflow DAG Performance Test: $TEST_NAME" echo "Users: $USERS, Spawn Rate: $SPAWN_RATE, Duration: ${RUN_TIME}s"

Activate virtual environment

source "$TEST_DIR/venv/bin/activate"

Start metrics collection in background

echo "Starting metrics collection..." python3 "$TEST_DIR/metrics_collector.py" > "$REPORTS_DIR/pre_test_metrics.json" 2>&1 & METRICS_PID=$!

Run Locust test

echo "Starting Locust load test..." locust \ --locustfile="$TEST_DIR/locustfile.py" \ --headless \ --users="$USERS" \ --spawn-rate="$SPAWN_RATE" \ --run-time="${RUN_TIME}s" \ --host="http://localhost:8080" \ --html="$REPORTS_DIR/locust_report.html" \ --csv="$REPORTS_DIR/locust_stats" \ --logfile="$LOGS_DIR/locust_$TEST_NAME.log" \ --loglevel=INFO

Wait a moment for final metrics

sleep 30

Stop metrics collection and generate final report

echo "Generating final performance report..." kill $METRICS_PID 2>/dev/null || true

Generate comprehensive report

python3 "$TEST_DIR/metrics_collector.py" > "$REPORTS_DIR/post_test_metrics.json" 2>&1

Create summary report

cat > "$REPORTS_DIR/test_summary.txt" << EOF Airflow DAG Performance Test Summary =================================== Test Name: $TEST_NAME Date: $(date) Users: $USERS Spawn Rate: $SPAWN_RATE Duration: ${RUN_TIME}s Files Generated:
  • locust_report.html: Detailed Locust performance report
  • locust_stats_*.csv: Raw performance data
  • performance_analysis.png: Performance visualizations
  • performance_summary.json: Key metrics summary
Logs Location: $LOGS_DIR/locust_$TEST_NAME.log EOF echo "Performance test completed. Reports available in: $REPORTS_DIR" echo "View the HTML report: file://$REPORTS_DIR/locust_report.html"

Set up CI/CD integration

Create a GitLab CI pipeline configuration for automated performance testing in your deployment workflow.

stages:
  - performance-test
  - report

variables:
  AIRFLOW_HOME: "/opt/airflow"
  TEST_USERS: "10"
  TEST_DURATION: "180"
  PERFORMANCE_THRESHOLD_P95: "60"

performance-test:
  stage: performance-test
  image: python:3.11-slim
  services:
    - postgres:13
  variables:
    POSTGRES_DB: airflow_test
    POSTGRES_USER: airflow
    POSTGRES_PASSWORD: airflow
    DATABASE_URL: postgresql://airflow:airflow@postgres:5432/airflow_test
  before_script:
    - apt-get update && apt-get install -y git
    - pip install locust apache-airflow-client psycopg2-binary pandas matplotlib seaborn
    - git clone https://gitlab.com/your-org/airflow-dags.git /tmp/dags
  script:
    - cd /opt/airflow-testing
    - source venv/bin/activate
    - chmod +x run_performance_test.sh
    - ./run_performance_test.sh $TEST_USERS 2 $TEST_DURATION
    - python3 -c "
      import json;
      with open('reports/*/performance_summary.json') as f:
        data = json.load(f);
        p95 = data['p95_duration'];
        threshold = float('$PERFORMANCE_THRESHOLD_P95');
        print(f'P95 Duration: {p95}s, Threshold: {threshold}s');
        exit(1 if p95 > threshold else 0)
      "
  artifacts:
    when: always
    reports:
      junit: reports/*/junit.xml
    paths:
      - reports/
    expire_in: 1 week
  only:
    - merge_requests
    - main

generate-report:
  stage: report
  dependencies:
    - performance-test
  script:
    - echo "Performance test completed successfully"
    - ls -la reports/
  artifacts:
    reports:
      junit: reports/*/junit.xml
    paths:
      - reports/
  only:
    - main

Configure monitoring and alerting

Set up Prometheus metrics collection for continuous performance monitoring of your Airflow instance.

#!/usr/bin/env python3
from prometheus_client import start_http_server, Gauge, Counter, Histogram
import psycopg2
import time
import threading
from datetime import datetime, timedelta

class AirflowPrometheusExporter:
    def __init__(self, db_config, port=8000):
        self.db_config = db_config
        self.port = port
        
        # Define Prometheus metrics
        self.dag_runs_total = Counter('airflow_dag_runs_total', 'Total DAG runs', ['dag_id', 'state'])
        self.dag_run_duration = Histogram('airflow_dag_run_duration_seconds', 'DAG run duration', ['dag_id'])
        self.active_dag_runs = Gauge('airflow_active_dag_runs', 'Currently running DAGs', ['dag_id'])
        self.task_instances_total = Counter('airflow_task_instances_total', 'Total task instances', ['dag_id', 'task_id', 'state'])
        self.scheduler_heartbeat = Gauge('airflow_scheduler_heartbeat', 'Scheduler last heartbeat timestamp')
        
    def collect_metrics(self):
        """Collect metrics from Airflow database"""
        try:
            with psycopg2.connect(**self.db_config) as conn:
                with conn.cursor() as cur:
                    # DAG run metrics
                    cur.execute("""
                        SELECT dag_id, state, COUNT(*),
                               AVG(EXTRACT(EPOCH FROM (end_date - start_date)))
                        FROM dag_run 
                        WHERE start_date >= NOW() - INTERVAL '1 hour'
                        GROUP BY dag_id, state
                    """)
                    
                    for dag_id, state, count, avg_duration in cur.fetchall():
                        self.dag_runs_total.labels(dag_id=dag_id, state=state).inc(count)
                        if avg_duration:
                            self.dag_run_duration.labels(dag_id=dag_id).observe(avg_duration)
                    
                    # Active DAG runs
                    cur.execute("""
                        SELECT dag_id, COUNT(*)
                        FROM dag_run 
                        WHERE state = 'running'
                        GROUP BY dag_id
                    """)
                    
                    # Reset gauge values
                    for dag_id, count in cur.fetchall():
                        self.active_dag_runs.labels(dag_id=dag_id).set(count)
                    
                    # Scheduler heartbeat
                    cur.execute("SELECT EXTRACT(EPOCH FROM MAX(latest_heartbeat)) FROM job WHERE job_type = 'SchedulerJob'")
                    heartbeat = cur.fetchone()[0]
                    if heartbeat:
                        self.scheduler_heartbeat.set(heartbeat)
                        
        except Exception as e:
            print(f"Error collecting metrics: {e}")
    
    def run(self):
        """Start the Prometheus metrics server"""
        start_http_server(self.port)
        print(f"Prometheus exporter started on port {self.port}")
        
        while True:
            self.collect_metrics()
            time.sleep(30)

if __name__ == "__main__":
    db_config = {
        "host": "localhost",
        "database": "airflow",
        "user": "airflow",
        "password": "airflow",
        "port": 5432
    }
    
    exporter = AirflowPrometheusExporter(db_config)
    exporter.run()

Set correct permissions

Configure appropriate permissions for the testing environment and make scripts executable.

sudo chown -R airflow:airflow /opt/airflow-testing
chmod +x /opt/airflow-testing/run_performance_test.sh
chmod +x /opt/airflow-testing/metrics_collector.py
chmod +x /opt/airflow-testing/prometheus_exporter.py
mkdir -p /opt/airflow-testing/{logs,reports}
chmod 755 /opt/airflow-testing/{logs,reports}
Never use chmod 777. It gives every user on the system full access to your files. Instead, fix ownership with chown and use minimal permissions like 755 for directories and 644 for files.

Configure load testing metrics and monitoring

Set up Grafana dashboards

Create comprehensive Grafana dashboards to visualize Airflow performance metrics during load testing. This complements our existing Airflow monitoring setup.

{
  "dashboard": {
    "id": null,
    "title": "Airflow DAG Performance Testing",
    "tags": ["airflow", "performance", "testing"],
    "panels": [
      {
        "id": 1,
        "title": "DAG Run Success Rate",
        "type": "stat",
        "targets": [
          {
            "expr": "rate(airflow_dag_runs_total{state=\"success\"}[5m]) / rate(airflow_dag_runs_total[5m]) * 100",
            "legendFormat": "Success Rate %"
          }
        ],
        "gridPos": {"h": 8, "w": 12, "x": 0, "y": 0}
      },
      {
        "id": 2,
        "title": "DAG Run Duration (P95)",
        "type": "graph",
        "targets": [
          {
            "expr": "histogram_quantile(0.95, airflow_dag_run_duration_seconds)",
            "legendFormat": "P95 Duration"
          }
        ],
        "gridPos": {"h": 8, "w": 12, "x": 12, "y": 0}
      },
      {
        "id": 3,
        "title": "Active DAG Runs",
        "type": "graph",
        "targets": [
          {
            "expr": "sum(airflow_active_dag_runs) by (dag_id)",
            "legendFormat": "{{dag_id}}"
          }
        ],
        "gridPos": {"h": 8, "w": 24, "x": 0, "y": 8}
      }
    ],
    "time": {
      "from": "now-1h",
      "to": "now"
    },
    "refresh": "10s"
  }
}

Configure performance alerts

Set up automated alerts for performance degradation during load testing.

groups:
  - name: airflow_performance
    rules:
      - alert: AirflowDAGHighFailureRate
        expr: |
          (
            rate(airflow_dag_runs_total{state="failed"}[5m]) /
            rate(airflow_dag_runs_total[5m])
          ) * 100 > 10
        for: 2m
        labels:
          severity: warning
        annotations:
          summary: "High DAG failure rate detected"
          description: "DAG failure rate is {{ $value }}% for {{ $labels.dag_id }}"
          
      - alert: AirflowDAGSlowExecution
        expr: |
          histogram_quantile(0.95, airflow_dag_run_duration_seconds) > 300
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "DAG execution is slower than expected"
          description: "P95 DAG duration is {{ $value }}s for {{ $labels.dag_id }}"
          
      - alert: AirflowSchedulerDown
        expr: |
          time() - airflow_scheduler_heartbeat > 300
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "Airflow scheduler is not responding"
          description: "Scheduler last heartbeat was {{ $value }}s ago"

Verify your setup

Test your performance testing configuration with a basic load test scenario.

cd /opt/airflow-testing
source venv/bin/activate

Test Locust configuration

locust --version

Verify Airflow API connectivity

curl -u perf_test:SecureTestPass123! http://localhost:8080/api/v1/dags

Run a short performance test

./run_performance_test.sh 5 1 60

Check generated reports

ls -la reports/

Verify metrics collection

python3 metrics_collector.py

Run your first performance test

Execute a comprehensive performance test to baseline your Airflow DAG performance.

# Run performance test with 20 users for 5 minutes
./run_performance_test.sh 20 2 300

Start Prometheus metrics exporter

nohup python3 prometheus_exporter.py &

Monitor test progress with Locust web UI (if running interactively)

locust --locustfile=locustfile.py --host=http://localhost:8080

Common issues

SymptomCauseFix
Authentication failed (401)Invalid Airflow credentialsVerify user exists: airflow users list
Connection refused to AirflowAirflow webserver not runningStart webserver: airflow webserver -D
Database connection errorsPostgreSQL not accessibleCheck connection: psql -h localhost -U airflow -d airflow
No DAGs found in testsDAG IDs don't existUpdate dag_ids list in locustfile.py with actual DAG names
Permission denied on reportsIncorrect directory ownershipFix ownership: sudo chown -R airflow:airflow /opt/airflow-testing
Metrics collection failingDatabase query permissionsGrant read access: GRANT SELECT ON ALL TABLES IN SCHEMA public TO perf_test;
High test failure ratesAirflow overloadedReduce user count and spawn rate in test configuration

Next steps

Automated install script

Run this to automate the entire setup

Need help?

Don't want to manage this yourself?

We handle infrastructure performance optimization for businesses that depend on uptime. From initial setup to ongoing operations.