警告
本文最后更新于 2023-04-25,文中内容可能已过时。
使用 airflow
管理工作流。
Install
1
2
3
4
5
6
7
|
## 设置 DAG 目录
export AIRFLOW_HOME=/root/app
## 不显示 example 案例
export AIRFLOW__CORE__LOAD_EXAMPLES=False
airflow db init
|
添加用户
1
2
3
4
5
6
7
|
## 需要确保 $AIRFLOW_HOME 是存在的,否则触发不同的 db,会导致数据库匹配不对,进而引发账户-密码错误
airflow users create \
--username admin \
--firstname admin \
--lastname admin \
--role Admin \
--email admin@example.org
|
启动
1
2
3
4
5
6
7
8
9
|
## 启动 webserver
airflow webserver -p 8080 -D
## 启动 scheduler
airflow scheduler -D
## 1. vim ariflow.cfg
default_timezone = utc 修改为 default_timezone = Asia/Shanghai
default_ui_timezone = UTC 修改为 default_ui_timezone = Asia/Shanghai
|
测试
1
2
3
4
5
6
7
8
9
10
11
|
## 先用 Python 测试代码
python hello.py
airflow tasks list hello
airflow tasks test hello print_date
airflow tasks test hello py_say_hello 20230514
## 启动一个 dag
airflow dags list
airflow dags trigger pretrading.all.csv
|
切换 db 数据库
airflow
内置的默认数据库是 sqlite
,这个主要是为了方便测试,不需要额外的配置即可启动 airflow
。但是这种情况下,只能使用 SequencialOperator
,无法实现并行化,因此官方不推荐。
我们可以修改数据库为 Postgre
或者 MySQl
,进行实现并行化。
MySQL
MariaDB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
|
## 需要这个 so 放在 /app
libmysqlclient.so.18
setenforce 0
getenforce
vim /etc/selinux/config
SELINUX=disabled
# This file controls the state of SELinux on the system.
# SELINUX= can take one of these three values:
# enforcing - SELinux security policy is enforced.
# permissive - SELinux prints warnings instead of enforcing.
# disabled - No SELinux policy is loaded.
SELINUX=disabled
# SELINUXTYPE= can take one of three two values:
# targeted - Targeted processes are protected,
# minimum - Modification of targeted policy. Only selected processes are protected.
# mls - Multi Level Security protection.
SELINUXTYPE=targeted
sudo rpm --import https://repo.mysql.com/RPM-GPG-KEY-mysql-2022
sudo yum localinstall -y https://dev.mysql.com/get/mysql57-community-release-el7-11.noarch.rpm
sudo yum install -y mysql-community-server
sudo systemctl start mysqld
## 首次安装的密码
grep "temporary password" /var/log/mysqld.log
## 如果办法登录
## 则使用一下方法,可以直接 mysql -u root -p 不用输秘密
Open and edit /etc/my.cnf or /etc/mysql/my.cnf, depending on your distribution.
Add skip-grant-tables under [mysqld]
Restart MySQL
You should be able to log in to MySQL now using the below command mysql -u root -p
Run mysql> flush privileges;
Set new password by ALTER USER 'root'@'localhost' IDENTIFIED BY 'NewPassword';
Go back to /etc/my.cnf and remove/comment skip-grant-tables
Restart MySQL
Now you will be able to login with the new password mysql -u root -p
## 报错:Your password does not satisfy the current policy requirements
mysql> SHOW VARIABLES LIKE 'validate_password%';
+--------------------------------------+--------+
| Variable_name | Value |
+--------------------------------------+--------+
| validate_password_check_user_name | OFF |
| validate_password_dictionary_file | |
| validate_password_length | 8 |
| validate_password_mixed_case_count | 1 |
| validate_password_number_count | 1 |
| validate_password_policy | MEDIUM |
| validate_password_special_char_count | 1 |
+--------------------------------------+--------+
7 rows in set (0.00 sec)
mysql> SET GLOBAL validate_password_length = 6;
mysql> SET GLOBAL validate_password_policy = LOW;
mysql -uroot -p
CREATE DATABASE IF NOT EXISTS airflow DEFAULT CHARACTER SET utf8 DEFAULT COLLATE utf8_general_ci;
create user 'airflow'@'%' identified by 'xxxxxxxx';
grant all privileges on airflow.* to airflow@localhost identified by 'xxxxxxxx';
grant all privileges on airflow.* to 'airflow'@'%' identified by 'xxxxxxxx';
flush privileges;
select user,authentication_string,host from mysql.user;
+---------------+-------------------------------------------+-----------+
| user | authentication_string | host |
+---------------+-------------------------------------------+-----------+
| root | *51F9815BB277B91759503A29D46EC9364D361F1C | localhost |
| mysql.session | *THISISNOTAVALIDPASSWORDTHATCANBEUSEDHERE | localhost |
| mysql.sys | *THISISNOTAVALIDPASSWORDTHATCANBEUSEDHERE | localhost |
| airflow | *51F9815BB277B91759503A29D46EC9364D361F1C | % |
+---------------+-------------------------------------------+-----------+
4 rows in set (0.00 sec)
executor = LocalExecutor
sql_alchemy_conn = mysql://airflow:xxxxxxxx@localhost:3306/airflow?charset=utf8
#初始化数据库
#如果前面的两个utf8没写好,可能会出现/airflow/lib/python3.7/encodings/cp1252.py错误
# 若之前使用sqllite初始化过,需要
# 重置数据库
airflow db reset
# 初始化数据库
airflow db init
## 1067 - Invalid default value for ‘update_at‘
## MySQLdb.OperationalError: (1067, "Invalid default value for 'updated_at'")
set GLOBAL sql_mode ='STRICT_TRANS_TABLES,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION'
|
Ref