定时任务无论是个人开发还是企业业务都是需要的。但个人开发的时候使用celery,未免有点杀鸡用牛刀的感觉。Celery性能不错,但配置起来并没有那么简单。非密集型的定时任务,我们完全可以使用django-background-tasks来替代Celery。
How to use
按例,pip install django-background-tasks
来安装。
然后加入INSTALLED_APPS:
INSTALLED_APPS = (
# ...
'background_task',
# ...
)
创建对应的数据库
python manage.py makemigrations background_task
python manage.py migrate
接下来就可以使用了,例如我有一个函数需要在被调用之后60秒才运行
from background_task import background
@background(schedule=60)
def task(user_id):
do.somethings
这样定义task
,在我们使用的时候,直接调用它就行了。调用这个函数,不会被马上执行,也不会阻塞原本所在的位置的程序执行,并且会被加入background_task对应的数据库里,我们可以在Django后台看到任务完成情况,并且进行管理。
最后,我们需要另开一个进程来处理这些定时任务
python manage.py process_tasks
如果想后台运行此进程,nohup python manage.py process_tasks > tasks.log & echo $! > tasks.pid &
即可。运行的错误日志会被保存到tasks.log
中,进程号会被存在tasks.pid
中。
-
如果需要函数被调用之后,每隔一段时间就运行一次,只需要在调用时加上一个关键字参数
repeat
即可# some statements task(2, repeat=60*60*6)
例如此处,函数被调用之后,在60s之后运行,然后每隔60*60*6秒就会运行一次。
Extend
Django-background-tasks这个库是作为Django的一个app而存在的,所以Django的ORM对它依旧可以使用。
只需要导入对应的model-class就可以控制、访问任务完成的情况。
from background_task.models import Task
只执行一次定时任务
有些时候,我们只期望一个函数每隔一段时间运行一次。并且不期望这个函数任务被重复调用塞进任务数据库。那么可以通过查询任务数据库来做到这一点
可以看到DBT这个库关于任务的一部分定义如下,
@python_2_unicode_compatible
class Task(models.Model):
# the "name" of the task/function to be run
task_name = models.CharField(max_length=255, db_index=True)
# the json encoded parameters to pass to the task
task_params = models.TextField()
# a sha1 hash of the name and params, to lookup already scheduled tasks
task_hash = models.CharField(max_length=40, db_index=True)
verbose_name = models.CharField(max_length=255, null=True, blank=True)
...
而每一个任务的task_name
的值都是由该函数所在的App名.文件名.函数名
组成,三者之间由.
连接。
那么只需要通过Django自带的ORM进行查询Task,例如:
from background_task import background
from background_task.models import Task
@background(schedule=60)
def task(user_id):
do.somethings
def index(request):
if Task.objects.filter(task_name = "Clrawer.views.task").exists():
pass
else:
task(2, repeat=60*60*6)
就可以在index这个视图函数被调用的时候,通过查询任务是否已经被加进了任务队列,来选择是否调用task
函数。