QPID 消息队列初步
QPID 是一种高性能 Message Bus,目前因为牵扯到工具的 SOA 架构,我的项目中将会整合它,以将自身对数据库的修改提交到 Message Bus 中, 供其它程序监听调用。
目前主流的 Message Bus 主要有以下几种:
而之所以选择 QPID 是因为它有以下几个优点(引用源):
- Supports transactions
- Persistence using a pluggable layer — I believe the default is Apache Derby
- This like the other Java based product is HIGHLY configurable
- Management using JMX and an Eclipse Management Console application - http://www.lahiru.org/2008/08/what-qpid-management-console-can-do.html
- The management console is very feature rich
- Supports message Priorities
-
Automatic client failover using configurable connection properties -
- http://qpid.apache.org/cluster-design-note.html
- http://qpid.apache.org/starting-a-cluster.html
- http://qpid.apache.org/cluster-failover-modes.html
- Cluster is nothing but a set of machines have all the queues replicated
- All queue data and metadata is replicated across all nodes that make up a cluster
- All clients need to know in advance which nodes make up the cluster
- Retry logic lies in the client code
- Durable Queues/Subscriptions
- Has bindings in many languages
- For the curious: http://qpid.apache.org/current-architecture.html
而对我而言,QPID 比较有优势的地方是,一有 Python 的 bindding(Perl 的兄弟对不起了),二是源代码比较充足。
为此我写了两个基类,简单地调用了 QPID Python 中的 Receiver 和 Sender,相对于 message_transfer 方法,这种方法可以传递 Dictionary 对象,一共三个文件,其实也可以合在一起使用。
#!/usr/bin/pythonimport qpid
import qpid.messaging
import logging
logging.basicConfig()class QPIDBase(object):
def init(self, host=&apos10.66.93.193&apos, port=&apos5672&apos, queue_name=&apostmp.testing&apos, username=&aposguest&apos, password=&aposguest&apos):
"""
Arguments:
host
port
queue_name
username
password
"""
self.host = host
self.port = port
self.queue_name = queue_name
self.username = username
self.password = password
self.connection = None
self.session = Nonedef init_connect(self, mechanism=&aposPLAIN&apos): """Initial the connection""" url = &aposamqp://guest/guest@%s:%s&apos %(self.host, self.port) self.connection = qpid.messaging.Connection( url = url, sasl_mechanisms=mechanism, reconnect=True, reconnect_interval=60, reconnect_limit=60, username=self.username, password=self.password ) self.connection.open() def init_session(self): """Initial the session""" if not self.connection: self.init_connect() self.session = self.connection.session() def close(self): """Close the connection and session""" self.session.close() self.connection.close()
#!/usr/bin/pythonimport qpid.messaging
from datetime import datetime
from base import QPIDBaseclass QPIDSender(QPIDBase):
def init(self, **kwargs):
super(QPIDSender, self).init(**kwargs)
self.sender = Nonedef init_sender(self): """Initial the sender""" if not self.session: self.init_session() self.sender = self.session.sender(self.queue_name) def send(self, content, t = &apostest&apos): """Sending the content""" if not self.sender: self.init_sender() props = {&apostype&apos: t} message = qpid.messaging.Message(properties=props, content = content) self.sender.send(message) def typing(self): """Sending the contents real time with typing""" content = &apos&apos while content != &aposEOF&apos: content = raw_input(&aposStart typing:&apos) self.send(content)
if name == &apos__main__&apos:
s = QPIDSender()
s.send(&aposTesting at %s&apos % datetime.now())
s.close()
#!/usr/bin/python from pprint import pprint from base import QPIDBase class QPIDReceiver(QPIDBase): def __init__(self, **kwargs): super(QPIDReceiver, self).__init__(**kwargs) self.receiver = None def init_receiver(self): """Initial the receiver""" if not self.session: self.init_session() self.receiver = self.session.receiver(self.queue_name) def receive(self): """Listing the messages from server""" if not self.receiver: self.init_receiver() try: while True: message = self.receiver.fetch() content = message.content pprint({&aposprops&apos: message.properties}) pprint(content) self.session.acknowledge(message) except KeyboardInterrupt: pass self.close() # Test code if __name__ == &apos__main__&apos: r = QPIDReceiver() r.receive()
代码非常简单,容易读懂,使用方法是在一台 Linux Server 上安装好 qpid-cpp-server, 并且启动后,在 Client 上安装 python-qpid,然后修改一下 base.py __init__ 方法的 host 字段,或者在代码中自行指定好服务器地址,即可直接执行测试。
需要说明的是 QPID 返回的数据结构,包含可以为 Dictionary 对象的 properties 和只能为纯文本的 content 两个属性,也就是说可以将数据结构保存到 properties,而消息名称保存成 content 中,即:
try: while True: message = self.receiver.fetch() content = message.content pprint({&aposprops&apos: message.properties}) pprint(content) self.session.acknowledge(message) except KeyboardInterrupt: pass
一个终端执行 receiver.py 监听消息,再开一个终端执行 sender.py,将会如以下输出:
$ python ./receiver.py {&aposprops&apos: {u&apostype&apos: u&apostest&apos, &aposx-amqp-0-10.routing-key&apos: u&apostmp.testing&apos}} &aposTesting at 2010-12-06 14:54:59.536093&apos
如果有兴趣试下 QPIDSender.typing() 方法,再把 Kerberos 的用户名读出来,就可以做一个 IM 啦~
问题:现在似乎 Sender 发出的消息一次只能有一个 Receiver 接收,也就是现有代码不能用于 SOA,而这理论上应该是不应该的,依然在探索。
(可以尝试打开两个 receiver.py 测试)
版权所有丨转载请注明出处:https://kxq.io/archives/qpid消息队列初步