暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

任务分发系统设计(一)

剽悍的派森先生 2021-06-24
1561

场景:在一个系统中有持续不断的任务产生,并且需要尽快处理。

解析:由于任务量巨大,对于下游处理者就有很多要求:

  1. 分配的任务不能重复;

  1. 处理者可以动态扩缩容;

  1. 任务处理超时重新分配。

方案:一个任务分发系统可以看成一个复杂一点的秒杀系统。拆解来看,主要有三部分组成:任务分发、延时队列以及分布式锁。

本文主要介绍任务分发模块。

通常来说,对于任务处理都会有一定的优先级规则,比如时间维度。我们知道 Redis 中的有序集合 zset 是可以根据 score 进行排序。我们刚好可以借助 zset 来实现一个任务池。

    def dict_slice(mapping, start, end):
    """切分字典
    :param mapping: 初始字典
    :param start: 其实位置,包含
    :param end: 结束位置,不包含
    :return: dict
    """
    keys = mapping.keys()
    return {
    k: mapping[k]
    for k in keys[start:end]
    }



    class TaskPool(object):
    """任务池"""
    BIG_TASK_SIZE = 500


    def __init__(self, key):
    self.key = key


    def add(self, task, priority):
    """添加一个任务到任务池
    :param task: str/int, 任务名称
    :param priority: float/int, 任务优先级
    """
    client.zadd(self.key, {str(task): priority})


    def clear(self):
    return client.delete(self.key)


    def add_many(self, mapping):
    """一次添加多个任务
    :param mapping: dict, {task1: priority1, task2: priority2}
    """
    length = len(mapping)
    if length > self.BIG_TASK_SIZE:
    for start in range(0, length, self.BIG_TASK_SIZE):
    piece = dict_slice(mapping, start, start+self.BIG_TASK_SIZE)
    client.zadd(self.key, piece)
    else:
    client.zadd(self.key, mapping)


    def assign(self, size=20, factory=int):
    """分配任务,默认一次分配 20 个
    :param size: int, 任务数量
    :param factory: callable, 格式化
    :return: list, ['4', '3', '2']
    """
    pipe = client.pipeline()
    pipe.zrange(self.key, 0, size - 1)
    pipe.zremrangebyrank(self.key, 0, size - 1)
    tasks = pipe.execute()[0]
           return map(factory, tasks)
    复制

    当有了一个任务池之后,我们就可以每次从这个任务池中申请一定数量的任务进行处理。

      class TaskBucket(object):
      """分配到个人的任务桶"""


      def __init__(self, name, exp):
      self.name = name
      self.exp = exp


      def add(self, tasks):
      """添加任务
      :param tasks: list, [1,2,3]
      """
      if not tasks:
      return
      client.rpush(self.name, *tasks)
      self.expire(self.exp)


      def expire(self, time):
      """设置过期时间"""
      return client.expire(self.name, time)


      def clear(self):
      """清空"""
      return client.delete(self.name)


      def pop(self, reverse=False):
      """从自己的列表中获取一个任务"""
      if not client.exists(self.name):
      return
      self.expire(self.exp)
      if reverse:
      return client.rpop(self.name)
      return client.lpop(self.name)


      def peek_all(self, factory=int):
      """查看所有任务
      :param factory: callable,类型转换方法
      :return: list[factory(val)]
      """
      if not client.exists(self.name) or client.ttl(self.name) <= 0:
      logging.info('key {} not exists or ttl <= 0'.format(self.name))
      return []
      return map(factory, client.lrange(self.name, 0, -1))
      复制

      我们还需要一个中间关联者,关联起池子跟桶的关系。

        class TaskRouter(object):
        """任务路由器"""


        def __init__(self, key, prefix):
        self.key = key
        self.prefix = prefix


        def make_task_name(self, val):
        return self.prefix.format(val)


        def add(self, name, task_name=None):
        """添加处理人
        :param name: str
        :param task_name: str
        :return: str, 任务桶名称
        """
        if task_name is None:
        task_name = self.make_task_name(name)
        client.hset(self.key, name, task_name)
        return task_name


        def clear(self):
        """清空"""
        return client.delete(self.key)


        def get(self, name):
        """根据名称获取任务桶名称
        :param name: str, 处理人名称
        :return: str/None, 任务桶名称
        """
        return client.hget(self.key, name)


        def buckets(self):
        """查看所有的任务桶"""
        return client.hvals(self.key)
        复制

        当我们有了任务池 TaskPool、任务桶 TaskBucket 以及任务路由器 TaskRouter,我们就可以实现最终的任务分发器。

          class TaskDispenser(object):


          def __init__(self, pool_key, router_key, bucket_prefix):
          self.task_pool = TaskPool(pool_key)
          self.task_router = TaskRouter(router_key, bucket_prefix)


          def recreate_task_pool(self, mapping):
          """构建任务池
          :param mapping: dict, {value: score}
          """
          buckets = self.task_router.buckets()


          # 收集已经分配的任务
          assigned_flow_ids = []
          for name in buckets:
          tb = TaskBucket(name)
          assigned_flow_ids.extend(tb.peek_all())


          # 移除已经分配的任务
          for fid in assigned_flow_ids:
          mapping.pop(fid, None)


          # 重构任务池
          self.task_pool.clear()
          self.task_pool.add_many(mapping)


          def assign_tasks(self, limit, worker):
          """任务分配
          :param limit: int, 任务数量
          :param worker: str, 任务处理者
          :return: list[int]
          """
          bucket_name = self.task_router.get(worker)
          if not bucket_name:
          bucket_name = self.task_router.add(worker)


          tb = TaskBucket(bucket_name)


          tasks = tb.peek_all()
          size = len(tasks)
          if size == limit:
          return tasks


          # 分配任务时需要先检查自己的桶里是否有未处理的任务
          # 并且确认数量是否满足要求
          if size < limit:
          # 少补
          new_tasks = self.task_pool.assign(limit-size)
          if new_tasks:
          tb.add(new_tasks)
          tasks.extend(new_tasks)
          else:
          # 多不退,通过下次重建任务池的方式重新分配
          for _ in range(size-limit):
          tb.pop(reverse=True)
          tasks = tb.peek_all()


          return tasks


          def check_tasks(self, worker, tasks):
          """校验任务
          :param worker: str, 任务处理者
          :param tasks: list, 需要校验的任务
          :return: bool
          """
          bucket_name = self.task_router.get(worker)
          if not bucket_name:
          return False


          tb = TaskBucket(bucket_name)
          bucket_tasks = tb.peek_all()


          return set(bucket_tasks) == set(tasks)


          def tasks_done(self, worker, tasks):
          """任务完成"""
          bucket_name = self.task_router.get(worker)
          if not bucket_name:
          return False
          tb = TaskBucket(bucket_name)
          tb.clear()
          return True
          复制

          我们通过调用 recreate_task_pool 方法实现创建任务池,调用 assign_tasks 方法分配任务,最后调用 tasks_done 方法完成任务。

          整个过程看起来似乎没有什么问题。但是试想一下,在并发场景下会不会有问题呢。由于 recreate_task_pool、assign_tasks 以及 tasks_done 方法同时在操作相同的数据源,是必然会有并发问题的,也就是数据不一致问题。

          那么该如何解决呢,且听下回分解。

          文章转载自剽悍的派森先生,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

          评论