AMQP RabbitMQ users block each other?

I encoded a working C application (rabbitmq-c) that consumes a queue published by a Python script (pika).

I have the following strange behavior that I cannot solve:

  • Running all workers before posting to the queue works as expected
  • Starting 1 employee after publishing a queue works as expected
  • HOWEVER: Launching additional workers after the employee has started consuming from the queue means that these workers do not see any messages in the queue (number of messages = 0) and therefore just wait (Eventhough there means there are still many messages in the queue ) Killing the first employee, he will confuse messages that apply to all other (waiting) consumers.

Any ideas what could happen?

I tried to make sure that each consumer has their own channel (is this necessary?), But still the same behavior ...

Here is the code for the consumer (employee):

conn = amqp_new_connection();
sock = (amqp_socket_t *)(uint64_t)amqp_tcp_socket_new(conn);
amqp_socket_open(sock, "localhost", 5672);
amqp_login(conn,
           "/",
           0,
           131072,
           0,
           AMQP_SASL_METHOD_PLAIN,
           "guest",
           "guest");

if (amqp_channel_open(conn, chan) == NULL)
    LOG_ERR(" [!] Failed to open amqp channel!\n");

if ((q = amqp_queue_declare(conn,
                            chan,
                            amqp_cstring_bytes("ranges"),
                            0,
                            0,
                            0,
                            0,
                            amqp_empty_table)) == NULL)
    LOG_ERR(" [!] Failed to declare queue!\n");

LOG_INFO(" [x] Queue (message count = %d)\n", q->message_count);

amqp_queue_bind(conn, chan, amqp_cstring_bytes("ranges"), amqp_empty_bytes, amqp_empty_table);
amqp_basic_consume(conn, chan, amqp_cstring_bytes("ranges"), amqp_empty_bytes, 0, 0, 0, amqp_empty_table);

while(1) {
    amqp_maybe_release_buffers(conn);
    amqp_consume_message(conn, &e, NULL, 0);

    {
        int n;
        amqp_frame_t f;
        unsigned char buf[8];
        unsigned char *pbuf = buf;

        amqp_simple_wait_frame(conn, &f);       // METHOD frame
        amqp_simple_wait_frame(conn, &f);       // HEADER frame

        n = f.payload.properties.body_size;
        if (n != sizeof(range_buf))
            LOG_ERR(" [!] Invalid message size!");

        while (n) {
            amqp_simple_wait_frame(conn, &f);   // BODY frame
            memcpy(pbuf,
                   f.payload.body_fragment.bytes,
                   f.payload.body_fragment.len);
            n -= f.payload.body_fragment.len;
            pbuf += f.payload.body_fragment.len;
        }

        // do something with buf

        LOG_INFO(" [x] Message recevied from queue\n");
    }

    amqp_destroy_envelope(&e);

    amqp_maybe_release_buffers(conn);
}
+4
source share
2 answers

The problem here is most likely that your consumer pre-requests all messages at startup. This is the default behavior of RabbitMQ, but you can reduce the number of messages pre-typed by the consumer so that you can better distribute the workload among several workers.

It simply means that one or more consumers will receive all messages and not leave them for new users.

qos , 10 . 10 , .

, , amqp_basic_qos, .

+4

. , , . , RabbitMQ , . , , , . , , .

- . , , ​​ .

, RabbitMQ . Ack (nowledgement) , RabbitMQ, , RabbitMQ .

ack, RabbitMQ , . , , , .

- ; RabbitMQ , . , .

.

0

All Articles