Airflow Plugins

ariflowschedule 可以使用 timetable 或者 cron expression 配置任务调度的时间。对于使用 cron expression,如果我们需要更加细粒度的控制,则比较困难。比如我有一个任务设定在两个时间执行

  • 45 08 * * 1-5
  • 01 09 * * 1-5

这个在 cron expression 无法合并成同一个任务。

拆开多个子任务

一个方法是我们可以把这个拆开成两个 DAG,分别命名为 task1task2,然后各自按照自己的时间进行调度。这个方法的问题是:如果我们有多个时间段,则要对应多个子任务。这个在管理上不是很直观,本质上我们还是管理同一个 DAG,但是现在反而分散到不同的子任务,明显违反了 DRY 的原则。

自定义 Timetable

在底层,ariflow 其实是把我们传递给 schedule 的表达式转化成一个 timetable 类,然后根据一定的规则识别下一次任务的执行时间。因此,我们可以从 timetable 继承一个子类,自定义任务执行的时间间隔,比如多个 cron expression 进行对比,就可以实现多时段的设置了。

plugins

如果我们要添加自定义的 timetable 子类,需要以 airflow plugins 的方式进行添加,这样 ariflowshedulerwebserver 等组件才能识别。

我是参考了这篇 SO airflow 2.2 timetable for schedule, always with error: timetable not registered。具体的实现步骤是:

  1. 修改 $AIRFLOW_HOME/airflow.cfg 里面的配置,lazy_load_plugins = False

    1
    2
    3
    4
    5
    6
    
    # By default Airflow plugins are lazily-loaded (only loaded when required). Set it to ``False``,
    # if you want to load plugins whenever 'airflow' is invoked via cli or loaded from module.
    #
    # Variable: AIRFLOW__CORE__LAZY_LOAD_PLUGINS
    #
    lazy_load_plugins = False
  2. $AIRFLOW_HOME/plugins 目录添加 __init__.py

  3. $AIRFLOW_HOME/plugins 目录添加 mycron.py,这里面就是我们需要实现的自定义子类 MultiCronTimetable

  4. 编写 MultiCronTimetable 类定义

  5. 需要重启 airflow

  6. dagschedule 添加设置的任务时间即可。需要注意的是,由于我们使用 airflowplugins_manager 插件,已经把命名空间引进来了,因此可以直接 import mycron

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    
    from airflow.plugins_manager import AirflowPlugin
    ## the file name of your plugin
    ## no import airflow.plugins.example_plugin
    ## since this has already been managed by airflow.plugins_manager
    ## Import plugin directly - Airflow's plugin manager handles the namespace
    # Define multiple cron schedules
    from mycron import MultiCronTimetable
    multi_cron = MultiCronTimetable(
        crons = [
            "50,59 08 * * 1-5",  # Original schedule
            "01,05 09 * * 1-5",  # more cron expression
        ],
        timezone = "Asia/Shanghai"
    )
    
    with DAG(
        dag_id         = "pretrading.pos",
        default_args   = AirflowUtil.default_args(retry=0),
        # schedule       = "01,05 09,15 * * 1-5",
        schedule       = multi_cron,
        start_date     = datetime(2023, 1, 1, tzinfo=AirflowUtil.local_tz()),
        catchup        = False,
        dagrun_timeout = timedelta(seconds=60*30),
        tags           = ['pretrading', 'pos', 'gui', 'stk', 'cv'],
        on_failure_callback = dag_failure_alert,
        on_success_callback = dag_success_alert,
    ) as dag:

MultiCronTimetable

类实现

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
# This file is <airflow plugins directory>/timetable.py

from typing import Any, Dict, List, Optional
import pendulum
from croniter import croniter
from pendulum import DateTime, Duration, timezone, instance as pendulum_instance
from airflow.plugins_manager import AirflowPlugin
from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable
from airflow.exceptions import AirflowTimetableInvalid


