Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move subtask queueing to workers #2631

Open
wjsi opened this issue Jan 13, 2022 · 0 comments · May be fixed by #2627
Open

Move subtask queueing to workers #2631

wjsi opened this issue Jan 13, 2022 · 0 comments · May be fixed by #2627

Comments

@wjsi
Copy link
Member

wjsi commented Jan 13, 2022

Motivations

Mars now handles all subtasks within a single supervisor. When the number of subtasks or workers is large, there can be huge load on a supervisor node. What's more, scheduling merely on the supervisor side brings considerable latency between worker tasks. If we move subtask scheduling to workers, these issues can be alleviated.

Overall design

This design enables workers to schedule subtasks submitted to it, while supervisors only act as batch assigners and coordinators. Subtasks created from TaskService will be assigned and pushed into workers. Then inside workers, subtasks are then queued and executed given priority assigned to them. Results are then fed back to supervisors for further activation of successors.

Subtask submission

When subtasks are generated, the assigned supervisor assignes and pushes all ready subtasks to corresponding workers. Unlike previous design, the supervisor no longer decides how many subtasks it need to submit to workers given global slot information, neither did it maintain queues of subtasks. Workers decide and run subtasks given their own storage, leading to faster reaction speed and narrower gap between execution.

Subtask queueing

Inside workers, we use queues with latches to order and control tasks. The queue can be seen as a combination of a priority queue deciding orders of subtasks with a semaphore deciding the number of subtasks to output. The default value of the semaphore is equal to the number of slots of given bands. The basic API of the queue is shown below:

class SubtaskPriorityQueueActor(mo.StatelessActor):
    @mo.extensible
    def put(self, subtask_id: str, band_name: str, priority: Tuple):
        """
        Put a subtask ID into the queue.
        """

    @mo.extensible
    def update_priority(self, subtask_id: str, band_name: str, priority: Tuple):
        """
        Update priority of given subtask.
        """

    async def get(self, band_name: str):
        """
        Get an item from the queue and returns the subtask ID
        and slot ID. Will wait when the queue is empty, or
        the value of semaphore is zero.
        """

    def release_slot(self, subtask_id: str, errors: str = "raise"):
        """
        Return the slot occupied by given subtask and increase
        the value of the semaphore.
        """

    @mo.extensible
    def remove(self, subtask_id: str):
        """
        Remove a subtask from the queue. If the subtask is occupying
        some slot, the slot is also released.
        """

More APIs can be added to implement operations like yield_slot.

To parallelize IO and CPU cost, two queues are set up inside the worker.

  • PrepareQueue: queue of submitted subtasks. A prepare task consumes items of the queue and do quora allocation as well as data moving. When a new subtask starts execution, its slot is released.
  • ExecutionQueue: queue of prepared subtasks. An execution task consumes items of the queue and do execution. When a subtask finishes execution, its slots are then released.

Successor forwarding

When a subtask finishes execution and we need to choose another subtask to run, we have two kinds of subtasks to schedule: subtasks already enqueued in ExecutionQueue, and subtasks whose predecessors are just filled by the execution finished just now. The latter group often have higher priority but without data preparation, and may not be scheduled because of latencies brought by queues. We design a successor forwarding mechanism to resolve this condition.

When pushing ready subtasks to scheduling service, its successors are also pushed for cache. Scheduling service decides and pushes subtasks to correct workers. Subtasks whose successors can be forwarded must satisfy conditions below:

  1. Some of the successors are cached in workers

  2. All dependent data of successors are already stored in workers, thus we do not need to consult Meta or Storage service for remote data retrival

  3. There is enough quota for the successor

When all conditions are met, PrepareQueueis skipped and the subtask is inserted into ExecutionQueuedirectly. When the slot is released, the successor will be scheduled as soon as possible.

Possible impacts

Autoscale

Current autoscale is based on queueing mechanism at supervisor side, which must be redesigned based on worker scheduling.

Fault Injection

As worker side of scheduling service is rewritten, fault injection need to adapt to that change.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Distributed
  
To do
Development

Successfully merging a pull request may close this issue.

2 participants