场景:在一个系统中有持续不断的任务产生,并且需要尽快处理。
解析:由于任务量巨大,对于下游处理者就有很多要求:
分配的任务不能重复;
处理者可以动态扩缩容;
任务处理超时重新分配。
方案:一个任务分发系统可以看成一个复杂一点的秒杀系统。拆解来看,主要有三部分组成:任务分发、延时队列以及分布式锁。
本文主要介绍任务分发模块。
通常来说,对于任务处理都会有一定的优先级规则,比如时间维度。我们知道 Redis 中的有序集合 zset 是可以根据 score 进行排序。我们刚好可以借助 zset 来实现一个任务池。
def dict_slice(mapping, start, end):
"""切分字典
:param mapping: 初始字典
:param start: 其实位置,包含
:param end: 结束位置,不包含
:return: dict
"""
keys = mapping.keys()
return {
k: mapping[k]
for k in keys[start:end]
}
class TaskPool(object):
"""任务池"""
BIG_TASK_SIZE = 500
def __init__(self, key):
self.key = key
def add(self, task, priority):
"""添加一个任务到任务池
:param task: str/int, 任务名称
:param priority: float/int, 任务优先级
"""
client.zadd(self.key, {str(task): priority})
def clear(self):
return client.delete(self.key)
def add_many(self, mapping):
"""一次添加多个任务
:param mapping: dict, {task1: priority1, task2: priority2}
"""
length = len(mapping)
if length > self.BIG_TASK_SIZE:
for start in range(0, length, self.BIG_TASK_SIZE):
piece = dict_slice(mapping, start, start+self.BIG_TASK_SIZE)
client.zadd(self.key, piece)
else:
client.zadd(self.key, mapping)
def assign(self, size=20, factory=int):
"""分配任务,默认一次分配 20 个
:param size: int, 任务数量
:param factory: callable, 格式化
:return: list, ['4', '3', '2']
"""
pipe = client.pipeline()
pipe.zrange(self.key, 0, size - 1)
pipe.zremrangebyrank(self.key, 0, size - 1)
tasks = pipe.execute()[0]
return map(factory, tasks)
复制
当有了一个任务池之后,我们就可以每次从这个任务池中申请一定数量的任务进行处理。
class TaskBucket(object):
"""分配到个人的任务桶"""
def __init__(self, name, exp):
self.name = name
self.exp = exp
def add(self, tasks):
"""添加任务
:param tasks: list, [1,2,3]
"""
if not tasks:
return
client.rpush(self.name, *tasks)
self.expire(self.exp)
def expire(self, time):
"""设置过期时间"""
return client.expire(self.name, time)
def clear(self):
"""清空"""
return client.delete(self.name)
def pop(self, reverse=False):
"""从自己的列表中获取一个任务"""
if not client.exists(self.name):
return
self.expire(self.exp)
if reverse:
return client.rpop(self.name)
return client.lpop(self.name)
def peek_all(self, factory=int):
"""查看所有任务
:param factory: callable,类型转换方法
:return: list[factory(val)]
"""
if not client.exists(self.name) or client.ttl(self.name) <= 0:
logging.info('key {} not exists or ttl <= 0'.format(self.name))
return []
return map(factory, client.lrange(self.name, 0, -1))
复制
我们还需要一个中间关联者,关联起池子跟桶的关系。
class TaskRouter(object):
"""任务路由器"""
def __init__(self, key, prefix):
self.key = key
self.prefix = prefix
def make_task_name(self, val):
return self.prefix.format(val)
def add(self, name, task_name=None):
"""添加处理人
:param name: str
:param task_name: str
:return: str, 任务桶名称
"""
if task_name is None:
task_name = self.make_task_name(name)
client.hset(self.key, name, task_name)
return task_name
def clear(self):
"""清空"""
return client.delete(self.key)
def get(self, name):
"""根据名称获取任务桶名称
:param name: str, 处理人名称
:return: str/None, 任务桶名称
"""
return client.hget(self.key, name)
def buckets(self):
"""查看所有的任务桶"""
return client.hvals(self.key)
复制
当我们有了任务池 TaskPool、任务桶 TaskBucket 以及任务路由器 TaskRouter,我们就可以实现最终的任务分发器。
class TaskDispenser(object):
def __init__(self, pool_key, router_key, bucket_prefix):
self.task_pool = TaskPool(pool_key)
self.task_router = TaskRouter(router_key, bucket_prefix)
def recreate_task_pool(self, mapping):
"""构建任务池
:param mapping: dict, {value: score}
"""
buckets = self.task_router.buckets()
# 收集已经分配的任务
assigned_flow_ids = []
for name in buckets:
tb = TaskBucket(name)
assigned_flow_ids.extend(tb.peek_all())
# 移除已经分配的任务
for fid in assigned_flow_ids:
mapping.pop(fid, None)
# 重构任务池
self.task_pool.clear()
self.task_pool.add_many(mapping)
def assign_tasks(self, limit, worker):
"""任务分配
:param limit: int, 任务数量
:param worker: str, 任务处理者
:return: list[int]
"""
bucket_name = self.task_router.get(worker)
if not bucket_name:
bucket_name = self.task_router.add(worker)
tb = TaskBucket(bucket_name)
tasks = tb.peek_all()
size = len(tasks)
if size == limit:
return tasks
# 分配任务时需要先检查自己的桶里是否有未处理的任务
# 并且确认数量是否满足要求
if size < limit:
# 少补
new_tasks = self.task_pool.assign(limit-size)
if new_tasks:
tb.add(new_tasks)
tasks.extend(new_tasks)
else:
# 多不退,通过下次重建任务池的方式重新分配
for _ in range(size-limit):
tb.pop(reverse=True)
tasks = tb.peek_all()
return tasks
def check_tasks(self, worker, tasks):
"""校验任务
:param worker: str, 任务处理者
:param tasks: list, 需要校验的任务
:return: bool
"""
bucket_name = self.task_router.get(worker)
if not bucket_name:
return False
tb = TaskBucket(bucket_name)
bucket_tasks = tb.peek_all()
return set(bucket_tasks) == set(tasks)
def tasks_done(self, worker, tasks):
"""任务完成"""
bucket_name = self.task_router.get(worker)
if not bucket_name:
return False
tb = TaskBucket(bucket_name)
tb.clear()
return True
复制
我们通过调用 recreate_task_pool 方法实现创建任务池,调用 assign_tasks 方法分配任务,最后调用 tasks_done 方法完成任务。
整个过程看起来似乎没有什么问题。但是试想一下,在并发场景下会不会有问题呢。由于 recreate_task_pool、assign_tasks 以及 tasks_done 方法同时在操作相同的数据源,是必然会有并发问题的,也就是数据不一致问题。
那么该如何解决呢,且听下回分解。
文章转载自剽悍的派森先生,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。