Integrate WireGuard VPN server with LDAP authentication for enterprise user management

Advanced 45 min Apr 25, 2026 22 views
Ubuntu 24.04 Debian 12 AlmaLinux 9 Rocky Linux 9

Configure WireGuard VPN server to authenticate users against LDAP directory services like Active Directory. Automate client certificate management and implement centralized user access control for enterprise environments.

Prerequisites

  • Root access to server
  • LDAP/Active Directory server with service account
  • Basic knowledge of Python and LDAP concepts
  • Firewall configuration access

What this solves

WireGuard provides fast, secure VPN connections but lacks built-in user management beyond manual key distribution. This tutorial integrates WireGuard with LDAP authentication, allowing you to manage VPN access through your existing Active Directory or OpenLDAP infrastructure. You'll automate client certificate generation, implement user group-based access policies, and centralize VPN user management.

Step-by-step configuration

Update system packages

Start by updating your package manager and installing essential dependencies for WireGuard and LDAP integration.

sudo apt update && sudo apt upgrade -y
sudo apt install -y wireguard wireguard-tools python3-pip python3-venv ldap-utils libldap2-dev libsasl2-dev libssl-dev
sudo dnf update -y
sudo dnf install -y wireguard-tools python3-pip python3-devel openldap-devel cyrus-sasl-devel openssl-devel gcc

Enable IP forwarding

Configure the kernel to forward packets between network interfaces, which is required for VPN routing.

echo 'net.ipv4.ip_forward = 1' | sudo tee -a /etc/sysctl.conf
echo 'net.ipv6.conf.all.forwarding = 1' | sudo tee -a /etc/sysctl.conf
sudo sysctl -p

Generate WireGuard server keys

Create the cryptographic keys that WireGuard uses for secure communications.

sudo mkdir -p /etc/wireguard
sudo chmod 700 /etc/wireguard
cd /etc/wireguard
sudo wg genkey | sudo tee server_private.key
sudo chmod 600 server_private.key
sudo cat server_private.key | wg pubkey | sudo tee server_public.key

Configure WireGuard server

Create the main WireGuard configuration file with network settings and firewall rules.

[Interface]
PrivateKey = $(sudo cat /etc/wireguard/server_private.key)
Address = 10.66.66.1/24
ListenPort = 51820
SaveConfig = false

NAT rules for client traffic

PostUp = iptables -A FORWARD -i %i -j ACCEPT; iptables -A FORWARD -o %i -j ACCEPT; iptables -t nat -A POSTROUTING -o eth0 -j MASQUERADE PostDown = iptables -D FORWARD -i %i -j ACCEPT; iptables -D FORWARD -o %i -j ACCEPT; iptables -t nat -D POSTROUTING -o eth0 -j MASQUERADE
Note: Replace eth0 with your server's primary network interface name. Check with ip route show default.

Configure firewall for WireGuard

Open the WireGuard port and ensure proper traffic forwarding through the system firewall.

sudo ufw allow 51820/udp
sudo ufw allow ssh
sudo ufw --force enable
sudo firewall-cmd --permanent --add-port=51820/udp
sudo firewall-cmd --permanent --add-masquerade
sudo firewall-cmd --reload

Create LDAP authentication script

Build a Python script that handles LDAP authentication and user group validation for VPN access.

sudo python3 -m venv /opt/wg-ldap
sudo /opt/wg-ldap/bin/pip install ldap3 configparser
#!/opt/wg-ldap/bin/python3
import sys
import configparser
from ldap3 import Server, Connection, ALL, NTLM
from ldap3.core.exceptions import LDAPException

def authenticate_user(username, password):
    config = configparser.ConfigParser()
    config.read('/etc/wireguard/ldap.conf')
    
    ldap_server = config.get('ldap', 'server')
    ldap_port = config.getint('ldap', 'port', fallback=389)
    use_ssl = config.getboolean('ldap', 'use_ssl', fallback=False)
    bind_dn = config.get('ldap', 'bind_dn')
    bind_password = config.get('ldap', 'bind_password')
    user_base = config.get('ldap', 'user_base')
    user_filter = config.get('ldap', 'user_filter')
    vpn_group = config.get('ldap', 'vpn_group', fallback=None)
    
    try:
        server = Server(ldap_server, port=ldap_port, use_ssl=use_ssl, get_info=ALL)
        
        # Bind with service account
        bind_conn = Connection(server, bind_dn, bind_password, auto_bind=True)
        
        # Search for user
        search_filter = user_filter.format(username=username)
        bind_conn.search(user_base, search_filter, attributes=['cn', 'memberOf'])
        
        if not bind_conn.entries:
            print(f"User {username} not found in LDAP")
            return False
            
        user_entry = bind_conn.entries[0]
        user_dn = user_entry.entry_dn
        
        # Check group membership if configured
        if vpn_group:
            member_of = [str(group).lower() for group in user_entry.memberOf]
            if not any(vpn_group.lower() in group for group in member_of):
                print(f"User {username} not in VPN group {vpn_group}")
                return False
        
        # Authenticate user with their credentials
        user_conn = Connection(server, user_dn, password, auto_bind=True)
        
        print(f"User {username} authenticated successfully")
        return True
        
    except LDAPException as e:
        print(f"LDAP error: {e}")
        return False
    except Exception as e:
        print(f"Authentication error: {e}")
        return False

if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: wg_ldap_auth.py  ")
        sys.exit(1)
    
    username = sys.argv[1]
    password = sys.argv[2]
    
    if authenticate_user(username, password):
        sys.exit(0)
    else:
        sys.exit(1)
sudo chmod +x /opt/wg-ldap/wg_ldap_auth.py

Configure LDAP connection settings

Create the configuration file with your LDAP server details and authentication parameters.

[ldap]
server = ldap.example.com
port = 389
use_ssl = false
bind_dn = CN=wireguard-service,CN=Users,DC=example,DC=com
bind_password = your-service-account-password
user_base = CN=Users,DC=example,DC=com
user_filter = (&(objectClass=user)(sAMAccountName={username}))
vpn_group = CN=VPN Users,CN=Groups,DC=example,DC=com
sudo chmod 600 /etc/wireguard/ldap.conf
Note: For OpenLDAP, use filter (&(objectClass=inetOrgPerson)(uid={username})) and adjust base DN format like ou=users,dc=example,dc=com.

Create client management script

Build a comprehensive script for managing WireGuard client configurations with LDAP integration.

#!/opt/wg-ldap/bin/python3
import os
import sys
import subprocess
import argparse
import ipaddress
from pathlib import Path

WG_DIR = Path("/etc/wireguard")
CLIENT_DIR = WG_DIR / "clients"
CONFIG_FILE = WG_DIR / "wg0.conf"
AUTH_SCRIPT = "/opt/wg-ldap/wg_ldap_auth.py"

def get_next_ip():
    """Find the next available IP address in the VPN subnet"""
    network = ipaddress.IPv4Network('10.66.66.0/24')
    used_ips = set(['10.66.66.1'])  # Server IP
    
    # Parse existing client IPs from config
    if CONFIG_FILE.exists():
        with open(CONFIG_FILE, 'r') as f:
            content = f.read()
            for line in content.split('\n'):
                if line.strip().startswith('AllowedIPs'):
                    ip = line.split('=')[1].strip().split('/')[0]
                    used_ips.add(ip)
    
    for ip in network.hosts():
        if str(ip) not in used_ips:
            return str(ip)
    
    raise Exception("No available IP addresses")

def generate_client_config(username, client_ip, server_public_key, client_private_key, client_public_key):
    """Generate client WireGuard configuration"""
    config = f"""[Interface]
PrivateKey = {client_private_key}
Address = {client_ip}/32
DNS = 8.8.8.8, 1.1.1.1

[Peer]
PublicKey = {server_public_key}
AllowedIPs = 0.0.0.0/0
Endpoint = YOUR_SERVER_IP:51820
PersistentKeepalive = 25
"""
    return config

