ZeroMQ 异步消息队列通信
ZeroMQ 是一个很有个性的项目,它原来是定位为“史上最快消息队列”,所以名字里面有“MQ”两个字母,但是后来逐渐演变发展,慢慢淡化了消息队列的身影,改称为消息内核,或者消息层了。从网络通信的角度看,它处于会话层之上,应用层之下,有了它,你甚至不需要自己写一行的socket函数调用就能完成复杂的网络通信工作。
服务端 :
import zeromq import console; var context = zeromq.context() var responder = context.zmq_socket_reply() //创建套接字 responder.bind( "tcp://*:5559") console.log("服务端已启动") do { console.log("服务端收到消息",responder.recv() ); responder.send("World") }while( sleep (1) )
客户端:
import zeromq import console; var context = zeromq.context() var requester = context.zmq_socket_request(); requester.connect( "tcp://localhost:5559" ) requester.send("Hello"); //发送消息 var str = requester.recv(); //接收字符串 console.log ("客户端收到消息 ", str ); context.term(); //关闭
三种基本模式(它有很多种)
1. 请求应答模式(req 和 rep)
消息双向的,有来有往,req端请求的消息,rep端必须答复给req端
2. 订阅发布模式 (sub 和 pub)
消息单向的,有去无回的。可按照发布端可发布制定主题的消息,订阅端可订阅喜欢的主题,订阅端只会收到自己已经订阅的主题。发布端发布一条消息,可被多个订阅端同事收到。
3. push pull模式
消息单向的,也是有去无回的。push的任何一个消息,始终只会有一个pull端收到消息.
后续的代理模式和路由模式等都是在三种基本模式上面的扩展或变异。
阻塞 和 非阻塞
以上三种基本模式都支持阻塞模式和非阻塞模式。
req 和 rep的阻塞模式是这样的(其实跟原生的socket实现也非常像)
大家用过socket的,客户端要是先启动的话,会连接失败,或者是短时间内有超时问题。
Python socket 通信示例
服务端
#!/usr/bin/env python # -*- coding: utf-8 -*- ''' @author: homer @see: ithomer.net ''' import socket HOST = '' # Symbolic name meaning all available interfaces PORT = 50007 # Arbitrary non-privileged port s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind((HOST, PORT)) s.listen(1) conn, addr = s.accept() print("conn = %s, addr = %s" % (conn, addr)) while 1: data = conn.recv(1024) if not data: break conn.send(data) print('server close...') conn.close()
客户端:
#!/usr/bin/env python # -*- coding: utf-8 -*- ''' @author: homer @see: ithomer.net ''' import socket HOST = '' # The remote host PORT = 50007 # The same port as used by the server s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((HOST, PORT)) for i in range(10): s.send('Hello ithomer at %d' % i) data = s.recv(1024) print 'Received', repr(data) print('client close...') s.close()
运行结果 :
1)client 控制台输出:
Received 'Hello ithomer at 0'
Received 'Hello ithomer at 1'
Received 'Hello ithomer at 2'
Received 'Hello ithomer at 3'
Received 'Hello ithomer at 4'
Received 'Hello ithomer at 5'
Received 'Hello ithomer at 6'
Received 'Hello ithomer at 7'
Received 'Hello ithomer at 8'
Received 'Hello ithomer at 9'
client close...
2)server 控制台输出:
conn = <socket._socketobject object at 0x7fdaed198600>, addr = ('127.0.0.1', 42618)
server close...
如果使用ActiveMQ/RabbitMQ之类的有代理MQ系统,只要保证MQ代理最先启动, 就可以保证系统的正常运行。而对于无代理的ZeroMQ来说,似乎比较难办。 在刚刚开始使用ZeroMQ时,我也一直担心这个问题,总是小心翼翼地首先启动调 用bind指令的程序,然后启动执行connect指令的程序。这样其实只是利用了 ZeroMQ的高速数据传输能力,以及ZeroMQ对IPC和socket的良好封装特性,还是 没有解决进程启动顺序的问题。后来,偶然实验了一下,发现bind程序和 connect程序无论谁先启动,其实都不影响整个系统的正常运行。
zmq 通信示例:
服务器端:
#!/usr/bin/env python # -*- coding: utf-8 -*- ''' @author: homer @see: ithomer.net ''' import zmq import time context = zmq.Context() socket = context.socket(zmq.REP) socket.bind("tcp://*:5555") while True: # Wait for next request from client message = socket.recv() print "Received request: ", message # Do some 'work' time.sleep (1) # Do some 'work' # Send reply back to client socket.send("World")
客户端:
#!/usr/bin/env python # -*- coding: utf-8 -*- ''' @author: homer @see: ithomer.net ''' import zmq from zmq import * context = zmq.Context() # Socket to talk to server print "Connecting to server…" socket = context.socket(zmq.REQ) socket.connect ("tcp://localhost:5555") # Do 10 requests, waiting each time for a response for request in range (10): print "Sending request ", request,"…" socket.send ("Hello") # Get the reply. message = socket.recv() print "Received reply ", request, "[", message, "]"
运行结果:
1) client 控制台输出
Connecting to server…
Sending request 0 …
Sending request 1 …
Sending request 2 …
Sending request 3 …
Sending request 4 …
Sending request 5 …
Sending request 6 …
Sending request 7 …
Sending request 8 …
Sending request 9 …
Received reply 9 [ World ]
2) server 控制台输出
Received request: Hello
Received request: Hello
Received request: Hello
Received request: Hello
Received request: Hello
Received request: Hello
Received request: Hello
Received request: Hello
Received request: Hello
Received request: Hello
发布、订阅原理图
可以看出发布者绑定绑定一个端口,订阅者通过连接发布者接受订阅的消息。
官网描述这种模式要注意以下几点:
这里的Publish-Subscribe模型是一个很典型的PUB-SUB模型,即发布者(Publisher)只能发送数据,它发送时指明发送数据的类型,而订阅者(Subscriber)则只接收它关心的类型的消息。
1. pub/sub模式下,sub事实上可以连接多个pub,每次只连接一个connect,所以接收到的消息可以是叫错的,以至于不会单个pub掩盖了其他pub
2. 如果存在某个pub没有被任何sub连接,则该pub会丢弃所有的消息
3. 如果你采用tcp的连接方式,sub很慢,消息将会堆积在pub,后期会对该问题有个较好的解决
4. 目前的而版本,过滤发生在sub端,而不是pub端,意思就是说一个pub会发送所有的消息到所有的sub, 由sub决定是要drop这个msg.
zeromq是lib库,部署完成后可自行编写server和client,编译时指定-lzmq即可
读速度 | 写速度
send 1000000 message cost 659 ms | recv 50000 message cost 36 ms
send 1000000 message cost 597 ms | recv 50000 message cost 33 ms
send 1000000 message cost 735 ms | recv 50000 message cost 34 ms
send 1000000 message cost 727 ms | recv 50000 message cost 33 ms
send 1000000 message cost 741 ms | recv 50000 message cost 33 ms
send 1000000 message cost 798 ms | recv 50000 message cost 32 ms
send 1000000 message cost 665 ms | recv 50000 message cost 34 ms
平均读取速度 147w | 平均写入速度 144w /s
为题提高性能 可以用gevent框架
import gevent from gevent_zeromq import zmq # Global Context context = zmq.Context() #它是GreenContext的一个简写,确保greenlet化socket def server(): server_socket = context.socket(zmq.REQ) #创建一个socket,使用mq类型模式REQ/REP(请求/回复,服务器是请求),还有PUB/SUB(发布/订阅),push/pull等 server_socket.bind("tcp://127.0.0.1:5000") #绑定socket for request in range(1,10): server_socket.send("Hello") print('Switched to Server for ', request) server_socket.recv() #这里发生上下文切换 def client(): client_socket = context.socket(zmq.REP) (客户端是回复) client_socket.connect("tcp://127.0.0.1:5000") #连接server的socket端口 for request in range(1,10): client_socket.recv() print('Switched to Client for ', request) client_socket.send("World") publisher = gevent.spawn(server) client = gevent.spawn(client) gevent.joinall([publisher, client])
版权所有: 本文系米扑博客原创、转载、摘录,或修订后发表,最后更新于 2014-04-27 22:01:04
侵权处理: 本个人博客,不盈利,若侵犯了您的作品权,请联系博主删除,莫恶意,索钱财,感谢!
转载注明: ZeroMQ 异步消息队列通信 (米扑博客)