上一个示例简单展示了关于分布式进程一个示例,但是部分的一些细节没梳理好,这里再重新梳理一次。
什么是分布式系统?
定义(来自百度百科):
分布式系统(distributed system)是建立在网络之上的软件系统。正是因为软件的特性,所以分布式系统具有高度的内聚性和透明性。因此,网络和分布式系统之间的区别更多的在于高层软件(特别是操作系统),而不是硬件。
分布式系统有两大特点:
内聚性:每一个数据库节点高度自治,有本地的数据库管理系统。 透明性:每一个数据库分布节点对于用户的应用来说都是透明的,是无法区分本地还是远程的。
python分布式进程模式
上上一个示例其实可以总结出来下面的这个模式是:
MasterWork(主节点/服务节点)
SlaveWork(从节点/计算节点)
其中:
MasterWork主要负责处理结果回调处理(定义相关的队列并注册到网络上) TaskSendWork主要负责任务的委派发送,把需要处理任务丢到【发送任务Queue】通信队列 TaskHandleWork主要负责任务的消费处理,并且把处理结果丢回到我们【收结果的Queue】通信队列 MasterWork 和 TaskHandleWork 和TaskHandleWork之间的交互是通过Queue实现任务和结果共享。
python分布式进程模式中的QueueManager
QueueManager内部实现了Queue网络通信的处理。 QueueManager可以管理多个的Queue,所以相关队列需要做对应的名称和注册才可以使用。
BaseManager.register("tasks_queue", callable=self.get_task_queue)
BaseManager.register("results_queue", callable=self.get_result_queue)
分布式进程的流程:
MasterWork 端:
1:定义需要处理的任务对象(通常通过类的形式包装待处理的任务,示例中已省略) 2:定义MasterWork主类,内部用于实例化发送任务、接收结果的Queue消息队列对象。 3:把发送任务、接收结果的Queue两个队列注册到网络服务上。 4:设置MasterWork节点队列使用的IP、端口、密钥信息并启动服务进程。 5:循环的判断(可以阻塞形式,或限定超时的方式)等待接收结果的Queue的消息,如果队列有消息则进行获取结果信息。 TaskSendWork端:
1:连接到我们的服务节点上。 2:获取我们的发送任务队列对象,然后把待出的任务丢到【发送任务Queue】通信队列 TaskHandleWork端:
1:连接到我们的服务节点上。 2:获取我们发送任务队列对象,判断是否有任务委派,有则消费处理,然后把处理结果丢回到我们【接收结果的Queue】通信队列
完整三端示例:
MasterWork.py
#!/usr/bin/evn python
# -*- coding: utf-8 -*-
"""
-------------------------------------------------
文件名称 : master
文件功能描述 : 功能描述
创建人 : 小钟同学
-------------------------------------------------
修改描述-2021/5/10:
-------------------------------------------------
"""
from multiprocessing import Queue
from multiprocessing.managers import BaseManager
class MasterWork(object):
def __init__(self):
# 定义【发送任务Queue】通信队列
self.my_task_queue = Queue()
# 定义【接收结果的Queue】通信队列
self.my_result_queue = Queue()
# 定义获取队列-用户注册回调方法传入
def get_my_task(self):
return self.my_task_queue
# 定义获取队列-用户注册回调方法传入
def get_my_result(self):
return self.my_result_queue
def run(self):
# 队列注册到网上上
BaseManager.register("xiao_tasks_queue", callable=self.get_my_task)
BaseManager.register("xiao_results_queue", callable=self.get_my_result)
# 定义网络服务的对应的IP和端口和秘钥,并启动
mgr = BaseManager(address=('127.0.0.1', 8888), authkey=b'xiaozhong123456')
mgr.start()
# 获取相关注册到网络上的对象
# 获得通过网络访问的Queue对象:
# task = mgr.get_task_queue()
# result = mgr.get_result_queue()
tasks_list = mgr.xiao_tasks_queue()
results_list = mgr.xiao_results_queue()
# 等待我们的将结果消费完成之后的通知回调
while True:
while not tasks_list.empty():
job_result = results_list.get()
# job_result = results_list.get(timeout=10)
print("master get result: {}".format(job_result))
mgr.shutdown()
if __name__ == "__main__":
my_master = MasterWork()
my_master.run()
TaskSendWork.py
#!/usr/bin/evn python
# -*- coding: utf-8 -*-
"""
-------------------------------------------------
文件名称 : worker
文件功能描述 : 功能描述
创建人 : 小钟同学
-------------------------------------------------
修改描述-2021/5/10:
-------------------------------------------------
"""
import time
from multiprocessing.managers import BaseManager
class TaskSendWork(object):
def run(self):
BaseManager.register("xiao_tasks_queue")
BaseManager.register("xiao_results_queue")
mgr = BaseManager(address=('127.0.0.1', 8888), authkey=b'xiaozhong123456')
mgr.connect()
# 获取相关注册到网络上的对象
tasks_list = mgr.xiao_tasks_queue()
while True:
msg = input('请输入需要处理的任务信息:>>>: ').strip()
if not msg: continue
# 远程的链接的关闭了话,则会提示异常
if msg.upper() == 'EXIT':
break
# 丢一个任务进入
tasks_list.put(msg)
if __name__ == "__main__":
worker = TaskSendWork()
worker.run()
TaskHandleWork.py
import time
from multiprocessing.managers import BaseManager
class TaskHandleWork(object):
def run(self):
BaseManager.register("xiao_tasks_queue")
BaseManager.register("xiao_results_queue")
mgr = BaseManager(address=('127.0.0.1', 8888), authkey=b'xiaozhong123456')
mgr.connect()
# 获取相关注册到网络上的对象
tasks_list = mgr.xiao_tasks_queue()
results_list = mgr.xiao_results_queue()
while True:
time.sleep(0.5)
while not tasks_list.empty():
task = tasks_list.get(timeout=10)
print("接收到了任务",task)
result = "【ECHO】--%s"%(task)
print("处理完成任务输出任务要结果", result)
results_list.put(result)
if __name__ == "__main__":
worker = TaskHandleWork()
worker.run()
运行
首先运行我们的主节点:把服务开启!
然后分别再运行其他工作节点:
个人其他博客地址
简书:https://www.jianshu.com/u/d6960089b087
掘金:https://juejin.cn/user/2963939079225608
小钟同学 | 文 【原创】| QQ:308711822
1:本文相关描述主要是个人的认知和见解,如有不当之处,还望各位大佬指正。 2:关于文章内容,有部分内容参考自互联网整理,如有链接会声明标注;如没有及时标注备注的链接的,如有侵权请联系,我会立即删除处理哟。
文章转载自小儿来一壶枸杞酒泡茶,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。