相关模块依赖如下
amqp==2.5.2
anyjson==0.3.3
billiard==3.6.1.0
celery==4.3.0
Django==2.0
dnspython==1.16.0
eventlet==0.25.1
greenlet==0.4.15
importlib-metadata==1.3.0
kombu==4.6.7
monotonic==1.5
more-itertools==8.0.2
pytz==2019.3
redis==3.2.0
six==1.13.0
vine==1.3.0
zipp==0.6.0
比如说:我有一个demo3的django项目。
配置celery
修改settings.py文件,内容如下:
# 增加如下信息
# Celery settings
"""
当使用redis做broker,redis连接需要密码时:
BROKER_URL='redis://:xxxxx@127.0.0.1:6379/0',
其中xxxxx是密码,密码前必须加冒号。
"""
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/6'
# BROKER_URL = 'redis://127.0.0.1:6379/6'
# CELERY_ACCEPT_CONTENT = ['json']
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/10'
# CELERY_TASK_SERIALIZER = 'json'
创建celery.py
文件,文件位于/demo3/demo3/celery.py(settings.py
同级),内容如下:
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'demo3.settings')
app = Celery('demo3')
app.config_from_object('django.conf:settings', namespace='CELERY')
# 自动搜索任务
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
修改/demo3/demo3/__init__.py
文件,内容如下:
from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app
__all__ = ('celery_app',)
django应用下使用celery
settings.py文件内容如下:
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'app', # 应用
]
urls.py文件(settings.py同级),内容如下:
from __future__ import absolute_import, unicode_literals
from django.contrib import admin
from django.urls import path,include
urlpatterns = [
path('admin/', admin.site.urls),
path("app/",include("app.urls")),
]
在app应用下创建urls.py,内容如下:
from django.urls import path
from . import views
# 命名空间
app_name = 'app'
urlpatterns = [
path("test/", views.test),
]
在app应用下再创建一个文件,文件名为tasks.py
,文件内容如下:
from __future__ import absolute_import, unicode_literals
from celery import shared_task
# 第一个celery任务
@shared_task
def add(x, y):
return x + y
创建test视图,修改app/views.py文件,文件内容如下:
from django.shortcuts import HttpResponse
from .tasks import add
# Create your views here.
def test(request):
result = add.delay(2, 3)
print(result.ready())
return HttpResponse('第一次测试clery运行')
终端下运行如下命令:
celery -A demo3 worker -l info
如下图所示:
测试celery
运行django项目,再打开一个终端,输入如下:
python manage.py runserver
然后浏览器访问地扯栏如下:
http://127.0.0.1:8000/app/test/
再次查看执行celery命令下的终端的打印信息,如下图所示:
重新运行celery时加上-P eventlet
celery -A demo3 worker -l info -P eventlet
然后重新运行django项目,再次访问浏览器http://127.0.0.1:8000/app/test/
然后我们再打开celery命令下的终端查看,如下图所示:
注意: 要启动redis
celery定时任务
第一种
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from datetime import timedelta
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'demo3.settings')
app = Celery('demo3')
app.config_from_object('django.conf:settings', namespace='CELERY')
# 设置定时任务
app.conf.beat_schedule = {
'第一个定时任务名称': {
'task': 'app.tasks.add', # 执行的任务
'schedule': timedelta(seconds=5), # 每5秒执行一次
'args': (1, 10) # 参数
}
}
# 时区设置
app.conf.timezone = 'Asia/Shanghai'
# 自动搜索任务
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
任务命令
celery -A demo3 beat
执行任务命令
celery -A demo3 worker -l info -P eventlet
注意:如果加上-P eventlet
的话,要先下载pip install eventlet
为了防止内存泄漏
比如说开了10个worker跑定时任务,跑10多个定时任务,才跑两天就发现woker占的内存越来越大一直不释放,后来发现默认每个worker跑完100个任务后才会自我销毁程序重建来释放内存,所以需要增加一个配置定义每个worker执行多少个任务后才会自我销毁重建。
建议在settings.py中加入如下:
CELERYD_MAX_TASKS_PER_CHILD = 3 # 每个worker最多执行3个任务就会被销毁,可防止内存泄露