Comsuming MassTransit from Python or other languages

I have a simple publisher in MassTransit. I am sending a message at intervals and can receive it from the .NET client using MassTransit. But when I try to observe something with Python, it is silent. Is there a way to use MassTransit with Python or other languages? Examples were appreciated.

Publisher:

builder.Register(c => ServiceBusFactory.New(sbc => { sbc.UseRabbitMq(); sbc.UseBsonSerializer(); sbc.UseLog4Net(); sbc.ReceiveFrom("rabbitmq://localhost/masstransit"); }); 

.NET Client:

 public void Execute(IJobExecutionContext context) { using (var scope = ServiceLocator.Current.GetInstance<ILifetimeScope>().BeginLifetimeScope()) { var log = scope.Resolve<ILog>(); log.Debug("Sending queue message"); var bus = scope.Resolve<IServiceBus>(); bus.Publish(new SimpleTextMessage{Text = "some text"}); } } 

Python client:

 import pika print('Stating consumer') connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='python_consumer_1') print ' [*] Waiting for messages. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] Received %r" % (body,) channel.basic_consume(callback, queue='python_consumer_1') channel.start_consuming() 

The trace from the C # application:

 Configuration Result: [Success] Name MyApp [Success] ServiceName MyApp Topshelf v3.1.122.0, .NET Framework v4.0.30319.34003 INFO (MassTransit.BusConfigurators.ServiceBusConfiguratorImpl) 209 - MassTransit v2.9.2/v2.9.0.0, .NET Framework v4.0.30319.34003 DEBUG(MassTransit.Transports.RabbitMq.RabbitMqTransportFactory) 245 - CreatingRabbitMQ connection: rabbitmq://localhost/groups_error DEBUG(MassTransit.Transports.RabbitMq.RabbitMqTransportFactory) 246 - Using default configurator for connection: rabbitmq://localhost/groups_error DEBUG(MassTransit.Transports.RabbitMq.RabbitMqTransportFactory) 251 - RabbitMQconnection created: localhost:5672// DEBUG(MassTransit.Transports.RabbitMq.RabbitMqTransportFactory) 921 - Creating RabbitMQ connection: rabbitmq://localhost/groups DEBUG(MassTransit.Transports.RabbitMq.RabbitMqTransportFactory) 922 - Using default configurator for connection: rabbitmq://localhost/groups DEBUG(MassTransit.Transports.RabbitMq.RabbitMqTransportFactory) 924 - RabbitMQconnection created: localhost:5672// DEBUG(MassTransit.ServiceContainer) 1056 - Starting bus service: MassTransit.Subscriptions.Coordinator.SubscriptionRouterService DEBUG(MassTransit.ServiceContainer) 1062 - Starting bus service: MassTransit.Subscriptions.SubscriptionBusService DEBUG(MassTransit.Threading.ThreadPoolConsumerPool) 1080 - Starting Consumer Pool for rabbitmq://localhost/groups [Topshelf.Quartz] Scheduled Job: DEFAULT.ea637337-950a-4281-99c0-f10b842814c9 [Topshelf.Quartz] Job Schedule: Trigger 'DEFAULT.8a1d0b7c-d670-440b-974f-31ec8be6f294': triggerClass: 'Quartz.Impl.Triggers.SimpleTriggerImpl calendar: '' misfireInstruction: 0 nextFireTime: 01/28/2014 07:12:35 +00:00 - Next Fire Time (local): 28.01.2014 9:12:35 +02:00 [Topshelf.Quartz] Scheduler started... DEBUG(MassTransit.Transports.RabbitMq.RabbitMqTransportFactory) 1248 - CreatingRabbitMQ connection: rabbitmq://localhost/MyApp.Transit:SimpleTextMessage_error DEBUG(MassTransit.Transports.RabbitMq.RabbitMqTransportFactory) 1250 - Using default configurator for connection: rabbitmq://localhost/MyApp.Transit:SimpleTextMessage_error DEBUG(Global) 1254 - Sending queue message DEBUG(MassTransit.Transports.RabbitMq.RabbitMqTransportFactory) 1254 - RabbitMQconnection created: localhost:5672// DEBUG(MassTransit.Transports.RabbitMq.RabbitMqTransportFactory) 1272 - CreatingRabbitMQ connection: rabbitmq://localhost/MyApp.Transit:SimpleTextMessage DEBUG(MassTransit.Transports.RabbitMq.RabbitMqTransportFactory) 1273 - Using default configurator for connection: rabbitmq://localhost/MyApp.Transit:SimpleTextMessage DEBUG(MassTransit.Transports.RabbitMq.RabbitMqTransportFactory) 1277 - RabbitMQconnection created: localhost:5672// DEBUG(MassTransit.Messages) 1439 - SEND:rabbitmq://localhost/MyApp.Transit:SimpleTextMessage:935a0000-5d93-0015-961c-08d0ea0f744a:MyApp.Transit.SimpleTextMessage, MyApp DEBUG(MassTransit.Messages) 1441 - SEND:rabbitmq://localhost/MyApp.Transit:SimpleTextMessage:935a0000-5d93-0015-8965-08d0ea0f744b:MyApp.Transit.SimpleTextMessage, MyApp The MyApp service is now running, press Control+C to exit. DEBUG(Global) 21212 - Sending queue message DEBUG(MassTransit.Messages) 21214 - SEND:rabbitmq://localhost/MyApp.Transit:SimpleTextMessage:935a0000-5d93-0015-d4fb-08d0ea0f77c4:MyApp.Transit.SimpleTextMessage, MyApp DEBUG(Global) 41213 - Sending queue message DEBUG(MassTransit.Messages) 41214 - SEND:rabbitmq://localhost/MyApp.Transit:SimpleTextMessage:935a0000-5d93-0015-077b-08d0ea0f7b40:MyApp.Transit.SimpleTextMessage, MyApp DEBUG(Global) 61212 - Sending queue message DEBUG(MassTransit.Messages) 61214 - SEND:rabbitmq://localhost/MyApp.Transit:SimpleTextMessage:935a0000-5d93-0015-2bed-08d0ea0f7ebb:MyApp.Transit.SimpleTextMessage, MyApp DEBUG(Global) 81213 - Sending queue message DEBUG(MassTransit.Messages) 81215 - SEND:rabbitmq://localhost/MyApp.Transit:SimpleTextMessage:935a0000-5d93-0015-5f44-08d0ea0f8236:MyApp.Transit.SimpleTextMessage, MyApp DEBUG(Global) 101212 - Sending queue message DEBUG(MassTransit.Messages) 101214 - SEND:rabbitmq://localhost/MyApp.Transit:SimpleTextMessage:935a0000-5d93-0015-80f8-08d0ea0f85b1:MyApp.Transit.SimpleTextMessage, MyApp DEBUG(Global) 121212 - Sending queue message DEBUG(MassTransit.Messages) 121213 - SEND:rabbitmq://localhost/MyApp.Transit:SimpleTextMessage:935a0000-5d93-0015-a971-08d0ea0f892c:MyApp.Transit.SimpleTextMessage, MyApp DEBUG(Global) 141212 - Sending queue message DEBUG(MassTransit.Messages) 141214 - SEND:rabbitmq://localhost/MyApp.Transit:SimpleTextMessage:935a0000-5d93-0015-d53e-08d0ea0f8ca7:MyApp.Transit.SimpleTextMessage, MyApp DEBUG(Global) 161212 - Sending queue message DEBUG(MassTransit.Messages) 172109 - SEND:rabbitmq://localhost/MyApp.Transit:SimpleTextMessage:935a0000-5d93-0015-7504-08d0ea0f9208:MyApp.Transit.SimpleTextMessage, MyApp DEBUG(Global) 181212 - Sending queue message DEBUG(MassTransit.Messages) 193461 - SEND:rabbitmq://localhost/MyApp.Transit:SimpleTextMessage:935a0000-5d93-0015-dd26-08d0ea0f95bf:MyApp.Transit.SimpleTextMessage, MyApp 
+7
python c # rabbitmq masstransit
source share
3 answers

It seems that the easiest way is to link the python queue to the exchange in RabbitMq management. After that, I successfully received the messages.

Now PyhonConsumer looks like this:

 import pika print('Stating consumer') connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare('python_consumer_1') print ' [*] Waiting for messages. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] Received %r" % (body,) ch.basic_ack(delivery_tag = method.delivery_tag) channel.queue_bind(queue='python_consumer_1', exchange='MyApp.Transit:SimpleTextMessage') channel.basic_consume(callback, queue='python_consumer_1') channel.start_consuming() 
+4
source share

If you intend to use messages from another language, you need to see how exchanges are created when MassTransit publishes a message. Then you will need to bind these exchanges to the queues so that messages are delivered to your subscribers.

For your Python code you need

 exchange_bind(".....:SimpleTextMessage", "phython_consumer_1") 

Once you do this, messages will be delivered to your turn. You are using BSON, why not use JSON or something python is working with? Honestly, I'm not sure if Python supports BSON or not, just trying to offer other suggestions.

+3
source share

You can check existing rabbitmq exchanges, queues using the rabbitmqctl admin utility, and then play the results.

+1
source share

All Articles