AberSheeran
Aber Sheeran

Django定时任务

起笔自
所属文集: Python-Package
共计 2145 个字符
落笔于

定时任务无论是个人开发还是企业业务都是需要的。但个人开发的时候使用celery,未免有点杀鸡用牛刀的感觉。Celery性能不错,但配置起来并没有那么简单。非密集型的定时任务,我们完全可以使用django-background-tasks来替代Celery。

How to use

按例,pip install django-background-tasks来安装。

然后加入INSTALLED_APPS:

INSTALLED_APPS = (
    # ...
    'background_task',
    # ...
)

创建对应的数据库

python manage.py makemigrations background_task
python manage.py migrate

接下来就可以使用了,例如我有一个函数需要在被调用之后60秒才运行

from background_task import background

@background(schedule=60)
def task(user_id):
    do.somethings

这样定义task,在我们使用的时候,直接调用它就行了。调用这个函数,不会被马上执行,也不会阻塞原本所在的位置的程序执行,并且会被加入background_task对应的数据库里,我们可以在Django后台看到任务完成情况,并且进行管理。

最后,我们需要另开一个进程来处理这些定时任务

python manage.py process_tasks

如果想后台运行此进程,nohup python manage.py process_tasks > tasks.log & echo $! > tasks.pid &即可。运行的错误日志会被保存到tasks.log中,进程号会被存在tasks.pid中。

  • 如果需要函数被调用之后,每隔一段时间就运行一次,只需要在调用时加上一个关键字参数repeat即可

    # some statements
    task(2, repeat=60*60*6)
    

    例如此处,函数被调用之后,在60s之后运行,然后每隔60*60*6秒就会运行一次。

Extend

Django-background-tasks这个库是作为Django的一个app而存在的,所以Django的ORM对它依旧可以使用。

只需要导入对应的model-class就可以控制、访问任务完成的情况。

from background_task.models import Task

只执行一次定时任务

有些时候,我们只期望一个函数每隔一段时间运行一次。并且不期望这个函数任务被重复调用塞进任务数据库。那么可以通过查询任务数据库来做到这一点

可以看到DBT这个库关于任务的一部分定义如下,

@python_2_unicode_compatible
class Task(models.Model):
    # the "name" of the task/function to be run
    task_name = models.CharField(max_length=255, db_index=True)
    # the json encoded parameters to pass to the task
    task_params = models.TextField()
    # a sha1 hash of the name and params, to lookup already scheduled tasks
    task_hash = models.CharField(max_length=40, db_index=True)

    verbose_name = models.CharField(max_length=255, null=True, blank=True)

    ...

而每一个任务的task_name的值都是由该函数所在的App名.文件名.函数名组成,三者之间由.连接。

那么只需要通过Django自带的ORM进行查询Task,例如:

from background_task import background
from background_task.models import Task

@background(schedule=60)
def task(user_id):
    do.somethings


def index(request):
    if Task.objects.filter(task_name = "Clrawer.views.task").exists():
        pass
    else:
        task(2, repeat=60*60*6)

就可以在index这个视图函数被调用的时候,通过查询任务是否已经被加进了任务队列,来选择是否调用task函数。

如果你觉得本文值得,不妨赏杯茶
没有上一篇
监听文件事件