def add_client(username, password):
    """Add a new VPN client after LDAP authentication"""
    # Authenticate with LDAP
    result = subprocess.run([AUTH_SCRIPT, username, password], 
                          capture_output=True, text=True)
    if result.returncode != 0:
        print(f"LDAP authentication failed for {username}")
        print(result.stdout)
        return False
    
    # Check if client already exists
    client_key_file = CLIENT_DIR / f"{username}_private.key"
    if client_key_file.exists():
        print(f"Client {username} already exists")
        return False
    
    # Create client directory
    CLIENT_DIR.mkdir(exist_ok=True)
    
    # Generate client keys
    client_private = subprocess.check_output(['wg', 'genkey'], text=True).strip()
    client_public = subprocess.check_output(['wg', 'pubkey'], 
                                          input=client_private, text=True).strip()
    
    # Get next available IP
    client_ip = get_next_ip()
    
    # Save client keys
    with open(client_key_file, 'w') as f:
        f.write(client_private)
    os.chmod(client_key_file, 0o600)
    
    with open(CLIENT_DIR / f"{username}_public.key", 'w') as f:
        f.write(client_public)
    
    # Get server public key
    with open(WG_DIR / "server_public.key", 'r') as f:
        server_public_key = f.read().strip()
    
    # Generate client config
    client_config = generate_client_config(username, client_ip, server_public_key, 
                                          client_private, client_public)
    
    # Save client config
    config_file = CLIENT_DIR / f"{username}.conf"
    with open(config_file, 'w') as f:
        f.write(client_config)
    
    # Add peer to server config
    peer_config = f"""\n# Client: {username}
[Peer]
PublicKey = {client_public}
AllowedIPs = {client_ip}/32\n"""
    
    with open(CONFIG_FILE, 'a') as f:
        f.write(peer_config)
    
    # Reload WireGuard
    subprocess.run(['systemctl', 'reload', 'wg-quick@wg0'], check=True)
    
    print(f"Client {username} added successfully")
    print(f"Config file: {config_file}")
    print(f"Client IP: {client_ip}")
    return True

def remove_client(username):
    """Remove a VPN client"""
    client_key_file = CLIENT_DIR / f"{username}_private.key"
    if not client_key_file.exists():
        print(f"Client {username} does not exist")
        return False
    
    # Get client public key
    with open(CLIENT_DIR / f"{username}_public.key", 'r') as f:
        client_public = f.read().strip()
    
    # Remove from server config
    with open(CONFIG_FILE, 'r') as f:
        lines = f.readlines()
    
    new_lines = []
    skip_peer = False
    for line in lines:
        if line.strip() == f"# Client: {username}":
            skip_peer = True
            continue
        elif line.strip().startswith("[Peer]") and skip_peer:
            continue
        elif line.strip().startswith("PublicKey =") and skip_peer:
            continue
        elif line.strip().startswith("AllowedIPs =") and skip_peer:
            skip_peer = False
            continue
        else:
            new_lines.append(line)
    
    with open(CONFIG_FILE, 'w') as f:
        f.writelines(new_lines)
    
    # Remove client files
    for file_pattern in [f"{username}_private.key", f"{username}_public.key", f"{username}.conf"]:
        file_path = CLIENT_DIR / file_pattern
        if file_path.exists():
            file_path.unlink()
    
    # Reload WireGuard
    subprocess.run(['systemctl', 'reload', 'wg-quick@wg0'], check=True)
    
    print(f"Client {username} removed successfully")
    return True

def list_clients():
    """List all VPN clients"""
    if not CLIENT_DIR.exists():
        print("No clients configured")
        return
    
    print("Configured VPN clients:")
    for key_file in CLIENT_DIR.glob("*_private.key"):
        username = key_file.stem.replace("_private", "")
        print(f"  - {username}")

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='WireGuard LDAP Client Manager')
    subparsers = parser.add_subparsers(dest='action', help='Available actions')
    
    add_parser = subparsers.add_parser('add', help='Add a new client')
    add_parser.add_argument('username', help='LDAP username')
    add_parser.add_argument('password', help='LDAP password')
    
    remove_parser = subparsers.add_parser('remove', help='Remove a client')
    remove_parser.add_argument('username', help='Client username')
    
    subparsers.add_parser('list', help='List all clients')
    
    args = parser.parse_args()
    
    if args.action == 'add':
        add_client(args.username, args.password)
    elif args.action == 'remove':
        remove_client(args.username)
    elif args.action == 'list':
        list_clients()
    else:
        parser.print_help()
sudo chmod +x /opt/wg-ldap/wg_client_manager.py

Update server IP in client template

Replace the placeholder with your actual server's public IP address in the client manager script.

SERVER_IP=$(curl -s ipv4.icanhazip.com)
sudo sed -i "s/YOUR_SERVER_IP/$SERVER_IP/g" /opt/wg-ldap/wg_client_manager.py

Start WireGuard service

Enable and start the WireGuard VPN service with the configuration you created.

sudo systemctl enable wg-quick@wg0
sudo systemctl start wg-quick@wg0
sudo systemctl status wg-quick@wg0

Create user management aliases

Add convenient command aliases for managing VPN users with LDAP authentication.

