暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

python中分布式进程示例补充(分三端演示)

小儿来一壶枸杞酒泡茶 2021-05-10
586

上一个示例简单展示了关于分布式进程一个示例,但是部分的一些细节没梳理好,这里再重新梳理一次。

什么是分布式系统?

定义(来自百度百科):

分布式系统(distributed system)是建立在网络之上的软件系统。正是因为软件的特性,所以分布式系统具有高度的内聚性和透明性。因此,网络和分布式系统之间的区别更多的在于高层软件(特别是操作系统),而不是硬件。

分布式系统有两大特点:

  • 内聚性:每一个数据库节点高度自治,有本地的数据库管理系统。
  • 透明性:每一个数据库分布节点对于用户的应用来说都是透明的,是无法区分本地还是远程的。

python分布式进程模式

上上一个示例其实可以总结出来下面的这个模式是:

  • MasterWork(主节点/服务节点)

  • SlaveWork(从节点/计算节点)

其中:

  • MasterWork主要负责处理结果回调处理(定义相关的队列并注册到网络上)
  • TaskSendWork主要负责任务的委派发送,把需要处理任务丢到【发送任务Queue】通信队列
  • TaskHandleWork主要负责任务的消费处理,并且把处理结果丢回到我们【收结果的Queue】通信队列
  • MasterWork 和 TaskHandleWork 和TaskHandleWork之间的交互是通过Queue实现任务和结果共享。

python分布式进程模式中的QueueManager

  • QueueManager内部实现了Queue网络通信的处理。
  • QueueManager可以管理多个的Queue,所以相关队列需要做对应的名称和注册才可以使用。
BaseManager.register("tasks_queue", callable=self.get_task_queue)
BaseManager.register("results_queue", callable=self.get_result_queue)

分布式进程的流程:

  • MasterWork 端:

    • 1:定义需要处理的任务对象(通常通过类的形式包装待处理的任务,示例中已省略)
    • 2:定义MasterWork主类,内部用于实例化发送任务、接收结果的Queue消息队列对象。
    • 3:把发送任务、接收结果的Queue两个队列注册到网络服务上。
    • 4:设置MasterWork节点队列使用的IP、端口、密钥信息并启动服务进程。
    • 5:循环的判断(可以阻塞形式,或限定超时的方式)等待接收结果的Queue的消息,如果队列有消息则进行获取结果信息。
  • TaskSendWork端:

    • 1:连接到我们的服务节点上。
    • 2:获取我们的发送任务队列对象,然后把待出的任务丢到【发送任务Queue】通信队列
  • TaskHandleWork端:

    • 1:连接到我们的服务节点上。
    • 2:获取我们发送任务队列对象,判断是否有任务委派,有则消费处理,然后把处理结果丢回到我们【接收结果的Queue】通信队列

完整三端示例:

MasterWork.py

#!/usr/bin/evn python
# -*- coding: utf-8 -*-
"""
-------------------------------------------------
   文件名称 :     master
   文件功能描述 :   功能描述
   创建人 :       小钟同学
-------------------------------------------------
   修改描述-2021/5/10:         
-------------------------------------------------
"
""

from multiprocessing import Queue
from multiprocessing.managers import BaseManager





class MasterWork(object):
    def __init__(self):
        # 定义【发送任务Queue】通信队列
        self.my_task_queue = Queue()
        # 定义【接收结果的Queue】通信队列
        self.my_result_queue = Queue()

    # 定义获取队列-用户注册回调方法传入
    def get_my_task(self):
        return self.my_task_queue

    # 定义获取队列-用户注册回调方法传入
    def get_my_result(self):
        return self.my_result_queue

    def run(self):
        # 队列注册到网上上
        BaseManager.register("xiao_tasks_queue", callable=self.get_my_task)
        BaseManager.register("xiao_results_queue", callable=self.get_my_result)
        # 定义网络服务的对应的IP和端口和秘钥,并启动
        mgr = BaseManager(address=('127.0.0.1', 8888), authkey=b'xiaozhong123456')
        mgr.start()

        # 获取相关注册到网络上的对象
        # 获得通过网络访问的Queue对象:
        # task = mgr.get_task_queue()
        # result = mgr.get_result_queue()

        tasks_list = mgr.xiao_tasks_queue()
        results_list = mgr.xiao_results_queue()

        # 等待我们的将结果消费完成之后的通知回调
        while True:
            while not tasks_list.empty():
                job_result = results_list.get()
                # job_result = results_list.get(timeout=10)
                print("master get result: {}".format(job_result))

        mgr.shutdown()


if __name__ == "__main__":
    my_master = MasterWork()
    my_master.run()

TaskSendWork.py

#!/usr/bin/evn python
# -*- coding: utf-8 -*-
"""
-------------------------------------------------
   文件名称 :     worker
   文件功能描述 :   功能描述
   创建人 :       小钟同学
  
-------------------------------------------------
   修改描述-2021/5/10:
-------------------------------------------------
"
""
import time
from multiprocessing.managers import BaseManager



class TaskSendWork(object):
    def run(self):
        BaseManager.register("xiao_tasks_queue")
        BaseManager.register("xiao_results_queue")

        mgr = BaseManager(address=('127.0.0.1', 8888), authkey=b'xiaozhong123456')
        mgr.connect()

        # 获取相关注册到网络上的对象

        tasks_list = mgr.xiao_tasks_queue()


        while True:
            msg = input('请输入需要处理的任务信息:>>>: ').strip()
            if not msg: continue
            # 远程的链接的关闭了话,则会提示异常
            if msg.upper() == 'EXIT':
                break
            # 丢一个任务进入
            tasks_list.put(msg)


if __name__ == "__main__":
    worker = TaskSendWork()
    worker.run()


TaskHandleWork.py

import time
from multiprocessing.managers import BaseManager



class TaskHandleWork(object):
    def run(self):

        BaseManager.register("xiao_tasks_queue")
        BaseManager.register("xiao_results_queue")

        mgr = BaseManager(address=('127.0.0.1', 8888), authkey=b'xiaozhong123456')
        mgr.connect()

        # 获取相关注册到网络上的对象

        tasks_list = mgr.xiao_tasks_queue()
        results_list = mgr.xiao_results_queue()

        while True:
            time.sleep(0.5)
            while not tasks_list.empty():
                task = tasks_list.get(timeout=10)
                print("接收到了任务",task)
                result = "【ECHO】--%s"%(task)
                print("处理完成任务输出任务要结果", result)
                results_list.put(result)



if __name__ == "__main__":
    worker = TaskHandleWork()
    worker.run()


运行

首先运行我们的主节点:把服务开启!

然后分别再运行其他工作节点:

个人其他博客地址

简书:https://www.jianshu.com/u/d6960089b087

掘金:https://juejin.cn/user/2963939079225608

小钟同学 | 文 【原创】| QQ:308711822

  • 1:本文相关描述主要是个人的认知和见解,如有不当之处,还望各位大佬指正。
  • 2:关于文章内容,有部分内容参考自互联网整理,如有链接会声明标注;如没有及时标注备注的链接的,如有侵权请联系,我会立即删除处理哟。


文章转载自小儿来一壶枸杞酒泡茶,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论