Python을 활용하여 RabbitMQ와 통신하는 경우 일반적으로 pika 모듈을 사용하여 처리를 하고 있습니다. 보통 연결라이브러리는 librabbitmq 모듈을 랩핑하여 많이 사용하는데, 이 librabbitmq를 활용하여 메시지를 보내고 받는 코드를 아래와 같이 사용할 수 있습니다.
Direct exchange와 routing key를 이용하는 방식으로 통신을 할 수 있으며, exchage에 queue를 바인딩하여 처리하는 구조를 가지고 있습니다.
amq.direct라는 exchagne에 HA로 구성된 큐에 데이터를 전송합니다. 해당 데이터는 TestQueue와 맵핑되어 있습니다.
1. Sender.py
from librabbitmq import Connection import datetime import time, random, math class MqSender( object ): def __init__( self, partition, interval = 1, duration = 360, seed = 0, host = '192.168.0.1', verbose = True ): self._hostname = host self._port = 5672 self._exchange_name = "amq.direct" self._vhost = 'dev-quant' self._userid = 'quant' self._password = 'quant1234' self._duration = duration self._interval = interval self._routingKey = 'AA' self._queueId = 'TestQueue' print 'MqSender Init function' def rabbitmq_connect( self ): try: self._connection = Connection(host=self._hostname, port=self._port, userid=self._userid, password=self._password, virtual_host=self._vhost) print self._connection self._channel = self._connection.channel() self._channel.exchange_declare( self._exchange_name, type='direct', durable=True) self._channel.queue_declare( self._queueId, exclusive=False, durable = True) self._channel.queue_bind(self._queueId, self._exchange_name, self._routingKey) except Exception, e: print 'Can not connect to rabbitmq server' print e def close( self ): self._connection.close() def run( self ): properties = {'application_headers': {}, 'delivery_mode': 2, 'content_encoding': u'binary','content_type': u'application/x-python-serialize'} for serial in range( 0, self._duration ): print 'message sent, #', serial message = 'msg' + str(serial) self._channel.basic_publish(message, self._exchange_name, self._routingKey) time.sleep( self._interval ) print 'done', datetime.datetime.now() self.close() import sys def main(): _formatCode = lambda x: '0' * max( 0, ( 6 - len( str( x ) ) ) ) + str( x ) partition = dict() partition[ 'XKRX-A' ] = tuple( [ 'XKRX-CS-KR-' + _formatCode( i ) for i in range( 100, 105 ) ] ) partition[ 'XKRX-B' ] = tuple( [ 'XKRX-CS-KR-' + _formatCode( i ) for i in range( 250, 258 ) ] ) s = MqSender( partition, duration = 1000 ) try: s.rabbitmq_connect() s.run() except KeyboardInterrupt: s.close() if __name__ == '__main__': sys.exit( main() )
2. Receiver.py
from librabbitmq import Connection import datetime import time, random, math class MqReceiver( object ): def __init__( self, interval = 1, duration = 360, seed = 0, host = '192.168.0.1', verbose = True ): self._hostname = host self._port = 5672 self._exchange_name = "amq.direct" self._vhost = 'dev-quant' self._userid = 'quant' self._password = 'quant1234' self._duration = duration self._interval = interval self._routingKey = 'AA' self._queueId = 'TestQueue' print 'MqReceiver Init function' def rabbitmq_connect( self ): try: self._connection = Connection(host=self._hostname, port=self._port, userid=self._userid, password=self._password, virtual_host=self._vhost) print self._connection self._channel = self._connection.channel() self._channel.exchange_declare( self._exchange_name, type='direct', durable=True) self._channel.queue_declare( self._queueId, exclusive=False, durable = True) except Exception, e: print 'Can not connect to rabbitmq server' print e def close( self ): self._connection.close() def dump_message( self, message): print("Body:'%s', Proeprties:'%s', DeliveryInfo:'%s'" % (message.body, message.properties, message.delivery_info)) message.ack() def run( self ): self._channel.basic_consume(self._queueId, callback=self.dump_message) while True: self._connection.drain_events(); import sys def main(): s = MqReceiver( duration = 10 ) try: s.rabbitmq_connect() s.run() except KeyboardInterrupt: s.close() if __name__ == '__main__': sys.exit( main() )