Configure Apache Airflow to send intelligent alerts to Slack channels when DAGs fail, with custom notification rules based on task importance and execution context. Includes automated retry logic and escalation workflows.
Prerequisites
- Apache Airflow 2.7+ installed and running
- Slack workspace with admin permissions
- Python 3.8+ with pip access
- sudo access for service management
What this solves
Apache Airflow's default alerting sends basic email notifications that often get lost in inbox clutter. This tutorial sets up intelligent Slack integration that sends contextual alerts with DAG run details, failure reasons, and actionable links back to the Airflow UI. You'll configure custom notification rules that distinguish between critical production failures and expected development issues.
Prerequisites and setup
Before configuring Airflow alerting, you need a running Airflow instance and Slack workspace with admin permissions. This guide assumes you have Airflow 2.7+ installed and operational.
Verify Airflow installation
Check your current Airflow version and ensure the webserver is running.
airflow version
sudo systemctl status airflow-webserver
sudo systemctl status airflow-scheduler
Install Slack provider package
Install the official Airflow Slack provider to enable webhook and API integrations.
pip install apache-airflow-providers-slack==8.6.0
airflow db upgrade
Configure Slack application and webhooks
Create Slack application
Navigate to api.slack.com/apps and create a new Slack app for your workspace. This app will handle incoming webhooks and API calls from Airflow.
In your Slack app configuration:
- Enable "Incoming Webhooks" under Features
- Click "Add New Webhook to Workspace"
- Select the channel for Airflow alerts (e.g., #data-ops)
- Copy the webhook URL (starts with https://hooks.slack.com/services/)
Configure OAuth permissions
For advanced features like user mentions and channel management, add OAuth scopes to your Slack app.
Under "OAuth & Permissions", add these Bot Token Scopes:
chat:write- Send messages as the botchat:write.public- Send messages to channels the bot isn't inusers:read- Look up user information for mentionschannels:read- Access channel information
Install the app to your workspace and copy the "Bot User OAuth Token" (starts with xoxb-).
Test webhook connectivity
Verify the webhook works before integrating with Airflow.
curl -X POST -H 'Content-type: application/json' \
--data '{"text":"Airflow webhook test from $(hostname)"}' \
https://hooks.slack.com/services/YOUR/WEBHOOK/PATH
Configure Airflow connection settings
Create Slack webhook connection
Add the Slack webhook as an Airflow connection through the web UI or CLI. This connection will be referenced in your DAG alert configurations.
airflow connections add slack_webhook \
--conn-type http \
--conn-host https://hooks.slack.com \
--conn-password /services/YOUR/WEBHOOK/PATH
Create Slack API connection
For advanced messaging features, create a separate connection using the OAuth token.
airflow connections add slack_api \
--conn-type slack \
--conn-password xoxb-YOUR-BOT-TOKEN
Configure environment variables
Set environment variables for common Slack channels and users that your notification rules will reference.
AIRFLOW_SLACK_ALERTS_CHANNEL=#data-ops
AIRFLOW_SLACK_CRITICAL_CHANNEL=#incidents
AIRFLOW_SLACK_DEV_CHANNEL=#data-dev
AIRFLOW_ON_CALL_USER=@jane.doe
Reload the environment variables:
source /etc/environment
sudo systemctl restart airflow-webserver airflow-scheduler
Implement custom notification rules
Create notification utility module
Create a reusable module for Slack notifications that can be imported across DAGs.
import os
from datetime import datetime
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from airflow.providers.slack.operators.slack import SlackAPIPostOperator
from airflow.configuration import conf
def get_slack_channel(context):
"""Route alerts to appropriate Slack channel based on DAG tags and environment"""
dag = context['dag']
env = dag.tags.get('env', 'dev') if hasattr(dag, 'tags') else 'dev'
if 'critical' in dag.tags:
return os.environ.get('AIRFLOW_SLACK_CRITICAL_CHANNEL', '#incidents')
elif env == 'prod':
return os.environ.get('AIRFLOW_SLACK_ALERTS_CHANNEL', '#data-ops')
else:
return os.environ.get('AIRFLOW_SLACK_DEV_CHANNEL', '#data-dev')
def format_failure_message(context):
"""Generate rich failure message with context and actionable information"""
task_instance = context['task_instance']
dag = context['dag']
execution_date = context['execution_date']
webserver_url = conf.get('webserver', 'base_url', fallback='http://localhost:8080')
log_url = f"{webserver_url}/log?dag_id={dag.dag_id}&task_id={task_instance.task_id}&execution_date={execution_date}"
message = {
"blocks": [
{
"type": "header",
"text": {
"type": "plain_text",
"text": f"🚨 Airflow Task Failed: {task_instance.task_id}"
}
},
{
"type": "section",
"fields": [
{"type": "mrkdwn", "text": f"DAG: {dag.dag_id}"},
{"type": "mrkdwn", "text": f"Task: {task_instance.task_id}"},
{"type": "mrkdwn", "text": f"Execution Date: {execution_date}"},
{"type": "mrkdwn", "text": f"Try: {task_instance.try_number}/{task_instance.max_tries}"}
]
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"Error: ``{str(context.get('exception', 'Unknown error'))[:500]}``"
}
},
{
"type": "actions",
"elements": [
{
"type": "button",
"text": {"type": "plain_text", "text": "View Logs"},
"url": log_url,
"style": "primary"
},
{
"type": "button",
"text": {"type": "plain_text", "text": "View DAG"},
"url": f"{webserver_url}/tree?dag_id={dag.dag_id}"
}
]
}
]
}
return message
def create_failure_task(task_id="slack_failure_alert", **kwargs):
"""Create a Slack notification task for failure alerts"""
return SlackWebhookOperator(
task_id=task_id,
http_conn_id="slack_webhook",
message="{{ ti.xcom_pull(task_ids='format_message') }}",
**kwargs
)
Create DAG failure callback
Implement a callback function that triggers on DAG failures with intelligent routing logic.
import os
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
from .slack_notifications import get_slack_channel, format_failure_message
def task_failure_alert(context):
"""Send Slack alert when any task fails"""
slack_webhook = SlackWebhookHook(
http_conn_id="slack_webhook",
webhook_token=None
)
channel = get_slack_channel(context)
message = format_failure_message(context)
# Add mention for critical failures
if 'critical' in context['dag'].tags:
on_call_user = os.environ.get('AIRFLOW_ON_CALL_USER', '')
if on_call_user:
message['blocks'].insert(1, {
"type": "section",
"text": {
"type": "mrkdwn",
"text": f":rotating_light: Critical failure - {on_call_user} please investigate"
}
})
slack_webhook.send_dict(message, channel=channel)
def dag_success_alert(context):
"""Send success notification for critical DAGs"""
dag = context['dag']
# Only send success alerts for critical or production DAGs
if not ('critical' in dag.tags or 'prod' in dag.tags):
return
slack_webhook = SlackWebhookHook(http_conn_id="slack_webhook")
message = {
"text": f":white_check_mark: DAG {dag.dag_id} completed successfully",
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f":white_check_mark: {dag.dag_id} completed successfully at {context['execution_date']}"
}
}
]
}
channel = get_slack_channel(context)
slack_webhook.send_dict(message, channel=channel)
Configure DAG-level alerting policies
Apply callbacks to existing DAGs
Modify your DAGs to use the new notification system with appropriate tags and callbacks.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from utils.callbacks import task_failure_alert, dag_success_alert
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': False, # Disable email, use Slack instead
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
'on_failure_callback': task_failure_alert,
}
dag = DAG(
'customer_data_etl',
default_args=default_args,
description='Extract and transform customer data',
schedule_interval=timedelta(hours=6),
catchup=False,
tags=['prod', 'critical', 'customer-facing'], # Tags control notification routing
on_success_callback=dag_success_alert,
on_failure_callback=task_failure_alert,
)
Your existing tasks here
def extract_data(**context):
# Simulate potential failure for testing
if datetime.now().hour == 14: # Fail at 2 PM for testing
raise Exception("Database connection timeout during peak hours")
return "data extracted"
extract_task = PythonOperator(
task_id='extract_customer_data',
python_callable=extract_data,
dag=dag,
)
Create notification configuration file
Define notification rules in a configuration file for easy management across environments.
notification_rules:
channels:
critical: "#incidents"
prod: "#data-ops"
dev: "#data-dev"
default: "#airflow-alerts"
escalation:
critical:
immediate: ["@jane.doe", "@john.smith"]
after_retry: ["@data-team-lead"]
prod:
after_failure: ["@data-team"]
dag_patterns:
customer_*:
priority: critical
channel: "#incidents"
mentions: ["@customer-success"]
reporting_*:
priority: high
channel: "#data-ops"
business_hours_only: true
test_*:
priority: low
channel: "#data-dev"
suppress_retries: true
suppress_notifications:
- dag_pattern: "test_*"
time_range: "22:00-06:00" # Suppress overnight test failures
- dag_pattern: "*_backfill"
always: true # Never alert on backfill DAGs
Implement advanced notification logic
Create an enhanced notification system that reads the configuration file and applies complex rules.
import yaml
import re
from datetime import datetime, time
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
class AirflowNotificationManager:
def __init__(self, config_path='/opt/airflow/config/notification_rules.yaml'):
with open(config_path, 'r') as f:
self.config = yaml.safe_load(f)
def should_suppress_notification(self, context):
"""Check if notification should be suppressed based on rules"""
dag = context['dag']
current_time = datetime.now().time()
for rule in self.config.get('suppress_notifications', []):
# Check DAG pattern match
if 'dag_pattern' in rule:
pattern = rule['dag_pattern'].replace('', '.')
if re.match(pattern, dag.dag_id):
# Always suppress
if rule.get('always', False):
return True
# Time-based suppression
if 'time_range' in rule:
start_time, end_time = rule['time_range'].split('-')
start = datetime.strptime(start_time, '%H:%M').time()
end = datetime.strptime(end_time, '%H:%M').time()
if start <= end: # Same day range
if start <= current_time <= end:
return True
else: # Overnight range
if current_time >= start or current_time <= end:
return True
return False
def get_notification_config(self, context):
"""Get notification configuration for a specific DAG"""
dag = context['dag']
# Check for specific DAG pattern matches
for pattern, config in self.config.get('dag_patterns', {}).items():
regex_pattern = pattern.replace('', '.')
if re.match(regex_pattern, dag.dag_id):
return config
# Return default configuration
return {
'priority': 'normal',
'channel': self.config['notification_rules']['channels']['default']
}
def send_enhanced_notification(self, context, notification_type='failure'):
"""Send notification with advanced logic and formatting"""
if self.should_suppress_notification(context):
return
config = self.get_notification_config(context)
slack_webhook = SlackWebhookHook(http_conn_id="slack_webhook")
# Build enhanced message
message = self.build_notification_message(context, config, notification_type)
# Send to appropriate channel
channel = config.get('channel', self.config['notification_rules']['channels']['default'])
slack_webhook.send_dict(message, channel=channel)
def build_notification_message(self, context, config, notification_type):
"""Build rich notification message with context"""
task_instance = context['task_instance']
dag = context['dag']
priority = config.get('priority', 'normal')
# Priority-based emoji and styling
priority_config = {
'critical': {'emoji': ':rotating_light:', 'color': '#FF0000'},
'high': {'emoji': ':warning:', 'color': '#FFA500'},
'normal': {'emoji': ':x:', 'color': '#FF6B6B'},
'low': {'emoji': ':grey_exclamation:', 'color': '#808080'}
}
priority_style = priority_config.get(priority, priority_config['normal'])
message = {
"attachments": [
{
"color": priority_style['color'],
"blocks": [
{
"type": "header",
"text": {
"type": "plain_text",
"text": f"{priority_style['emoji']} Airflow {notification_type.title()}: {dag.dag_id}"
}
},
{
"type": "section",
"fields": [
{"type": "mrkdwn", "text": f"Priority: {priority.title()}"},
{"type": "mrkdwn", "text": f"Environment: {dag.tags.get('env', 'unknown')}"},
{"type": "mrkdwn", "text": f"Owner: {dag.owner}"},
{"type": "mrkdwn", "text": f"Execution: {context['execution_date']}"}
]
}
]
}
]
}
# Add mentions for high priority failures
if priority in ['critical', 'high'] and config.get('mentions'):
mention_text = ' '.join(config['mentions'])
message['attachments'][0]['blocks'].insert(1, {
"type": "section",
"text": {"type": "mrkdwn", "text": f"Attention: {mention_text}"}
})
return message
Global notification manager instance
notification_manager = AirflowNotificationManager()
def enhanced_failure_callback(context):
"""Enhanced failure callback using notification manager"""
notification_manager.send_enhanced_notification(context, 'failure')
def enhanced_success_callback(context):
"""Enhanced success callback using notification manager"""
notification_manager.send_enhanced_notification(context, 'success')
Setup monitoring and troubleshooting
Configure notification logging
Enable detailed logging for notification debugging and monitoring.
[logging]
Add Slack notifications to logging configuration
logging_level = INFO
[slack]
Custom section for Slack notification settings
enable_logging = True
log_level = DEBUG
max_retry_attempts = 3
retry_delay_seconds = 30
Create notification health check DAG
Build a monitoring DAG that tests notification delivery and Slack connectivity.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
from utils.advanced_notifications import notification_manager
def test_slack_connectivity(**context):
"""Test basic Slack webhook connectivity"""
try:
slack_webhook = SlackWebhookHook(http_conn_id="slack_webhook")
test_message = {
"text": ":white_check_mark: Airflow notification system health check - all systems operational",
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f":white_check_mark: Notification Health Check - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
}
},
{
"type": "section",
"fields": [
{"type": "mrkdwn", "text": "Webhook Status: :white_check_mark: Connected"},
{"type": "mrkdwn", "text": "Config Status: :white_check_mark: Loaded"},
{"type": "mrkdwn", "text": "Notification Rules: :white_check_mark: Active"}
]
}
]
}
slack_webhook.send_dict(test_message, channel="#data-ops")
return "Health check completed successfully"
except Exception as e:
# This failure will trigger the notification system
raise Exception(f"Slack connectivity test failed: {str(e)}")
default_args = {
'owner': 'platform-team',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': False,
'retries': 1,
'retry_delay': timedelta(minutes=2),
'on_failure_callback': notification_manager.enhanced_failure_callback,
}
dag = DAG(
'notification_health_check',
default_args=default_args,
description='Monitor Airflow notification system health',
schedule_interval=timedelta(hours=12), # Run twice daily
catchup=False,
tags=['monitoring', 'notifications', 'health-check'],
)
health_check_task = PythonOperator(
task_id='test_slack_connectivity',
python_callable=test_slack_connectivity,
dag=dag,
)
Monitor notification delivery metrics
Add custom metrics to track notification success rates and response times.
import time
from datetime import datetime
from airflow.models import Variable
import json
class NotificationMetrics:
def __init__(self):
self.metrics_key = "notification_metrics"
def record_notification_attempt(self, dag_id, notification_type, success, response_time_ms):
"""Record notification delivery metrics"""
try:
# Get existing metrics
existing_metrics = Variable.get(self.metrics_key, default_var="{}")
metrics = json.loads(existing_metrics)
# Initialize structure if needed
today = datetime.now().strftime('%Y-%m-%d')
if today not in metrics:
metrics[today] = {
'total_attempts': 0,
'successful': 0,
'failed': 0,
'average_response_time': 0,
'by_dag': {}
}
# Update metrics
daily_metrics = metrics[today]
daily_metrics['total_attempts'] += 1
if success:
daily_metrics['successful'] += 1
else:
daily_metrics['failed'] += 1
# Update response time average
current_avg = daily_metrics['average_response_time']
total_attempts = daily_metrics['total_attempts']
daily_metrics['average_response_time'] = ((current_avg * (total_attempts - 1)) + response_time_ms) / total_attempts
# Track by DAG
if dag_id not in daily_metrics['by_dag']:
daily_metrics['by_dag'][dag_id] = {'attempts': 0, 'successful': 0}
daily_metrics['by_dag'][dag_id]['attempts'] += 1
if success:
daily_metrics['by_dag'][dag_id]['successful'] += 1
# Store updated metrics
Variable.set(self.metrics_key, json.dumps(metrics))
except Exception as e:
print(f"Failed to record notification metrics: {e}")
def get_daily_summary(self, date_str=None):
"""Get notification metrics summary for a specific date"""
if not date_str:
date_str = datetime.now().strftime('%Y-%m-%d')
try:
metrics = json.loads(Variable.get(self.metrics_key, default_var="{}"))
return metrics.get(date_str, {})
except:
return {}
Global metrics instance
notification_metrics = NotificationMetrics()
Verify your setup
Test your notification system with a controlled failure and verify all components work correctly.
# Test Slack webhook connectivity
curl -X POST -H 'Content-type: application/json' \
--data '{"text":"Airflow notification test - setup verification"}' \
https://hooks.slack.com/services/YOUR/WEBHOOK/PATH
Check Airflow connections
airflow connections list | grep slack
Verify notification module can be imported
python3 -c "from dags.utils.advanced_notifications import notification_manager; print('Notifications module loaded successfully')"
Test the health check DAG
airflow dags trigger notification_health_check
airflow dags state notification_health_check $(date +%Y-%m-%d)
raise Exception("Test notification") in a task) and verify the alert appears in your designated Slack channel with proper formatting and action buttons.Common issues
| Symptom | Cause | Fix |
|---|---|---|
| Slack messages not appearing | Incorrect webhook URL or connection | Verify connection with airflow connections test slack_webhook |
| Messages sent to wrong channel | Channel routing logic error | Check DAG tags and notification rules in notification_rules.yaml |
| Mentions not working | Bot lacks required OAuth permissions | Add users:read and chat:write.public scopes to Slack app |
| Notifications for every retry | Callback configured on task vs DAG level | Set on_failure_callback in DAG default_args, not individual tasks |
| Missing notification config | YAML file not found or malformed | Check file path and validate YAML syntax with python -c "import yaml; yaml.safe_load(open('config.yaml'))" |
| Action buttons not working | Incorrect Airflow webserver URL | Verify base_url in [webserver] section of airflow.cfg |
Next steps
- Optimize DAG performance to reduce failure rates and improve notification accuracy
- Set up comprehensive monitoring with DataDog integration for production observability
- Configure high availability with CeleryExecutor and Redis clustering
- Implement DAG security scanning to prevent notification system compromise
- Set up comprehensive testing for notification reliability
Running this in production?
Automated install script
Run this to automate the entire setup
#!/usr/bin/env bash
set -euo pipefail
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color
# Global variables
SLACK_WEBHOOK_URL=""
SLACK_BOT_TOKEN=""
AIRFLOW_HOME="/opt/airflow"
AIRFLOW_USER="airflow"
# Cleanup function
cleanup() {
if [[ $? -ne 0 ]]; then
echo -e "${RED}Installation failed. Check logs above for errors.${NC}"
fi
}
trap cleanup ERR
usage() {
echo "Usage: $0 --webhook-url <slack_webhook_url> [--bot-token <slack_bot_token>] [--airflow-home <path>]"
echo ""
echo "Required:"
echo " --webhook-url Slack webhook URL (https://hooks.slack.com/services/...)"
echo ""
echo "Optional:"
echo " --bot-token Slack bot OAuth token (xoxb-...) for advanced features"
echo " --airflow-home Airflow installation directory (default: /opt/airflow)"
exit 1
}
log_step() {
echo -e "${BLUE}[$1] $2${NC}"
}
log_success() {
echo -e "${GREEN}✓ $1${NC}"
}
log_warning() {
echo -e "${YELLOW}⚠ $1${NC}"
}
log_error() {
echo -e "${RED}✗ $1${NC}"
}
# Parse arguments
while [[ $# -gt 0 ]]; do
case $1 in
--webhook-url)
SLACK_WEBHOOK_URL="$2"
shift 2
;;
--bot-token)
SLACK_BOT_TOKEN="$2"
shift 2
;;
--airflow-home)
AIRFLOW_HOME="$2"
shift 2
;;
-h|--help)
usage
;;
*)
echo "Unknown option: $1"
usage
;;
esac
done
# Validate required arguments
if [[ -z "$SLACK_WEBHOOK_URL" ]]; then
log_error "Slack webhook URL is required"
usage
fi
# Check if running as root or with sudo
if [[ $EUID -eq 0 ]]; then
SUDO=""
else
if ! command -v sudo &> /dev/null; then
log_error "This script requires sudo privileges"
exit 1
fi
SUDO="sudo"
fi
log_step "1/8" "Detecting operating system..."
# Auto-detect distribution
if [[ -f /etc/os-release ]]; then
. /etc/os-release
case "$ID" in
ubuntu|debian)
PKG_MGR="apt"
PKG_INSTALL="apt install -y"
PKG_UPDATE="apt update"
PYTHON_PKG="python3-pip"
;;
almalinux|rocky|centos|rhel|ol)
PKG_MGR="dnf"
PKG_INSTALL="dnf install -y"
PKG_UPDATE="dnf update -y"
PYTHON_PKG="python3-pip"
;;
fedora)
PKG_MGR="dnf"
PKG_INSTALL="dnf install -y"
PKG_UPDATE="dnf update -y"
PYTHON_PKG="python3-pip"
;;
amzn)
PKG_MGR="yum"
PKG_INSTALL="yum install -y"
PKG_UPDATE="yum update -y"
PYTHON_PKG="python3-pip"
;;
*)
log_error "Unsupported distribution: $ID"
exit 1
;;
esac
log_success "Detected $PRETTY_NAME"
else
log_error "Cannot detect operating system"
exit 1
fi
log_step "2/8" "Checking prerequisites..."
# Check if Airflow is installed and get version
if ! command -v airflow &> /dev/null; then
log_error "Apache Airflow is not installed or not in PATH"
exit 1
fi
AIRFLOW_VERSION=$(airflow version 2>/dev/null | head -n1 || echo "unknown")
log_success "Found Airflow: $AIRFLOW_VERSION"
# Check Airflow services
if systemctl is-active --quiet airflow-webserver 2>/dev/null; then
log_success "Airflow webserver is running"
else
log_warning "Airflow webserver is not running"
fi
if systemctl is-active --quiet airflow-scheduler 2>/dev/null; then
log_success "Airflow scheduler is running"
else
log_warning "Airflow scheduler is not running"
fi
log_step "3/8" "Installing system dependencies..."
$SUDO $PKG_UPDATE
$SUDO $PKG_INSTALL curl $PYTHON_PKG
log_step "4/8" "Installing Airflow Slack provider..."
# Install as airflow user if exists, otherwise current user
if id "$AIRFLOW_USER" &>/dev/null; then
$SUDO -u $AIRFLOW_USER pip3 install apache-airflow-providers-slack==8.6.0
else
pip3 install apache-airflow-providers-slack==8.6.0
fi
# Upgrade Airflow database
if id "$AIRFLOW_USER" &>/dev/null; then
$SUDO -u $AIRFLOW_USER airflow db upgrade
else
airflow db upgrade
fi
log_success "Slack provider installed"
log_step "5/8" "Testing Slack webhook connectivity..."
# Test webhook
HOSTNAME=$(hostname)
if curl -s -X POST -H 'Content-type: application/json' \
--data "{\"text\":\"Airflow webhook test from $HOSTNAME\"}" \
"$SLACK_WEBHOOK_URL" | grep -q "ok"; then
log_success "Slack webhook test successful"
else
log_error "Slack webhook test failed"
exit 1
fi
log_step "6/8" "Configuring Airflow connections..."
# Extract webhook path from URL
WEBHOOK_PATH=$(echo "$SLACK_WEBHOOK_URL" | sed 's|https://hooks.slack.com||')
# Create Slack webhook connection
if id "$AIRFLOW_USER" &>/dev/null; then
$SUDO -u $AIRFLOW_USER airflow connections delete slack_webhook 2>/dev/null || true
$SUDO -u $AIRFLOW_USER airflow connections add slack_webhook \
--conn-type http \
--conn-host https://hooks.slack.com \
--conn-password "$WEBHOOK_PATH"
else
airflow connections delete slack_webhook 2>/dev/null || true
airflow connections add slack_webhook \
--conn-type http \
--conn-host https://hooks.slack.com \
--conn-password "$WEBHOOK_PATH"
fi
log_success "Slack webhook connection configured"
# Create Slack API connection if bot token provided
if [[ -n "$SLACK_BOT_TOKEN" ]]; then
if id "$AIRFLOW_USER" &>/dev/null; then
$SUDO -u $AIRFLOW_USER airflow connections delete slack_api 2>/dev/null || true
$SUDO -u $AIRFLOW_USER airflow connections add slack_api \
--conn-type slack \
--conn-password "$SLACK_BOT_TOKEN"
else
airflow connections delete slack_api 2>/dev/null || true
airflow connections add slack_api \
--conn-type slack \
--conn-password "$SLACK_BOT_TOKEN"
fi
log_success "Slack API connection configured"
fi
log_step "7/8" "Creating notification utility module..."
# Create dags directory if it doesn't exist
DAGS_DIR="$AIRFLOW_HOME/dags"
UTILS_DIR="$DAGS_DIR/utils"
$SUDO mkdir -p "$UTILS_DIR"
# Create notification utility module
cat > /tmp/slack_notifications.py << 'EOF'
import os
from datetime import datetime
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from airflow.models import Variable
def get_slack_channel(context):
"""Route alerts to appropriate Slack channel based on DAG tags and environment"""
dag = context['dag']
dag_tags = getattr(dag, 'tags', []) or []
if 'critical' in dag_tags:
return os.environ.get('AIRFLOW_SLACK_CRITICAL_CHANNEL', '#incidents')
elif 'prod' in dag_tags or 'production' in dag_tags:
return os.environ.get('AIRFLOW_SLACK_ALERTS_CHANNEL', '#data-ops')
else:
return os.environ.get('AIRFLOW_SLACK_DEV_CHANNEL', '#data-dev')
def create_failure_alert(context):
"""Create detailed failure notification for Slack"""
dag_run = context['dag_run']
task_instance = context['task_instance']
slack_msg = f"""
:rotating_light: *Airflow Task Failure*
*DAG:* {dag_run.dag_id}
*Task:* {task_instance.task_id}
*Execution Date:* {dag_run.execution_date}
*Log URL:* <{task_instance.log_url}|View Logs>
*Error:* {str(context.get('exception', 'Unknown error'))}
""".strip()
channel = get_slack_channel(context)
return SlackWebhookOperator(
task_id='slack_failure_alert',
http_conn_id='slack_webhook',
message=slack_msg,
channel=channel,
username='Airflow'
)
def create_success_alert(context):
"""Create success notification for critical DAGs"""
dag_run = context['dag_run']
dag_tags = getattr(context['dag'], 'tags', []) or []
# Only send success alerts for critical DAGs
if 'critical' not in dag_tags:
return None
slack_msg = f"""
:white_check_mark: *Airflow DAG Success*
*DAG:* {dag_run.dag_id}
*Execution Date:* {dag_run.execution_date}
*Duration:* {dag_run.end_date - dag_run.start_date if dag_run.end_date else 'N/A'}
""".strip()
channel = get_slack_channel(context)
return SlackWebhookOperator(
task_id='slack_success_alert',
http_conn_id='slack_webhook',
message=slack_msg,
channel=channel,
username='Airflow'
)
EOF
$SUDO mv /tmp/slack_notifications.py "$UTILS_DIR/"
$SUDO touch "$UTILS_DIR/__init__.py"
# Set proper ownership and permissions
if id "$AIRFLOW_USER" &>/dev/null; then
$SUDO chown -R $AIRFLOW_USER:$AIRFLOW_USER "$UTILS_DIR"
fi
$SUDO chmod 755 "$UTILS_DIR"
$SUDO chmod 644 "$UTILS_DIR"/*.py
log_success "Notification utility module created"
log_step "8/8" "Setting up environment variables..."
# Create environment file
ENV_FILE="/etc/environment"
if [[ ! -f "$ENV_FILE" ]]; then
$SUDO touch "$ENV_FILE"
fi
# Add Slack environment variables if they don't exist
if ! grep -q "AIRFLOW_SLACK_ALERTS_CHANNEL" "$ENV_FILE"; then
echo "AIRFLOW_SLACK_ALERTS_CHANNEL=#data-ops" | $SUDO tee -a "$ENV_FILE"
fi
if ! grep -q "AIRFLOW_SLACK_CRITICAL_CHANNEL" "$ENV_FILE"; then
echo "AIRFLOW_SLACK_CRITICAL_CHANNEL=#incidents" | $SUDO tee -a "$ENV_FILE"
fi
if ! grep -q "AIRFLOW_SLACK_DEV_CHANNEL" "$ENV_FILE"; then
echo "AIRFLOW_SLACK_DEV_CHANNEL=#data-dev" | $SUDO tee -a "$ENV_FILE"
fi
# Restart Airflow services to pick up new connections and environment
if systemctl is-active --quiet airflow-webserver 2>/dev/null; then
$SUDO systemctl restart airflow-webserver
fi
if systemctl is-active --quiet airflow-scheduler 2>/dev/null; then
$SUDO systemctl restart airflow-scheduler
fi
log_success "Environment variables configured"
echo ""
echo -e "${GREEN}✓ Airflow Slack alerting setup complete!${NC}"
echo ""
echo "Next steps:"
echo "1. Update your Slack channel names in /etc/environment if needed"
echo "2. Add 'from utils.slack_notifications import create_failure_alert' to your DAGs"
echo "3. Set on_failure_callback=create_failure_alert in your DAG definitions"
echo "4. Add tags like ['prod', 'critical'] to your DAGs for proper routing"
echo ""
echo "Example DAG configuration:"
echo " dag = DAG("
echo " 'my_dag',"
echo " tags=['prod', 'critical'],"
echo " on_failure_callback=create_failure_alert"
echo " )"
Review the script before running. Execute with: bash install.sh