Inhzus

Tech blog of inhzus.

Celery 集成

2019-10-15


这篇聊聊在微信服务号后端项目中集成 Celery。

起因

某天逛 V2ex 看到一个关于微信 access_token 的讨论。首先官方文档的介绍如下:

建议公众号开发者使用中控服务器统一获取和刷新access_token,其他业务逻辑服务器所使用的access_token均来自于该中控服务器,不应该各自去刷新,否则容易造成冲突,导致access_token覆盖而影响业务。

之前,没有考虑到多进程的问题,AccessToken 如下:

class AccessToken:
    token = ''
    last_time = 0
    
    @static_method
    def get():
        if time.time() - last_time < 7200:
            return AccessToken.token
        t = request_api_and_get_access_token()
        AccessToken.token = t
        return t

由于在生产环境下使用 gunicorn 启动了多个进程,上述逻辑出现问题。

基于原有形式考虑到的合理的方法是:将 token 与过期时间存储于 redis,进程每次尝试获取 token 时,首先阻塞式获取 redis 锁,若发现过期,即进行更新。

但为了尽量避免锁,且方便之后的业务扩展(即之前构想在 Flask 项目中也可以进行手动调用),当然同时也是为了了解新知识,决定使用官方推荐的方法。

在我们的单机环境下,中控服务器显然不太现实。一开始打算使用 Flask 周边的一些任务调度插件进行处理,常用的如 APScheduler,但 APScheduler 的配置方式我不是很喜欢,且它只能支持定时任务,灵活性太小,而 Celery 的功能较为全面,支持消息队列的形式,因此决定使用 Celery。

使用

Demo

首先有一个定时任务,就是定时刷新 access_token 至 redis,这一时间间隔应略小于 2 小时。

token_cache = MemoryCache('access_token', 7200)

cron = Celery(
    'tasks',
    broker=getenv('CELERY_BROKER_URL', 'redis://redis:6379/1'),
    backend=getenv('CELERY_RESULT_BACKEND', 'redis://redis:6379/1'))

@cron.on_after_configure.connect
def setup_periodic_tasks(**_):
    cron.add_periodic_task(2 * 60 * 60 - 5 * 60, refresh_access_token.s())

@cron.task
def refresh_access_token():
    token = get_access_token_directly(getenv('API_KEY'), getenv('API_SECRET'))
    print(f'token: {token}')
    if token:
        token_cache.set('', token=token)
    return token

需要注意的地方有:

同时,这一文件虽然在项目文件夹下,但 Celery 并不在 Flask 中启动(虽然有 flask_celery 库,但他的抽象功能比较弱,官方也并没有推荐这一插件,不进行考虑)。 相反,是在命令行中进行,具有比较高的独立性,在 docker-compose 中也将它独立出来,比较方便单独重启任务和查看日志。

调用

Flask 可以将该文件作为模块 import,celery.task reference 中介绍了具体的调用方式和参数。

部署

目前 celery==4.3.0, kombu==4.6.5 版本会遇到 InconsistencyError,将 kombu 降级至 4.5.0 后解决。在 supervisord 配置文件中配置执行 command:

celery -A berater.misc.tasks.cron worker -B -E

monitor 决定使用 celery-flower,进行配置后可以有 dashboard 进行监控。

需要注意: