AMQP dynamically creates a queue subscription

I am trying to create a simple chat application using AMQP, Websockets and Ruby. I understand that this is not a good example of using AMQP, but I would like to understand where I am going wrong.

Below is my amqp server code

require 'rubygems' require 'amqp' require 'mongo' require 'em-websocket' require 'json' class MessageParser # message format => "room:harry_potter, nickname:siddharth, room:members" def self.parse(message) parsed_message = JSON.parse(message) response = {} if parsed_message['status'] == 'status' response[:status] = 'STATUS' response[:username] = parsed_message['username'] response[:roomname] = parsed_message['roomname'] elsif parsed_message['status'] == 'message' response[:status] = 'MESSAGE' response[:message] = parsed_message['message'] response[:roomname] = parsed_message['roomname'].split().join('_') end response end end class MongoManager def self.establish_connection(database) @db ||= Mongo::Connection.new('localhost', 27017).db(database) @db.collection('rooms') @db end end @sockets = [] EventMachine.run do connection = AMQP.connect(:host => '127.0.0.1') channel = AMQP::Channel.new(connection) puts "Connected to AMQP broker. #{AMQP::VERSION} " mongo = MongoManager.establish_connection("trackertalk_development") EventMachine::WebSocket.start(:host => '127.0.0.1', :port => 8080) do |ws| socket_detail = {:socket => ws} ws.onopen do @sockets << socket_detail end ws.onmessage do |message| status = MessageParser.parse(message) exchange = channel.fanout(status[:roomname].split().join('_')) if status[:status] == 'STATUS' queue = channel.queue(status[:username], :durable => true) unless queue.subscribed? puts "--------- SUBSCRIBED --------------" queue.bind(exchange).subscribe do |payload| puts "PAYLOAD : #{payload}" ws.send(payload) end else puts "----ALREADY SUBSCRIBED" end # only after 0.8.0rc14 #queue = channel.queue(status[:username], :durable => true) #AMQP::Consumer.new(channel, queue) elsif status[:status] == 'MESSAGE' puts "********************* Message- published ******************************" exchange.publish(status[:message) end end ws.onclose do @sockets.delete ws end end end 

I use status to indicate whether the incoming message is a message for the current chat or for a status message that requires me to process responsibilities such as subscribing to a queue.

The problem I am facing is that when I send a message like socket.send(JSON.stringify({status:'message', message:'test', roomname:'Harry Potter'}))

exchange.publish' is called but it still doesn't get pushed via the ws.send` in the browser.

Is there anything fundamentally wrong with my understanding of EventMachine and AMQP?

Here is the paste for the same code http://pastie.org/private/xosgb8tw1w5vuroa4w7a

My code works as desired when I remove durable => true from queue = channel.queue(status[:username], :durable => true)

Below is a snippet of my Rails view that identifies the username and room name and sends it as part of the message through the Web.

Although the code works when I remove durable => true , I don’t understand why this affects the message being sent. I ask you to ignore part of the mongo, since it still does not play any role.

I would also like to know if my approach to AMQP is correct and its use

 <script> $(document).ready(function(){ var username = '<%= @user.email %>'; var roomname = 'Bazingaa'; socket = new WebSocket('ws://127.0.0.1:8080/'); socket.onopen = function(msg){ console.log('connected'); socket.send(JSON.stringify({status:'status', username:username, roomname:roomname})); } socket.onmessage = function(msg){ $('#chat-log').append(msg.data); } }); </script> <div class='block'> <div class='content'> <h2 class='title'><%= @room.name %></h2> <div class='inner'> <div id="chat-log"> </div> <div id="chat-console"> <textarea rows="5" cols="40"></textarea> </div> </div> </div> </div> <style> #chat-log{ color:#000; font-weight:bold; margin-top:1em; width:900px; overflow:auto; height:300px; } #chat-console{ bottom:10px; } textarea{ width:100%; height:60px; } </style> 
+8
ruby websocket amqp
source share
2 answers

I think your problem may be that the queue hangs on the broker between ws.onmessage calls. When the client re-binds the queue and the bind already exists, therefore ws.send () does not receive the call.

By default, when you create a queue, it and any bindings that it has, hang until the broker restarts, or you explicitly do not suggest the broker to delete it.

There are two ways to change this:

  • Adding the durable flag when creating a queue, which will cause the queue to hang even if the broker restarts
  • Adding the auto_delete flag, which will cause the broker to automatically delete the object after a short period of time when it is not connected to it.

If you have control over the broker, you are using the rabbitmq broker, an easy way to understand what is happening with the broker is to install a management plugin that provides a web interface for exchanging, linking and queuing at brokers.

+1
source share

In the first case, the AMQP bits seem OK, but I do not want to configure all the dependencies. If you provide a minimal example of just an AMQP part, I will check it out.

0
source share

All Articles