airflow: 使用 cli 进行操作

注意
本文最后更新于 2024-08-22,文中内容可能已过时。

ariflow 是一款优秀的开源任务管理架构,通过 DAG 的图形关系,指定各个子任务之间的依赖关系,并自动执行流水线。同时,airflow 还提供了美观的 UI,方便用户通过鼠标点击进行相关操作。

而本文要介绍的,则是 airflow 的命令行(CLI)操作模式。CLI 相比于 UI ,提供了更加灵活、可重现的运作方式,通过代码和配置,我们可以进行大规模的系统部署,避免鼠标操作存在的失误与不可重复性。

airflow UI 操作

在研究 CLI 之前,我尝试使用 UI 进行相关操作,如添加 roleuserDAGs 管理等。大体的经验是

  1. 最基础的用户组应该具备以下几组,权限才能获取相关页面、按钮、执行任务的权限

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    
    {'actions': [
        {'action': {'name': 'can_read'},    'resource': {'name':'Website'}},
        {'action': {'name': 'can_read'},    'resource': {'name': 'Triggers'}},
        {'action': {'name': 'can_create'},  'resource': {'name': 'DAG Runs'}},
        {'action': {'name': 'menu_access'}, 'resource': {'name': "User's Statistics"}},
        {'action': {'name': 'menu_access'}, 'resource': {'name': 'Actions'}},
        {'action': {'name': 'menu_access'}, 'resource': {'name': 'DAG Runs'}},
        {'action': {'name': 'menu_access'}, 'resource': {'name': 'Task Instances'}},
        {'action': {'name': 'menu_access'}, 'resource': {'name': 'Triggers'}},
        {'action': {'name': 'menu_access'}, 'resource': {'name': 'DAGs'}}
        ],
    'name': 'trader'}

    trader
    trader

  2. 对于的特定用户,需要有具体的 DAG 权限:read、edit、delete

    trader4
    trader4

  3. 通过 role 的组合,可以让 user 获取多个权限。如上面的 trader + trader4 的组合,可以允许用户获取两者的权限并集。

  4. airflow 提供了 DAG-specific 的权限控制,通过在 DAG 任务指定 role 权限,可以以更加细粒度的方式进行控制

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    
        with DAG(
            dag_id         = "trading.hyena15.comm.c2s",
            default_args   = AirflowUtil.default_args(retry=0),
            schedule       = "50 08 * * 1-5",
            start_date     = datetime(2023, 1, 1, tzinfo=AirflowUtil.local_tz()),
            catchup        = False,
            dagrun_timeout = timedelta(seconds=60*30),
            tags           = ['trading', 'futures', 'hyena15', 'comm', 'c2s'],
            on_failure_callback = dag_failure_alert,
            on_success_callback = dag_success_alert,
            access_control={
                # "trader" : {"can_edit", "can_read", "can_delete"},
                "trader4": {"can_edit", "can_read", "can_delete"},
            },
        ) as dag:

启用 airflow cli 功能

airflow 提供的 cli 功能1,主要是通过 REST API 实现,默认是关闭的。因此,我们需要修改配置文件,使其生效。

打开配置文件 airflow.cfg,修改 [api] 下面的 auth_backends

1
2
3
4
5
6
7
8
9
[api]
# Comma separated list of auth backends to authenticate users of the API. See
# https://airflow.apache.org/docs/apache-airflow/stable/security/api.html for possible values.
# ("airflow.api.auth.backend.default" allows all requests for historic reasons)
#
# Variable: AIRFLOW__API__AUTH_BACKENDS
#
# auth_backends = airflow.api.auth.backend.session
auth_backends = airflow.api.auth.backend.basic_auth

然后重启 ariflow

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#!/bin/bash
## =============================================================================
killx () {
    list=$(ps aux | grep -i $1| grep -v grep |grep -v color)

    if [ -n "$list" ]; then
        dead=$(ps aux | grep -i $1| grep -v grep| grep -v color | awk '{print $2}')
        echo "Killing... $1"
        echo $dead |xargs kill -9
    else
        echo "Not running $1"
    fi
}
killx airflow

## ------------------------------------
export AIRFLOW_HOME=/app
export AIRFLOW__CORE__LOAD_EXAMPLES=False

ps -ef | egrep 'scheduler|airflow|webserver' | awk '{print $2}'| xargs kill -15

for var in scheduler webserver
do
   x=`cat $AIRFLOW_HOME/airflow-${var}.pid`
   if [ "$x" != "" ]
   then
       cat $AIRFLOW_HOME/airflow-${var}.pid | xargs kill -9
   fi

   cat /dev/null >  $AIRFLOW_HOME/airflow-${var}.pid
   rm $AIRFLOW_HOME/airflow-${var}.pid
