Django-celery-beat动态添加周期性任务实现过程解析
前期准备
1.beat插件安装
pip3 install django-celery-beat
2.注册APP
INSTALLED_APPS = [.... ’django_celery_beat’,]
3.数据库变更
python3 manage.py migrate django_celery_beat
配置工作
目录结构请参考://www.jb51.net/article/200659.htm
1.配置celerypro.py
from __future__ import absolute_importimport osfrom celery import Celeryfrom django.conf import settingsfrom django.utils import timezone# set the default Django settings module for the ’celery’ program.# 为celery设置环境变量os.environ.setdefault(’DJANGO_SETTINGS_MODULE’, ’voice_quality_assurance_configure.settings’)# 创建celery appapp = Celery(’voice_quality_assurance_configure’)# Using a string here means the worker will not have to# pickle the object when using Windows.# 从单独的配置模块中加载配置app.config_from_object(’voice_quality_assurance_configure.celeryconfig’)# 设置app自动加载任务app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)# 解决时区问题,定时任务启动就循环输出app.now = timezone.now
2.配置celeryconfig.py
from __future__ import absolute_importfrom kombu import Queuefrom django.conf import settings# 设置代理人brokerCELERY_BROKER_URL = ’redis://127.0.0.1:6379/2’# 指定 BackendCELERY_RESULT_BACKEND = ’redis://127.0.0.1:6379/1’# 指定时区,默认是 UTCCELERY_TIMEZONE=’Asia/Shanghai’# celery 序列化与反序列化配置CELERY_TASK_SERIALIZER = ’pickle’CELERY_RESULT_SERIALIZER = ’pickle’CELERY_ACCEPT_CONTENT = [’pickle’, ’json’]CELERY_IGNORE_RESULT = True# celery 的启动工作数量设置CELERY_WORKER_CONCURRENCY = 10# 任务预取功能,会尽量多拿 n 个,以保证获取的通讯成本可以压缩。CELERYD_PREFETCH_MULTIPLIER = 20# 有些情况下可以防止死锁CELERYD_FORCE_EXECV = True# celery 的 worker 执行多少个任务后进行重启操作CELERY_WORKER_MAX_TASKS_PER_CHILD = 100# 禁用所有速度限制,如果网络资源有限,不建议开足马力。CELERY_DISABLE_RATE_LIMITS = True# celery beat配置(周期性任务设置)CELERY_ENABLE_UTC = FalseCELERY_TIMEZONE = settings.TIME_ZONEDJANGO_CELERY_BEAT_TZ_AWARE = FalseCELERY_BEAT_SCHEDULER = ’django_celery_beat.schedulers:DatabaseScheduler’
3.分别启动woker和beta
项目根目录终端执行(voice_quality_assurance_configure为项目名称,简单来说,和manage.py文件同级)
celery -A voice_quality_assurance_configure beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler #
启动beta 调度器使用数据库
celery worker -A voice_quality_assurance_configure --loglevel=info -n worker1 #启动celery worker
4.创建周期性任务
from datetime import datetime, timedeltaimport jsonimport os,djangoos.environ.setdefault('DJANGO_SETTINGS_MODULE', 'voice_quality_assurance_configure.settings')# project_name 项目名称django.setup()from django_celery_beat.models import PeriodicTask, IntervalScheduleschedule, created = IntervalSchedule.objects.get_or_create(every=10,period=IntervalSchedule.SECONDS,)# 带参数的创建方法,如下:PeriodicTask.objects.create( interval=schedule, # 上面创建10秒的间隔 interval 对象 name=’test_task’, # 设置任务的name值 task=’mission.tasks.my_task’, # 指定需要周期性执行的任务 args=json.dumps([10, 2, 76]), expires=datetime.utcnow() + timedelta(seconds=30))
详解创建周期性任务的方法
创建基于interval的周期性任务
第一步创建间隔对象
schedule, created = IntervalSchedule.objects.get_or_create( every=10, period=IntervalSchedule.SECONDS,)
IntervalSchedule.DAYS 固定间隔天数IntervalSchedule.HOURS 固定间隔小时数IntervalSchedule.MINUTES 固定间隔分钟数IntervalSchedule.SECONDS 固定间隔秒数IntervalSchedule.MICROSECONDS 固定间隔微秒
第二步创建任务
无参数的创建方法:
PeriodicTask.objects.create( interval=schedule, # we created this above. name=’test_task’, # simply describes this periodic task. task=’app名.tasks.任务函数名’, # name of task.)
有参数的创建方法:
PeriodicTask.objects.create( interval=schedule, # we created this above. name=’test’_task’, # simply describes this periodic task. task=’app名.tasks.任务函数名’, # name of task. args=json.dumps([’arg1’, ’arg2’]), kwargs=json.dumps({ ’be_careful’: True, }), expires=datetime.utcnow() + timedelta(seconds=30) )
class MonitorDeviceTask(object): ''' 设备创建,增加周期性任务 ''' def __init__(self, device_obj): self.device_obj = device_obj self.periodic_task = PeriodicTask.objects.create( interval=schedule, name=’test_task’, task=’mission.tasks.my_task’, args=json.dumps([self.device_obj.ip]) ) def starttask(self): ''' 启动任务 ''' self.periodic_task.enabled = True self.periodic_task.save() def stoptask(self): ''' 停止任务 ''' self.periodic_task.enabled = False self.periodic_task.save() def deltask(self): ''' 删除任务 ''' self.periodic_task.delete() self.periodic_task.save()
创建基于 crontab 的周期性任务
from django_celery_beat.models import CrontabSchedule, PeriodicTaskschedule, _ = CrontabSchedule.objects.get_or_create( minute=’30’, hour=’*’, day_of_week=’*’, day_of_month=’*’, month_of_year=’*’, timezone=pytz.timezone(’Canada/Pacific’))
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持好吧啦网。
相关文章: