1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
|
from typing import List
import requests
import argparse
import base64
from pprint import pprint
def make_permissions(action, resources):
permissions = []
for perm in resources:
permissions.append(make_permission(action, perm))
return permissions
def make_permission(action, resource):
return {
"action": {"name": action},
"resource": {"name": resource}
}
def create_rbac_role_with_permissions(
airflow_url: str,
new_role_name: str,
dag_names: List[str],
google_access_token: str=None,
airflow_username: str=None,
airflow_password: str=None
):
"""
airflow_url = "http://192.168.1.160:18080/"
new_role_name = 'testingx'
dag_names = ['pretrading.check.init.pos']
airflow_username = 'william'
airflow_password = 'xxxxxx'
google_access_token = None
"""
if isinstance(dag_names, str):
dag_names = [dag_names]
headers = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7,zh-TW;q=0.6',
'Cache-Control': 'max-age=0',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36',
}
if google_access_token:
headers["Authorization"] = "Bearer " + google_access_token
elif airflow_username and airflow_password:
auth_str = f"{airflow_username}:{airflow_password}"
base64_auth_str = base64.b64encode(auth_str.encode()).decode()
headers["Authorization"] = "Basic " + base64_auth_str
read = "can_read"
edit = "can_edit"
create = "can_create"
delete = "can_delete"
menu = "menu_access"
# add general permissions
permissions = []
read_permissions = make_permissions(read, [
"Website",
"DAG Runs",
"Task Instances",
"Audit Logs",
"ImportError",
"XComs",
"DAG Code",
"Plugins",
"My Password",
"My Profile",
"Jobs",
"SLA Misses",
"DAG Dependencies",
"Task Logs"])
edit_permissions = make_permissions(edit, [
"Task Instances",
"My Password",
"My Profile",
"DAG Runs"])
create_permissions = make_permissions(create, [
"DAG Runs",
"Task Instances"])
delete_permissions = make_permissions(delete, [
"DAG Runs",
"Task Instances"])
menu_permissions = make_permissions(menu, [
"View Menus",
"Browse",
"Docs",
"Documentation",
"SLA Misses",
"Jobs",
"DAG Runs",
"Task Instances",
"Audit Logs",
"DAG Dependencies"])
## all in one
permissions += read_permissions + edit_permissions + create_permissions + delete_permissions + menu_permissions
# add dag-specific permissions
for dag in dag_names:
dag = "DAG:" + dag
read_permissions = make_permissions(read,[dag])
edit_permissions = make_permissions(edit, [dag])
delete_permissions = make_permissions(delete, [dag])
permissions += read_permissions + edit_permissions + delete_permissions
data = {
"actions": [
*permissions
],
"name": new_role_name
}
airflow_url += "/api/v1/roles"
response = requests.post(airflow_url, json=data, headers=headers)
print(response.status_code)
if response.status_code == 403:
raise PermissionError(f"Error 403 returned, please check if your AirFlow account is Op/Admin or verify the dags exist. \n {response.json()}")
elif response.status_code == 401:
raise PermissionError("Error 401 returned, please check the access token if the page is protected by an authentication")
elif response.status_code == 200:
print(f"Role `{new_role_name}` successfully created.")
return
elif response.status_code == 409: # Role already exists, update it
print("Role already exists, updating...")
airflow_role_update_url = f"{airflow_url}/{new_role_name}"
update_response = requests.patch(airflow_role_update_url, json=data, headers=headers)
if update_response.status_code == 200:
print(f"Role `{new_role_name}` successfully updated.")
else:
raise ConnectionError(f"An error occurred during role update: {update_response.json()}")
else:
raise ConnectionError(f"An error occurred during role creation: {response.json()}")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-u", "--airflow-url", required=True, help="URL to the composer Airflow UI root page")
parser.add_argument("-r", "--role-name", required=True, help="Name of the new created role")
parser.add_argument("-d", "--dags", nargs="+", required=True, help="List of accessible dags for the role")
parser.add_argument("-t", "--access-token", required=False, help="Google access token used only if Airflow is managed by Cloud Composer")
parser.add_argument("-afu", "--airflow-username", required=False, help="Airflow username for Basic Auth")
parser.add_argument("-afp", "--airflow-password", required=False, help="Airflow password for Basic Auth")
args = parser.parse_args()
create_rbac_role_with_permissions(
airflow_url=args.airflow_url,
new_role_name=args.role_name,
dag_names=args.dags,
google_access_token=args.access_token,
airflow_username=args.airflow_username,
airflow_password=args.airflow_password
)
|