RabbitMQ closes the connection when processing long task starts and timeout creates errors

I use the manufacturer RabbitMQ to send long-term tasks (30 minutes +) to the consumer. The problem is that the consumer is still working on the task when the connection to the server is closed and an unconfirmed task is requested.

From the study, I understand that either a heartbeat or an increased connection timeout can be used to solve this problem. Both of these solutions cause errors when they are tried. When reading replies to similar messages, I also found out that many changes were made to RabbitMQ because the answers were published (for example, the default timeout was changed to 60 from 580 to RabbitMQ 3.5.5).

When specifying a heartbeat and a blocked connection timeout:

credentials = pika.PlainCredentials('user', 'password') parameters = pika.ConnectionParameters('XXX.XXX.XXX.XXX', port, '/', credentials, blocked_connection_timeout=2000) connection = pika.BlockingConnection(parameters) channel = connection.channel() 

The following error is displayed:

 TypeError: __init__() got an unexpected keyword argument 'blocked_connection_timeout' 

When heartbeat_interval=1000 specified, a similar error is shown in the connection parameters: TypeError: __init__() got an unexpected keyword argument 'heartbeat_interval'

Similarly, for socket_timeout = 1000 , the following error is displayed: TypeError: __init__() got an unexpected keyword argument 'socket_timeout'

I am running RabbitMQ 3.6.1, pika 0.10.0 and python 2.7 on Ubuntu 14.04.

  • Why do the above approaches lead to errors?
  • Can a heart attack approach be used when there is continuous continuous work? For example, can heartbeats be used when doing large database joins that take 30+ minutes? I am in favor of repeatedly thinking about how many times it is difficult to judge how long such a task as combining a database will be carried out.

I read answers to similar questions

Update : running code from the pika documentation leads to the same error.

+9
python rabbitmq amqp pika python-pika
source share
2 answers

I ran into the same problem with my systems that you see with a disconnected connection during very long tasks.

Perhaps a heartbeat can help keep your connection alive if your network setup is such that idle TCP / IP connections are greatly removed. If this is not the case, a change in heart rate will not help.

Changing the connection timeout will not help at all. This parameter is used only during the initial connection creation.

I use the manufacturer RabbitMQ to send long-term tasks (30 minutes +) to the consumer. The problem is that the consumer is still working on the task when the connection to the server is closed and an unconfirmed task is requested.

There are two reasons for this: both of them are already running:

  • Connections fall randomly, even in the best of circumstances.
  • Re-starting the process due to a message with a re-queue can cause problems.

By deploying RabbitMQ code with tasks that range from less than a second to several hours in time, I found that instant message confirmation and system updates with status messages are best suited for very long tasks, for example.

You will need a recording system (possibly with a database) that tracks the status of a given job.

When the consumer takes the message and starts the process, he must immediately acknowledge the message and send a message about the status of the "start" in the recording system.

At the end of the process, send another message to say this.

This will not solve the problem with a disconnected connection, but nothing will solve it 100%. Instead, it will prevent re-posting problems when the connection is deleted.

This solution really introduces another problem: when the process of long work falls, how do you resume work?

The main answer is to use the status of the recording system (your database) for the job, to inform you that you need to pick up this work again. When the application starts, check the database to make sure there is incomplete work. If so, resume or restart that work in any way suitable.

+7
source share

I have already seen this problem. The reason is that you are declaring to use this queue. but you did not tie the queue in the exchange.

eg:

  @Bean(name = "test_queue") public Queue testQueue() { return queue("test_queue"); } @RabbitListener(queues = "test_queue_1") public void listenCreateEvent(){ } 

if you are listening, the queue is not tied to an exchange. this will happen.

0
source share

All Articles