Celery:在特定时间间隔后执行任务

django

1个回答

写回答

gnwn

2025-07-09 19:47

+ 关注

Python
Python

使用 Celery 可以很方便地在特定时间间隔后执行任务。Celery 是一个强大的分布式任务队列,它基于消息中间件实现任务的异步处理。在实际的开发中,我们经常会遇到需要延迟执行任务的场景,比如发送邮件、处理数据等。使用 Celery,我们可以轻松地实现这些功能。

延迟任务执行

使用 Celery 延迟执行任务非常简单。首先,我们需要安装 Celery 并配置消息中间件,例如 RabbitMQ 或 Redis。然后,我们可以定义一个任务函数,并使用 apply_async 方法指定任务的执行时间。下面是一个发送邮件的示例:

Python

from celery import Celery

app = Celery('tasks', broker='pyamqp://guest@localhost//')

@app.task

def send_emAIl(to, subject, body):

# 发送邮件代码

pass

send_emAIl.apply_async(args=('example@example.com', 'Hello', 'This is a test emAIl'), countdown=3600)

在上面的示例中,send_emAIl 函数被装饰为 Celery 的任务,并使用 apply_async 方法指定了任务的执行时间为当前时间加上 3600 秒(即延迟一小时执行)。这样,我们就可以在特定时间后执行发送邮件的任务了。

定时任务调度

除了延迟执行任务,Celery 还支持定时任务调度。我们可以使用 Celery 自带的 beat 功能,通过配置定时任务的调度规则来实现定时执行任务的功能。下面是一个定时清理缓存的示例:

Python

from celery import Celery

from datetime import timedelta

app = Celery('tasks', broker='pyamqp://guest@localhost//')

app.conf.beat_schedule = {

'clear_cache': {

'task': 'tasks.clear_cache',

'schedule': timedelta(hours=1),

},

}

@app.task

def clear_cache():

# 清理缓存代码

pass

在上面的示例中,我们定义了一个名为 clear_cache 的定时任务,它将会每隔一小时执行一次。我们只需要在 Celery 启动时运行 celery -A tasks beat 命令,就可以启动定时任务调度器了。

通过使用 Celery,我们可以轻松地在特定时间间隔后执行任务。无论是延迟执行任务还是定时任务调度,Celery 都提供了简洁而强大的功能。在实际的开发中,我们可以根据具体的需求灵活运用 Celery,提高系统的性能和可靠性。

案例代码

Python

from celery import Celery

from datetime import timedelta

app = Celery('tasks', broker='pyamqp://guest@localhost//')

app.conf.beat_schedule = {

'clear_cache': {

'task': 'tasks.clear_cache',

'schedule': timedelta(hours=1),

},

}

@app.task

def clear_cache():

# 清理缓存代码

pass

以上是使用 Celery 实现定时任务调度的案例代码。在这个例子中,我们定义了一个名为 clear_cache 的任务,它将会每隔一小时执行一次清理缓存的操作。通过配置 Celery 的定时任务调度器,我们可以轻松地实现这个功能。

参考资料

- Celery 官方文档:https://docs.celeryproject.org/

- Celery GitHub 仓库:https://github.com/celery/celery

举报有用(4分享收藏

Copyright © 2025 IZhiDa.com All Rights Reserved.

知答 版权所有 粤ICP备2023042255号