Skip to content

Commit d39dff2

Browse files
committed
feat: 定时任务新增支持调用异步函数
1 parent f7d8ca2 commit d39dff2

File tree

3 files changed

+50
-22
lines changed

3 files changed

+50
-22
lines changed

dash-fastapi-backend/config/get_scheduler.py

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
import json
22
from apscheduler.events import EVENT_ALL
3+
from apscheduler.executors.asyncio import AsyncIOExecutor
34
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
4-
from apscheduler.schedulers.background import BackgroundScheduler
55
from apscheduler.jobstores.memory import MemoryJobStore
66
from apscheduler.jobstores.redis import RedisJobStore
77
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
8+
from apscheduler.schedulers.asyncio import AsyncIOScheduler
9+
from apscheduler.schedulers.background import BackgroundScheduler
810
from apscheduler.triggers.cron import CronTrigger
11+
from asyncio import iscoroutinefunction
912
from datetime import datetime, timedelta
1013
from sqlalchemy.engine import create_engine
1114
from sqlalchemy.orm import sessionmaker
@@ -109,9 +112,12 @@ def __find_recent_workday(cls, day: int):
109112
)
110113
),
111114
}
115+
async_executors = {'default': AsyncIOExecutor()}
112116
executors = {'default': ThreadPoolExecutor(20), 'processpool': ProcessPoolExecutor(5)}
113117
job_defaults = {'coalesce': False, 'max_instance': 1}
118+
async_scheduler = AsyncIOScheduler()
114119
scheduler = BackgroundScheduler()
120+
async_scheduler.configure(jobstores=job_stores, executors=async_executors, job_defaults=job_defaults)
115121
scheduler.configure(jobstores=job_stores, executors=executors, job_defaults=job_defaults)
116122

117123

@@ -129,14 +135,14 @@ async def init_system_scheduler(cls):
129135
"""
130136
logger.info('开始启动定时任务...')
131137
scheduler.start()
138+
async_scheduler.start()
132139
async with AsyncSessionLocal() as session:
133140
job_list = await JobDao.get_job_list_for_scheduler(session)
134141
for item in job_list:
135-
query_job = cls.get_scheduler_job(job_id=str(item.job_id))
136-
if query_job:
137-
cls.remove_scheduler_job(job_id=str(item.job_id))
142+
cls.remove_scheduler_job(job_id=str(item.job_id))
138143
cls.add_scheduler_job(item)
139144
scheduler.add_listener(cls.scheduler_event_listener, EVENT_ALL)
145+
async_scheduler.add_listener(cls.scheduler_event_listener, EVENT_ALL)
140146
logger.info('系统初始定时任务加载成功')
141147

142148
@classmethod
@@ -147,6 +153,7 @@ async def close_system_scheduler(cls):
147153
:return:
148154
"""
149155
scheduler.shutdown()
156+
async_scheduler.shutdown()
150157
logger.info('关闭定时任务成功')
151158

