Python网络编程之ZeroMQ知识总结
总结以下:ØMQ (ZeroMQ) 是一个基于消息队列的多线程网络库,它封装了网络通信、消息队列、线程调度等功能,向上层提供简洁的API,应用程序通过加载库文件,调用API函数来实现高性能网络通信。
看起来有些抽象,下面我们结合ZeroMQ 的 Python 封装———— pyzmp,用实例看一下ZeroMQ的三种最基本的工作模式。
二、安装安装方法
pip install pyzmq
查看是否安装成功
>>> import zmq>>> print(zmq.__version__)22.0.3三、Request-Reply (请求响应模式)3.1 Request-Reply模式概述: 消息双向的,有来有往。 Client请求的消息,Server必须答复给Client。 Client在请求后,Server必须回响应,注意:Server不返回响应会报错。 Server和Client都可以是1:N的模型。通常把1认为是Server,N认为是Client。 更底层的端点地址是对上层隐藏的,每个请求都隐含回应地址,而应用则不关心它。 ZMQ 可以很好的支持路由功能(实现路由功能的组件叫做 Device),把 1:N 扩展为 N:M(只需要加入若干路由节点)。
#client.pyimport zmqcontext = zmq.Context()# Socket to talk to serverprint('Connecting to hello world server…')socket = context.socket(zmq.REQ)socket.connect('tcp://localhost:5555')socket.send(b'Hello')# Get the reply.message = socket.recv()print(f'Received reply [ {message} ]')3.3 Server端python实现
#server.pyimport timeimport zmqcontext = zmq.Context()socket = context.socket(zmq.REP)socket.bind('tcp://*:5555')while True: # Wait for next request from client message = socket.recv() print('Received request: %s' % message) # Do some ’work’ time.sleep(1) # Send reply back to client socket.send(b'World') 启动client.py 首先会打印Connecting to hello world server… 但不会受到任何消息。 然后启动server.py ,客户端收到来自客户端的request: b’Hello’ 此时client端收到来自server端的 reply: [ b’World’ ]
python client.py Connecting to hello world server…Received reply [ b’World’ ]
python server.py Received request: b’Hello’
可以试一下,多运行几个client.py,看看情况是什么样的。
这里直接引用官方文档的例子:
发布者:类似于一个天气更新服务器,向订阅者发送天气更新,内容包括邮政编码、温度、湿度等信息
#Publisher.pyimport zmqfrom random import randrangecontext = zmq.Context()socket = context.socket(zmq.PUB)socket.bind('tcp://*:5556')while True: zipcode = randrange(1, 100000) temperature = randrange(-80, 135) relhumidity = randrange(10, 60) socket.send_string('%i %i %i' % (zipcode, temperature, relhumidity))
订阅者:它监听发布者更新的数据流,过滤只接收与特定邮政编码相关的天气信息,默认接收接收10条数据
#Subscribe.py import sysimport zmq# Socket to talk to servercontext = zmq.Context()socket = context.socket(zmq.SUB)print('Collecting updates from weather server...')socket.connect('tcp://localhost:5556')# Subscribe to zipcode, default is NYC, 10001zip_filter = sys.argv[1] if len(sys.argv) > 1 else '10001'# Python 2 - ascii bytes to unicode strif isinstance(zip_filter, bytes): zip_filter = zip_filter.decode(’ascii’)socket.setsockopt_string(zmq.SUBSCRIBE, zip_filter)# Process 5 updatestotal_temp = 0for update_nbr in range(5): string = socket.recv_string() zipcode, temperature, relhumidity = string.split() total_temp += int(temperature)print( 'Average temperature for zipcode ’%s’ was %dF' % (zip_filter, total_temp / (update_nbr + 1)))
ventilator 使用的是 SOCKET_PUSH,将任务分发到 Worker 节点上。Worker 节点上,使用 SOCKET_PULL 从上游接受任务,并使用 SOCKET_PUSH 将结果汇集到 Sink。值得注意的是,任务的分发的时候也同样有一个负载均衡的路由功能,worker 可以随时自由加入,ventilator 可以均衡将任务分发出去。
Push/Pull模式还是蛮常用的,这里我们主要测试一下它的负载均衡。
5.2 Ventilator# ventilator.pyimport zmqimport timecontext = zmq.Context()socket = context.socket(zmq.PUSH)socket.bind('tcp://*:5557')while True: socket.send(b'test') print('已发送') time.sleep(1)5.3 worker
# worker.pyimport zmqcontext = zmq.Context()recive = context.socket(zmq.PULL)recive.connect(’tcp://127.0.0.1:5557’)sender = context.socket(zmq.PUSH)sender.connect(’tcp://127.0.0.1:5558’)while True: data = recive.recv() print('work1 正在转发...') sender.send(data)5.4 sink
# sink.pyimport zmqimport syscontext = zmq.Context()socket = context.socket(zmq.PULL)socket.bind('tcp://*:5558')while True: response = socket.recv() print('response: %s' % response)
打开4个Terminal,分别运行
python sink.pypython worker.pypython worker.pypython ventilator.py
消息模型可以根据需要组合使用,后续的代理模式和路由模式等都是在三种基本模式上面的扩展或变异。
到此这篇关于Python网络编程之ZeroMQ知识总结的文章就介绍到这了,更多相关Python ZeroMQ知识总结内容请搜索好吧啦网以前的文章或继续浏览下面的相关文章希望大家以后多多支持好吧啦网!
相关文章: