Setup advanced Airflow alerting with Slack integration and custom notification rules

Intermediate 45 min May 22, 2026 37 views
Ubuntu 24.04 Debian 12 AlmaLinux 9 Rocky Linux 9

Configure Apache Airflow to send intelligent alerts to Slack channels when DAGs fail, with custom notification rules based on task importance and execution context. Includes automated retry logic and escalation workflows.

Prerequisites

  • Apache Airflow 2.7+ installed and running
  • Slack workspace with admin permissions
  • Python 3.8+ with pip access
  • sudo access for service management

What this solves

Apache Airflow's default alerting sends basic email notifications that often get lost in inbox clutter. This tutorial sets up intelligent Slack integration that sends contextual alerts with DAG run details, failure reasons, and actionable links back to the Airflow UI. You'll configure custom notification rules that distinguish between critical production failures and expected development issues.

Prerequisites and setup

Before configuring Airflow alerting, you need a running Airflow instance and Slack workspace with admin permissions. This guide assumes you have Airflow 2.7+ installed and operational.

Verify Airflow installation

Check your current Airflow version and ensure the webserver is running.

airflow version
sudo systemctl status airflow-webserver
sudo systemctl status airflow-scheduler

Install Slack provider package

Install the official Airflow Slack provider to enable webhook and API integrations.

pip install apache-airflow-providers-slack==8.6.0
airflow db upgrade

Configure Slack application and webhooks

Create Slack application

Navigate to api.slack.com/apps and create a new Slack app for your workspace. This app will handle incoming webhooks and API calls from Airflow.