class MultiCronTimetable(Timetable):
    valid_units = ['minutes', 'hours', 'days']

    def __init__(self,
                 crons: List[str],
                 timezone: str = 'Europe/Berlin',
                 period_length: int = 0,
                 period_unit: str = 'hours'):

        self.crons = crons
        self.timezone = timezone
        self.period_length = period_length
        self.period_unit = period_unit

    def infer_manual_data_interval(self, run_after: DateTime) -> DataInterval:
        """
        Determines date interval for manually triggered runs.
        This is simply (now - period) to now.
        """
        end = run_after
        if self.period_length == 0:
            start = end
        else:
            start = self.data_period_start(end)
        return DataInterval(start=start, end=end)

    def next_dagrun_info(
        self,
        *,
        last_automated_data_interval: Optional[DataInterval],
        restriction: TimeRestriction) -> Optional[DagRunInfo]:
        """
        Determines when the DAG should be scheduled.

        """

        if restriction.earliest is None:
            # No start_date. Don't schedule.
            return None

        is_first_run = last_automated_data_interval is None

        if is_first_run:
            if restriction.catchup:
                scheduled_time = self.next_scheduled_run_time(restriction.earliest)

            else:
                scheduled_time = self.previous_scheduled_run_time()
                if scheduled_time is None:
                    # No previous cron time matched. Find one in the future.
                    scheduled_time = self.next_scheduled_run_time()
        else:
            last_scheduled_time = last_automated_data_interval.end

            if restriction.catchup:
                scheduled_time = self.next_scheduled_run_time(last_scheduled_time)

            else:
                scheduled_time = self.previous_scheduled_run_time()

                if scheduled_time is None or scheduled_time == last_scheduled_time:
                    # No previous cron time matched,
                    # or the matched cron time was the last execution time,
                    scheduled_time = self.next_scheduled_run_time()

                elif scheduled_time > last_scheduled_time:
                    # Matched cron time was after last execution time, but before now.
                    # Use this cron time
                    pass

                else:
                    # The last execution time is after the most recent matching cron time.
                    # Next scheduled run will be in the future
                    scheduled_time = self.next_scheduled_run_time()

        if scheduled_time is None:
            return None

        if restriction.latest is not None and scheduled_time > restriction.latest:
            # Over the DAG's scheduled end; don't schedule.
            return None

        start = self.data_period_start(scheduled_time)
        return DagRunInfo(run_after=scheduled_time, data_interval=DataInterval(start=start, end=scheduled_time))

    def data_period_start(self, period_end: DateTime):
        return period_end - Duration(**{self.period_unit: self.period_length})

    def croniter_values(self, base_datetime=None):
        if not base_datetime:
            tz = timezone(self.timezone)
            base_datetime = pendulum.now(tz)

        return [croniter(expr, base_datetime) for expr in self.crons]

    def next_scheduled_run_time(self, base_datetime: DateTime = None):
        min_date = None
        tz = timezone(self.timezone)
        if base_datetime:
            base_datetime_localized = base_datetime.in_timezone(tz)
        else:
            base_datetime_localized = pendulum.now(tz)

        for cron in self.croniter_values(base_datetime_localized):
            next_date = cron.get_next(DateTime)
            if not min_date:
                min_date = next_date
            else:
                min_date = min(min_date, next_date)
        if min_date is None:
            return None
        return pendulum_instance(min_date)

    def previous_scheduled_run_time(self, base_datetime: DateTime = None):
        """
        Get the most recent time in the past that matches one of the cron schedules
        """
        max_date = None
        tz = timezone(self.timezone)
        if base_datetime:
            base_datetime_localized = base_datetime.in_timezone(tz)
        else:
            base_datetime_localized = pendulum.now(tz)

        for cron in self.croniter_values(base_datetime_localized):
            prev_date = cron.get_prev(DateTime)
            if not max_date:
                max_date = prev_date
            else:
                max_date = max(max_date, prev_date)
        if max_date is None:
            return None
        return pendulum_instance(max_date)


    def validate(self) -> None:
        if not self.crons:
            raise AirflowTimetableInvalid("At least one cron definition must be present")

        if self.period_unit not in self.valid_units:
            raise AirflowTimetableInvalid(f'period_unit must be one of {self.valid_units}')

        if self.period_length < 0:
            raise AirflowTimetableInvalid(f'period_length must not be less than zero')

        try:
            self.croniter_values()
        except Exception as e:
            raise AirflowTimetableInvalid(str(e))

    @property
    def summary(self) -> str:
        """A short summary for the timetable.

        This is used to display the timetable in the web UI. A cron expression
        timetable, for example, can use this to display the expression.
        """
        return ' || '.join(self.crons) + f' [TZ: {self.timezone}]'

    def serialize(self) -> Dict[str, Any]:
        """Serialize the timetable for JSON encoding.

        This is called during DAG serialization to store timetable information
        in the database. This should return a JSON-serializable dict that will
        be fed into ``deserialize`` when the DAG is deserialized.
        """
        return dict(crons=self.crons,
                    timezone=self.timezone,
                    period_length=self.period_length,
                    period_unit=self.period_unit)

    @classmethod
    def deserialize(cls, data: Dict[str, Any]) -> "MultiCronTimetable":
        """Deserialize a timetable from data.

        This is called when a serialized DAG is deserialized. ``data`` will be
        whatever was returned by ``serialize`` during DAG serialization.
        """
        return cls(**data)


class CustomTimetablePlugin(AirflowPlugin):
    name = "custom_timetable_plugin"
    timetables = [MultiCronTimetable]

注册插件

需要注意的是,我们在上面实现了自定义子类 MultiCronTimetable,还需要将其注册到 airflow plugins,否则会报错

1
MultiCronTimetable plugins not registered

具体实现就是上面最后那段代码

1
2
3
class CustomTimetablePlugin(AirflowPlugin):
    name = "custom_timetable_plugin"
    timetables = [MultiCronTimetable]

此时我们通过 airflow 查看插件是否已经加载

1
2
3
4
5
airflow plugins

name                      | source                     | timetables
==========================+============================+===========================
custom_timetable_plugin   | $PLUGINS_FOLDER/mycron.py  | mycron.MultiCronTimetable

使用

在 dag 使用多任务调度
在 dag 使用多任务调度

william 支付宝支付宝
william 微信微信
0%