プロセス間のやり取りを簡単に。centos5.7 + RabbitMQ + python + pika/kombu

OpenStackに触れる中で今まで触ったことあるけどあやふやだった技術を再確認したり、今まで聞いたことのかなかった技術を知ることができています。

その中の1つでメッセージキューイングサーバーのRabbitMQというものを知ったので軽くまとめときます。

RabbitMQとは

RabbitMQはAMQP(Advanced Message Queuing Protocol)(Advanced Message Queuing Protocol – Wikipedia)を使用したメッセージキューイングサーバーです。Erlangで実装されています。

メッセージキューイングとはその名前のまま、何らかの「メッセージ」を「キュー」に入れておいてくれるサーバーです。
通常アプリケーションでの通信は、

  1. 相手方に接続して
  2. 何か処理して(処理が終わるのを待って)
  3. 切断する

という流れになりますが、メッセージキューイングサーバーを使うことで、

  1. メッセージキューイングサーバーに接続して
  2. 処理したいことを書いたメッセージをキューを投げて
  3. 切断する

  1. 処理する側がメッセージキューイングサーバーに接続して
  2. メッセージが届いていないか確認して
  3. メッセージがあれば処理
 

という手順にすることができ、処理を依頼する側は処理の完了を待つ必要がありません。
ユーザーインタフェースとしてはすぐに反応が返ってくるので気持ちよいですよね(時間のかかる処理の場合、経過がわかるような処理を作る必要はありますが)。

 

AMQPとは

メッセージキューイングについて標準化をがんばってる的な規格です。
amqpについては以下がわかりやすいです。
AMQPによるメッセージング | GREE Engineers’ Blog

 

OpenStackではどう使われているか

OpenStackでは、各コンポーネント間の通信をRabbitMQを通して行なっているようです。
OpenStackではkombuというpythonライブラリを使っているようです。

nova/rpc at master from openstack/nova – GitHub

 

RabbitMQをcentos 5.7にインストール

手元で動いてたcentos5.7にインストールしてみました。

参考:RabbitMQ – Installing on RPM-based Linux (CentOS, Fedora, OpenSuse, RedHat)

参考URLと最後のインストールするパッケージが違いますが、こっちが正しいはず。

インストール

以下rootで。

[bash]
rpm -ivh http://dl.fedoraproject.org/pub/epel/5/i386/epel-release-5-4.noarch.rpm
wget -O /etc/yum.repos.d/epel-erlang.repo http://repos.fedorapeople.org/repos/peter/erlang/epel-erlang.repo
yum install -y erlang
rpm –import http://www.rabbitmq.com/rabbitmq-signing-key-public.asc
yum install -y rabbitmq-server
[/bash]

 

RabbitMQを操作する

rabbitmqctlというコマンドで、色々と操作できるみたいです。

[bash]
$ sudo rabbitmqctl list_queues
Listing queues …
hello 1
…done.
[/bash]

[bash]
$ sudo rabbitmqctl –help

Error: invalid command ‘–help’

Usage:
rabbitmqctl [-n <node>] [-q] <command> [<command options>]
Options:
-n node
-q
Default node is "rabbit@server", where server is the local host. On a host
named "server.example.com", the node name of the RabbitMQ Erlang node will
usually be rabbit@server (unless RABBITMQ_NODENAME has been set to some
non-default value at broker startup time). The output of hostname -s is usually
the correct suffix to use after the "@" sign. See rabbitmq-server(1) for
details of configuring the RabbitMQ broker.
Quiet output mode is selected with the "-q" flag. Informational messages are
suppressed when quiet mode is in effect.
Commands:
stop
stop_app
start_app
status
reset
force_reset
rotate_logs <suffix>
cluster <clusternode> …
force_cluster <clusternode> …
close_connection <connectionpid> <explanation>
add_user <username> <password>
delete_user <username>
change_password <username> <newpassword>
set_admin <username>
clear_admin <username>
list_users
add_vhost <vhostpath>
delete_vhost <vhostpath>
list_vhosts
set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
clear_permissions [-p <vhostpath>] <username>
list_permissions [-p <vhostpath>]
list_user_permissions [-p <vhostpath>] <username>
list_queues [-p <vhostpath>] [<queueinfoitem> …]
list_exchanges [-p <vhostpath>] [<exchangeinfoitem> …]
list_bindings [-p <vhostpath>] [<bindinginfoitem> …]
list_connections [<connectioninfoitem> …]
list_channels [<channelinfoitem> …]
list_consumers
The list_queues, list_exchanges and list_bindings commands accept an optional
virtual host parameter for which to display results. The default value is "/".
<queueinfoitem> must be a member of the list [name, durable, auto_delete,
arguments, pid, owner_pid, exclusive_consumer_pid, exclusive_consumer_tag,
messages_ready, messages_unacknowledged, messages, consumers, memory].
<exchangeinfoitem> must be a member of the list [name, type, durable,
auto_delete, arguments].
<bindinginfoitem> must be a member of the list [source_name, source_kind,
destination_name, destination_kind, routing_key, arguments].
<connectioninfoitem> must be a member of the list [pid, address, port,
peer_address, peer_port, ssl, peer_cert_subject, peer_cert_issuer,
peer_cert_validity, state, channels, protocol, user, vhost, timeout, frame_max,
client_properties, recv_oct, recv_cnt, send_oct, send_cnt, send_pend].
<channelinfoitem> must be a member of the list [pid, connection, number, user,
vhost, transactional, consumer_count, messages_unacknowledged,
acks_uncommitted, prefetch_count, client_flow_blocked].
The output format for "list_consumers" is a list of rows containing, in
order, the queue name, channel process id, consumer tag, and a boolean
indicating whether acknowledgements are expected from the consumer.
[/bash]

 

