# By default Airflow plugins are lazily-loaded (only loaded when required). Set it to ``False``,
# if you want to load plugins whenever 'airflow' is invoked via cli or loaded from module.
#
# Variable: AIRFLOW__CORE__LAZY_LOAD_PLUGINS
#
lazy_load_plugins=False
fromairflow.plugins_managerimportAirflowPlugin## the file name of your plugin## no import airflow.plugins.example_plugin## since this has already been managed by airflow.plugins_manager## Import plugin directly - Airflow's plugin manager handles the namespace# Define multiple cron schedulesfrommycronimportMultiCronTimetablemulti_cron=MultiCronTimetable(crons=["50,59 08 * * 1-5",# Original schedule"01,05 09 * * 1-5",# more cron expression],timezone="Asia/Shanghai")withDAG(dag_id="pretrading.pos",default_args=AirflowUtil.default_args(retry=0),# schedule = "01,05 09,15 * * 1-5",schedule=multi_cron,start_date=datetime(2023,1,1,tzinfo=AirflowUtil.local_tz()),catchup=False,dagrun_timeout=timedelta(seconds=60*30),tags=['pretrading','pos','gui','stk','cv'],on_failure_callback=dag_failure_alert,on_success_callback=dag_success_alert,)asdag:
# This file is <airflow plugins directory>/timetable.pyfromtypingimportAny,Dict,List,OptionalimportpendulumfromcroniterimportcroniterfrompendulumimportDateTime,Duration,timezone,instanceaspendulum_instancefromairflow.plugins_managerimportAirflowPluginfromairflow.timetables.baseimportDagRunInfo,DataInterval,TimeRestriction,Timetablefromairflow.exceptionsimportAirflowTimetableInvalidclassMultiCronTimetable(Timetable):valid_units=['minutes','hours','days']def__init__(self,crons:List[str],timezone:str='Europe/Berlin',period_length:int=0,period_unit:str='hours'):self.crons=cronsself.timezone=timezoneself.period_length=period_lengthself.period_unit=period_unitdefinfer_manual_data_interval(self,run_after:DateTime)->DataInterval:"""
Determines date interval for manually triggered runs.
This is simply (now - period) to now.
"""end=run_afterifself.period_length==0:start=endelse:start=self.data_period_start(end)returnDataInterval(start=start,end=end)defnext_dagrun_info(self,*,last_automated_data_interval:Optional[DataInterval],restriction:TimeRestriction)->Optional[DagRunInfo]:"""
Determines when the DAG should be scheduled.
"""ifrestriction.earliestisNone:# No start_date. Don't schedule.returnNoneis_first_run=last_automated_data_intervalisNoneifis_first_run:ifrestriction.catchup:scheduled_time=self.next_scheduled_run_time(restriction.earliest)else:scheduled_time=self.previous_scheduled_run_time()ifscheduled_timeisNone:# No previous cron time matched. Find one in the future.scheduled_time=self.next_scheduled_run_time()else:last_scheduled_time=last_automated_data_interval.endifrestriction.catchup:scheduled_time=self.next_scheduled_run_time(last_scheduled_time)else:scheduled_time=self.previous_scheduled_run_time()ifscheduled_timeisNoneorscheduled_time==last_scheduled_time:# No previous cron time matched,# or the matched cron time was the last execution time,scheduled_time=self.next_scheduled_run_time()elifscheduled_time>last_scheduled_time:# Matched cron time was after last execution time, but before now.# Use this cron timepasselse:# The last execution time is after the most recent matching cron time.# Next scheduled run will be in the futurescheduled_time=self.next_scheduled_run_time()ifscheduled_timeisNone:returnNoneifrestriction.latestisnotNoneandscheduled_time>restriction.latest:# Over the DAG's scheduled end; don't schedule.returnNonestart=self.data_period_start(scheduled_time)returnDagRunInfo(run_after=scheduled_time,data_interval=DataInterval(start=start,end=scheduled_time))defdata_period_start(self,period_end:DateTime):returnperiod_end-Duration(**{self.period_unit:self.period_length})defcroniter_values(self,base_datetime=None):ifnotbase_datetime:tz=timezone(self.timezone)base_datetime=pendulum.now(tz)return[croniter(expr,base_datetime)forexprinself.crons]defnext_scheduled_run_time(self,base_datetime:DateTime=None):min_date=Nonetz=timezone(self.timezone)ifbase_datetime:base_datetime_localized=base_datetime.in_timezone(tz)else:base_datetime_localized=pendulum.now(tz)forcroninself.croniter_values(base_datetime_localized):next_date=cron.get_next(DateTime)ifnotmin_date:min_date=next_dateelse:min_date=min(min_date,next_date)ifmin_dateisNone:returnNonereturnpendulum_instance(min_date)defprevious_scheduled_run_time(self,base_datetime:DateTime=None):"""
Get the most recent time in the past that matches one of the cron schedules
"""max_date=Nonetz=timezone(self.timezone)ifbase_datetime:base_datetime_localized=base_datetime.in_timezone(tz)else:base_datetime_localized=pendulum.now(tz)forcroninself.croniter_values(base_datetime_localized):prev_date=cron.get_prev(DateTime)ifnotmax_date:max_date=prev_dateelse:max_date=max(max_date,prev_date)ifmax_dateisNone:returnNonereturnpendulum_instance(max_date)defvalidate(self)->None:ifnotself.crons:raiseAirflowTimetableInvalid("At least one cron definition must be present")ifself.period_unitnotinself.valid_units:raiseAirflowTimetableInvalid(f'period_unit must be one of {self.valid_units}')ifself.period_length<0:raiseAirflowTimetableInvalid(f'period_length must not be less than zero')try:self.croniter_values()exceptExceptionase:raiseAirflowTimetableInvalid(str(e))@propertydefsummary(self)->str:"""A short summary for the timetable.
This is used to display the timetable in the web UI. A cron expression
timetable, for example, can use this to display the expression.
"""return' || '.join(self.crons)+f' [TZ: {self.timezone}]'defserialize(self)->Dict[str,Any]:"""Serialize the timetable for JSON encoding.
This is called during DAG serialization to store timetable information
in the database. This should return a JSON-serializable dict that will
be fed into ``deserialize`` when the DAG is deserialized.
"""returndict(crons=self.crons,timezone=self.timezone,period_length=self.period_length,period_unit=self.period_unit)@classmethoddefdeserialize(cls,data:Dict[str,Any])->"MultiCronTimetable":"""Deserialize a timetable from data.
This is called when a serialized DAG is deserialized. ``data`` will be
whatever was returned by ``serialize`` during DAG serialization.
"""returncls(**data)classCustomTimetablePlugin(AirflowPlugin):name="custom_timetable_plugin"timetables=[MultiCronTimetable]