Configure Airflow DAG security and isolation with RBAC policies

Advanced 45 min Apr 15, 2026 36 views
Ubuntu 24.04 Debian 12 AlmaLinux 9 Rocky Linux 9

Set up comprehensive security for Apache Airflow with role-based access control, DAG-level permissions, and resource isolation. Configure user authentication, implement fine-grained security policies, and establish monitoring for production-grade workflow orchestration.

Prerequisites

  • Apache Airflow installed with PostgreSQL backend
  • LDAP server for authentication (optional)
  • SSL certificates for secure communication
  • Administrative access to the server

What this solves

Apache Airflow's default configuration allows all users to view, edit, and execute any DAG, creating significant security risks in production environments. This tutorial implements comprehensive RBAC policies, DAG-level security controls, and resource isolation to ensure workflows run securely with appropriate access restrictions and audit trails.

Step-by-step configuration

Enable RBAC and authentication

Configure Airflow to use role-based access control with database authentication backend.

[webserver]
rbac = True
authenticate = True
auth_backend = airflow.contrib.auth.backends.password_auth

[core]
security = rbac
default_timezone = UTC
store_serialized_dags = True

[api]
auth_backend = airflow.api.auth.backend.basic_auth

Create admin user and configure database

Initialize the Airflow database with RBAC tables and create the first administrative user.

cd /opt/airflow
sudo -u airflow airflow db init
sudo -u airflow airflow users create \
    --username admin \
    --firstname Admin \
    --lastname User \
    --role Admin \
    --email admin@example.com \
    --password SecurePassword123!

Configure LDAP authentication

Set up LDAP integration for enterprise authentication systems.

[webserver]
auth_backend = airflow.contrib.auth.backends.ldap_auth

[ldap]
uri = ldap://ldap.example.com:389
user_filter = objectClass=*
user_name_attr = uid
group_member_attr = memberOf
superuser_filter = memberOf=cn=airflow-admins,ou=groups,dc=example,dc=com
data_profiler_filter = memberOf=cn=airflow-profilers,ou=groups,dc=example,dc=com
bind_user = cn=airflow,ou=service,dc=example,dc=com
bind_password = LdapBindPassword123!
basedn = ou=users,dc=example,dc=com
cacert = /etc/ssl/certs/ldap-ca.pem
search_scope = SUBTREE

Install RBAC dependencies

Install required Python packages for RBAC functionality and LDAP integration.

sudo apt update
sudo apt install -y python3-ldap libldap2-dev libsasl2-dev libssl-dev
sudo -u airflow pip install flask-appbuilder[ldap]
sudo -u airflow pip install apache-airflow[ldap]
sudo dnf update -y
sudo dnf install -y python3-ldap openldap-devel cyrus-sasl-devel openssl-devel
sudo -u airflow pip install flask-appbuilder[ldap]
sudo -u airflow pip install apache-airflow[ldap]

Create custom roles and permissions

Define specific roles with granular permissions for different user groups.

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.www.app import create_app
from flask_appbuilder.security.sqla.models import Role, Permission, ViewMenu
from datetime import datetime, timedelta

def create_custom_roles():
    app = create_app()
    with app.app_context():
        security_manager = app.appbuilder.sm
        
        # Create DAG Developer role
        dag_developer_role = security_manager.find_role("DAG_Developer")
        if not dag_developer_role:
            dag_developer_role = security_manager.add_role("DAG_Developer")
            
        # Add permissions for DAG developers
        permissions = [
            ("can_read", "DAGs"),
            ("can_edit", "DAGs"),
            ("can_create", "DAGs"),
            ("can_read", "Task Instances"),
            ("can_read", "Log"),
            ("can_read", "XComs"),
        ]
        
        for perm_name, view_name in permissions:
            perm = security_manager.find_permission_view_menu(perm_name, view_name)
            if perm and perm not in dag_developer_role.permissions:
                dag_developer_role.permissions.append(perm)
                
        # Create DAG Viewer role
        dag_viewer_role = security_manager.find_role("DAG_Viewer")
        if not dag_viewer_role:
            dag_viewer_role = security_manager.add_role("DAG_Viewer")
            
        viewer_permissions = [
            ("can_read", "DAGs"),
            ("can_read", "Task Instances"),
            ("can_read", "Log"),
        ]
        
        for perm_name, view_name in viewer_permissions:
            perm = security_manager.find_permission_view_menu(perm_name, view_name)
            if perm and perm not in dag_viewer_role.permissions:
                dag_viewer_role.permissions.append(perm)
                
        security_manager.get_session.commit()