pythonのライブラリ

pythonのライブラリは山ほどあってどれ使っていいんだかわかりませんが、公式ページで紹介しているpikaとOpenStackで使用していると思われるkombuを使ってみます。

pika

インストール

[bash]
pip install pika
[/bash]

サンプル

send.py
[python]
#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
host=’localhost’))
channel = connection.channel()

channel.queue_declare(queue=’hello’)

channel.basic_publish(exchange=”,
routing_key=’hello’,
body=’Hello World!’)
print " [x] Sent ‘Hello World!’"
connection.close()
[/python]

recieve.py
[python]
#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
host=’localhost’))
channel = connection.channel()

channel.queue_declare(queue=’hello’)

print ‘ [*] Waiting for messages. To exit press CTRL+C’

def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)

channel.basic_consume(callback,
queue=’hello’,
no_ack=True)

channel.start_consuming()
[/python]

それぞれ別々のプロセスで動かしてみると、sendで送ったメッセージを、recieveが受信してくるのがわかります。

kombu

インストール

pip install kombu

サンプル

Examples — Kombu 2.1.0 documentation

queues.py
[python]
from kombu import Exchange, Queue
task_exchange = Exchange("tasks", type="direct")
task_queues = [Queue("hipri", task_exchange, routing_key="hipri"),
Queue("midpri", task_exchange, routing_key="midpri"),
Queue("lopri", task_exchange, routing_key="lopri")]
[/python]

worker.py
[python]
from __future__ import with_statement
from kombu.mixins import ConsumerMixin
from kombu.utils import kwdict, reprcall
from queues import task_queues
class Worker(ConsumerMixin):
def __init__(self, connection):
self.connection = connection
def get_consumers(self, Consumer, channel):
return [Consumer(queues=task_queues,
callbacks=[self.process_task])]
def process_task(self, body, message):
fun = body["fun"]
args = body["args"]
kwargs = body["kwargs"]
self.info("Got task: %s", reprcall(fun.__name__, args, kwargs))
try:
fun(*args, **kwdict(kwargs))
except Exception, exc:
self.error("task raised exception: %r", exc)
message.ack()
if __name__ == "__main__":
from kombu import BrokerConnection
from kombu.utils.debug import setup_logging
setup_logging(loglevel="INFO")
with BrokerConnection("amqp://guest:guest@localhost:5672//") as conn:
try:
Worker(conn).run()
except KeyboardInterrupt:
print("bye bye")
[/python]

tasks.py
[python]
def hello_task(who="world"):
print("Hello %s" % (who, ))
[/python]

client.py
[python]
from __future__ import with_statement
from kombu.common import maybe_declare
from kombu.pools import producers
from queues import task_exchange
priority_to_routing_key = {"high": "hipri",
"mid": "midpri",
"low": "lopri"}
def send_as_task(connection, fun, args=(), kwargs={}, priority="mid"):
payload = {"fun": fun, "args": args, "kwargs": kwargs}
routing_key = priority_to_routing_key[priority]
with producers[connection].acquire(block=True) as producer:
maybe_declare(task_exchange, producer.channel)
producer.publish(payload, serializer="pickle",
compression="bzip2",
routing_key=routing_key)
if __name__ == "__main__":
from kombu import BrokerConnection
from tasks import hello_task
connection = BrokerConnection("amqp://guest:guest@localhost:5672//")
send_as_task(connection, fun=hello_task, args=("Kombu", ), kwargs={},
priority="high")

[/python]

worker.pyを動かして、別プロセスでclient.pyを動かしてみると動きが確認できます。
kombuを使った実装はnovaの実装も参考になりそうです。
nova/rpc/impl_kombu.py at master from openstack/nova – GitHub