본문 바로가기

개발 이야기/Python

Python RabbitMQ Send/Receive using librabbitmq

Python을 활용하여 RabbitMQ와 통신하는 경우 일반적으로 pika 모듈을 사용하여 처리를 하고 있습니다. 보통 연결라이브러리는 librabbitmq 모듈을 랩핑하여 많이 사용하는데, 이 librabbitmq를 활용하여 메시지를 보내고 받는 코드를 아래와 같이 사용할 수 있습니다.


Direct exchange와 routing key를 이용하는 방식으로 통신을 할 수 있으며, exchage에 queue를 바인딩하여 처리하는 구조를 가지고 있습니다. 



amq.direct라는 exchagne에 HA로 구성된 큐에 데이터를 전송합니다. 해당 데이터는 TestQueue와 맵핑되어 있습니다.


1. Sender.py


from librabbitmq import Connection
import datetime
import time, random, math

class MqSender( object ):

    def __init__( self, partition, interval = 1, duration = 360, seed = 0, host = '192.168.0.1', verbose = True ):
        self._hostname = host
        self._port = 5672
        self._exchange_name = "amq.direct"
        self._vhost = 'dev-quant'
        self._userid = 'quant'
        self._password = 'quant1234'
        self._duration = duration
        self._interval = interval
        self._routingKey = 'AA'
        self._queueId = 'TestQueue'
        print 'MqSender Init function'

    def rabbitmq_connect( self ):

        try:
            self._connection = Connection(host=self._hostname, port=self._port, userid=self._userid, password=self._password, virtual_host=self._vhost)
            print self._connection
            self._channel = self._connection.channel()
            self._channel.exchange_declare( self._exchange_name, type='direct', durable=True)
            self._channel.queue_declare( self._queueId, exclusive=False, durable = True)
            self._channel.queue_bind(self._queueId, self._exchange_name, self._routingKey)
        except Exception, e:
            print 'Can not connect to rabbitmq server'
            print e


    def close( self ):

        self._connection.close()

    def run( self ):

        properties = {'application_headers': {}, 'delivery_mode': 2, 'content_encoding': u'binary','content_type': u'application/x-python-serialize'}
        for serial in range( 0, self._duration ):
            print 'message sent, #', serial
            message = 'msg' + str(serial)

            self._channel.basic_publish(message, self._exchange_name, self._routingKey)
            time.sleep( self._interval )

        print 'done', datetime.datetime.now()
        self.close()

import sys

def main():

    _formatCode = lambda x: '0' * max( 0, ( 6 - len( str( x ) ) ) ) + str( x )
    partition = dict()
    partition[ 'XKRX-A' ] = tuple( [ 'XKRX-CS-KR-' + _formatCode( i ) for i in range( 100, 105 ) ] )
    partition[ 'XKRX-B' ] = tuple( [ 'XKRX-CS-KR-' + _formatCode( i ) for i in range( 250, 258 ) ] )
    s = MqSender( partition, duration = 1000 )

    try:
        s.rabbitmq_connect()
        s.run()
    except KeyboardInterrupt:
        s.close()

if __name__ == '__main__':

    sys.exit( main() )

2. Receiver.py


from librabbitmq import Connection
import datetime
import time, random, math

class MqReceiver( object ):

    def __init__( self, interval = 1, duration = 360, seed = 0, host = '192.168.0.1', verbose = True ):
        self._hostname = host
        self._port = 5672
        self._exchange_name = "amq.direct"
        self._vhost = 'dev-quant'
        self._userid = 'quant'
        self._password = 'quant1234'
        self._duration = duration
        self._interval = interval
        self._routingKey = 'AA'
        self._queueId = 'TestQueue'
        print 'MqReceiver Init function'

    def rabbitmq_connect( self ):

        try:
            self._connection = Connection(host=self._hostname, port=self._port, userid=self._userid, password=self._password, virtual_host=self._vhost)
            print self._connection
            self._channel = self._connection.channel()
            self._channel.exchange_declare( self._exchange_name, type='direct', durable=True)
            self._channel.queue_declare( self._queueId, exclusive=False, durable = True)

        except Exception, e:
            print 'Can not connect to rabbitmq server'
            print e


    def close( self ):

        self._connection.close()

    def dump_message( self, message):

        print("Body:'%s', Proeprties:'%s', DeliveryInfo:'%s'" % (message.body, message.properties, message.delivery_info))
        message.ack()

    def run( self ):

        self._channel.basic_consume(self._queueId, callback=self.dump_message)

        while True:
            self._connection.drain_events();

import sys

def main():

    s = MqReceiver( duration = 10 )

    try:
        s.rabbitmq_connect()
        s.run()
    except KeyboardInterrupt:
        s.close()

if __name__ == '__main__':

    sys.exit( main() )