Where/When shall we initialize queues #1962
-
Hi, we are going to adopt redis queues in our company and we couldn't find enough documentation on when and where to define the queues in the projects Shall we instantiate singletons and reference them across our apis or initialize a new queue in each api? |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 1 reply
-
You don't really need to define queues for RQ to work: it'll create and perform book-keeping on its own as soon as you start adding a job to a queue. If it doesn't exist on the RQ side, it'll create it properly. |
Beta Was this translation helpful? Give feedback.
-
Thank you for replying @dams I use FastApi and it works quite differently from Flask from its asynchronous nature. Also since requests run concurrently, I believe it is dangerous to use a queue global variable. Here is what I do now, from typing import Annotated
import redis
import rq
from fastapi import Depends
from app.cache.client import AsyncRedisClient
from app.core.config import settings
from app.schemas.enums import WorkerQueue
sync_redis_pool = redis.ConnectionPool.from_url(str(settings.REDIS_URI))
def get_sync_redis_conn() -> redis.Redis:
return redis.Redis(connection_pool=sync_redis_pool)
def get_mq_low() -> rq.Queue:
return rq.Queue(WorkerQueue.LOW, connection=get_sync_redis_conn())
def get_mq_default() -> rq.Queue:
return rq.Queue(WorkerQueue.DEFAULT, connection=get_sync_redis_conn())
def get_mq_high() -> rq.Queue:
return rq.Queue(WorkerQueue.HIGH, connection=get_sync_redis_conn())
""" Annotated Dependency """
AsyncRedisClientDep = Annotated[AsyncRedisClient, Depends()]
SyncRedisClientDep = Annotated[redis.Redis, Depends(get_sync_redis_conn)]
MQLow = Annotated[rq.Queue, Depends(get_mq_low)]
MQDefault = Annotated[rq.Queue, Depends(get_mq_default)]
MQHigh = Annotated[rq.Queue, Depends(get_mq_high)] I basically use dependency injected to initialize a queue object with its own redis connection within each API Let me know what you think |
Beta Was this translation helpful? Give feedback.
For future visitors:
Instantiating a new queue when needed in your APIs with a redis connection from a connection pool is the best option imo. Having global queues using the same connection over and over again for the lifetime of the server is not a good idea. At the end of the day, you won't face the overhead of a new connection, the pooler would just give you a cached live connection and if any connection fails, the pooler will handle it aswell.