Set up comprehensive monitoring for Apache Airflow DAGs using DataDog integration. This tutorial covers DataDog agent installation, metrics collection configuration, custom dashboard creation, and alerting rules for production workflow observability.
Prerequisites
- Apache Airflow 2.0+ installed
- DataDog account and API key
- Root or sudo access
- Python 3.8+ environment
What this solves
Apache Airflow DAG monitoring becomes critical in production environments where workflow failures can impact business operations. DataDog integration provides comprehensive observability into DAG execution metrics, task duration, failure rates, and resource utilization. This monitoring solution helps identify bottlenecks, predict failures, and maintain SLA compliance for your data pipelines.
Step-by-step installation
Update system packages
Start by updating your package manager to ensure you get the latest versions of dependencies.
sudo apt update && sudo apt upgrade -y
sudo apt install -y curl wget gnupg2 software-properties-common
Install DataDog agent
Download and install the DataDog agent using the official installation script. Replace YOUR_API_KEY with your actual DataDog API key from your DataDog dashboard.
DD_API_KEY=YOUR_API_KEY DD_SITE="datadoghq.com" bash -c "$(curl -L https://s3.amazonaws.com/dd-agent/scripts/install_script.sh)"
Configure DataDog agent for Airflow
Create the Airflow integration configuration file to enable metrics collection from your Airflow instance.
init_config:
instances:
- url: http://localhost:8080
username: admin
password: admin
tags:
- environment:production
- service:airflow
collect_default_metrics: true
dag_bag_timeout: 300
dag_run_timeout: 300
Enable Airflow StatsD metrics
Configure Airflow to send StatsD metrics to DataDog by updating the airflow.cfg configuration file.
[metrics]
statsd_on = True
statsd_host = localhost
statsd_port = 8125
statsd_prefix = airflow
statsd_allow_list =
stat_name_handler =
Configure DataDog DogStatsD
Enable DogStatsD in the DataDog agent to receive metrics from Airflow. This allows real-time metric collection.
# DogStatsD configuration
use_dogstatsd: true
dogstatsd_port: 8125
dogstatsd_non_local_traffic: false
dogstatsd_stats_enable: true
dogstatsd_queue_size: 1024
dogstatsd_buffer_size: 8192
Tags for all metrics
tags:
- env:production
- service:airflow
- datacenter:us-east-1
Install Python DataDog library
Install the DataDog Python library in your Airflow environment to enable custom metrics and enhanced monitoring capabilities.
sudo -u airflow pip install datadog
sudo -u airflow pip install apache-airflow[statsd]
Create custom DAG monitoring script
Create a monitoring script that collects custom metrics about DAG performance and sends them to DataDog.
from datadog import initialize, statsd
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import DagRun, TaskInstance
from datetime import datetime, timedelta
import logging
Initialize DataDog
options = {
'statsd_host': '127.0.0.1',
'statsd_port': 8125,
}
initialize(**options)
def collect_dag_metrics():
"""Collect custom DAG metrics and send to DataDog"""
try:
# Get recent DAG runs
recent_runs = DagRun.find(
execution_date_gte=datetime.now() - timedelta(hours=1)
)
success_count = len([r for r in recent_runs if r.state == 'success'])
failed_count = len([r for r in recent_runs if r.state == 'failed'])
# Send metrics to DataDog
statsd.gauge('airflow.dag_runs.success', success_count, tags=['env:production'])
statsd.gauge('airflow.dag_runs.failed', failed_count, tags=['env:production'])
logging.info(f"Sent metrics: {success_count} success, {failed_count} failed")
except Exception as e:
logging.error(f"Error collecting metrics: {e}")
statsd.increment('airflow.metrics.collection.error', tags=['env:production'])
Configure log collection
Enable DataDog log collection for Airflow to capture task logs, scheduler logs, and webserver logs.
logs:
- type: file
path: /opt/airflow/logs/dag_processor_manager/dag_processor_manager.log
service: airflow
source: airflow
sourcecategory: airflow
tags:
- env:production
- component:dag_processor
- type: file
path: /opt/airflow/logs/scheduler/latest/*.log
service: airflow
source: airflow
sourcecategory: airflow
tags:
- env:production
- component:scheduler
- type: file
path: /opt/airflow/logs////.log
service: airflow
source: airflow
sourcecategory: airflow
tags:
- env:production
- component:task
Enable log collection in DataDog agent
Update the main DataDog agent configuration to enable log collection and processing.
logs_enabled: true
log_level: INFO
Log processing configuration
logs_config:
container_collect_all: false
processing_rules:
- type: exclude_at_match
name: exclude_debug
pattern: "DEBUG"
- type: mask_sequences
name: mask_passwords
pattern: "password=\\S+"
replace_placeholder: "password=***"
Restart services
Restart both the DataDog agent and Airflow services to apply the new configuration.
sudo systemctl restart datadog-agent
sudo systemctl restart airflow-webserver
sudo systemctl restart airflow-scheduler
Configure custom dashboards
Create DAG performance dashboard
Use the DataDog web interface to create a comprehensive dashboard. Here's the JSON configuration for a production-ready dashboard.
{
"title": "Airflow DAG Monitoring",
"description": "Production Airflow DAG monitoring and performance metrics",
"widgets": [
{
"definition": {
"type": "timeseries",
"requests": [
{
"q": "avg:airflow.dag_run.duration{*} by {dag_id}",
"display_type": "line"
}
],
"title": "DAG Execution Duration"
}
},
{
"definition": {
"type": "query_value",
"requests": [
{
"q": "sum:airflow.dag_runs.failed{*}",
"aggregator": "last"
}
],
"title": "Failed DAG Runs (Last Hour)"
}
},
{
"definition": {
"type": "toplist",
"requests": [
{
"q": "top(avg:airflow.task_instance.duration{*} by {task_id}, 10, 'mean', 'desc')"
}
],
"title": "Slowest Tasks"
}
}
]
}
Configure alerting rules
Create DAG failure alerts
Set up alerts for DAG failures, long-running tasks, and scheduler health issues.
# DAG Failure Alert
name: "Airflow DAG Failures"
type: "metric alert"
query: "sum(last_5m):sum:airflow.dag_runs.failed{env:production} > 0"
message: |
Airflow DAG Failure Detected
One or more DAGs have failed in the last 5 minutes.
Check the Airflow web interface: http://localhost:8080
@slack-airflow-alerts @pagerduty
Long Running Task Alert
name: "Airflow Long Running Tasks"
type: "metric alert"
query: "avg(last_15m):avg:airflow.task_instance.duration{env:production} > 3600"
message: |
Long Running Task Detected
Tasks are taking longer than 1 hour to complete.
This may indicate performance issues or stuck processes.
@slack-airflow-alerts
Scheduler Health Alert
name: "Airflow Scheduler Down"
type: "service check"
query: '"airflow.scheduler_heartbeat".over("env:production").last(2).count_by_status()'
message: |
Airflow Scheduler is Down
The Airflow scheduler has stopped responding.
Immediate action required to restore workflow processing.
@pagerduty @slack-airflow-alerts
Verify your setup
# Check DataDog agent status
sudo datadog-agent status
Verify Airflow integration
sudo datadog-agent check airflow
Check if metrics are being sent
sudo datadog-agent flare
Test StatsD connectivity
echo "custom.metric:1|c" | nc -u -w1 127.0.0.1 8125
Check Airflow logs for metric collection
tail -f /opt/airflow/logs/scheduler/latest/*.log | grep -i statsd
Common issues
| Symptom | Cause | Fix |
|---|---|---|
| No metrics in DataDog | StatsD not configured | Verify statsd_on=True in airflow.cfg and restart services |
| Agent check fails | Wrong Airflow URL/credentials | Update conf.yaml with correct webserver URL and credentials |
| Permission denied on logs | DataDog agent can't read log files | sudo chown -R dd-agent:dd-agent /opt/airflow/logs |
| High memory usage | Too many metrics collected | Add metric filtering in dogstatsd configuration |
| Missing task logs | Log path pattern incorrect | Verify log path matches your Airflow log structure |
Next steps
- Configure Apache Airflow monitoring with Prometheus alerts and Grafana dashboards
- Setup centralized log aggregation with Elasticsearch 8, Logstash 8, and Kibana 8 (ELK Stack)
- Implement comprehensive Apache Airflow DAG testing and validation strategies with pytest and best practices
- Setup advanced Airflow alerting with Slack integration and custom notification rules
- Configure Apache Airflow performance optimization with connection pooling and resource tuning
Automated install script
Run this to automate the entire setup
#!/usr/bin/env bash
set -euo pipefail
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m'
# Script configuration
SCRIPT_NAME="$(basename "$0")"
DD_API_KEY=""
AIRFLOW_USER="airflow"
AIRFLOW_HOME="/opt/airflow"
# Usage message
usage() {
echo "Usage: $SCRIPT_NAME --api-key=YOUR_DD_API_KEY [--airflow-user=airflow] [--airflow-home=/opt/airflow]"
echo ""
echo "Required arguments:"
echo " --api-key=KEY DataDog API key"
echo ""
echo "Optional arguments:"
echo " --airflow-user=USER Airflow system user (default: airflow)"
echo " --airflow-home=PATH Airflow home directory (default: /opt/airflow)"
echo " --help Show this help message"
exit 1
}
# Parse arguments
for arg in "$@"; do
case $arg in
--api-key=*)
DD_API_KEY="${arg#*=}"
shift
;;
--airflow-user=*)
AIRFLOW_USER="${arg#*=}"
shift
;;
--airflow-home=*)
AIRFLOW_HOME="${arg#*=}"
shift
;;
--help)
usage
;;
*)
echo -e "${RED}Unknown argument: $arg${NC}"
usage
;;
esac
done
# Validate required arguments
if [[ -z "$DD_API_KEY" ]]; then
echo -e "${RED}Error: DataDog API key is required${NC}"
usage
fi
# Cleanup function for rollback on failure
cleanup() {
echo -e "${YELLOW}Script failed. Cleaning up...${NC}"
systemctl stop datadog-agent 2>/dev/null || true
rm -f /etc/datadog-agent/conf.d/airflow.yaml
rm -f "$AIRFLOW_HOME/dags/dag_monitoring.py"
}
trap cleanup ERR
# Log function
log() {
echo -e "${GREEN}$1${NC}"
}
warn() {
echo -e "${YELLOW}$1${NC}"
}
error() {
echo -e "${RED}$1${NC}"
}
# Check prerequisites
check_prerequisites() {
echo -e "${BLUE}[1/8] Checking prerequisites...${NC}"
if [[ $EUID -ne 0 ]]; then
error "This script must be run as root or with sudo"
exit 1
fi
if ! command -v curl &> /dev/null; then
error "curl is required but not installed"
exit 1
fi
if ! id "$AIRFLOW_USER" &>/dev/null; then
error "User $AIRFLOW_USER does not exist. Please create it first or specify correct user with --airflow-user"
exit 1
fi
if [[ ! -d "$AIRFLOW_HOME" ]]; then
error "Airflow home directory $AIRFLOW_HOME does not exist"
exit 1
fi
log "Prerequisites check passed"
}
# Auto-detect distribution
detect_distro() {
echo -e "${BLUE}[2/8] Detecting distribution...${NC}"
if [[ ! -f /etc/os-release ]]; then
error "Cannot detect distribution - /etc/os-release not found"
exit 1
fi
. /etc/os-release
case "$ID" in
ubuntu|debian)
PKG_MGR="apt"
PKG_UPDATE="apt update && apt upgrade -y"
PKG_INSTALL="apt install -y"
PYTHON_PIP="pip3"
;;
almalinux|rocky|centos|rhel|ol)
PKG_MGR="dnf"
PKG_UPDATE="dnf update -y"
PKG_INSTALL="dnf install -y"
PYTHON_PIP="pip3"
;;
fedora)
PKG_MGR="dnf"
PKG_UPDATE="dnf update -y"
PKG_INSTALL="dnf install -y"
PYTHON_PIP="pip3"
;;
amzn)
PKG_MGR="yum"
PKG_UPDATE="yum update -y"
PKG_INSTALL="yum install -y"
PYTHON_PIP="pip3"
;;
*)
error "Unsupported distribution: $ID"
exit 1
;;
esac
log "Detected $PRETTY_NAME using $PKG_MGR"
}
# Update system packages
update_system() {
echo -e "${BLUE}[3/8] Updating system packages...${NC}"
$PKG_UPDATE
$PKG_INSTALL curl wget gnupg2
if [[ "$PKG_MGR" == "apt" ]]; then
$PKG_INSTALL software-properties-common
fi
log "System packages updated"
}
# Install DataDog agent
install_datadog_agent() {
echo -e "${BLUE}[4/8] Installing DataDog agent...${NC}"
DD_SITE="datadoghq.com" bash -c "$(curl -L https://s3.amazonaws.com/dd-agent/scripts/install_script.sh)"
systemctl enable datadog-agent
systemctl start datadog-agent
log "DataDog agent installed and started"
}
# Configure DataDog for Airflow
configure_datadog_airflow() {
echo -e "${BLUE}[5/8] Configuring DataDog for Airflow...${NC}"
# Create Airflow integration config
cat > /etc/datadog-agent/conf.d/airflow.yaml << EOF
init_config:
instances:
- url: http://localhost:8080
username: admin
password: admin
tags:
- environment:production
- service:airflow
collect_default_metrics: true
dag_bag_timeout: 300
dag_run_timeout: 300
EOF
chown dd-agent:dd-agent /etc/datadog-agent/conf.d/airflow.yaml
chmod 644 /etc/datadog-agent/conf.d/airflow.yaml
# Configure DogStatsD
cat >> /etc/datadog-agent/datadog.yaml << EOF
# DogStatsD configuration
use_dogstatsd: true
dogstatsd_port: 8125
dogstatsd_non_local_traffic: false
dogstatsd_stats_enable: true
dogstatsd_queue_size: 1024
dogstatsd_buffer_size: 8192
# Tags for all metrics
tags:
- env:production
- service:airflow
- datacenter:us-east-1
EOF
log "DataDog configuration completed"
}
# Configure Airflow for StatsD
configure_airflow_statsd() {
echo -e "${BLUE}[6/8] Configuring Airflow StatsD metrics...${NC}"
AIRFLOW_CFG="$AIRFLOW_HOME/airflow.cfg"
if [[ ! -f "$AIRFLOW_CFG" ]]; then
warn "airflow.cfg not found at $AIRFLOW_CFG"
return 0
fi
# Configure StatsD in airflow.cfg
sed -i 's/^statsd_on = .*/statsd_on = True/' "$AIRFLOW_CFG"
sed -i 's/^statsd_host = .*/statsd_host = localhost/' "$AIRFLOW_CFG"
sed -i 's/^statsd_port = .*/statsd_port = 8125/' "$AIRFLOW_CFG"
sed -i 's/^statsd_prefix = .*/statsd_prefix = airflow/' "$AIRFLOW_CFG"
chown "$AIRFLOW_USER:$AIRFLOW_USER" "$AIRFLOW_CFG"
log "Airflow StatsD configuration completed"
}
# Install Python dependencies
install_python_deps() {
echo -e "${BLUE}[7/8] Installing Python dependencies...${NC}"
sudo -u "$AIRFLOW_USER" $PYTHON_PIP install datadog apache-airflow[statsd]
log "Python dependencies installed"
}
# Create monitoring DAG
create_monitoring_dag() {
echo -e "${BLUE}[8/8] Creating monitoring DAG...${NC}"
DAG_DIR="$AIRFLOW_HOME/dags"
mkdir -p "$DAG_DIR"
cat > "$DAG_DIR/dag_monitoring.py" << 'EOF'
from datadog import initialize, statsd
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import DagRun, TaskInstance
from datetime import datetime, timedelta
import logging
# Initialize DataDog
options = {
'statsd_host': '127.0.0.1',
'statsd_port': 8125,
}
initialize(**options)
def collect_dag_metrics():
"""Collect custom DAG metrics and send to DataDog"""
try:
# Get recent DAG runs
recent_runs = DagRun.find(
execution_date_gte=datetime.now() - timedelta(hours=1)
)
success_count = len([r for r in recent_runs if r.state == 'success'])
failed_count = len([r for r in recent_runs if r.state == 'failed'])
# Send metrics to DataDog
statsd.gauge('airflow.dag_runs.success', success_count, tags=['env:production'])
statsd.gauge('airflow.dag_runs.failed', failed_count, tags=['env:production'])
logging.info(f"Sent metrics: {success_count} success, {failed_count} failed")
except Exception as e:
logging.error(f"Error collecting metrics: {e}")
statsd.increment('airflow.metrics.collection.error', tags=['env:production'])
# Define DAG
default_args = {
'owner': 'datadog-monitoring',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'datadog_monitoring',
default_args=default_args,
description='DataDog monitoring DAG',
schedule_interval=timedelta(minutes=15),
catchup=False,
)
monitoring_task = PythonOperator(
task_id='collect_metrics',
python_callable=collect_dag_metrics,
dag=dag,
)
EOF
chown -R "$AIRFLOW_USER:$AIRFLOW_USER" "$DAG_DIR"
chmod 644 "$DAG_DIR/dag_monitoring.py"
log "Monitoring DAG created"
}
# Verification
verify_installation() {
echo -e "${BLUE}Verifying installation...${NC}"
if systemctl is-active --quiet datadog-agent; then
log "✓ DataDog agent is running"
else
error "✗ DataDog agent is not running"
fi
if [[ -f /etc/datadog-agent/conf.d/airflow.yaml ]]; then
log "✓ Airflow integration configured"
else
error "✗ Airflow integration not configured"
fi
if [[ -f "$AIRFLOW_HOME/dags/dag_monitoring.py" ]]; then
log "✓ Monitoring DAG created"
else
error "✗ Monitoring DAG not created"
fi
log "Installation completed successfully!"
log "Please restart Airflow to apply StatsD configuration changes"
log "The monitoring DAG will start collecting metrics automatically"
}
# Main execution
main() {
check_prerequisites
detect_distro
update_system
install_datadog_agent
configure_datadog_airflow
configure_airflow_statsd
install_python_deps
create_monitoring_dag
systemctl restart datadog-agent
verify_installation
}
main "$@"
Review the script before running. Execute with: bash install.sh