done

rm -rf airflow-scheduler.pid
rm -rf airflow-webserver-monitor.pid
rm -rf airflow-webserver.pid
#airflow db init
## ------------------------------------
airflow webserver -p 8080 -D
airflow scheduler -D
## ------------------------------------

python 调用相关接口

airflow 提供 REST API,通过 GETPOST 方法,我们可以很方便的与服务端进行交互。

登录 API

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
airflow_url = "http://192.168.1.160:18080/"
new_role_name = 'testingx'
dag_names = ['pretrading.check.init.pos']
airflow_username = 'william'
airflow_password = 'xxxxxx'
headers = {
    'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
    'Accept-Encoding': 'gzip, deflate',
    'Accept-Language': 'zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7,zh-TW;q=0.6',
    'Cache-Control': 'max-age=0',
    'Connection': 'keep-alive',
    'Upgrade-Insecure-Requests': '1',
    'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36',
}
auth_str = f"{airflow_username}:{airflow_password}"
base64_auth_str = base64.b64encode(auth_str.encode()).decode()
headers["Authorization"] = "Basic " + base64_auth_str

## REST API -------------------------------------------------------------------
airflow_url += "/api/v1/roles"

GET

1
2
3
4
## GET ------------------------------------------------------------------------
info = requests.get(airflow_url, headers=headers).json()
tmp = [k for k in info['roles'] if k['name'] == 'trader'][0]
pprint(tmp)

Post

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
## POST -----------------------------------------------------------------------
read = "can_read"
edit = "can_edit"
create = "can_create"
delete = "can_delete"
menu = "menu_access"

# add general permissions
permissions = []
read_permissions = make_permissions(read, [
    "Website",
    "DAG Runs",
    "Task Instances",
    "Audit Logs",
    "ImportError",
    "XComs",
    "DAG Code",
    "Plugins",
    "My Password",
    "My Profile",
    "Jobs",
    "SLA Misses",
    "DAG Dependencies",
    "Task Logs"])
edit_permissions = make_permissions(edit, [
    "Task Instances",
    "My Password",
    "My Profile",
    "DAG Runs"])
create_permissions = make_permissions(create, [
    "DAG Runs",
    "Task Instances"])
delete_permissions = make_permissions(delete, [
    "DAG Runs",
    "Task Instances"])
menu_permissions = make_permissions(menu, [
    "View Menus",
    "Browse",
    "Docs",
    "Documentation",
    "SLA Misses",
    "Jobs",
    "DAG Runs",
    "Task Instances",
    "Audit Logs",
    "DAG Dependencies"])

## all in one
permissions += read_permissions + edit_permissions + create_permissions + delete_permissions + menu_permissions

# add dag-specific permissions
for dag in dag_names:
    dag = "DAG:" + dag
    read_permissions = make_permissions(read,[dag])
    edit_permissions = make_permissions(edit, [dag])
    delete_permissions = make_permissions(delete, [dag])
    permissions += read_permissions + edit_permissions + delete_permissions

data = {
    "actions": [
        *permissions
    ],
    "name": new_role_name
}

airflow_url += "/api/v1/roles"
response = requests.post(airflow_url, json=data, headers=headers)
print(response.status_code)

if response.status_code == 403:
    raise PermissionError(f"Error 403 returned, please check if your AirFlow account is Op/Admin or verify the dags exist. \n {response.json()}")
elif response.status_code == 401:
    raise PermissionError(f"Error 401 returned, please check the access token if the page is protected by an authentication")
elif response.status_code == 200:
    print(f"Role `{new_role_name}` successfully created.")
    return
elif response.status_code == 409:  # Role already exists, update it
    print("Role already exists, updating...")
    airflow_role_update_url = f"{airflow_url}/{new_role_name}"
    update_response = requests.patch(airflow_role_update_url, json=data, headers=headers)
    if update_response.status_code == 200:
        print(f"Role `{new_role_name}` successfully updated.")
    else:
        raise ConnectionError(f"An error occurred during role update: {update_response.json()}")
else:
    raise ConnectionError(f"An error occurred during role creation: {response.json()}")

整理以上的脚本

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
from typing import List
import requests
import argparse
import base64
from pprint import pprint

def make_permissions(action, resources):
    permissions = []
    for perm in resources:
        permissions.append(make_permission(action, perm))
    return permissions

def make_permission(action, resource):
    return {
        "action": {"name": action},
        "resource": {"name": resource}
    }

