Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Creating a pool with workers #173

Open
ebeyrent opened this issue Mar 28, 2023 · 3 comments
Open

Creating a pool with workers #173

ebeyrent opened this issue Mar 28, 2023 · 3 comments
Labels

Comments

@ebeyrent
Copy link

ebeyrent commented Mar 28, 2023

I'm slightly baffled by what's happening with my code. I have a number of message queues, and each queue specifies how many workers should be created to process it. Based on that configuration value, I create n instances of my queue worker and add them to the pool. However, when I call getWorkerCount(), I get the wrong value.

// Create a worker pool.
$pool = new ContextWorkerPool(MessageQueueConfig::MAX_WORKERS);
$worker_count = 6;
if ($worker_count < 1) {
  $worker_count = 1;
}
for ($i = 0; $i < $worker_count; $i++) {
  // Create an instance of the queue-specific worker plugin.
  $instance = $this->queueWorkerManager->createInstance('message_queue:' . $queue_config->id());
  dump('Instance created');
  $pool->submit(
    $instance
  );

}
$this->processes[$queue_config->id()] = $pool;
dump('Pool id = ' . $queue_config->id(),  'Worker count = ' . $pool->getWorkerCount(), 'Idle count = ' . $pool->getIdleWorkerCount());

This gives me:

^ "Instance created"
^ "Instance created"
^ "Instance created"
^ "Instance created"
^ "Instance created"
^ "Instance created"
^ "Pool id = immediate"
^ "Limit = 5"
^ "Worker count = 3"
^ "Idle count = 0"

So, definitely creating 6 instances, my limit is 5, so I expected getWorkerCount() to return a value of 5, but it doesn't. It seems that if I create one or two instances, the count is correct, but anything above that only causes that method to return 3.

I'd be grateful for any insight into what's happening!

@kelunik
Copy link
Member

kelunik commented Mar 28, 2023

submit submits tasks to the worker pool, but it doesn't create workers. If one of the existing workers is ready to accept a new task by the time submit is called again, an existing worker will be reused instead of creating a new worker.

@ebeyrent
Copy link
Author

ebeyrent commented Mar 29, 2023

I may be struggling conceptually with how this works, and am not finding the documentation to be useful.

If I understand correctly, when I create a new ContextWorkerPool, it doesn't create workers. How then do workers get created?

I think I was also misunderstanding the submit() method actually does - I think that the task has the code that does the work, and the worker performs that work by calling the run() method. Is that correct?

Conceptually speaking, I have three queues (high_priority, medium_priority, low). For each queue, I want to create a single process for each queue (but running concurrently) that connects to the queue, claims an item, and then submits that item to a worker pool that's running multiple workers. To me, that suggests one worker pool for the task that connects to a queue, and that task would create a pool for processing the items in the queue that it's connected to.

Do I have that right?

@kelunik
Copy link
Member

kelunik commented Mar 29, 2023

Improving the documentation is on our list. If you have specific suggestions please open a separate issue or even better a pull request with concrete changes.

You're right, creating the pool does not immediately create the workers. They're created as needed when new tasks are submitted to the pool. If none of the existing workers can be used and the pool is below its limit, it'll create a new worker.

I think I was also misunderstanding the submit() method actually does - I think that the task has the code that does the work, and the worker performs that work by calling the run() method. Is that correct?

Yes, that's correct.

If you have three queues, I wouldn't use a worker pool for the queue watching. If you need to use blocking IO for the queue system, you can directly use the context API and create three separate contexts. If the queue IO is non-blocking, you can do it all in the parent process itself and only have three pools there to which you submit tasks based on the priority.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Development

No branches or pull requests

2 participants