dag = DAG(
    'create_roles_dag',
    default_args={
        'owner': 'admin',
        'start_date': datetime(2024, 1, 1),
        'retries': 0,
    },
    schedule_interval=None,
    catchup=False
)

create_roles_task = PythonOperator(
    task_id='create_custom_roles',
    python_callable=create_custom_roles,
    dag=dag
)

Implement DAG-level access control

Configure DAG-specific permissions using access control decorators and filters.

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.models import Variable
from datetime import datetime, timedelta
import os

DAG access control configuration

DAG_OWNER = 'finance_team' ALLOWED_ROLES = ['Admin', 'Finance_Analyst', 'DAG_Developer'] REQUIRED_GROUPS = ['finance', 'data_analysts'] def check_dag_access(): """Verify user has appropriate access to this DAG""" from flask_login import current_user if not current_user.is_authenticated: raise Exception("User not authenticated") user_roles = [role.name for role in current_user.roles] if not any(role in ALLOWED_ROLES for role in user_roles): raise Exception(f"User {current_user.username} lacks required role for this DAG") return True default_args = { 'owner': DAG_OWNER, 'depends_on_past': False, 'start_date': datetime(2024, 1, 1), 'email_on_failure': True, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), 'queue': 'finance_queue', # Dedicated queue for isolation } dag = DAG( 'secure_finance_dag', default_args=default_args, description='Finance DAG with RBAC security', schedule_interval=timedelta(hours=6), catchup=False, tags=['finance', 'secure', 'rbac'], access_control={ 'Finance_Analyst': {'can_dag_read', 'can_dag_edit'}, 'DAG_Viewer': {'can_dag_read'}, 'Admin': {'can_dag_read', 'can_dag_edit', 'can_dag_delete'}, }, )

Security check task

security_check = PythonOperator( task_id='verify_access', python_callable=check_dag_access, dag=dag, )

Secure data processing task

process_data = BashOperator( task_id='process_financial_data', bash_command='''#!/bin/bash # Set secure environment export PATH=/usr/local/bin:/usr/bin:/bin export PYTHONPATH=/opt/airflow/dags # Validate input parameters if [[ -z "$AIRFLOW_CTX_DAG_ID" ]]; then echo "ERROR: DAG context not available" exit 1 fi # Process with restricted permissions cd /opt/airflow/secure_data python3 finance_processor.py --dag-id="$AIRFLOW_CTX_DAG_ID" --user="$AIRFLOW_CTX_TASK_OWNER" ''', dag=dag, ) security_check >> process_data

Configure resource isolation

Set up Celery queues and worker pools for workload isolation and resource limits.

[celery]
worker_concurrency = 4
worker_prefetch_multiplier = 1
task_track_started = True
task_time_limit = 7200
task_soft_time_limit = 3600

[celery_kubernetes_executor]
kubernetes_queue = kubernetes

Queue configuration for isolation

[celery_worker_queues] default = default finance_queue = finance_queue hr_queue = hr_queue data_science_queue = data_science_queue high_memory_queue = high_memory_queue

Resource limits per queue

[celery_worker_limits] finance_queue_memory_limit = 4G finance_queue_cpu_limit = 2 hr_queue_memory_limit = 2G hr_queue_cpu_limit = 1 data_science_queue_memory_limit = 16G data_science_queue_cpu_limit = 8

Create systemd service for isolated workers

Set up dedicated worker services with resource constraints and security isolation.

[Unit]
Description=Airflow Finance Queue Worker
After=network.target
Requires=postgresql.service
After=postgresql.service

[Service]
Type=notify
User=airflow
Group=airflow-finance
WorkingDirectory=/opt/airflow
Environment=PATH=/opt/airflow/venv/bin:/usr/local/bin:/usr/bin:/bin
Environment=AIRFLOW_HOME=/opt/airflow
Environment=PYTHONPATH=/opt/airflow/dags
ExecStart=/opt/airflow/venv/bin/airflow celery worker --queues=finance_queue
Restart=always
RestartSec=30
KillMode=mixed
TimeoutStopSec=60

Security and resource limits

NoNewPrivileges=true PrivateTmp=true ProtectSystem=strict ProtectHome=true ReadWritePaths=/opt/airflow/logs /opt/airflow/dags MemoryMax=4G CPUQuota=200% TasksMax=100

Network isolation

PrivateNetwork=false RestrictAddressFamilies=AF_INET AF_INET6 AF_UNIX [Install] WantedBy=multi-user.target

Enable and start isolated workers

Start the finance queue worker with resource isolation and security constraints.

sudo systemctl daemon-reload
sudo systemctl enable airflow-worker-finance.service
sudo systemctl start airflow-worker-finance.service
sudo systemctl status airflow-worker-finance.service

Configure audit logging

Set up comprehensive audit logging for security monitoring and compliance.

[logging]
logging_level = INFO
fab_logging_level = WARN
log_processor_filename_template = {{ filename }}.log
dag_processor_manager_log_location = /opt/airflow/logs/dag_processor_manager/dag_processor_manager.log

Audit logging configuration

[audit] enable_audit_log = True audit_log_file = /opt/airflow/logs/audit/airflow_audit.log audit_log_level = INFO audit_events = login,logout,dag_create,dag_edit,dag_delete,task_run,task_fail,variable_create,variable_edit,variable_delete,connection_create,connection_edit,connection_delete

Security logging

[security] log_authentication_events = True log_authorization_failures = True security_log_file = /opt/airflow/logs/security/airflow_security.log

Set up log rotation and monitoring

Configure logrotate for audit logs and set up monitoring alerts for security events.

/opt/airflow/logs/audit/*.log {
    daily
    missingok
    rotate 90
    compress
    delaycompress
    notifempty
    copytruncate
    create 0640 airflow airflow
    postrotate
        systemctl reload airflow-webserver
    endscript
}

/opt/airflow/logs/security/*.log {
    daily
    missingok
    rotate 90
    compress
    delaycompress
    notifempty
    copytruncate
    create 0640 airflow airflow
}

Create security monitoring script

Implement automated monitoring for security events and policy violations.

#!/usr/bin/env python3
import re
import smtplib
import logging
from datetime import datetime, timedelta
from email.mime.text import MIMEText
from pathlib import Path

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class AirflowSecurityMonitor:
    def __init__(self):
        self.audit_log = "/opt/airflow/logs/audit/airflow_audit.log"
        self.security_log = "/opt/airflow/logs/security/airflow_security.log"
        self.alert_email = "security@example.com"
        self.smtp_server = "smtp.example.com"
        
    def check_failed_logins(self, threshold=5, window_minutes=15):
        """Monitor for brute force attempts"""
        pattern = r'FAILED_LOGIN.user:(\S+).ip:(\S+)'
        failed_attempts = {}
        
        with open(self.security_log, 'r') as f:
            for line in f:
                match = re.search(pattern, line)
                if match:
                    user, ip = match.groups()
                    key = f"{user}@{ip}"
                    failed_attempts[key] = failed_attempts.get(key, 0) + 1
                    
        alerts = []
        for key, count in failed_attempts.items():
            if count >= threshold:
                alerts.append(f"Brute force detected: {count} failed attempts for {key}")
                
        return alerts
        
    def check_privilege_escalation(self):
        """Monitor for unauthorized privilege escalation"""
        pattern = r'PRIVILEGE_ESCALATION.user:(\S+).role:(\S+)'
        alerts = []
        
        with open(self.audit_log, 'r') as f:
            for line in f:
                if 'PRIVILEGE_ESCALATION' in line:
                    match = re.search(pattern, line)
                    if match:
                        user, role = match.groups()
                        alerts.append(f"Privilege escalation attempt: {user} -> {role}")
                        
        return alerts
        
    def check_dag_access_violations(self):
        """Monitor for DAG access violations"""
        pattern = r'ACCESS_DENIED.dag:(\S+).user:(\S+)'
        violations = []
        
        with open(self.audit_log, 'r') as f:
            for line in f:
                if 'ACCESS_DENIED' in line:
                    match = re.search(pattern, line)
                    if match:
                        dag, user = match.groups()
                        violations.append(f"Access denied: {user} attempted to access {dag}")
                        
        return violations
        
    def send_alert(self, alerts):
        """Send security alerts via email"""
        if not alerts:
            return
            
        subject = f"Airflow Security Alert - {datetime.now().strftime('%Y-%m-%d %H:%M')}"
        body = "\n".join(alerts)
        
        msg = MIMEText(body)
        msg['Subject'] = subject
        msg['From'] = 'airflow-security@example.com'
        msg['To'] = self.alert_email
        
        try:
            with smtplib.SMTP(self.smtp_server) as server:
                server.send_message(msg)
            logger.info(f"Security alert sent: {len(alerts)} events")
        except Exception as e:
            logger.error(f"Failed to send alert: {e}")
            
    def run_checks(self):
        """Run all security checks"""
        all_alerts = []
        
        # Check for brute force attempts
        failed_login_alerts = self.check_failed_logins()
        all_alerts.extend(failed_login_alerts)
        
        # Check for privilege escalation
        privilege_alerts = self.check_privilege_escalation()
        all_alerts.extend(privilege_alerts)
        
        # Check DAG access violations
        access_alerts = self.check_dag_access_violations()
        all_alerts.extend(access_alerts)
        
        if all_alerts:
            self.send_alert(all_alerts)
            logger.warning(f"Security events detected: {len(all_alerts)}")
        else:
            logger.info("No security events detected")
            
if __name__ == "__main__":
    monitor = AirflowSecurityMonitor()
    monitor.run_checks()

Schedule security monitoring

Set up automated security monitoring with systemd timer for continuous oversight.

[Unit]
Description=Airflow Security Monitor

[Service]
Type=oneshot
User=airflow
Group=airflow
WorkingDirectory=/opt/airflow
ExecStart=/usr/bin/python3 /opt/airflow/scripts/security_monitor.py
StandardOutput=journal
StandardError=journal
[Unit]
Description=Run Airflow Security Monitor every 15 minutes
Requires=airflow-security-monitor.service

[Timer]
OnCalendar=*:0/15
Persistent=true

[Install]
WantedBy=timers.target
sudo chmod +x /opt/airflow/scripts/security_monitor.py
sudo systemctl daemon-reload
sudo systemctl enable airflow-security-monitor.timer
sudo systemctl start airflow-security-monitor.timer
sudo systemctl status airflow-security-monitor.timer

Configure connection and variable security

Implement encryption and access controls for sensitive Airflow connections and variables.

[core]

Enable Fernet encryption for connections and variables

fernet_key = YOUR_GENERATED_FERNET_KEY_HERE encrypt_s3_logs = True [secrets]

Backend for secrets management

backend = airflow.providers.hashicorp.secrets.vault.VaultBackend backend_kwargs = {"connections_path": "connections", "variables_path": "variables", "url": "https://vault.example.com:8200"} [connections]

Restrict connection access

connections_check_enabled = True allow_insecure_connections = False

Generate Fernet key for encryption

Create a secure Fernet key for encrypting sensitive data in the Airflow database.

python3 -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())"

Update the fernet_key in airflow.cfg with the generated key

sudo sed -i "s/YOUR_GENERATED_FERNET_KEY_HERE/$(python3 -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())")/" /opt/airflow/airflow.cfg

Restart Airflow services

Restart all Airflow services to apply the security configuration changes.

sudo systemctl restart airflow-webserver
sudo systemctl restart airflow-scheduler
sudo systemctl restart airflow-worker-finance
sudo systemctl status airflow-webserver airflow-scheduler

Verify your setup

Test the RBAC configuration and security policies to ensure proper access control.

# Check RBAC is enabled
curl -u admin:SecurePassword123! http://localhost:8080/api/v1/config | grep -i rbac

Verify user roles

sudo -u airflow airflow users list

Check worker queues

sudo -u airflow airflow celery inspect active_queues

Test DAG access control

curl -u admin:SecurePassword123! http://localhost:8080/api/v1/dags/secure_finance_dag

Verify audit logging

tail -f /opt/airflow/logs/audit/airflow_audit.log

Check security monitoring

sudo systemctl status airflow-security-monitor.timer journalctl -u airflow-security-monitor.service --since "1 hour ago"

Common issues

Symptom Cause Fix
RBAC interface not loading Missing Flask-AppBuilder dependencies pip install flask-appbuilder[oauth,openid,ldap]
LDAP authentication failing Incorrect LDAP configuration Test with ldapsearch -x -H ldap://server -D "bind_user" -W
DAG access denied errors Role permissions not properly set Check role assignments in Airflow UI under Security > List Roles
Workers not processing specific queues Queue configuration mismatch Verify queue names in airflow.cfg and worker startup commands
Audit logs not generating Log directory permissions sudo chown -R airflow:airflow /opt/airflow/logs && chmod 755 /opt/airflow/logs/audit
Fernet encryption errors Invalid or missing Fernet key Generate new key with python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())"
Never use chmod 777. It gives every user on the system full access to your files. Instead, fix ownership with chown and use minimal permissions like 755 for directories and 644 for files.

Next steps

Automated install script

Run this to automate the entire setup

Need help?

Don't want to manage this yourself?

We handle infrastructure security hardening for businesses that depend on uptime. From initial setup to ongoing operations.