在任务分发系统设计(一)中我们实现了一个基本任务分发系统:通过调用 recreate_task_pool 方法实现创建任务池;调用 assign_tasks 方法分配任务;最后调用 tasks_done 方法完成任务。这三个方法使用过程中需要小心的一点是数据一致性问题,因为它们都是在处理同一份数据。至于创建任务池的时间可以是在线实时生成的,也可以是离线定时生成,只要不丢失数据即可。
为了防止不同方法之间的干扰,我们需要实现一个分布式锁来解决这个问题。下面我们就用 Redis 来实现一个简易的分布式锁。
class FunctionLock(object):
TIMEOUT = 60 # 锁过期超时
@classmethod
def gen_name(cls, key):
return "{prefix}:{key}".format(prefix=cls.__name__, key=key)
@classmethod
def get_lock(cls, name, timeout=0):
return cod.set(name, "1", ex=timeout or cls.TIMEOUT, nx=True)
@classmethod
def acquire(cls, key, timeout=0):
name = cls.gen_name(key)
return cls.get_lock(name, timeout=timeout)
@classmethod
def release(cls, key):
name = cls.gen_name(key)
cod.delete(name)
def synchronized(prefix, local=0, timeout=60):
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
key = prefix + str(args[local])
if not FunctionLock.acquire(key, timeout=timeout):
raise RuntimeError("PROCESSING LOCKED")
try:
return func(*args, **kwargs)
finally:
FunctionLock.release(id)
return wrapper
return decorator
复制
简单介绍一下实现原理。借用 Redis 的 set 方法,加入了 nx(not exist)参数,可以保证如果已有 key 存在,则函数不会调用成功,也就是只有一个客户端能持有锁,满足互斥性。其次,由于我们对锁设置了 ex(expire)过期时间,即使锁的持有者后续发生崩溃而没有解锁,锁也会因为到了过期时间而自动解锁(即 key 被删除),不会发生死锁。
使用的时候非常方便。
class TaskDispenser(object):
key = "_dispenser_"
def __str__(self):
return self.pool_key
def __init__(self, pool_key, router_key, bucket_prefix):
self.pool_key = pool_key
self.task_pool = TaskPool(pool_key)
self.task_router = TaskRouter(router_key, bucket_prefix)
@synchronized(key)
def recreate_task_pool(self, mapping):
"""构建任务池
:param mapping: dict, {value: score}
"""
buckets = self.task_router.buckets()
# 收集已经分配的任务
assigned_flow_ids = []
for name in buckets:
tb = TaskBucket(name)
assigned_flow_ids.extend(tb.peek_all())
# 移除已经分配的任务
for fid in assigned_flow_ids:
mapping.pop(fid, None)
# 重构任务池
self.task_pool.clear()
self.task_pool.add_many(mapping)
@synchronized(key)
def assign_tasks(self, limit, worker):
"""任务分配
:param limit: int, 任务数量
:param worker: str, 任务处理者
:return: list[int]
"""
bucket_name = self.task_router.get(worker)
if not bucket_name:
bucket_name = self.task_router.add(worker)
tb = TaskBucket(bucket_name)
tasks = tb.peek_all()
size = len(tasks)
if size == limit:
return tasks
# 分配任务时需要先检查自己的桶里是否有未处理的任务
# 并且确认数量是否满足要求
if size < limit:
# 少补
new_tasks = self.task_pool.assign(limit-size)
if new_tasks:
tb.add(new_tasks)
tasks.extend(new_tasks)
else:
# 多不退,通过下次重建任务池的方式重新分配
for _ in range(size-limit):
tb.pop(reverse=True)
tasks = tb.peek_all()
return tasks
def check_tasks(self, worker, tasks):
"""校验任务
:param worker: str, 任务处理者
:param tasks: list, 需要校验的任务
:return: bool
"""
bucket_name = self.task_router.get(worker)
if not bucket_name:
return False
tb = TaskBucket(bucket_name)
bucket_tasks = tb.peek_all()
return set(bucket_tasks) == set(tasks)
@synchronized(key)
def tasks_done(self, worker, tasks):
"""任务完成"""
bucket_name = self.task_router.get(worker)
if not bucket_name:
return False
tb = TaskBucket(bucket_name)
tb.clear()
return True
复制
通过装饰器的方式把以上三个方法全部加上同步处理,使得这三个方法可以在并发的情况下顺序执行,从而保证数据一致性。
文章转载自剽悍的派森先生,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。
评论
相关阅读
数据库国产化替代深化:DBA的机遇与挑战
代晓磊
1244次阅读
2025-04-27 16:53:22
2025年4月国产数据库中标情况一览:4个千万元级项目,GaussDB与OceanBase大放异彩!
通讯员
726次阅读
2025-04-30 15:24:06
国产数据库需要扩大场景覆盖面才能在竞争中更有优势
白鳝的洞穴
588次阅读
2025-04-14 09:40:20
【活动】分享你的压箱底干货文档,三篇解锁进阶奖励!
墨天轮编辑部
502次阅读
2025-04-17 17:02:24
一页概览:Oracle GoldenGate
甲骨文云技术
475次阅读
2025-04-30 12:17:56
GoldenDB数据库v7.2焕新发布,助力全行业数据库平滑替代
GoldenDB分布式数据库
470次阅读
2025-04-30 12:17:50
优炫数据库成功入围新疆维吾尔自治区行政事业单位数据库2025年框架协议采购!
优炫软件
359次阅读
2025-04-18 10:01:22
给准备学习国产数据库的朋友几点建议
白鳝的洞穴
274次阅读
2025-05-07 10:06:14
国产数据库图谱又上新|82篇精选内容全览达梦数据库
墨天轮编辑部
273次阅读
2025-04-23 12:04:21
XCOPS广州站:从开源自研之争到AI驱动的下一代数据库架构探索
韩锋频道
270次阅读
2025-04-29 10:35:54