暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

任务分发系统设计(二)

剽悍的派森先生 2021-06-24
1170

任务分发系统设计(一)中我们实现了一个基本任务分发系统:通过调用 recreate_task_pool 方法实现创建任务池;调用 assign_tasks 方法分配任务;最后调用 tasks_done 方法完成任务。这三个方法使用过程中需要小心的一点是数据一致性问题,因为它们都是在处理同一份数据。至于创建任务池的时间可以是在线实时生成的,也可以是离线定时生成,只要不丢失数据即可。

为了防止不同方法之间的干扰,我们需要实现一个分布式锁来解决这个问题。下面我们就用 Redis 来实现一个简易的分布式锁。

    class FunctionLock(object):
    TIMEOUT = 60 # 锁过期超时


    @classmethod
    def gen_name(cls, key):
    return "{prefix}:{key}".format(prefix=cls.__name__, key=key)


    @classmethod
    def get_lock(cls, name, timeout=0):
           return cod.set(name, "1", ex=timeout or cls.TIMEOUT, nx=True)


    @classmethod
    def acquire(cls, key, timeout=0):
    name = cls.gen_name(key)
    return cls.get_lock(name, timeout=timeout)


    @classmethod
    def release(cls, key):
    name = cls.gen_name(key)
    cod.delete(name)




    def synchronized(prefix, local=0, timeout=60):
    def decorator(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
    key = prefix + str(args[local])
    if not FunctionLock.acquire(key, timeout=timeout):
                   raise RuntimeError("PROCESSING LOCKED")
    try:
    return func(*args, **kwargs)
    finally:
    FunctionLock.release(id)
    return wrapper
    return decorator
    复制

    简单介绍一下实现原理。借用 Redis 的 set 方法,加入了 nx(not exist)参数,可以保证如果已有 key 存在,则函数不会调用成功,也就是只有一个客户端能持有锁,满足互斥性。其次,由于我们对锁设置了 ex(expire)过期时间,即使锁的持有者后续发生崩溃而没有解锁,锁也会因为到了过期时间而自动解锁(即 key 被删除),不会发生死锁。

    使用的时候非常方便。

      class TaskDispenser(object):
      key = "_dispenser_"


      def __str__(self):
      return self.pool_key

      def __init__(self, pool_key, router_key, bucket_prefix):
      self.pool_key = pool_key
      self.task_pool = TaskPool(pool_key)
      self.task_router = TaskRouter(router_key, bucket_prefix)
      @synchronized(key)
      def recreate_task_pool(self, mapping):
      """构建任务池
      :param mapping: dict, {value: score}
      """
      buckets = self.task_router.buckets()


      # 收集已经分配的任务
      assigned_flow_ids = []
      for name in buckets:
      tb = TaskBucket(name)
      assigned_flow_ids.extend(tb.peek_all())


      # 移除已经分配的任务
      for fid in assigned_flow_ids:
      mapping.pop(fid, None)


      # 重构任务池
      self.task_pool.clear()
      self.task_pool.add_many(mapping)

      @synchronized(key)
      def assign_tasks(self, limit, worker):
      """任务分配
      :param limit: int, 任务数量
      :param worker: str, 任务处理者
      :return: list[int]
            """
      bucket_name = self.task_router.get(worker)
      if not bucket_name:
      bucket_name = self.task_router.add(worker)


      tb = TaskBucket(bucket_name)


      tasks = tb.peek_all()
      size = len(tasks)
      if size == limit:
      return tasks


      # 分配任务时需要先检查自己的桶里是否有未处理的任务
      # 并且确认数量是否满足要求
      if size < limit:
      # 少补
      new_tasks = self.task_pool.assign(limit-size)
      if new_tasks:
      tb.add(new_tasks)
      tasks.extend(new_tasks)
      else:
      # 多不退,通过下次重建任务池的方式重新分配
      for _ in range(size-limit):
      tb.pop(reverse=True)
      tasks = tb.peek_all()


      return tasks


      def check_tasks(self, worker, tasks):
           """校验任务
      :param worker: str, 任务处理者
      :param tasks: list, 需要校验的任务
      :return: bool
          """
      bucket_name = self.task_router.get(worker)
      if not bucket_name:
      return False


      tb = TaskBucket(bucket_name)
      bucket_tasks = tb.peek_all()


      return set(bucket_tasks) == set(tasks)


      @synchronized(key)
      def tasks_done(self, worker, tasks):
      """任务完成"""
      bucket_name = self.task_router.get(worker)
      if not bucket_name:
      return False
      tb = TaskBucket(bucket_name)
      tb.clear()
      return True
      复制

      通过装饰器的方式把以上三个方法全部加上同步处理,使得这三个方法可以在并发的情况下顺序执行,从而保证数据一致性。

      文章转载自剽悍的派森先生,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

      评论