def create_rbac_role_with_permissions(
    airflow_url: str,
    new_role_name: str,
    dag_names: List[str],
    google_access_token: str=None,
    airflow_username: str=None,
    airflow_password: str=None
):
    """
    airflow_url = "http://192.168.1.160:18080/"
    new_role_name = 'testingx'
    dag_names = ['pretrading.check.init.pos']
    airflow_username = 'william'
    airflow_password = 'xxxxxx'
    google_access_token = None
    """
    if isinstance(dag_names, str):
        dag_names = [dag_names]

    headers = {
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
        'Accept-Encoding': 'gzip, deflate',
        'Accept-Language': 'zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7,zh-TW;q=0.6',
        'Cache-Control': 'max-age=0',
        'Connection': 'keep-alive',
        'Upgrade-Insecure-Requests': '1',
        'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36',
    }

    if google_access_token:
        headers["Authorization"] = "Bearer " + google_access_token
    elif airflow_username and airflow_password:
        auth_str = f"{airflow_username}:{airflow_password}"
        base64_auth_str = base64.b64encode(auth_str.encode()).decode()
        headers["Authorization"] = "Basic " + base64_auth_str

    read = "can_read"
    edit = "can_edit"
    create = "can_create"
    delete = "can_delete"
    menu = "menu_access"

    # add general permissions
    permissions = []
    read_permissions = make_permissions(read, [
        "Website",
        "DAG Runs",
        "Task Instances",
        "Audit Logs",
        "ImportError",
        "XComs",
        "DAG Code",
        "Plugins",
        "My Password",
        "My Profile",
        "Jobs",
        "SLA Misses",
        "DAG Dependencies",
        "Task Logs"])
    edit_permissions = make_permissions(edit, [
        "Task Instances",
        "My Password",
        "My Profile",
        "DAG Runs"])
    create_permissions = make_permissions(create, [
        "DAG Runs",
        "Task Instances"])
    delete_permissions = make_permissions(delete, [
        "DAG Runs",
        "Task Instances"])
    menu_permissions = make_permissions(menu, [
        "View Menus",
        "Browse",
        "Docs",
        "Documentation",
        "SLA Misses",
        "Jobs",
        "DAG Runs",
        "Task Instances",
        "Audit Logs",
        "DAG Dependencies"])

    ## all in one
    permissions += read_permissions + edit_permissions + create_permissions + delete_permissions + menu_permissions

    # add dag-specific permissions
    for dag in dag_names:
        dag = "DAG:" + dag
        read_permissions = make_permissions(read,[dag])
        edit_permissions = make_permissions(edit, [dag])
        delete_permissions = make_permissions(delete, [dag])
        permissions += read_permissions + edit_permissions + delete_permissions

    data = {
        "actions": [
            *permissions
        ],
        "name": new_role_name
    }

    airflow_url += "/api/v1/roles"
    response = requests.post(airflow_url, json=data, headers=headers)
    print(response.status_code)

    if response.status_code == 403:
        raise PermissionError(f"Error 403 returned, please check if your AirFlow account is Op/Admin or verify the dags exist. \n {response.json()}")
    elif response.status_code == 401:
        raise PermissionError("Error 401 returned, please check the access token if the page is protected by an authentication")
    elif response.status_code == 200:
        print(f"Role `{new_role_name}` successfully created.")
        return
    elif response.status_code == 409:  # Role already exists, update it
        print("Role already exists, updating...")
        airflow_role_update_url = f"{airflow_url}/{new_role_name}"
        update_response = requests.patch(airflow_role_update_url, json=data, headers=headers)
        if update_response.status_code == 200:
            print(f"Role `{new_role_name}` successfully updated.")
        else:
            raise ConnectionError(f"An error occurred during role update: {update_response.json()}")
    else:
        raise ConnectionError(f"An error occurred during role creation: {response.json()}")


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("-u", "--airflow-url", required=True, help="URL to the composer Airflow UI root page")
    parser.add_argument("-r", "--role-name", required=True, help="Name of the new created role")
    parser.add_argument("-d", "--dags", nargs="+", required=True, help="List of accessible dags for the role")
    parser.add_argument("-t", "--access-token", required=False, help="Google access token used only if Airflow is managed by Cloud Composer")
    parser.add_argument("-afu", "--airflow-username", required=False, help="Airflow username for Basic Auth")
    parser.add_argument("-afp", "--airflow-password", required=False, help="Airflow password for Basic Auth")

    args = parser.parse_args()
    create_rbac_role_with_permissions(
        airflow_url=args.airflow_url,
        new_role_name=args.role_name,
        dag_names=args.dags,
        google_access_token=args.access_token,
        airflow_username=args.airflow_username,
        airflow_password=args.airflow_password
    )

DAGs 权限管理

william 支付宝支付宝
william 微信微信
0%