By Florian Maas on August 20, 2023
Estimated Reading Time: 15 minutes
Running Apache Airflow on Kubernetes is a powerful way to manage and orchestrate your workloads, but it can also be a bit daunting to get started with. In this tutorial, we will deploy an AKS cluster using Terraform and install Airflow on the AKS cluster using Helm.
All code used in this tutorial can be found on GitHub:
Before we start this tutorial, make sure the following prerequisites have been met:
az aks install-cli
.Let's get started!
Before we can install Airflow on a Kubernetes cluster, we should first create the cluster. We will do this using Terraform. In a previous blog post I presented a method to deploy infrastructure with Terraform by using GitHub Actions and a remote backend, but to keep this tutorial simple we will simply use a local backend here.
Let's start by definining some variables that we are going to need for defining our infrastructure.
Create a file called terraform/variables.tf
with the following contents:
variable "app_name" {
type = string
description = "name of the application"
}
variable "location" {
type = string
description = "location for the resources"
}
and then create a file called terraform/main.auto.tfvars
with the values for these variables:
app_name = "example"
location = "westeurope"
Now, we can define our AKS cluster in a file called terraform/main.tf
:
terraform {
required_providers {
azurerm = {
source = "hashicorp/azurerm"
version = "=3.70.0"
}
}
}
provider "azurerm" {
features {}
}
resource "azurerm_resource_group" "rg" {
name = "${var.app_name}rg"
location = var.location
}
# Azure Kubernetes Cluster
resource "azurerm_kubernetes_cluster" "main" {
name = "${var.app_name}aks"
location = var.location
resource_group_name = azurerm_resource_group.rg.name
dns_prefix = "${var.app_name}-aks"
default_node_pool {
name = "default"
node_count = 1
vm_size = "Standard_DS2_v2"
}
identity {
type = "SystemAssigned"
}
storage_profile {
blob_driver_enabled = true
}
}
Here, we have defined a resource group which will get the name examplerg
(because our app_name
variable has the value example
).
We also defined a Kubernetes cluster, with a basic node pool containing a single node. Additional node pools could easily be added later
by adding azurerm_kubernetes_cluster_node_pool
resources. We also enabled the blob driver for the cluster, which will turn out to be useful later
when we want to use Blob storage for our Airflow logs.
One useful addition to our AKS cluster would be a Container Registry. This can serve as a place to store our
Docker container images which can then be run on our cluster. To do so, let's add the following to terraform/main.tf
:
# Azure Container Registry
resource "azurerm_container_registry" "acr" {
name = "${var.app_name}acr"
resource_group_name = azurerm_resource_group.rg.name
location = var.location
sku = "Standard"
admin_enabled = true
}
resource "azurerm_role_assignment" "main" {
principal_id = azurerm_kubernetes_cluster.main.kubelet_identity[0].object_id
role_definition_name = "AcrPull"
scope = azurerm_container_registry.acr.id
}
This provisions the container registry, and it grants our Kubernetes Cluster permission to pull images from the container registry.
Note that the Container Registry must have a globally unique name, and since exampleacr
is likely already taken, you might need to
add some random digits as a suffix. The name of the Container Registry also occurs in some later steps, so keep in
mind that those should also be updated if you modify the name here.
Lastly, we will add some code to terraform/main.tf
that provisions a Blob container to store our Airflow logs:
# Blob storage for Airflow logs
resource "azurerm_storage_account" "airflow" {
name = "${var.app_name}airflowsa"
resource_group_name = azurerm_resource_group.rg.name
location = var.location
account_tier = "Standard"
account_replication_type = "LRS"
}
resource "azurerm_storage_container" "airflow_logs" {
name = "airflow-logs"
storage_account_name = azurerm_storage_account.airflow.name
container_access_type = "private"
}
resource "azurerm_storage_management_policy" "prune_logs" {
storage_account_id = azurerm_storage_account.airflow.id
rule {
name = "prune-logs"
enabled = true
filters {
prefix_match = ["airflow-logs"]
blob_types = ["blockBlob"]
}
actions {
base_blob {
delete_after_days_since_modification_greater_than = 7
}
}
}
}
This provisions a blob container called airflow-logs
, with a policy that prunes any log files that have not been modified
for more than seven days.
Now we have defined the infrastructure, let's deploy it to Azure. First, let's create a Service Principal with the permissions to deploy infrastructure:
az login
export SUBSCRIPTION_ID=$(az account show --query id -o tsv)
export SERVICE_PRINCIPAL_NAME="InfrastructureAccount"
az ad sp create-for-rbac \
--name $SERVICE_PRINCIPAL_NAME \
--role "Owner" \
--scopes "/subscriptions/$SUBSCRIPTION_ID" > credentials.json
credentials.json
to your .gitignore
now, and delete the file completely after finishing the tutorial. Consider storing the credentials in a secure location instead of keeping them stored locally.For Terraform to authenticate as the Service Principal, we should set some environment variables. This can be done by running the code below:
export ARM_CLIENT_ID=`cat credentials.json | python -c 'import json,sys;obj=json.load(sys.stdin);print(obj["appId"])'`
export ARM_CLIENT_SECRET=`cat credentials.json | python -c 'import json,sys;obj=json.load(sys.stdin);print(obj["password"])'`
export ARM_TENANT_ID=`cat credentials.json | python -c 'import json,sys;obj=json.load(sys.stdin);print(obj["tenant"])'`
export ARM_SUBSCRIPTION_ID=`az account show --query id -o tsv`
Now, we should be able to deploy our infrastructure by running the following Terraform commands within the terraform
directory:
terraform init
terraform fmt
terraform validate
terraform plan
terraform apply
Eventually, we should see something like:
Apply complete! Resources: 7 added, 0 changed, 0 destroyed.
Let's verify that we can connect to our AKS cluster:
az aks get-credentials --resource-group examplerg --name exampleaks
and if that is succesful, let's list all pods:
kubectl get pods -A
NAMESPACE NAME READY STATUS RESTARTS AGE
kube-system azure-ip-masq-agent-58pln 1/1 Running 0 20h
kube-system cloud-node-manager-glrh9 1/1 Running 0 20h
kube-system coredns-76b9877f49-8h6cz 1/1 Running 0 20h
kube-system coredns-76b9877f49-pjjdb 1/1 Running 0 20h
kube-system coredns-autoscaler-85f7d6b75d-wdbwd 1/1 Running 0 20h
kube-system csi-azuredisk-node-mmpxj 3/3 Running 0 20h
kube-system csi-azurefile-node-sbw7j 3/3 Running 0 20h
kube-system csi-blob-node-jhtnr 3/3 Running 0 17h
kube-system konnectivity-agent-659478b8b7-gpkwv 1/1 Running 0 20h
kube-system konnectivity-agent-659478b8b7-zqksv 1/1 Running 0 20h
kube-system kube-proxy-c48c8 1/1 Running 0 20h
kube-system metrics-server-c456c67cb-dqv8f 2/2 Running 0 20h
kube-system metrics-server-c456c67cb-r9f82 2/2 Running 0 20h
Great, we have provisioned our AKS cluster! Now let's continue by installing Airflow.
We will install Airflow using the Airflow Helm chart. For that purpose, we will start by creating
a directory airflow
, and in that directory we add a file called values.yaml
:
# Full template: https://github.com/apache/airflow/blob/main/chart/values.yaml
# Select certain nodes for airflow pods.
nodeSelector:
agentpool: default
# Airflow executor
executor: 'KubernetesExecutor'
# Environment variables for all airflow containers
env:
- name: ENVIRONMENT
value: dev
extraEnv: |
- name: AIRFLOW__CORE__DEFAULT_TIMEZONE
value: 'Europe/Amsterdam'
# Configuration for postgresql subchart
# Not recommended for production! Instead, spin up your own Postgresql server and use the `data`
# attribute in this yaml file.
postgresql:
enabled: true
# Enable pgbouncer.
# See https://airflow.apache.org/docs/helm-chart/stable/production-guide.html#pgbouncer
pgbouncer:
enabled: true
Since we only have one nodepool, the nodeSelector
is a bit superfluous. But if we start adding more nodepools in the future, this
ensures that Airflow stays running on our default nodepool, so it does not prevent the other nodepools from scaling down to 0. We set the executor to the
KubernetesExecutor, and we
add an environment variable ENVIRONMENT
with the value dev
to all our Airflow containers. Although we might not use that in this tutorial,
it can still be useful for later use. To keep this tutorial simple, we enable the built-in postgresql
subchart.
However, when we would use this in production we should spin up our own PostgreSQL server, add the connection string as
a Kubernetes secret, and add the name of that secret to the data
attribute.
There are still a few things missing from the values.yaml
file; we did not specify where Airflow should take our DAG's from and
we are not using our blob storage yet for the Airflow logs. Let's start filling in the gaps.
Airflow uses DAG's (Directed Acyclic Graphs) to define tasks to be run and the relationships between them. In this case, we will assume our DAG's are stored in a Git repository (as they should be) on GitHub, so we will use GitSync to synchronize the DAG's from Git to our Airflow instance.
To enable synchronization between GitHub and Airflow using GitSync, we use a deploy key.
To create a new deploy key, navigate to ~/.ssh
and run:
ssh-keygen -t rsa -b 4096 -C "your@email.com"
As the name, choose airflowsshkey
, and do not set a password. Now, print the public key to the console:
cat ~/.ssh/airflowsshkey.pub
Add the public key as a deploy key to your GitHub repository (Settings > Deploy Keys > Add deploy key
).
Now we need to add the private key as a secret to our Kubernetes cluster. Before we do so, let's create a namespace for all our Airflow resources:
kubectl create namespace airflow
Then, let's create a secret called airflow-git-ssh-secret
in the airflow
namespace in kubernetes:
kubectl create secret generic -n airflow airflow-git-ssh-secret \
--from-file=gitSshKey=$HOME/.ssh/airflowsshkey
Now we can add the following configuration to our airflow/values.yaml
file:
dags:
gitSync:
enabled: true
repo: git@github.com:fpgmaas/azure-airflow-kubernetes.git
branch: main
rev: HEAD
depth: 1
maxFailures: 0
subPath: 'dags'
sshKeySecret: airflow-git-ssh-secret
knownHosts: |
github.com ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQCj7ndNxQowgcQnjshcLrqPEiiphnt+VTTvDP6mHBL9j1aNUkY4Ue1gvwnGLVlOhGeYrnZaMgRK6+PKCUXaDbC7qtbW8gIkhL7aGCsOr/C56SJMy/BCZfxd1nWzAOxSDPgVsmerOBYfNqltV9/hWCqBywINIR+5dIg6JTJ72pcEpEjcYgXkE2YEFXV1JHnsKgbLWNlhScqb2UmyRkQyytRLtL+38TGxkxCflmO+5Z8CSSNY7GidjMIZ7Q4zMjA2n1nGrlTDkzwDCsw+wqFPGQA179cnfGWOWRVruj16z6XyvxvjJwbz0wQZ75XK5tKSb7FNyeIEs4TT4jk+S4dhPeAUC5y+bDYirYgM4GC7uEnztnZyaVWQ7B381AK4Qdrwt51ZqExKbQpTUNn+EjqoTwvqNj4kqx5QUCI0ThS/YkOxJCXmPUWZbhjpCg56i+2aB6CmK2JGhn57K5mj0MNdBXA4/WnwH6XoPWJzK5Nyu2zB3nAZp+S5hpQs+p1vN1/wsjk=
Here we specify that our DAG's are stored in the git@github.com:fpgmaas/azure-airflow-kubernetes.git
repo, and that our DAG's
should be taken from the dags
directory on the main
branch. This directory does not exist yet, but we will create it later in this
tutorial.
The next thing we need to set-up is our log storage. For this, we create two files in the airflow
directory:
pv-logs.yaml
apiVersion: v1
kind: PersistentVolume
metadata:
name: pv-airflow-logs
labels:
type: local
spec:
capacity:
storage: 5Gi
accessModes:
- ReadWriteMany
persistentVolumeReclaimPolicy: Retain # If set as "Delete" container would be removed after pvc deletion
storageClassName: azureblob-fuse-premium
mountOptions:
- -o allow_other
- --file-cache-timeout-in-seconds=120
csi:
driver: blob.csi.azure.com
readOnly: false
volumeHandle: airflow-logs-1
volumeAttributes:
resourceGroup: examplerg
storageAccount: exampleairflowsa
containerName: airflow-logs
nodeStageSecretRef:
name: storage-account-credentials
namespace: airflow
pvc-logs.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: pvc-airflow-logs
spec:
storageClassName: azureblob-fuse-premium
accessModes:
- ReadWriteMany
resources:
requests:
storage: 5Gi
volumeName: pv-airflow-logs
These files are used to provision a PersistentVolume and a PersistentVolumeClaim
respectively. Before we can create the specified resources, note that we have not defined the secret storage-account-credentials
yet,
which is referenced in pv-logs.yaml
. We can create it by running the following commands in the terminal:
export STORAGE_ACCOUNT_KEY=$(az storage account keys list \
-g examplerg \
-n exampleairflowsa \
--query '[0]'.value \
-o tsv)
kubectl create secret generic -n airflow storage-account-credentials \
--from-literal azurestorageaccountname=exampleairflowsa \
--from-literal azurestorageaccountkey=$STORAGE_ACCOUNT_KEY \
--type=Opaque
Now, we can create the PersistentVolume and the PersistentVolumeClaim:
kubectl apply -n airflow -f airflow/pv-logs.yaml
kubectl apply -n airflow -f airflow/pvc-logs.yaml
To have Airflow use the blob container for it's logs, we add the following to our airflow/values.yaml
file:
logs:
persistence:
enabled: true
existingClaim: pvc-airflow-logs
storageClassName: azureblob-fuse-premium
# We disable the log groomer sidecar because we use Azure Blob Storage for logs,
# where a lifecycle policy is already set
triggerer:
logGroomerSidecar:
enabled: false
scheduler:
logGroomerSidecar:
enabled: false
workers:
logGroomerSidecar:
enabled: false
Here, we have disabled all logGroomerSidecar
's, since we already defined a log pruning policy on the blob container in our Terraform code.
Now we are ready to actually install Airflow! Let's start by adding the helm repo:
helm repo add apache-airflow https://airflow.apache.org
Then, install Airflow in the airflow
namespace using our values.yaml
file:
helm install airflow apache-airflow/airflow -n airflow -f airflow/values.yaml --debug
When that is finished, we can test our Airflow instance by port forwarding the service:
kubectl port-forward svc/airflow-webserver 8080:8080 -n airflow
And when we visit localhost:8080
in our browser, we find that Airflow is up and running. Great! The default username/password combination
is admin/admin which is something we should change later, but for this tutorial we will just continue with the default settings.
At this moment, we do not have any DAG's in our Airflow instance yet. Let's change that!
First, let's create a very simple Docker container that only prints Hello World!
when it's run and upload it to our
Container Registry. Here is a very simple Dockerfile
:
FROM alpine:latest
# The command to run when the container starts
CMD ["echo", "hello world"]
Let's test it by building the image and running the container:
docker build -t example .
docker run example
docker buildx build -t example --platform linux/amd64 .
to build the image instead to match the architecture of the VM's in our Kubernetes platform. This will prevent exec format
error's later.Great! The next step would be to upload this image to our Container Registry. First, we login to our ACR instance:
export ACR_PASSWORD=$(az acr credential show \
--name exampleacr \
--query passwords'[0]'.value \
-o tsv)
export ACR_USERNAME=$(az acr credential show \
--name exampleacr \
--query username \
-o tsv)
az acr login \
--name exampleacr \
--username $ACR_USERNAME \
--password $ACR_PASSWORD
Then, we push the image by running
docker tag example exampleacr.azurecr.io/example:latest
docker push -a exampleacr.azurecr.io/example
Now, let's create a DAG that actually runs this image in the dags
directory:
dags/example.py
from datetime import datetime
from airflow import DAG
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
default_args = {
"retries": 1,
"start_date": datetime(2022, 1, 1),
"image_pull_policy": "Always",
}
with DAG(
dag_id="simple_kubernetes_dag",
schedule_interval=None,
default_args=default_args,
catchup=False,
tags=["example"],
max_active_runs=1,
) as dag:
simple_task = KubernetesPodOperator(
task_id="simple_echo_task",
image="exampleacr.azurecr.io/example:latest",
name="simple-airflow-task"
)
An important setting here is the "image_pull_policy" : "Always"
. Without this, our Kubernetes cluster will keep running the current
version of our image, even when we update the latest
version of the image. By setting image_pull_policy
to Always
,
the cluster checks if the cached version's digest matches the digest of the latest image. If so; it runs the cached image, and if not it
will fetch the latest image from the registry.
Now we commit and push our changes to our Git repository so our DAG can be found on GitHub. If we configured everything correctly, our DAG should now appear on Airflow. Since we did not add a schedule for the DAG, let's trigger the DAG manually and check the logs:
[2023-08-19, 08:36:51 UTC] {pod_manager.py:235} INFO - hello world
Awesome, our DAG is working! 🎉
We succesfully managed to deploy Kubernetes and install Airflow on the cluster. I hope you found this tutorial useful. If you have any questions or feedback, feel free to reach out!
Florian