In your Slack app configuration:

  • Enable "Incoming Webhooks" under Features
  • Click "Add New Webhook to Workspace"
  • Select the channel for Airflow alerts (e.g., #data-ops)
  • Copy the webhook URL (starts with https://hooks.slack.com/services/)

Configure OAuth permissions

For advanced features like user mentions and channel management, add OAuth scopes to your Slack app.

Under "OAuth & Permissions", add these Bot Token Scopes:

  • chat:write - Send messages as the bot
  • chat:write.public - Send messages to channels the bot isn't in
  • users:read - Look up user information for mentions
  • channels:read - Access channel information

Install the app to your workspace and copy the "Bot User OAuth Token" (starts with xoxb-).

Test webhook connectivity

Verify the webhook works before integrating with Airflow.

curl -X POST -H 'Content-type: application/json' \
  --data '{"text":"Airflow webhook test from $(hostname)"}' \
  https://hooks.slack.com/services/YOUR/WEBHOOK/PATH

Configure Airflow connection settings

Create Slack webhook connection

Add the Slack webhook as an Airflow connection through the web UI or CLI. This connection will be referenced in your DAG alert configurations.

airflow connections add slack_webhook \
  --conn-type http \
  --conn-host https://hooks.slack.com \
  --conn-password /services/YOUR/WEBHOOK/PATH

Create Slack API connection

For advanced messaging features, create a separate connection using the OAuth token.

airflow connections add slack_api \
  --conn-type slack \
  --conn-password xoxb-YOUR-BOT-TOKEN

Configure environment variables

Set environment variables for common Slack channels and users that your notification rules will reference.

AIRFLOW_SLACK_ALERTS_CHANNEL=#data-ops
AIRFLOW_SLACK_CRITICAL_CHANNEL=#incidents
AIRFLOW_SLACK_DEV_CHANNEL=#data-dev
AIRFLOW_ON_CALL_USER=@jane.doe

Reload the environment variables:

source /etc/environment
sudo systemctl restart airflow-webserver airflow-scheduler

Implement custom notification rules

Create notification utility module

Create a reusable module for Slack notifications that can be imported across DAGs.

import os
from datetime import datetime
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from airflow.providers.slack.operators.slack import SlackAPIPostOperator
from airflow.configuration import conf

def get_slack_channel(context):
    """Route alerts to appropriate Slack channel based on DAG tags and environment"""
    dag = context['dag']
    env = dag.tags.get('env', 'dev') if hasattr(dag, 'tags') else 'dev'
    
    if 'critical' in dag.tags:
        return os.environ.get('AIRFLOW_SLACK_CRITICAL_CHANNEL', '#incidents')
    elif env == 'prod':
        return os.environ.get('AIRFLOW_SLACK_ALERTS_CHANNEL', '#data-ops')
    else:
        return os.environ.get('AIRFLOW_SLACK_DEV_CHANNEL', '#data-dev')

def format_failure_message(context):
    """Generate rich failure message with context and actionable information"""
    task_instance = context['task_instance']
    dag = context['dag']
    execution_date = context['execution_date']
    
    webserver_url = conf.get('webserver', 'base_url', fallback='http://localhost:8080')
    log_url = f"{webserver_url}/log?dag_id={dag.dag_id}&task_id={task_instance.task_id}&execution_date={execution_date}"
    
    message = {
        "blocks": [
            {
                "type": "header",
                "text": {
                    "type": "plain_text",
                    "text": f"🚨 Airflow Task Failed: {task_instance.task_id}"
                }
            },
            {
                "type": "section",
                "fields": [
                    {"type": "mrkdwn", "text": f"DAG: {dag.dag_id}"},
                    {"type": "mrkdwn", "text": f"Task: {task_instance.task_id}"},
                    {"type": "mrkdwn", "text": f"Execution Date: {execution_date}"},
                    {"type": "mrkdwn", "text": f"Try: {task_instance.try_number}/{task_instance.max_tries}"}
                ]
            },
            {
                "type": "section",
                "text": {
                    "type": "mrkdwn",
                    "text": f"Error: ``{str(context.get('exception', 'Unknown error'))[:500]}``"
                }
            },
            {
                "type": "actions",
                "elements": [
                    {
                        "type": "button",
                        "text": {"type": "plain_text", "text": "View Logs"},
                        "url": log_url,
                        "style": "primary"
                    },
                    {
                        "type": "button",
                        "text": {"type": "plain_text", "text": "View DAG"},
                        "url": f"{webserver_url}/tree?dag_id={dag.dag_id}"
                    }
                ]
            }
        ]
    }
    
    return message

def create_failure_task(task_id="slack_failure_alert", **kwargs):
    """Create a Slack notification task for failure alerts"""
    return SlackWebhookOperator(
        task_id=task_id,
        http_conn_id="slack_webhook",
        message="{{ ti.xcom_pull(task_ids='format_message') }}",
        **kwargs
    )

Create DAG failure callback

Implement a callback function that triggers on DAG failures with intelligent routing logic.

import os
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
from .slack_notifications import get_slack_channel, format_failure_message

def task_failure_alert(context):
    """Send Slack alert when any task fails"""
    slack_webhook = SlackWebhookHook(
        http_conn_id="slack_webhook",
        webhook_token=None
    )
    
    channel = get_slack_channel(context)
    message = format_failure_message(context)
    
    # Add mention for critical failures
    if 'critical' in context['dag'].tags:
        on_call_user = os.environ.get('AIRFLOW_ON_CALL_USER', '')
        if on_call_user:
            message['blocks'].insert(1, {
                "type": "section",
                "text": {
                    "type": "mrkdwn",
                    "text": f":rotating_light: Critical failure - {on_call_user} please investigate"
                }
            })
    
    slack_webhook.send_dict(message, channel=channel)

def dag_success_alert(context):
    """Send success notification for critical DAGs"""
    dag = context['dag']
    
    # Only send success alerts for critical or production DAGs
    if not ('critical' in dag.tags or 'prod' in dag.tags):
        return
    
    slack_webhook = SlackWebhookHook(http_conn_id="slack_webhook")
    
    message = {
        "text": f":white_check_mark: DAG {dag.dag_id} completed successfully",
        "blocks": [
            {
                "type": "section",
                "text": {
                    "type": "mrkdwn",
                    "text": f":white_check_mark: {dag.dag_id} completed successfully at {context['execution_date']}"
                }
            }
        ]
    }
    
    channel = get_slack_channel(context)
    slack_webhook.send_dict(message, channel=channel)

Configure DAG-level alerting policies

Apply callbacks to existing DAGs

Modify your DAGs to use the new notification system with appropriate tags and callbacks.

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from utils.callbacks import task_failure_alert, dag_success_alert

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email_on_failure': False,  # Disable email, use Slack instead
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'on_failure_callback': task_failure_alert,
}

dag = DAG(
    'customer_data_etl',
    default_args=default_args,
    description='Extract and transform customer data',
    schedule_interval=timedelta(hours=6),
    catchup=False,
    tags=['prod', 'critical', 'customer-facing'],  # Tags control notification routing
    on_success_callback=dag_success_alert,
    on_failure_callback=task_failure_alert,
)

Your existing tasks here

def extract_data(**context): # Simulate potential failure for testing if datetime.now().hour == 14: # Fail at 2 PM for testing raise Exception("Database connection timeout during peak hours") return "data extracted" extract_task = PythonOperator( task_id='extract_customer_data', python_callable=extract_data, dag=dag, )

Create notification configuration file

Define notification rules in a configuration file for easy management across environments.

notification_rules:
  channels:
    critical: "#incidents"
    prod: "#data-ops"
    dev: "#data-dev"
    default: "#airflow-alerts"
  
  escalation:
    critical:
      immediate: ["@jane.doe", "@john.smith"]
      after_retry: ["@data-team-lead"]
    prod:
      after_failure: ["@data-team"]
  
  dag_patterns:
    customer_*:
      priority: critical
      channel: "#incidents"
      mentions: ["@customer-success"]
    
    reporting_*:
      priority: high
      channel: "#data-ops"
      business_hours_only: true
    
    test_*:
      priority: low
      channel: "#data-dev"
      suppress_retries: true

suppress_notifications:
  - dag_pattern: "test_*"
    time_range: "22:00-06:00"  # Suppress overnight test failures
  - dag_pattern: "*_backfill"
    always: true  # Never alert on backfill DAGs

Implement advanced notification logic

Create an enhanced notification system that reads the configuration file and applies complex rules.

import yaml
import re
from datetime import datetime, time
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook

class AirflowNotificationManager:
    def __init__(self, config_path='/opt/airflow/config/notification_rules.yaml'):
        with open(config_path, 'r') as f:
            self.config = yaml.safe_load(f)
    
    def should_suppress_notification(self, context):
        """Check if notification should be suppressed based on rules"""
        dag = context['dag']
        current_time = datetime.now().time()
        
        for rule in self.config.get('suppress_notifications', []):
            # Check DAG pattern match
            if 'dag_pattern' in rule:
                pattern = rule['dag_pattern'].replace('', '.')
                if re.match(pattern, dag.dag_id):
                    # Always suppress
                    if rule.get('always', False):
                        return True
                    
                    # Time-based suppression
                    if 'time_range' in rule:
                        start_time, end_time = rule['time_range'].split('-')
                        start = datetime.strptime(start_time, '%H:%M').time()
                        end = datetime.strptime(end_time, '%H:%M').time()
                        
                        if start <= end:  # Same day range
                            if start <= current_time <= end:
                                return True
                        else:  # Overnight range
                            if current_time >= start or current_time <= end:
                                return True
        
        return False
    
    def get_notification_config(self, context):
        """Get notification configuration for a specific DAG"""
        dag = context['dag']
        
        # Check for specific DAG pattern matches
        for pattern, config in self.config.get('dag_patterns', {}).items():
            regex_pattern = pattern.replace('', '.')
            if re.match(regex_pattern, dag.dag_id):
                return config
        
        # Return default configuration
        return {
            'priority': 'normal',
            'channel': self.config['notification_rules']['channels']['default']
        }
    
    def send_enhanced_notification(self, context, notification_type='failure'):
        """Send notification with advanced logic and formatting"""
        if self.should_suppress_notification(context):
            return
        
        config = self.get_notification_config(context)
        
        slack_webhook = SlackWebhookHook(http_conn_id="slack_webhook")
        
        # Build enhanced message
        message = self.build_notification_message(context, config, notification_type)
        
        # Send to appropriate channel
        channel = config.get('channel', self.config['notification_rules']['channels']['default'])
        slack_webhook.send_dict(message, channel=channel)
    
    def build_notification_message(self, context, config, notification_type):
        """Build rich notification message with context"""
        task_instance = context['task_instance']
        dag = context['dag']
        priority = config.get('priority', 'normal')
        
        # Priority-based emoji and styling
        priority_config = {
            'critical': {'emoji': ':rotating_light:', 'color': '#FF0000'},
            'high': {'emoji': ':warning:', 'color': '#FFA500'},
            'normal': {'emoji': ':x:', 'color': '#FF6B6B'},
            'low': {'emoji': ':grey_exclamation:', 'color': '#808080'}
        }
        
        priority_style = priority_config.get(priority, priority_config['normal'])
        
        message = {
            "attachments": [
                {
                    "color": priority_style['color'],
                    "blocks": [
                        {
                            "type": "header",
                            "text": {
                                "type": "plain_text",
                                "text": f"{priority_style['emoji']} Airflow {notification_type.title()}: {dag.dag_id}"
                            }
                        },
                        {
                            "type": "section",
                            "fields": [
                                {"type": "mrkdwn", "text": f"Priority: {priority.title()}"},
                                {"type": "mrkdwn", "text": f"Environment: {dag.tags.get('env', 'unknown')}"},
                                {"type": "mrkdwn", "text": f"Owner: {dag.owner}"},
                                {"type": "mrkdwn", "text": f"Execution: {context['execution_date']}"}
                            ]
                        }
                    ]
                }
            ]
        }
        
        # Add mentions for high priority failures
        if priority in ['critical', 'high'] and config.get('mentions'):
            mention_text = ' '.join(config['mentions'])
            message['attachments'][0]['blocks'].insert(1, {
                "type": "section",
                "text": {"type": "mrkdwn", "text": f"Attention: {mention_text}"}
            })
        
        return message

Global notification manager instance

notification_manager = AirflowNotificationManager() def enhanced_failure_callback(context): """Enhanced failure callback using notification manager""" notification_manager.send_enhanced_notification(context, 'failure') def enhanced_success_callback(context): """Enhanced success callback using notification manager""" notification_manager.send_enhanced_notification(context, 'success')

Setup monitoring and troubleshooting

Configure notification logging

Enable detailed logging for notification debugging and monitoring.

[logging]

Add Slack notifications to logging configuration

logging_level = INFO [slack]

Custom section for Slack notification settings

enable_logging = True log_level = DEBUG max_retry_attempts = 3 retry_delay_seconds = 30

Create notification health check DAG

Build a monitoring DAG that tests notification delivery and Slack connectivity.

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
from utils.advanced_notifications import notification_manager

def test_slack_connectivity(**context):
    """Test basic Slack webhook connectivity"""
    try:
        slack_webhook = SlackWebhookHook(http_conn_id="slack_webhook")
        
        test_message = {
            "text": ":white_check_mark: Airflow notification system health check - all systems operational",
            "blocks": [
                {
                    "type": "section",
                    "text": {
                        "type": "mrkdwn",
                        "text": f":white_check_mark: Notification Health Check - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
                    }
                },
                {
                    "type": "section",
                    "fields": [
                        {"type": "mrkdwn", "text": "Webhook Status: :white_check_mark: Connected"},
                        {"type": "mrkdwn", "text": "Config Status: :white_check_mark: Loaded"},
                        {"type": "mrkdwn", "text": "Notification Rules: :white_check_mark: Active"}
                    ]
                }
            ]
        }
        
        slack_webhook.send_dict(test_message, channel="#data-ops")
        return "Health check completed successfully"
    
    except Exception as e:
        # This failure will trigger the notification system
        raise Exception(f"Slack connectivity test failed: {str(e)}")

default_args = {
    'owner': 'platform-team',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email_on_failure': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=2),
    'on_failure_callback': notification_manager.enhanced_failure_callback,
}

dag = DAG(
    'notification_health_check',
    default_args=default_args,
    description='Monitor Airflow notification system health',
    schedule_interval=timedelta(hours=12),  # Run twice daily
    catchup=False,
    tags=['monitoring', 'notifications', 'health-check'],
)

health_check_task = PythonOperator(
    task_id='test_slack_connectivity',
    python_callable=test_slack_connectivity,
    dag=dag,
)

Monitor notification delivery metrics

Add custom metrics to track notification success rates and response times.

import time
from datetime import datetime
from airflow.models import Variable
import json

class NotificationMetrics:
    def __init__(self):
        self.metrics_key = "notification_metrics"
    
    def record_notification_attempt(self, dag_id, notification_type, success, response_time_ms):
        """Record notification delivery metrics"""
        try:
            # Get existing metrics
            existing_metrics = Variable.get(self.metrics_key, default_var="{}")
            metrics = json.loads(existing_metrics)
            
            # Initialize structure if needed
            today = datetime.now().strftime('%Y-%m-%d')
            if today not in metrics:
                metrics[today] = {
                    'total_attempts': 0,
                    'successful': 0,
                    'failed': 0,
                    'average_response_time': 0,
                    'by_dag': {}
                }
            
            # Update metrics
            daily_metrics = metrics[today]
            daily_metrics['total_attempts'] += 1
            
            if success:
                daily_metrics['successful'] += 1
            else:
                daily_metrics['failed'] += 1
            
            # Update response time average
            current_avg = daily_metrics['average_response_time']
            total_attempts = daily_metrics['total_attempts']
            daily_metrics['average_response_time'] = ((current_avg * (total_attempts - 1)) + response_time_ms) / total_attempts
            
            # Track by DAG
            if dag_id not in daily_metrics['by_dag']:
                daily_metrics['by_dag'][dag_id] = {'attempts': 0, 'successful': 0}
            
            daily_metrics['by_dag'][dag_id]['attempts'] += 1
            if success:
                daily_metrics['by_dag'][dag_id]['successful'] += 1
            
            # Store updated metrics
            Variable.set(self.metrics_key, json.dumps(metrics))
            
        except Exception as e:
            print(f"Failed to record notification metrics: {e}")
    
    def get_daily_summary(self, date_str=None):
        """Get notification metrics summary for a specific date"""
        if not date_str:
            date_str = datetime.now().strftime('%Y-%m-%d')
        
        try:
            metrics = json.loads(Variable.get(self.metrics_key, default_var="{}"))
            return metrics.get(date_str, {})
        except:
            return {}

Global metrics instance

notification_metrics = NotificationMetrics()

Verify your setup

Test your notification system with a controlled failure and verify all components work correctly.

# Test Slack webhook connectivity
curl -X POST -H 'Content-type: application/json' \
  --data '{"text":"Airflow notification test - setup verification"}' \
  https://hooks.slack.com/services/YOUR/WEBHOOK/PATH

Check Airflow connections

airflow connections list | grep slack

Verify notification module can be imported

python3 -c "from dags.utils.advanced_notifications import notification_manager; print('Notifications module loaded successfully')"

Test the health check DAG

airflow dags trigger notification_health_check airflow dags state notification_health_check $(date +%Y-%m-%d)
Testing failure alerts: Temporarily modify a DAG to fail (like adding raise Exception("Test notification") in a task) and verify the alert appears in your designated Slack channel with proper formatting and action buttons.

Common issues

Symptom Cause Fix
Slack messages not appearing Incorrect webhook URL or connection Verify connection with airflow connections test slack_webhook
Messages sent to wrong channel Channel routing logic error Check DAG tags and notification rules in notification_rules.yaml
Mentions not working Bot lacks required OAuth permissions Add users:read and chat:write.public scopes to Slack app
Notifications for every retry Callback configured on task vs DAG level Set on_failure_callback in DAG default_args, not individual tasks
Missing notification config YAML file not found or malformed Check file path and validate YAML syntax with python -c "import yaml; yaml.safe_load(open('config.yaml'))"
Action buttons not working Incorrect Airflow webserver URL Verify base_url in [webserver] section of airflow.cfg

Next steps

Running this in production?

Want this handled for you? Setting up Slack alerts once is straightforward. Keeping notification rules tuned, managing escalation policies, and maintaining 24/7 alerting reliability is the harder part. See how we run infrastructure like this for European SaaS and e-commerce teams.

Automated install script

Run this to automate the entire setup

Need help?

Don't want to manage this yourself?

We handle managed devops services for businesses that depend on uptime. From initial setup to ongoing operations.