Python定时任务框架APScheduler

 python  Python定时任务框架APScheduler已关闭评论
3月 242017
 

转自:http://blog.csdn.net/chosen0ne/article/details/7842421

        APScheduler是基于Quartz的一个Python定时任务框架,实现了Quartz的所有功能,使用起来十分方便。提供了基于日期、固定时间间隔以及crontab类型的任务,并且可以持久化任务。基于这些功能,我们可以很方便的实现一个python定时任务系统,写python还是要比Java舒服多了。

1. 安装

        安装过程很简单,可以基于easy_install和源码。

[plain] view plain copy

 print?

  1. easy_install apscheduler  

        或者下载源码,运行命令:

[plain] view plain copy

 print?

  1. python setup.py install  

2. cron job例子

        APScheduler是进程内的调度器,可以定时触发具体的函数,并且可以访问应用的所有变量和函数。在web应用中通过APScheduler实现定时任务是很方便的。下面看例子:

[python] view plain copy

 print?

  1. from apscheduler.scheduler import Scheduler  
  2.   
  3. schedudler = Scheduler(daemonic = False)  
  4.  
  5. @schedudler.cron_schedule(second=‘*’, day_of_week=‘0-4’, hour=‘9-12,13-15’)  
  6. def quote_send_sh_job():  
  7.     print ‘a simple cron job start at’, datetime.datetime.now()  
  8.   
  9. schedudler.start()  

        上面通过装饰器定义了cron job,可以通过函数scheduler.add_cron_job添加,用装饰器更方便。Scheduler构造函数中传入daemonic参数,表示执行线程是非守护的,在Schduler的文档中推荐使用非守护线程:

[plain] view plain copy

 print?

  1. Jobs are always executed in non-daemonic threads.  

        具体cron job的配置参看doc,基本上与Quartz一致。

        在添加job时还有一个比较重要的参数max_instances,指定一个job的并发实例数,默认值是1。默认情况下,如果一个job准备执行,但是该job的前一个实例尚未执行完,则后一个job会失败,可以通过这个参数来改变这种情况。

3. Store

        APScheduler提供了jobstore用于存储job的执行信息,默认使用的是RAMJobStore,还提供了SQLAlchemyJobStore、ShelveJobStore和MongoDBJobStore。APScheduler允许同时使用多个jobstore,通过别名(alias)区分,在添加job时需要指定具体的jobstore的别名,否则使用的是别名是default的jobstore,即RAMJobStore。下面以MongoDBJobStore举例说明。

[python] view plain copy

 print?

  1. import pymongo  
  2. from apscheduler.scheduler import Scheduler  
  3. from apscheduler.jobstores.mongodb_store import MongoDBJobStore  
  4. import time  
  5.   
  6. sched = Scheduler(daemonic = False)  
  7.   
  8. mongo = pymongo.Connection(host=‘127.0.0.1’, port=27017)  
  9. store = MongoDBJobStore(connection=mongo)  
  10. sched.add_jobstore(store, ‘mongo’)        # 别名是mongo  
  11.  
  12. @sched.cron_schedule(second=‘*’, day_of_week=‘0-4’, hour=‘9-12,13-15’, jobstore=‘mongo’)        # 向别名为mongo的jobstore添加job  
  13. def job():  
  14.         print ‘a job’  
  15.         time.sleep(1)  
  16.   
  17. sched.start()  

        注意start必须在添加job动作之后调用,否则会抛错。默认会把job信息保存在apscheduler数据库下的jobs表:

