11import json
22from apscheduler.events import EVENT_ALL
33from apscheduler.executors.asyncio import AsyncIOExecutor
4- from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
4+ from apscheduler.executors.pool import ProcessPoolExecutor
55from apscheduler.jobstores.memory import MemoryJobStore
66from apscheduler.jobstores.redis import RedisJobStore
77from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
88from apscheduler.schedulers.asyncio import AsyncIOScheduler
9- from apscheduler.schedulers.background import BackgroundScheduler
109from apscheduler.triggers.cron import CronTrigger
1110from asyncio import iscoroutinefunction
1211from datetime import datetime, timedelta
@@ -112,12 +111,9 @@ def __find_recent_workday(cls, day: int):
112111 )
113112 ),
114113}
115- async_executors = {'default': AsyncIOExecutor()}
116- executors = {'default': ThreadPoolExecutor(20), 'processpool': ProcessPoolExecutor(5)}
114+ executors = {'default': AsyncIOExecutor(), 'processpool': ProcessPoolExecutor(5)}
117115job_defaults = {'coalesce': False, 'max_instance': 1}
118- async_scheduler = AsyncIOScheduler()
119- scheduler = BackgroundScheduler()
120- async_scheduler.configure(jobstores=job_stores, executors=async_executors, job_defaults=job_defaults)
116+ scheduler = AsyncIOScheduler()
121117scheduler.configure(jobstores=job_stores, executors=executors, job_defaults=job_defaults)
122118
123119
@@ -135,14 +131,12 @@ async def init_system_scheduler(cls):
135131 """
136132 logger.info('开始启动定时任务...')
137133 scheduler.start()
138- async_scheduler.start()
139134 async with AsyncSessionLocal() as session:
140135 job_list = await JobDao.get_job_list_for_scheduler(session)
141136 for item in job_list:
142137 cls.remove_scheduler_job(job_id=str(item.job_id))
143138 cls.add_scheduler_job(item)
144139 scheduler.add_listener(cls.scheduler_event_listener, EVENT_ALL)
145- async_scheduler.add_listener(cls.scheduler_event_listener, EVENT_ALL)
146140 logger.info('系统初始定时任务加载成功')
147141
148142 @classmethod
@@ -153,7 +147,6 @@ async def close_system_scheduler(cls):
153147 :return:
154148 """
155149 scheduler.shutdown()
156- async_scheduler.shutdown()
157150 logger.info('关闭定时任务成功')
158151
159152 @classmethod
@@ -164,7 +157,7 @@ def get_scheduler_job(cls, job_id: Union[str, int]):
164157 :param job_id: 任务id
165158 :return: 任务对象
166159 """
167- query_job = scheduler.get_job(job_id=str(job_id)) or async_scheduler.get_job(job_id=str(job_id))
160+ query_job = scheduler.get_job(job_id=str(job_id))
168161
169162 return query_job
170163
@@ -177,8 +170,11 @@ def add_scheduler_job(cls, job_info: JobModel):
177170 :return:
178171 """
179172 job_func = eval(job_info.invoke_target)
180- job_param = dict(
181- func=job_func,
173+ job_executor = job_info.job_executor
174+ if iscoroutinefunction(job_func):
175+ job_executor = 'default'
176+ scheduler.add_job(
177+ func=eval(job_info.invoke_target),
182178 trigger=MyCronTrigger.from_crontab(job_info.cron_expression),
183179 args=job_info.job_args.split(',') if job_info.job_args else None,
184180 kwargs=json.loads(job_info.job_kwargs) if job_info.job_kwargs else None,
@@ -188,11 +184,8 @@ def add_scheduler_job(cls, job_info: JobModel):
188184 coalesce=True if job_info.misfire_policy == '2' else False,
189185 max_instances=3 if job_info.concurrent == '0' else 1,
190186 jobstore=job_info.job_group,
187+ executor=job_executor,
191188 )
192- if iscoroutinefunction(job_func):
193- async_scheduler.add_job(**job_param)
194- else:
195- scheduler.add_job(executor=job_info.job_executor, **job_param)
196189
197190 @classmethod
198191 def execute_scheduler_job_once(cls, job_info: JobModel):
@@ -203,8 +196,11 @@ def execute_scheduler_job_once(cls, job_info: JobModel):
203196 :return:
204197 """
205198 job_func = eval(job_info.invoke_target)
206- job_param = dict(
207- func=job_func,
199+ job_executor = job_info.job_executor
200+ if iscoroutinefunction(job_func):
201+ job_executor = 'default'
202+ scheduler.add_job(
203+ func=eval(job_info.invoke_target),
208204 trigger='date',
209205 run_date=datetime.now() + timedelta(seconds=1),
210206 args=job_info.job_args.split(',') if job_info.job_args else None,
@@ -215,11 +211,8 @@ def execute_scheduler_job_once(cls, job_info: JobModel):
215211 coalesce=True if job_info.misfire_policy == '2' else False,
216212 max_instances=3 if job_info.concurrent == '0' else 1,
217213 jobstore=job_info.job_group,
214+ executor=job_executor,
218215 )
219- if iscoroutinefunction(job_func):
220- async_scheduler.add_job(**job_param)
221- else:
222- scheduler.add_job(executor=job_info.job_executor, **job_param)
223216
224217 @classmethod
225218 def remove_scheduler_job(cls, job_id: Union[str, int]):
@@ -231,12 +224,7 @@ def remove_scheduler_job(cls, job_id: Union[str, int]):
231224 """
232225 query_job = cls.get_scheduler_job(job_id=job_id)
233226 if query_job:
234- query_job_info = query_job.__getstate__()
235- job_func = eval(query_job_info.get('func').replace(':', '.'))
236- if iscoroutinefunction(job_func):
237- async_scheduler.remove_job(job_id=str(job_id))
238- else:
239- scheduler.remove_job(job_id=str(job_id))
227+ scheduler.remove_job(job_id=str(job_id))
240228
241229 @classmethod
242230 def scheduler_event_listener(cls, event):
0 commit comments