中间件(middleware)允许您在一个浏览器的请求在到达Django视图之前处理它,以及在视图返回的响应到达浏览器之前处理这个响应。中间件(middleware)是一个镶嵌到Django的request(请求)/response(响应)处理机制中的一个钩子(hooks) 框架。它是一个可以修改Django全局输入或输出的一个底层插件系统。
中间件(Middleware)在整个Django的request/response处理机制中的角色如下所示:
HttpRequest -> Middleware -> View -> Middleware -> HttpResponse
中间件常用于权限校验、限制用户请求、打印日志、改变输出内容等场景,比如:
禁止特定IP地址的用户或未登录的用户访问我们的View视图函数;
对同一IP地址单位时间内发送的请求数量做出限制;
在View视图函数执行前传递额外的变量或参数;
在View视图函数执行前或执行后把特定信息打印到log日志;
在View视图函数执行后对response数据进行修改后返回给用。
装饰器也经常用于用户权限校验。但与装饰器不同,中间件对Django的输入或输出的改变是全局的。比如@login_required装饰器仅作用于单个视图函数。如果你希望实现全站只有登录用户才能访问,编写一个中间件是一个更好的解决方案。
Django自带中间件
MIDDLEWARE = [
#为request/response提供了几种安全改进;
'django.middleware.security.SecurityMiddleware',
#开启session会话支持;
'django.contrib.sessions.middleware.SessionMiddleware',
#基于APPEND_SLASH和PREPEND_WWW的设置来重写URL,如果APPEND_SLASH设为True,并且初始URL 没有以斜线结尾以及在URLconf 中没找到对应定义,这时形成一个斜线结尾的新URL;
'django.middleware.common.CommonMiddleware',
#添加跨站点请求伪造的保护,通过向POST表单添加一个隐藏的表单字段,并检查请求中是否有正确的值;
'django.middleware.csrf.CsrfViewMiddleware',
#在视图函数执行前向每个接收到的user对象添加HttpRequest属性,表示当前登录的用户,无它用不了request.user。
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',#开启基于Cookie和会话的消息支持
'django.middleware.clickjacking.XFrameOptionsMiddleware',#对点击劫持的保护
]
Django还提供了压缩网站内容的GZipMiddleware,根据用户请求语言返回不同内容的LocaleMiddleware和给GET请求附加条件的ConditionalGetMiddleware。
Django的中间件执行顺序
当你在settings.py注册中间件时一定要要考虑中间件的执行顺序,中间件在request到达view之前是从上向下执行的,在view执行完后返回response过程中是从下向上执行的。
自定义中间件
自定义中间件你首先要在app所属目录下新建一个文件middleware.py,添加好编写的中间件代码,然后在项目settings.py中把它添加到MIDDLEWARE列表进行注册,添加时一定要注意顺序。Django提供了两种编写自定义中间件的方式:函数和类。
函数
def simple_middleware(get_response):
# 一次性设置和初始化
def middleware(request):
# 请求在到达视图前执行的代码
response = get_response(request)
# 响应在返回给客户端前执行的代码
return response
return middleware
当请求从浏览器发送到服务器视图时,将执行response = get_response(request)该行之前的所有代码。当响应从服务器返回到浏览器时,将执行response = get_response(request)此行之后的所有内容。
respone = get_response(request)将调用列表中的下一个中间件。如果这是最后一个中间件,则将调用该视图。
示例:以函数编写一个名为timeit_middleware的中间件,打印出执行每个请求所花费的时间:
import time
def timeit_middleware(get_response):
def middleware(request):
start = time.time()
response = get_response(request)
end = time.time()
print("请求花费时间: {}秒".format(end - start))
return response
return middleware
注册中间件
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
'blog.middleware.timeit_middleware', # 新增
]
执行效果
每当Django处理一个请求时,终端(terminal)就会打印出请求花费时间。
使用类
class SimpleMiddleware:
def __init__(self, get_response):
# 一次性设置和初始化
self.get_response = get_response
def __call__(self, request):
# 视图函数执行前的代码
response = self.get_response(request)
# 视图函数执行后的代码
return response
示例:以类来编写一个名为LoginRequiredMiddleware的中间件,实现全站要求登录,但是登录页面和开放白名单上的urls除外。
from django.shortcuts import redirect
from django.conf import settings
class LoginRequiredMiddleware:
def __init__(self, get_response):
self.get_response = get_response
self.login_url = settings.LOGIN_URL
# 开放白名单,比如['/login/', '/admin/']
self.open_urls = [self.login_url] + getattr(settings, 'OPEN_URLS', [])
def __call__(self, request):
if not request.user.is_authenticated and request.path_info not in self.open_urls:
return redirect(self.login_url + '?next=' + request.get_full_path())
response = self.get_response(request)
return response
request.path_info用于获取当前请求的相对路径,如/articles/,而request.get_full_path()用于获取当前请求完整的相对路径,包括请求参数,如/articles/?page=2。使用request.get_full_path()时别忘了加括号哦,否则返回的是uwsgi请求对象,不是字符串。
注册中间件:修改settings.py, 注册中间件,并添加LOGIN_URL和OPEN_URLS。
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
'blog.middleware.timeit_middleware',
'blog.middleware.LoginRequiredMiddleware',
]
LOGIN_URL = "/admin/login/"
OPEN_URLS = ["/admin/"]
查看效果:添加完中间件后,你访问任何非LOGIN_URL和OPEN_URLS里的urls,都需要你先进行登录。
其它中间件钩子函数
其它三个中间件钩子函数,分别在执行视图函数,处理异常和进行模板渲染时调用。
process_view(request, view_func, view_args, view_kwargs)该方法有四个参数
request是HttpRequest对象。
view_func是Django即将使用的视图函数。它是实际的函数对象,而不是函数的名称作为字符串。
view_args是将传递给视图的位置参数的列表。
view_kwargs是将传递给视图的关键字参数的字典。view_args和view_kwargs都不包含第一个视图参数(request)。
Django会在调用视图函数之前调用process_view方法。它应该返回None或一个HttpResponse对象。如果返回None,Django将继续处理这个请求,执行任何其他中间件的process_view方法,然后在执行相应的视图。如果它返回一个HttpResponse对象,Django不会调用适当的视图函数。它将执行中间件的process_response方法并将应用到该HttpResponse并返回结果。
process_exception(self, request, exception)该方法两个参数:
一个HttpRequest对象
一个exception是视图函数异常产生的Exception对象。
这个方法只有在视图函数中出现异常了才执行,它返回的值可以是一个None也可以是一个HttpResponse对象。如果是HttpResponse对象,Django将调用模板和中间件中的process_response方法,并返回给浏览器,否则将默认处理异常。如果返回一个None,则交给下一个中间件的process_exception方法来处理异常。该方法常用于发生异常时通知管理员或将其日志的形式记录下来。
process_template_response(self, request, response)该方法两个参数:
一个HttpRequest对象
一个response是TemplateResponse对象(由视图函数或者中间件产生)。
该方法是在视图函数执行完成后立即执行,但是它有一个前提条件,那就是视图函数返回的对象有一个render()方法(或者表明该对象是一个TemplateResponse对象)。该方法常用于向模板注入变量或则直接改变模板。
实现方式
函数
from django.http import HttpResponse
def timeit_middleware(get_response):
def middleware(request):
response = get_response(request)
return response
def process_view(request, view_func, view_args, view_kwargs)
return None or HttpResponse(xx)
def process_exception(self, request, exception):
return None or HttpResponse(xx)
def process_template_response(self, request, response)
return ...
middleware.process_view = process_view
middleware.process_exception = process_exception
middleware.process_template_response = process_template_response
return middleware
类
class MyClassMiddleware:
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
return self.get_response(request)
def process_view(request, view_func, view_args, view_kwargs)
return None or HttpResponse(xx)
def process_exception(self, request, exception):
return None or HttpResponse(xx)
# 例子: 打印出异常
return HttpResponse(<h1>str(exception)</h1)
# 该方法仅对TemplateResponse输入有用,对render方法失效
def process_template_response(self, request, response)
response.context_data['title'] = 'New title'
return response
每次在启动Django服务之前,我们都会在终端运行pythonmanage.py xxx的管理命令。自定义Django-admin命令一共分三步:创建文件夹布局、编写命令代码和测试使用。
创建文件夹布局
自定义的Django-admin管理命令本质上是一个python脚本文件,它的存放路径必须遵循一定的规范,一般位于app/management/commands目录。
app01/
__init__.py
models.py
management/
__init__.py
commands/
__init__.py
_private.py # 以下划线开头文件不能用作管理命令
my_commands.py # 这个就是自定义的管理命令脚本,文件名即为命令名
tests.py
views.py
management和commands每个目录下都必须有个__init__.py空文件,表明这是一个python包。另外以下划线开头的文件名不能用作管理命令脚本。
management/commands目录可以位于任何一个app的目录下,Django都能找到它。
一般建议每个python脚本文件对应一条管理命令。
编写命令代码
每一个自定义的管理命令本质是一个Command类, 它继承了Django的Basecommand或其子类, 主要通过重写handle()方法实现自己的业务逻辑代码,而add_arguments()则用于帮助处理命令行的参数,如果运行命令时不需要额外参数,可以不写这个方法。
from django.core.management.base import BaseCommand
class Command(BaseCommand):
# 帮助文本, 一般备注命令的用途及如何使用。
help = 'Some help texts'
# 处理命令行参数,可选
def add_arguments(self, parser):
pass
# 核心业务逻辑
def handle(self, *args, **options):
pass
定义一个名为hello_world的命令。这样当我们运行pythonmanage.py hello_world命令时,控制台会打印出Hello World!字样。在app/management/commands目录下新建hello_world.py。
from django.core.management.base import BaseCommand
class Command(BaseCommand):
# 帮助文本, 一般备注命令的用途及如何使用。
help = "Print Hello World!"
# 核心业务逻辑
def handle(self, *args, **options):
self.stdout.write('Hello World!')
# 当你使用管理命令并希望在控制台输出指定信息时,
# 你应该使用`self.stdout`和`self.stderr`方法,
# 而不能直接使用python的`print`方法。
# 另外,你不需要在消息的末尾加上换行符,它将被自动添加。
进入项目文件夹运行python manage.py hello_world命令时,将得到输出结果。
通过命令行给hello_world命令传递参数,以实现运行pythonmanage.py helloworld John命令时 打印出Hello World! John。修改hello_world.py, 添加add_arguments方法,该方法的作用是给自定义的handle方法添加1个或多个参数。
from django.core.management.base import BaseCommand
class Command(BaseCommand):
# 帮助文本, 一般备注命令的用途及如何使用。
help = "Print Hello World!"
# 给命令添加一个名为name的参数
def add_arguments(self, parser):
parser.add_argument('name')
# 核心业务逻辑,通过options字典接收name参数值,拼接字符串后输出
def handle(self, *args, **options):
msg = 'Hello World ! '+ options['name']
self.stdout.write(msg)
#运行`python manage.py hello_world John`命令时,得到输出结果。
#如果你直接运行命令而不携带参数,将会报错。
实际应用场景
案例1:检查数据库连接是否已就绪
# app/management/commands/wait_for_db.py
import time
from django.db import connections
from django.db.utils import OperationalError
from django.core.management import BaseCommand
class Command(BaseCommand):
help = 'Run data migrations until db is available.'
def handle(self, *args, **options):
self.stdout.write('Waiting for database...')
db_conn = None
while not db_conn:
try:
# 尝试连接
db_conn = connections['default']
except OperationalError:
# 连接失败,就等待1秒钟
self.stdout.write('Database unavailable, waiting 1 second...')
time.sleep(1)
self.stdout.write(self.style.SUCCESS('Database available!'))
每次在运行python manage.py migrate 命令前先运行python manage.py wait_for_db。
案例2:周期性发送邮件
# app/management/commands/mail_admin.py
#-*- coding:utf-8 -*-
from datetime import timedelta, time, datetime
from django.core.mail import mail_admins
from django.core.management import BaseCommand
from django.utils import timezone
from django.contrib.auth import get_user_model
User = get_user_model()
today = timezone.now()
yesterday = today - timedelta(1)
class Command(BaseCommand):
help = "Send The Daily Count of New Users to Admins"
def handle(self, *args, **options):
# 获取过去一天注册用户数量
user_count =User.objects.filter(date_joined__range=(yesterday, today)).count()
# 当注册用户数量多余1个,才发送邮件给管理员
if user_count >= 1:
message = "You have got {} user(s) in the past 24 hours".format(user_count)
subject = (
f"New user count for {today.strftime('%Y-%m-%d')}: {user_count}"
)
mail_admins(subject=subject, message=message, html_message=None)
self.stdout.write("E-mail was sent.")
else:
self.stdout.write("No new users today.")
真正发送邮件成功需要设置Email后台及管理员,测试环境下可以使用如下简单配置
EMAIL_BACKEND = "django.core.mail.backends.console.EmailBackend"
DEFAULT_FROM_EMAIL = "admin@example.com"
ADMINS = [("admin", "admin@example.com"), ]
完全可以使用Linux的crontab服务或Celery-Beat将其设成周期性定时任务task,这时只需要调用Django的call_command方法即可。
# app/tasks.py, 可以任一app目录下新建task
from celery import shared_task
from django.core.management import call_command
@shared_task
def mail_admin():
call_command("mail_admin", )
安装项目依赖文件
# pip安装必选
Django==3.2
celery==5.0.5
redis==3.5.3
# 可选,windows下运行celery 4以后版本,还需额外安装eventlet库
eventlet
# 推荐安装, 需要设置定时或周期任务时安装,推荐安装
django-celery-beat==2.2.0
# 视情况需要,需要存储任务结果时安装,视情况需要
django-celery-results==2.0.1
# 视情况需要,需要监控celery运行任务状态时安装
folower==0.9.7
Celery配置
首先需要在myproject/myproject目录下新增celery.py并修改__init__.py
- myproject/
- manage.py
- project/
- __init__.py # 修改这个文件
- celery.py # 新增这个文件
- asgi.py
- settings.py
- urls.py
- wsgi.py
新建celery.py
import os
from celery import Celery
# 设置环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
# 实例化
app = Celery('myproject')
# namespace='CELERY'作用是允许你在Django配置文件中对Celery进行配置
# 但所有Celery配置项必须以CELERY开头,防止冲突
app.config_from_object('django.conf:settings', namespace='CELERY')
# 自动从Django的已注册app中发现任务
app.autodiscover_tasks()
# 一个测试任务
@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}')
修改__init__.py
from .celery import app as celery_app
__all__ = ('celery_app',)
修改Django项目的settings.py,添加Celery有关配置选项
# 最重要的配置,设置消息broker,格式为:db://user:password@host:port/dbname
# 如果redis安装在本机,使用localhost
# 如果docker部署的redis,使用redis://redis:6379
CELERY_BROKER_URL = "redis://127.0.0.1:6379/0"
# celery时区设置,建议与Django settings中TIME_ZONE同样时区,防止时差
# Django设置时区需同时设置USE_TZ=True和TIME_ZONE = 'Asia/Shanghai'
CELERY_TIMEZONE = TIME_ZONE
其它Celery常用配置选项
# 为django_celery_results存储Celery任务执行结果设置后台
# 格式为:db+scheme://user:password@host:port/dbname
# 支持数据库django-db和缓存django-cache存储任务状态及结果
CELERY_RESULT_BACKEND = "django-db"
# celery内容等消息的格式设置,默认json
CELERY_ACCEPT_CONTENT = ['application/json', ]
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
# 为任务设置超时时间,单位秒。超时即中止,执行下个任务。
CELERY_TASK_TIME_LIMIT = 5
# 为存储结果设置过期日期,默认1天过期。如果beat开启,Celery每天会自动清除。
# 设为0,存储结果永不过期
CELERY_RESULT_EXPIRES = xx
# 任务限流
CELERY_TASK_ANNOTATIONS = {'tasks.add': {'rate_limit': '10/s'}}
# Worker并发数量,一般默认CPU核数,可以不设置
CELERY_WORKER_CONCURRENCY = 2
# 每个worker执行了多少任务就会死掉,默认是无限的
CELERY_WORKER_MAX_TASKS_PER_CHILD = 200
https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-result_expires
在Django中正式编写和执行自己的异步任务前,一定要先测试redis和celery是否安装好并配置成功。
一个无限期阻塞的任务会使得工作单元无法再做其他事情,建议给任务设置超时时间。
测试Celery是否工作正常
首先你要启动redis服务。windows进入redis所在目录(比如C:\redis),使用redis-server.exe启动redis。Linux下使用./redis-server redis.conf启动,也可修改redis.conf将daemonize设置为yes, 确保守护进程开启。
启动redis服务后,你要先进入项目所在文件夹运行pythonmanage.py runserver命令启动Django服务器(无需创建任何app),然后再打开一个终端terminal窗口输入celery命令,启动worker。
# Linux下测试,启动Celery
Celery -A myproject worker -l info
# Windows下测试,启动Celery
Celery -A myproject worker -l info -P eventlet
# 如果Windows下Celery不工作,输入如下命令
Celery -A myproject worker -l info --pool=solo
如果你能看到[tasks]下所列异步任务清单如debug_task,以及最后一句celery@xxxx ready, 说明你的redis和celery都配置好了,可以开始正式工作了。
编写任务
Django项目中所有需要Celery执行的异步或周期性任务都放在tasks.py文件里,该文件可以位于project目录下,也可以位于各个app的目录下。专属于某个Celery实例化项目的task可以使用@app.task装饰器定义,各个app目录下可以复用的task建议使用@shared_task定义。
# myproject/tasks.py
# 专属于myproject项目的任务
app = Celery('myproject')
@ app.task
def test():
pass
# app/tasks.py, 可以复用的task
from celery import shared_task
import time
@shared_task
def add(x, y):
time.sleep(2)
return x + y
使用celery定义任务时,避免在一个任务中调用另一个异步任务,容易造成阻塞。
当我们使用@app.task装饰器定义我们的异步任务时,那么这个任务依赖于根据项目名myproject生成的Celery实例。然而我们在进行Django开发时为了保证每个app的可重用性,我们经常会在每个app文件夹下编写异步任务,这些任务并不依赖于具体的Django项目名。使用@shared_task 装饰器能让我们避免对某个项目名对应Celery实例的依赖,使app的可移植性更强。
异步调用任务
2种以异步方式调用任务的方法,delay和apply_async方法
# 方法一:delay方法
task_name.delay(args1, args2, kwargs=value_1, kwargs2=value_2)
# 方法二: apply_async方法,与delay类似,但支持更多参数
task.apply_async(args=[arg1, arg2], kwargs={key:value, key:value})
编写了一个Django视图函数,使用delay方法调用add任务
# app/views.py
from .tasks import add
def test_celery(request):
add.delay(3, 5)
return HttpResponse("Celery works")
# app/urls.py
urlpatterns = [
re_path(r'^test/$', views.test_celery, name="test_celery")
]
当你通过浏览器访问/test/链接时,你根本感受不到2s的延迟,页面可以秒开,同时你会发现终端的输出,显示任务执行成功。
使用apply_async方法调用add任务,还要打印初任务的id (task.id)和状态status。Celery会为每个加入到队列的任务分配一个独一无二的uuid,通过task.status获取状态和task.result获取结果。注意:apply_async传递参数的方式与delay方法不同。
# app/views.py
from .tasks import add
def test_celery(request):
result = add.apply_async(args=[3, 5])
return HttpResponse(result.task_id + ' : ' + result.status)
这个异步任务执行了,返回了个计算结果(8),django-celery-results系统性地了解任务状态并获取这个执行结果。
查看任务执行状态及结果
pip安装django-celery-results后,需要将其加入到INSTALLED_APPS并使用migrate命令迁移创建数据表。
# 支持数据库django-db和缓存django-cache存储任务状态及结果
# 建议选django-db
CELERY_RESULT_BACKEND = "django-db"
# celery内容等消息的格式设置,默认json
CELERY_ACCEPT_CONTENT = ['application/json', ]
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
安装配置完成后,进入Django admin后台,详细看到每个任务的id、名称及状态。
点击单个任务id,你可以看到有关这个任务的更多信息,比如传递的参数和返回结果。
除了在Django admin后台中查看任务状态和结果,你还可以在视图中通过AsyncResult方法获取任务执行状态和结果,它需要接收一个任务的task_id(通常为uuid格式)。
from celery.result import AsyncResult
# 调用异步任务
async_task = add.apply_async(args=[3, 5])
# 获取任务状态和结果
AsyncResult(async_task.task_id).status
AsyncResult(async_task.task_id).result
设置定时和周期性任务
借助于装django-celery-beat后, 你可以将任一Celery任务设置为定时任务或周期性任务。使用它你只需要通过pip安装它,并加入INSTALLED_APPS里去。
django-celery-beat提供了两种添加定时或周期性任务的方式,一是直接在settings.py中添加,二是通过Django admin后台添加。
配置文件添加任务
同一任务可以设置成不同的调用周期,给它们不同的任务名就好了。
from datetime import timedelta
CELERY_BEAT_SCHEDULE = {
"add-every-30s": {
"task": "app.tasks.add",
'schedule': 30.0, # 每30秒执行1次
'args': (3, 8) # 传递参数-
},
"add-every-day": {
"task": "app.tasks.add",
'schedule': timedelta(hours=1), # 每小时执行1次
'args': (3, 8) # 传递参数-
},
}
Django Admin添加周期性任务
先在settings.py中将任务调度器设为DatabaseScheduler。
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
然后进入Periodic Task表添加和修改周期性任务即可。
通过Crontab设置定时任务
如果你希望在特定的时间(某月某周或某天)执行一个任务,你可以通过crontab设置定时任务
CELERY_BEAT_SCHEDULE = {
# 每周一早上7点半执行
'add-every-monday-morning': {
'task': 'app.tasks.add',
'schedule': crontab(hour=7, minute=30, day_of_week=1),
'args': (7, 8),
},
}
例子 | 含义 |
crontab() | 每分 |
crontab(minute=0, hour=0) | 每天午夜 |
crontab(minute=0, hour='*/3') | 能被3整除的小时数,3,6,9点等等 |
crontab(minute=0,hour='0,3,6,9,12,15,18,21') | 与前面相同,指定小时 |
crontab(minute='*/15') | 每15分钟 |
crontab(day_of_week='sunday') | 星期日每分钟 |
crontab(minute='*',hour='*', day_of_week='sun') | 同上 |
crontab(minute='*/10',hour='3,17,22', day_of_week='thu,fri') | 每10分钟运行一次, 但仅限于周四或周五的 3-4 am, 5-6 pm, 和10-11 pm. |
crontab(minute=0, hour='*/2,*/3') | 可以被2或3整除的小时数,除了 1am, 5am, 7am, 11am, 1pm, 5pm, 7pm, 11pm |
crontab(minute=0, hour='*/5') | 可以被5整除的小时 |
crontab(minute=0, hour='*/3,8-17') | 8am-5pm之间可以被3整除的小时 |
crontab(0, 0, day_of_month='2') | 每个月的第2天 |
crontab(0, 0,day_of_month='2-30/2') | 每月的偶数日 |
crontab(0, 0,day_of_month='1-7,15-21') | 每月的第一和第三周 |
crontab(0, 0, day_of_month='11',month_of_year='5') | 每年的5月11日 |
crontab(0, 0,month_of_year='*/3') | 每个季度首个月份每天 |
Crontab也可以通过Django Admin添加,然后与任务进行绑定。
如果你变换了时区timezone,比如从’UTC’变成了’Asia/Shanghai’,需重置周期性任务,这非常重要。
# 调整timezone后重置任务
$ python manage.py shell
>>> from django_celery_beat.models import PeriodicTask
>>> PeriodicTask.objects.update(last_run_at=None)
前面我们只是添加了定时或周期性任务,我们还需要启动任务调度器beat分发定时和周期任务给Celery的worker。
启动任务调度器beat
多开几个终端,一个用来启动任务调度器beat,另一个启动celeryworker,你的任务就可以在后台执行。
# 开启任务调度器
Celery -A myproject beat
# Linux下开启Celery worker
Celery -A myproject worker -l info
# windows下开启Celery worker
Celery -A myproject worker -l info -P eventlet
# windows下如果报Pid错误
Celery -A myproject worker -l info --pool=solo
Flower监控任务执行状态
除了django_celery_results, 你可以使用flower监控后台任务执行状态。它提供了一个可视化的界面,在测试环境中非常有用。
pip install flower
两种方式启动服务器。启动服务器后,打开http://localhost:5555即可查看监控情况。
# 从terminal终端启动, proj为项目名
$ flower -A proj --port=5555
# 从celery启动
$ celery flower -A proj --address=127.0.0.1 --port=5555
Celery高级用法与注意事项
给任务设置最大重试次数
定义任务时可以通过max_retries设置最大重试次数,并调用self.retry方法调用。因为要调用self这个参数,定义任务时必须设置bind=True。
@shared_task(bind=True, max_retries=3)
def send_batch_notifications(self):
try:
something_raising()
raise Exception('Can\'t send email.')
except Exception as exc:
self.retry(exc=exc, countdown=5)
send_mail(
subject='Batch email notifications',
message='Test email',
from_email='sour@example.com',
recipient_list=['dest@example.com']
)
不同任务交由不同Queue处理
为了防止一些非常占用资源或耗时的任务阻塞任务队列导致一些简单任务也无法执行,可以将不同任务交由不同的Queue处理。定义了两个Queue队列,default执行普通任务,heavy_tasks执行重型任务。
CELERY_TASK_DEFAULT_QUEUE = 'default'
CELERY_TASK_DEFAULT_ROUTING_KEY = 'default'
CELERY_QUEUES = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('heavy_tasks', Exchange('heavy_tasks'), routing_key='heavy_tasks'),
)
CELERY_TASK_ROUTES = {
'myapp.tasks.heave_tasks': 'heavy_tasks'
}
忽略不想要的结果
如果你不在意任务的返回结果,可以设置 ignore_result 选项,因为存储结果耗费时间和资源。你还可以可以通过 task_ignore_result 设置全局忽略任务结果。
@app.task(ignore_result=True)
def my_task():
something()
避免启动同步子任务
让一个任务等待另外一个任务的返回结果是很低效的,并且如果工作单元池被耗尽的话这将会导致死锁。
# 坏例子
@app.task
def update_page_info(url):
page = fetch_page.delay(url).get()
info = parse_page.delay(url, page).get()
store_page_info.delay(url, info)
@app.task
def fetch_page(url):
return myhttplib.get(url)
@app.task
def parse_page(url, page):
return myparser.parse_document(page)
@app.task
def store_page_info(url, info):
return PageInfo.objects.create(url, info)
我们将不同的任务签名链接起来创建一个任务链,三个子任务按顺序执行。
# 好例子
def update_page_info(url):
# fetch_page -> parse_page -> store_page
chain = fetch_page.s(url) | parse_page.s() | store_page_info.s(url)
chain()
@app.task()
def fetch_page(url):
return myhttplib.get(url)
@app.task()
def parse_page(page):
return myparser.parse_document(page)
@app.task(ignore_result=True)
def store_page_info(info, url):
PageInfo.objects.create(url=url, info=info)
Django的模型对象不应该作为参数传递
几乎总是在任务运行时从数据库获取对象是最好的,因为老的数据会导致竞态条件。假象有这样一个场景,你有一篇文章,以及自动展开文章中缩写的任务:
class Article(models.Model):
title = models.CharField()
body = models.TextField()
@app.task
def expand_abbreviations(article):
article.body.replace('Old text', 'New text')
article.save()
首先,作者创建一篇文章并保存,这时作者点击一个按钮初始化一个缩写展开任务:
>>> article = Article.objects.get(id=102)
>>> expand_abbreviations.delay(article)
现在,队列非常忙,所以任务在2分钟内都不会运行。与此同时,另一个作者修改了这篇文章,当这个任务最终运行,因为老版本的文章作为参数传递给了这个任务,所以这篇文章会回滚到老的版本。修复这个竞态条件很简单,只要参数传递文章的 id 即可,此时可以在任务中重新获取这篇文章:
@app.task
def expand_abbreviations(article_id):
article = Article.objects.get(id=article_id)
article.body.replace('MyCorp', 'My Corporation')
article.save()
使用on_commit函数处理事务
这是在数据库中创建一个文章对象的 Django 视图,此时传递主键给任务。它使用 commit_on_success 装饰器,当视图返回时该事务会被提交,当视图抛出异常时会进行回滚。
from django.db import transaction
@transaction.commit_on_success
def create_article(request):
article = Article.objects.create()
expand_abbreviations.delay(article.pk)
如果在事务提交之前任务已经开始执行会产生一个竞态条件;数据库对象还不存在。解决方案是使用 on_commit 回调函数来在所有事务提交成功后启动任务。
from django.db.transaction import on_commit
def create_article(request):
article = Article.objects.create()
on_commit(lambda: expand_abbreviations.delay(article.pk))