QPID 消息队列初步

QPID 是一种高性能 Message Bus,目前因为牵扯到工具的 SOA 架构,我的项目中将会整合它,以将自身对数据库的修改提交到 Message Bus 中, 供其它程序监听调用。

目前主流的 Message Bus 主要有以下几种:

  1. RabbitMQ
  2. Apache ActiveMQ
  3. Apache qpid

而之所以选择 QPID 是因为它有以下几个优点(引用源):

  • Supports transactions
  • Persistence using a pluggable layer — I believe the default is Apache Derby
  • This like the other Java based product is HIGHLY configurable
  • Management using JMX and an Eclipse Management Console application - http://www.lahiru.org/2008/08/what-qpid-management-console-can-do.html
  • The management console is very feature rich
  • Supports message Priorities
  • Automatic client failover using configurable connection properties -
    • http://qpid.apache.org/cluster-design-note.html
    • http://qpid.apache.org/starting-a-cluster.html
    • http://qpid.apache.org/cluster-failover-modes.html
  • Cluster is nothing but a set of machines have all the queues replicated
  • All queue data and metadata is replicated across all nodes that make up a cluster
  • All clients need to know in advance which nodes make up the cluster
  • Retry logic lies in the client code
  • Durable Queues/Subscriptions
  • Has bindings in many languages
  • For the curious: http://qpid.apache.org/current-architecture.html

而对我而言,QPID 比较有优势的地方是,一有 Python 的 bindding(Perl 的兄弟对不起了),二是源代码比较充足。

为此我写了两个基类,简单地调用了 QPID Python 中的 Receiver 和 Sender,相对于 message_transfer 方法,这种方法可以传递 Dictionary 对象,一共三个文件,其实也可以合在一起使用。

#!/usr/bin/python

import qpid
import qpid.messaging
import logging
logging.basicConfig()

class QPIDBase(object):
def init(self, host=&apos10.66.93.193&apos, port=&apos5672&apos, queue_name=&apostmp.testing&apos, username=&aposguest&apos, password=&aposguest&apos):
&quot&quot&quot
Arguments:
host
port
queue_name
username
password
&quot&quot&quot
self.host = host
self.port = port
self.queue_name = queue_name
self.username = username
self.password = password
self.connection = None
self.session = None

def init_connect(self, mechanism=&aposPLAIN&apos):
    &quot&quot&quotInitial the connection&quot&quot&quot
    url = &aposamqp://guest/guest@%s:%s&apos %(self.host, self.port)
    self.connection = qpid.messaging.Connection(
        url = url, sasl_mechanisms=mechanism,
        reconnect=True, reconnect_interval=60, reconnect_limit=60,
        username=self.username, password=self.password
    )
    self.connection.open()
    
def init_session(self):
    &quot&quot&quotInitial the session&quot&quot&quot
    if not self.connection:
        self.init_connect()
    self.session = self.connection.session()

def close(self):
    &quot&quot&quotClose the connection and session&quot&quot&quot
    self.session.close()
    self.connection.close()
#!/usr/bin/python

import qpid.messaging
from datetime import datetime
from base import QPIDBase

class QPIDSender(QPIDBase):
def init(self, **kwargs):
super(QPIDSender, self).init(**kwargs)
self.sender = None

def init_sender(self):
    &quot&quot&quotInitial the sender&quot&quot&quot
    if not self.session:
        self.init_session()
    self.sender = self.session.sender(self.queue_name)

def send(self, content, t = &apostest&apos):
    &quot&quot&quotSending the content&quot&quot&quot
    if not self.sender:
        self.init_sender()
    props = {&apostype&apos: t}
    message = qpid.messaging.Message(properties=props, content = content)
    self.sender.send(message)

def typing(self):
    &quot&quot&quotSending the contents real time with typing&quot&quot&quot
    content = &apos&apos
    while content != &aposEOF&apos:
        content = raw_input(&aposStart typing:&apos)
        self.send(content)

if name == &apos__main__&apos:
s = QPIDSender()
s.send(&aposTesting at %s&apos % datetime.now())
s.close()

#!/usr/bin/python

from pprint import pprint
from base import QPIDBase

class QPIDReceiver(QPIDBase):
    def __init__(self, **kwargs):
        super(QPIDReceiver, self).__init__(**kwargs)
        self.receiver = None
    
    def init_receiver(self):
        """Initial the receiver"""
        if not self.session:
            self.init_session()
        self.receiver = self.session.receiver(self.queue_name)
        
    def receive(self):
        """Listing the messages from server"""
        if not self.receiver:
            self.init_receiver()
            
        try:
            while True:
                message = self.receiver.fetch()
                content = message.content
                pprint({&aposprops&apos: message.properties})
                pprint(content)
                self.session.acknowledge(message)
        except KeyboardInterrupt:
            pass
        
        self.close()

# Test code
if __name__ == &apos__main__&apos:
    r = QPIDReceiver()
    r.receive()

代码非常简单,容易读懂,使用方法是在一台 Linux Server 上安装好 qpid-cpp-server, 并且启动后,在 Client 上安装 python-qpid,然后修改一下 base.py __init__ 方法的 host 字段,或者在代码中自行指定好服务器地址,即可直接执行测试。

需要说明的是 QPID 返回的数据结构,包含可以为 Dictionary 对象的 properties 和只能为纯文本的 content 两个属性,也就是说可以将数据结构保存到 properties,而消息名称保存成 content 中,即:

        try:
            while True:
                message = self.receiver.fetch()
                content = message.content
                pprint({&aposprops&apos: message.properties})
                pprint(content)
                self.session.acknowledge(message)
        except KeyboardInterrupt:
            pass

一个终端执行 receiver.py 监听消息,再开一个终端执行 sender.py,将会如以下输出:

$ python ./receiver.py
{&aposprops&apos: {u&apostype&apos: u&apostest&apos, &aposx-amqp-0-10.routing-key&apos: u&apostmp.testing&apos}}
&aposTesting at 2010-12-06 14:54:59.536093&apos

如果有兴趣试下 QPIDSender.typing() 方法,再把 Kerberos 的用户名读出来,就可以做一个 IM 啦~

问题:现在似乎 Sender 发出的消息一次只能有一个 Receiver 接收,也就是现有代码不能用于 SOA,而这理论上应该是不应该的,依然在探索。

(可以尝试打开两个 receiver.py 测试)

版权所有丨转载请注明出处:https://kxq.io/archives/qpid消息队列初步