152159
@classmethod
@@ -157,7 +164,7 @@ def get_scheduler_job(cls, job_id: Union[str, int]):
157164
:param job_id: 任务id
158165
:return: 任务对象
159166
"""
160-
query_job = scheduler.get_job(job_id=str(job_id))
167+
query_job = scheduler.get_job(job_id=str(job_id)) or async_scheduler.get_job(job_id=str(job_id))
161168

162169
return query_job
163170

@@ -169,8 +176,9 @@ def add_scheduler_job(cls, job_info: JobModel):
169176
:param job_info: 任务对象信息
170177
:return:
171178
"""
172-
scheduler.add_job(
173-
func=eval(job_info.invoke_target),
179+
job_func = eval(job_info.invoke_target)
180+
job_param = dict(
181+
func=job_func,
174182
trigger=MyCronTrigger.from_crontab(job_info.cron_expression),
175183
args=job_info.job_args.split(',') if job_info.job_args else None,
176184
kwargs=json.loads(job_info.job_kwargs) if job_info.job_kwargs else None,
@@ -180,8 +188,11 @@ def add_scheduler_job(cls, job_info: JobModel):
180188
coalesce=True if job_info.misfire_policy == '2' else False,
181189
max_instances=3 if job_info.concurrent == '0' else 1,
182190
jobstore=job_info.job_group,
183-
executor=job_info.job_executor,
184191
)
192+
if iscoroutinefunction(job_func):
193+
async_scheduler.add_job(**job_param)
194+
else:
195+
scheduler.add_job(executor=job_info.job_executor, **job_param)
185196

186197
@classmethod
187198
def execute_scheduler_job_once(cls, job_info: JobModel):
@@ -191,8 +202,9 @@ def execute_scheduler_job_once(cls, job_info: JobModel):
191202
:param job_info: 任务对象信息
192203
:return:
193204
"""
194-
scheduler.add_job(
195-
func=eval(job_info.invoke_target),
205+
job_func = eval(job_info.invoke_target)
206+
job_param = dict(
207+
func=job_func,
196208
trigger='date',
197209
run_date=datetime.now() + timedelta(seconds=1),
198210
args=job_info.job_args.split(',') if job_info.job_args else None,
@@ -203,8 +215,11 @@ def execute_scheduler_job_once(cls, job_info: JobModel):
203215
coalesce=True if job_info.misfire_policy == '2' else False,
204216
max_instances=3 if job_info.concurrent == '0' else 1,
205217
jobstore=job_info.job_group,
206-
executor=job_info.job_executor,
207218
)
219+
if iscoroutinefunction(job_func):
220+
async_scheduler.add_job(**job_param)
221+
else:
222+
scheduler.add_job(executor=job_info.job_executor, **job_param)
208223

209224
@classmethod
210225
def remove_scheduler_job(cls, job_id: Union[str, int]):
@@ -214,7 +229,14 @@ def remove_scheduler_job(cls, job_id: Union[str, int]):
214229
:param job_id: 任务id
215230
:return:
216231
"""
217-
scheduler.remove_job(job_id=str(job_id))
232+
query_job = cls.get_scheduler_job(job_id=job_id)
233+
if query_job:
234+
query_job_info = query_job.__getstate__()
235+
job_func = eval(query_job_info.get('func').replace(':', '.'))
236+
if iscoroutinefunction(job_func):
237+
async_scheduler.remove_job(job_id=str(job_id))
238+
else:
239+
scheduler.remove_job(job_id=str(job_id))
218240

219241
@classmethod
220242
def scheduler_event_listener(cls, event):

dash-fastapi-backend/module_admin/service/job_service.py

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -129,9 +129,7 @@ async def edit_job_services(cls, query_db: AsyncSession, page_object: EditJobMod
129129
raise ServiceException(message=f'修改定时任务{page_object.job_name}失败,定时任务已存在')
130130
try:
131131
await JobDao.edit_job_dao(query_db, edit_job)
132-
query_job = SchedulerUtil.get_scheduler_job(job_id=edit_job.get('job_id'))
133-
if query_job:
134-
SchedulerUtil.remove_scheduler_job(job_id=edit_job.get('job_id'))
132+
SchedulerUtil.remove_scheduler_job(job_id=edit_job.get('job_id'))
135133
if edit_job.get('status') == '0':
136134
job_info = await cls.job_detail_services(query_db, edit_job.get('job_id'))
137135
SchedulerUtil.add_scheduler_job(job_info=job_info)
@@ -152,9 +150,7 @@ async def execute_job_once_services(cls, query_db: AsyncSession, page_object: Jo
152150
:param page_object: 定时任务对象
153151
:return: 执行一次定时任务结果
154152
"""
155-
query_job = SchedulerUtil.get_scheduler_job(job_id=page_object.job_id)
156-
if query_job:
157-
SchedulerUtil.remove_scheduler_job(job_id=page_object.job_id)
153+
SchedulerUtil.remove_scheduler_job(job_id=page_object.job_id)
158154
job_info = await cls.job_detail_services(query_db, page_object.job_id)
159155
if job_info:
160156
SchedulerUtil.execute_scheduler_job_once(job_info=job_info)
@@ -176,9 +172,7 @@ async def delete_job_services(cls, query_db: AsyncSession, page_object: DeleteJo
176172
try:
177173
for job_id in job_id_list:
178174
await JobDao.delete_job_dao(query_db, JobModel(job_id=job_id))
179-
query_job = SchedulerUtil.get_scheduler_job(job_id=job_id)
180-
if query_job:
181-
SchedulerUtil.remove_scheduler_job(job_id=job_id)
175+
SchedulerUtil.remove_scheduler_job(job_id=job_id)
182176
await query_db.commit()
183177
return CrudResponseModel(is_success=True, message='删除成功')
184178
except Exception as e:

dash-fastapi-backend/module_task/scheduler_test.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,18 @@
22

33

44
def job(*args, **kwargs):
5+
"""
6+
定时任务执行同步函数示例
7+
"""
58
print(args)
69
print(kwargs)
7-
print(f'{datetime.now()}执行了')
10+
print(f'{datetime.now()}同步函数执行了')
11+
12+
13+
async def async_job(*args, **kwargs):
14+
"""
15+
定时任务执行异步函数示例
16+
"""
17+
print(args)
18+
print(kwargs)
19+
print(f'{datetime.now()}异步函数执行了')

0 commit comments

Comments
 (0)