[plain] view plain copy

 print?

  1. > db.jobs.findOne()  
  2. {  
  3.         “_id” : ObjectId(“502202d1443c1557fa8b8d66”),  
  4.         “runs” : 20,  
  5.         “name” : “job”,  
  6.         “misfire_grace_time” : 1,  
  7.         “coalesce” : true,  
  8.         “args” : BinData(0,”gAJdcQEu”),  
  9.         “next_run_time” : ISODate(“2012-08-08T14:10:46Z”),  
  10.         “max_instances” : 1,  
  11.         “max_runs” : null,  
  12.         “trigger” : BinData(0,”gAJjYXBzY2hlZHVsZXIudHJpZ2dlcnMuY3JvbgpDcm9uVHJpZ2dlcgpxASmBcQJ9cQMoVQZmaWVsZHNxBF1xBShjYXBzY2hlZHVsZXIudHJpZ2dlcnMuY3Jvbi5maWVsZHMKQmFzZUZpZWxkCnEGKYFxB31xCChVCmlzX2RlZmF1bHRxCYhVC2V4cHJlc3Npb25zcQpdcQtjYXBzY2hlZHVsZXIudHJpZ2dlcnMuY3Jvbi5leHByZXNzaW9ucwpBbGxFeHByZXNzaW9uCnEMKYFxDX1xDlUEc3RlcHEPTnNiYVUEbmFtZXEQVQR5ZWFycRF1YmgGKYFxEn1xEyhoCYhoCl1xFGgMKYFxFX1xFmgPTnNiYWgQVQVtb250aHEXdWJjYXBzY2hlZHVsZXIudHJpZ2dlcnMuY3Jvbi5maWVsZHMKRGF5T2ZNb250aEZpZWxkCnEYKYFxGX1xGihoCYhoCl1xG2gMKYFxHH1xHWgPTnNiYWgQVQNkYXlxHnViY2Fwc2NoZWR1bGVyLnRyaWdnZXJzLmNyb24uZmllbGRzCldlZWtGaWVsZApxHymBcSB9cSEoaAmIaApdcSJoDCmBcSN9cSRoD05zYmFoEFUEd2Vla3EldWJjYXBzY2hlZHVsZXIudHJpZ2dlcnMuY3Jvbi5maWVsZHMKRGF5T2ZXZWVrRmllbGQKcSYpgXEnfXEoKGgJiWgKXXEpY2Fwc2NoZWR1bGVyLnRyaWdnZXJzLmNyb24uZXhwcmVzc2lvbnMKUmFuZ2VFeHByZXNzaW9uCnEqKYFxK31xLChoD05VBGxhc3RxLUsEVQVmaXJzdHEuSwB1YmFoEFULZGF5X29mX3dlZWtxL3ViaAYpgXEwfXExKGgJiWgKXXEyKGgqKYFxM31xNChoD05oLUsMaC5LCXViaCopgXE1fXE2KGgPTmgtSw9oLksNdWJlaBBVBGhvdXJxN3ViaAYpgXE4fXE5KGgJiGgKXXE6aAwpgXE7fXE8aA9Oc2JhaBBVBm1pbnV0ZXE9dWJoBimBcT59cT8oaAmJaApdcUBoDCmBcUF9cUJoD05zYmFoEFUGc2Vjb25kcUN1YmVVCnN0YXJ0X2RhdGVxRE51Yi4=”),  
  13.         “func_ref” : “__main__:job”,  
  14.         “kwargs” : BinData(0,”gAJ9cQEu”)  
  15. }  

        上面就是存储的具体信息。

4.异常处理

        当job抛出异常时,APScheduler会默默的把他吞掉,不提供任何提示,这不是一种好的实践,我们必须知晓程序的任何差错。APScheduler提供注册listener,可以监听一些事件,包括:job抛出异常、job没有来得及执行等。

Constant Event class Triggered when…
EVENT_SCHEDULER_START SchedulerEvent The scheduler is started
EVENT_SCHEDULER_SHUTDOWN SchedulerEvent The scheduler is shut down
EVENT_JOBSTORE_ADDED JobStoreEvent A job store is added to the scheduler
EVENT_JOBSTORE_REMOVED JobStoreEvent A job store is removed from the scheduler
EVENT_JOBSTORE_JOB_ADDED JobStoreEvent A job is added to a job store
EVENT_JOBSTORE_JOB_REMOVED JobStoreEvent A job is removed from a job store
EVENT_JOB_EXECUTED JobEvent A job is executed successfully
EVENT_JOB_ERROR JobEvent A job raised an exception during execution
EVENT_JOB_MISSED JobEvent A job’s execution time is missed

        看下面的例子,监听异常和miss事件,这里用logging模块打印日志,logger.exception()可以打印出异常堆栈信息。

[python] view plain copy

 print?

  1. def err_listener(ev):  
  2.     err_logger = logging.getLogger(‘schedErrJob’)  
  3.     if ev.exception:  
  4.         err_logger.exception(‘%s error.’, str(ev.job))  
  5.     else:  
  6.         err_logger.info(‘%s miss’, str(ev.job))  
  7.   
  8. schedudler.add_listener(err_listener, apscheduler.events.EVENT_JOB_ERROR | apscheduler.events.EVENT_JOB_MISSED)  

        事件的属性包括:

  • job – the job instance in question
  • scheduled_run_time – the time when the job was scheduled to be run
  • retval – the return value of the successfully executed job
  • exception – the exception raised by the job
  • traceback – the traceback object associated with the exception

        最后,需要注意一点当job不以daemon模式运行时,并且APScheduler也不是daemon的,那么在关闭脚本时,Ctrl + C是不奏效的,必须kill才可以。可以通过命令实现关闭脚本:

[plain] view plain copy

 print?

  1. ps axu | grep {脚本名} | grep -v grep | awk ‘{print $2;}’ | xargs kill