sudo tee /usr/local/bin/wg-add-user << 'EOF'
#!/bin/bash
if [ $# -ne 2 ]; then
    echo "Usage: wg-add-user  "
    exit 1
fi
/opt/wg-ldap/wg_client_manager.py add "$1" "$2"
EOF

sudo tee /usr/local/bin/wg-remove-user << 'EOF'
#!/bin/bash
if [ $# -ne 1 ]; then
    echo "Usage: wg-remove-user "
    exit 1
fi
/opt/wg-ldap/wg_client_manager.py remove "$1"
EOF

sudo tee /usr/local/bin/wg-list-users << 'EOF'
#!/bin/bash
/opt/wg-ldap/wg_client_manager.py list
EOF

sudo chmod +x /usr/local/bin/wg-add-user /usr/local/bin/wg-remove-user /usr/local/bin/wg-list-users

Configure monitoring and logging

Set up connection logging

Configure WireGuard to log connection events for monitoring and auditing purposes.

# WireGuard logging
:msg, contains, "wireguard" /var/log/wireguard.log
& stop
sudo systemctl restart rsyslog
sudo touch /var/log/wireguard.log
sudo chmod 644 /var/log/wireguard.log

Create monitoring script

Build a script to monitor VPN connections and generate usage reports.

#!/opt/wg-ldap/bin/python3
import subprocess
import json
import datetime
from pathlib import Path

def get_wg_status():
    """Get current WireGuard status and peer information"""
    try:
        result = subprocess.run(['wg', 'show', 'wg0', 'dump'], 
                              capture_output=True, text=True, check=True)
        
        lines = result.stdout.strip().split('\n')
        if not lines or not lines[0]:
            return {'server': {}, 'peers': []}
        
        # First line is server info
        server_line = lines[0].split('\t')
        server_info = {
            'private_key': server_line[0],
            'public_key': server_line[1],
            'listen_port': server_line[2] if len(server_line) > 2 else None,
            'fwmark': server_line[3] if len(server_line) > 3 else None
        }
        
        # Remaining lines are peers
        peers = []
        for line in lines[1:]:
            if not line.strip():
                continue
            parts = line.split('\t')
            if len(parts) >= 3:
                peer = {
                    'public_key': parts[0],
                    'preshared_key': parts[1] if parts[1] != '(none)' else None,
                    'endpoint': parts[2] if parts[2] != '(none)' else None,
                    'allowed_ips': parts[3] if len(parts) > 3 else None,
                    'latest_handshake': int(parts[4]) if len(parts) > 4 and parts[4] != '0' else None,
                    'transfer_rx': int(parts[5]) if len(parts) > 5 else 0,
                    'transfer_tx': int(parts[6]) if len(parts) > 6 else 0,
                    'persistent_keepalive': parts[7] if len(parts) > 7 and parts[7] != 'off' else None
                }
                peers.append(peer)
        
        return {'server': server_info, 'peers': peers}
    
    except subprocess.CalledProcessError:
        return {'server': {}, 'peers': []}

def get_client_names():
    """Map public keys to client usernames"""
    client_dir = Path('/etc/wireguard/clients')
    client_map = {}
    
    if not client_dir.exists():
        return client_map
    
    for pub_key_file in client_dir.glob('*_public.key'):
        username = pub_key_file.stem.replace('_public', '')
        try:
            with open(pub_key_file, 'r') as f:
                public_key = f.read().strip()
                client_map[public_key] = username
        except IOError:
            continue
    
    return client_map

def format_bytes(bytes_val):
    """Format bytes in human readable format"""
    for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
        if bytes_val < 1024.0:
            return f"{bytes_val:.2f} {unit}"
        bytes_val /= 1024.0
    return f"{bytes_val:.2f} PB"

def format_timestamp(timestamp):
    """Format Unix timestamp to readable date"""
    if timestamp:
        return datetime.datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')
    return 'Never'

def main():
    status = get_wg_status()
    client_names = get_client_names()
    
    print("WireGuard VPN Status Report")
    print("=" * 50)
    print(f"Generated: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    print()
    
    print("Server Information:")
    server = status['server']
    print(f"  Public Key: {server.get('public_key', 'N/A')}")
    print(f"  Listen Port: {server.get('listen_port', 'N/A')}")
    print()
    
    print(f"Connected Clients ({len(status['peers'])}):")
    print("-" * 50)
    
    if not status['peers']:
        print("  No clients connected")
    else:
        for peer in status['peers']:
            username = client_names.get(peer['public_key'], 'Unknown')
            print(f"  Client: {username}")
            print(f"    Public Key: {peer['public_key'][:16]}...")
            print(f"    Allowed IPs: {peer.get('allowed_ips', 'N/A')}")
            print(f"    Endpoint: {peer.get('endpoint', 'N/A')}")
            print(f"    Last Handshake: {format_timestamp(peer.get('latest_handshake'))}")
            print(f"    Transfer RX: {format_bytes(peer.get('transfer_rx', 0))}")
            print(f"    Transfer TX: {format_bytes(peer.get('transfer_tx', 0))}")
            print()

if __name__ == "__main__":
    main()
sudo chmod +x /opt/wg-ldap/wg_monitor.py
sudo ln -s /opt/wg-ldap/wg_monitor.py /usr/local/bin/wg-status

Automate client certificate management

Create certificate renewal script

Set up automated certificate lifecycle management for enhanced security.

#!/opt/wg-ldap/bin/python3
import os
import sys
import subprocess
import datetime
import json
from pathlib import Path

CLIENT_DIR = Path("/etc/wireguard/clients")
CONFIG_FILE = Path("/etc/wireguard/wg0.conf")
METADATA_FILE = CLIENT_DIR / "client_metadata.json"

def load_metadata():
    """Load client metadata including creation dates"""
    if METADATA_FILE.exists():
        with open(METADATA_FILE, 'r') as f:
            return json.load(f)
    return {}

def save_metadata(metadata):
    """Save client metadata"""
    CLIENT_DIR.mkdir(exist_ok=True)
    with open(METADATA_FILE, 'w') as f:
        json.dump(metadata, f, indent=2)

def get_client_age(username, metadata):
    """Get the age of a client certificate in days"""
    if username not in metadata:
        return 0
    
    created = datetime.datetime.fromisoformat(metadata[username]['created'])
    age = (datetime.datetime.now() - created).days
    return age

def regenerate_client_keys(username):
    """Regenerate keys for an existing client"""
    print(f"Regenerating keys for client: {username}")
    
    # Generate new keys
    client_private = subprocess.check_output(['wg', 'genkey'], text=True).strip()
    client_public = subprocess.check_output(['wg', 'pubkey'], 
                                          input=client_private, text=True).strip()
    
    # Update key files
    private_key_file = CLIENT_DIR / f"{username}_private.key"
    public_key_file = CLIENT_DIR / f"{username}_public.key"
    
    with open(private_key_file, 'w') as f:
        f.write(client_private)
    os.chmod(private_key_file, 0o600)
    
    with open(public_key_file, 'w') as f:
        f.write(client_public)
    
    # Update server config
    update_server_config(username, client_public)
    
    # Update client config
    update_client_config(username, client_private)
    
    # Update metadata
    metadata = load_metadata()
    metadata[username] = {
        'created': datetime.datetime.now().isoformat(),
        'last_renewed': datetime.datetime.now().isoformat()
    }
    save_metadata(metadata)
    
    print(f"Keys regenerated for {username}")

def update_server_config(username, new_public_key):
    """Update server config with new client public key"""
    with open(CONFIG_FILE, 'r') as f:
        lines = f.readlines()
    
    new_lines = []
    in_client_section = False
    
    for line in lines:
        if line.strip() == f"# Client: {username}":
            in_client_section = True
            new_lines.append(line)
        elif in_client_section and line.strip().startswith("PublicKey ="):
            new_lines.append(f"PublicKey = {new_public_key}\n")
            in_client_section = False
        else:
            new_lines.append(line)
    
    with open(CONFIG_FILE, 'w') as f:
        f.writelines(new_lines)

def update_client_config(username, new_private_key):
    """Update client config file with new private key"""
    config_file = CLIENT_DIR / f"{username}.conf"
    
    with open(config_file, 'r') as f:
        lines = f.readlines()
    
    new_lines = []
    for line in lines:
        if line.strip().startswith("PrivateKey ="):
            new_lines.append(f"PrivateKey = {new_private_key}\n")
        else:
            new_lines.append(line)
    
    with open(config_file, 'w') as f:
        f.writelines(new_lines)

def check_certificate_expiry(max_age_days=90):
    """Check for certificates approaching expiry"""
    metadata = load_metadata()
    expiring_clients = []
    
    if not CLIENT_DIR.exists():
        return expiring_clients
    
    for key_file in CLIENT_DIR.glob("*_private.key"):
        username = key_file.stem.replace("_private", "")
        age = get_client_age(username, metadata)
        
        if age >= max_age_days:
            expiring_clients.append((username, age))
    
    return expiring_clients

def renew_expiring_certificates(max_age_days=90, dry_run=False):
    """Renew certificates that are approaching expiry"""
    expiring = check_certificate_expiry(max_age_days)
    
    if not expiring:
        print("No certificates need renewal")
        return
    
    print(f"Found {len(expiring)} certificates needing renewal:")
    for username, age in expiring:
        print(f"  - {username} (age: {age} days)")
    
    if dry_run:
        print("Dry run mode - no changes made")
        return
    
    for username, age in expiring:
        try:
            regenerate_client_keys(username)
        except Exception as e:
            print(f"Error renewing {username}: {e}")
    
    # Reload WireGuard
    subprocess.run(['systemctl', 'reload', 'wg-quick@wg0'], check=True)
    print("WireGuard reloaded with new certificates")

if __name__ == "__main__":
    import argparse
    
    parser = argparse.ArgumentParser(description='WireGuard Certificate Manager')
    parser.add_argument('--check', action='store_true', help='Check certificate expiry')
    parser.add_argument('--renew', action='store_true', help='Renew expiring certificates')
    parser.add_argument('--max-age', type=int, default=90, help='Maximum certificate age in days')
    parser.add_argument('--dry-run', action='store_true', help='Show what would be renewed')
    parser.add_argument('--regenerate', help='Regenerate keys for specific client')
    
    args = parser.parse_args()
    
    if args.check:
        expiring = check_certificate_expiry(args.max_age)
        if expiring:
            print("Certificates needing renewal:")
            for username, age in expiring:
                print(f"  - {username} (age: {age} days)")
        else:
            print("All certificates are current")
    elif args.renew:
        renew_expiring_certificates(args.max_age, args.dry_run)
    elif args.regenerate:
        regenerate_client_keys(args.regenerate)
        subprocess.run(['systemctl', 'reload', 'wg-quick@wg0'], check=True)
    else:
        parser.print_help()
sudo chmod +x /opt/wg-ldap/wg_cert_manager.py

Set up automated certificate renewal

Create a systemd timer to automatically check and renew expiring certificates.

[Unit]
Description=WireGuard Certificate Renewal
After=network.target

[Service]
Type=oneshot
ExecStart=/opt/wg-ldap/wg_cert_manager.py --renew --max-age 90
User=root
[Unit]
Description=Run WireGuard certificate renewal weekly
Requires=wg-cert-renewal.service

[Timer]
OnCalendar=weekly
Persistent=true

[Install]
WantedBy=timers.target
sudo systemctl daemon-reload
sudo systemctl enable wg-cert-renewal.timer
sudo systemctl start wg-cert-renewal.timer
sudo systemctl status wg-cert-renewal.timer

Monitor LDAP-authenticated VPN connections

You can integrate WireGuard with monitoring systems to track LDAP-authenticated connections. Here's how to set up monitoring with Prometheus metrics, which you can later visualize in Grafana dashboards as covered in our WireGuard monitoring tutorial.

Create Prometheus metrics exporter

Build a script that exports WireGuard connection metrics for monitoring systems.

#!/opt/wg-ldap/bin/python3
import subprocess
import time
from http.server import HTTPServer, BaseHTTPRequestHandler
from pathlib import Path
import json

class MetricsHandler(BaseHTTPRequestHandler):
    def do_GET(self):
        if self.path == '/metrics':
            metrics = generate_metrics()
            self.send_response(200)
            self.send_header('Content-Type', 'text/plain')
            self.end_headers()
            self.wfile.write(metrics.encode())
        else:
            self.send_response(404)
            self.end_headers()

def get_wg_peers():
    """Get WireGuard peer information"""
    try:
        result = subprocess.run(['wg', 'show', 'wg0', 'dump'], 
                              capture_output=True, text=True, check=True)
        peers = []
        for line in result.stdout.strip().split('\n')[1:]:  # Skip server line
            if line.strip():
                parts = line.split('\t')
                if len(parts) >= 7:
                    peers.append({
                        'public_key': parts[0],
                        'endpoint': parts[2] if parts[2] != '(none)' else None,
                        'allowed_ips': parts[3],
                        'latest_handshake': int(parts[4]) if parts[4] != '0' else None,
                        'transfer_rx': int(parts[5]),
                        'transfer_tx': int(parts[6])
                    })
        return peers
    except subprocess.CalledProcessError:
        return []

def get_client_names():
    """Map public keys to usernames"""
    client_dir = Path('/etc/wireguard/clients')
    client_map = {}
    
    if client_dir.exists():
        for pub_key_file in client_dir.glob('*_public.key'):
            username = pub_key_file.stem.replace('_public', '')
            try:
                with open(pub_key_file, 'r') as f:
                    public_key = f.read().strip()
                    client_map[public_key] = username
            except IOError:
                continue
    
    return client_map

def generate_metrics():
    """Generate Prometheus metrics"""
    peers = get_wg_peers()
    client_names = get_client_names()
    current_time = int(time.time())
    
    metrics = []
    
    # WireGuard server info
    metrics.append('# HELP wireguard_server_info WireGuard server information')
    metrics.append('# TYPE wireguard_server_info gauge')
    metrics.append('wireguard_server_info{interface="wg0"} 1')
    
    # Connected clients count
    active_clients = sum(1 for peer in peers if peer['latest_handshake'] and 
                        (current_time - peer['latest_handshake']) < 300)  # 5 min threshold
    
    metrics.append('# HELP wireguard_clients_connected Number of connected clients')
    metrics.append('# TYPE wireguard_clients_connected gauge')
    metrics.append(f'wireguard_clients_connected 2')
    
    metrics.append('# HELP wireguard_clients_total Total configured clients')
    metrics.append('# TYPE wireguard_clients_total gauge')
    metrics.append(f'wireguard_clients_total {len(peers)}')
    
    # Per-client metrics
    metrics.append('# HELP wireguard_client_last_handshake Last handshake time')
    metrics.append('# TYPE wireguard_client_last_handshake gauge')
    
    metrics.append('# HELP wireguard_client_bytes_received Bytes received from client')
    metrics.append('# TYPE wireguard_client_bytes_received counter')
    
    metrics.append('# HELP wireguard_client_bytes_sent Bytes sent to client')
    metrics.append('# TYPE wireguard_client_bytes_sent counter')
    
    for peer in peers:
        username = client_names.get(peer['public_key'], 'unknown')
        public_key_short = peer['public_key'][:16]
        
        if peer['latest_handshake']:
            metrics.append(f'wireguard_client_last_handshake{{username="{username}",public_key="{public_key_short}"}} {peer["latest_handshake"]}')
        
        metrics.append(f'wireguard_client_bytes_received{{username="{username}",public_key="{public_key_short}"}} {peer["transfer_rx"]}')
        metrics.append(f'wireguard_client_bytes_sent{{username="{username}",public_key="{public_key_short}"}} {peer["transfer_tx"]}')
    
    return '\n'.join(metrics) + '\n'

if __name__ == '__main__':
    server = HTTPServer(('0.0.0.0', 9586), MetricsHandler)
    print("WireGuard metrics exporter started on port 9586")
    try:
        server.serve_forever()
    except KeyboardInterrupt:
        server.shutdown()
sudo chmod +x /opt/wg-ldap/wg_prometheus_exporter.py

Create systemd service for metrics exporter

Set up the Prometheus exporter as a systemd service for reliable operation.

[Unit]
Description=WireGuard Prometheus Exporter
After=network.target wg-quick@wg0.service
Requires=wg-quick@wg0.service

[Service]
Type=simple
User=root
ExecStart=/opt/wg-ldap/wg_prometheus_exporter.py
Restart=always
RestartSec=5

[Install]
WantedBy=multi-user.target
sudo systemctl daemon-reload
sudo systemctl enable wg-prometheus-exporter
sudo systemctl start wg-prometheus-exporter
sudo systemctl status wg-prometheus-exporter

Verify your setup

Test the LDAP integration and verify that WireGuard is working correctly with your directory service authentication.

# Check WireGuard service status
sudo systemctl status wg-quick@wg0

Verify LDAP authentication script

/opt/wg-ldap/wg_ldap_auth.py testuser testpassword

Test client management

wg-list-users

Check monitoring

wg-status

Test metrics endpoint

curl -s http://localhost:9586/metrics | head -20

View WireGuard interface status

sudo wg show wg0

Add a test client

Create a test VPN client to verify LDAP authentication and configuration generation.

# Replace with actual LDAP credentials
wg-add-user johndoe "password123"

Verify the client was added

wg-list-users sudo wg show wg0

Check client configuration file

sudo cat /etc/wireguard/clients/johndoe.conf

Common issues

SymptomCauseFix
LDAP authentication failsIncorrect bind credentials or search baseTest with ldapsearch -x -H ldap://server -D "bind_dn" -W -b "search_base"
Client can't connectFirewall blocking UDP 51820Check firewall rules: sudo ufw status or sudo firewall-cmd --list-ports
No internet access through VPNIP forwarding not enabled or NAT rules missingVerify: cat /proc/sys/net/ipv4/ip_forward should return 1
Permission denied errorsIncorrect file ownership or permissionsFix ownership: sudo chown -R root:root /etc/wireguard and sudo chmod 600 /etc/wireguard/*.key
SSL/TLS errors with LDAPCertificate validation issuesSet use_ssl = true and port = 636 for LDAPS, or verify CA certificates
Group membership check failsIncorrect group DN or attribute nameUse LDAP browser tool or ldapsearch to verify group structure

Troubleshoot LDAP authentication issues

For more complex enterprise environments, you may need to integrate with comprehensive monitoring solutions. Consider setting up centralized log aggregation as described in our Loki and Promtail tutorial to collect and analyze VPN authentication logs across multiple servers.

Enable debug logging

Add detailed logging to troubleshoot LDAP connection and authentication problems.

#!/opt/wg-ldap/bin/python3
import configparser
import sys
from ldap3 import Server, Connection, ALL, NTLM, SUBTREE
from ldap3.core.exceptions import LDAPException
import logging

Enable debug logging

logging.basicConfig(level=logging.DEBUG, format='%(levelname)s: %(message)s') def debug_ldap_connection(): config = configparser.ConfigParser() config.read('/etc/wireguard/ldap.conf') ldap_server = config.get('ldap', 'server') ldap_port = config.getint('ldap', 'port', fallback=389) use_ssl = config.getboolean('ldap', 'use_ssl', fallback=False) bind_dn = config.get('ldap', 'bind_dn') bind_password = config.get('ldap', 'bind_password') user_base = config.get('ldap', 'user_base') print(f"Testing LDAP connection to {ldap_server}:{ldap_port}") print(f"SSL enabled: {use_ssl}") print(f"Bind DN: {bind_dn}") print(f"Search base: {user_base}") print("-" * 50) try: # Test server connection server = Server(ldap_server, port=ldap_port, use_ssl=use_ssl, get_info=ALL) print(f"Server info: {server.info}") # Test bind conn = Connection(server, bind_dn, bind_password, auto_bind=True) print("✓ Bind successful") # Test user search if len(sys.argv) > 1: test_user = sys.argv[1] user_filter = config.get('ldap', 'user_filter').format(username=test_user) print(f"Testing search for user: {test_user}") print(f"Filter: {user_filter}") conn.search(user_base, user_filter, SUBTREE, attributes=['cn', 'memberOf', 'mail']) if conn.entries: print(f"✓ User found: {conn.entries[0].entry_dn}") print(f"Attributes: {conn.entries[0]}") else: print("✗ User not found") except LDAPException as e: print(f"✗ LDAP error: {e}") except Exception as e: print(f"✗ Connection error: {e}") if __name__ == "__main__": debug_ldap_connection()
sudo chmod +x /opt/wg-ldap/debug_ldap.py

Test LDAP connection

sudo /opt/wg-ldap/debug_ldap.py

Test specific user lookup

sudo /opt/wg-ldap/debug_ldap.py johndoe

Test different LDAP configurations

Create configuration examples for common LDAP directory types.

# Active Directory example
[ldap]
server = dc1.company.local
port = 389
use_ssl = false
bind_dn = CN=svc-wireguard,OU=Service Accounts,DC=company,DC=local
bind_password = ServiceAccountPassword123!
user_base = CN=Users,DC=company,DC=local
user_filter = (&(objectClass=user)(sAMAccountName={username})(!(userAccountControl:1.2.840.113556.1.4.803:=2)))
vpn_group = CN=VPN-Users,OU=Security Groups,DC=company,DC=local
# OpenLDAP example
[ldap]
server = ldap.company.com
port = 389
use_ssl = false
bind_dn = cn=wireguard-bind,ou=services,dc=company,dc=com
bind_password = BindAccountPassword456!
user_base = ou=users,dc=company,dc=com
user_filter = (&(objectClass=inetOrgPerson)(uid={username}))
vpn_group = cn=vpn-users,ou=groups,dc=company,dc=com

Next steps

Running this in production?

Want this handled for you? Running this at scale adds a second layer of work: capacity planning, failover drills, cost control, and on-call. Our managed platform covers monitoring, backups and 24/7 response by default.

Automated install script

Run this to automate the entire setup

Need help?

Don't want to manage this yourself?

We handle infrastructure security hardening for businesses that depend on uptime. From initial setup to ongoing operations.