I encoded a working C application (rabbitmq-c) that consumes a queue published by a Python script (pika).
I have the following strange behavior that I cannot solve:
- Running all workers before posting to the queue works as expected
- Starting 1 employee after publishing a queue works as expected
- HOWEVER: Launching additional workers after the employee has started consuming from the queue means that these workers do not see any messages in the queue (number of messages = 0) and therefore just wait (Eventhough there means there are still many messages in the queue ) Killing the first employee, he will confuse messages that apply to all other (waiting) consumers.
Any ideas what could happen?
I tried to make sure that each consumer has their own channel (is this necessary?), But still the same behavior ...
Here is the code for the consumer (employee):
conn = amqp_new_connection();
sock = (amqp_socket_t *)(uint64_t)amqp_tcp_socket_new(conn);
amqp_socket_open(sock, "localhost", 5672);
amqp_login(conn,
"/",
0,
131072,
0,
AMQP_SASL_METHOD_PLAIN,
"guest",
"guest");
if (amqp_channel_open(conn, chan) == NULL)
LOG_ERR(" [!] Failed to open amqp channel!\n");
if ((q = amqp_queue_declare(conn,
chan,
amqp_cstring_bytes("ranges"),
0,
0,
0,
0,
amqp_empty_table)) == NULL)
LOG_ERR(" [!] Failed to declare queue!\n");
LOG_INFO(" [x] Queue (message count = %d)\n", q->message_count);
amqp_queue_bind(conn, chan, amqp_cstring_bytes("ranges"), amqp_empty_bytes, amqp_empty_table);
amqp_basic_consume(conn, chan, amqp_cstring_bytes("ranges"), amqp_empty_bytes, 0, 0, 0, amqp_empty_table);
while(1) {
amqp_maybe_release_buffers(conn);
amqp_consume_message(conn, &e, NULL, 0);
{
int n;
amqp_frame_t f;
unsigned char buf[8];
unsigned char *pbuf = buf;
amqp_simple_wait_frame(conn, &f);
amqp_simple_wait_frame(conn, &f);
n = f.payload.properties.body_size;
if (n != sizeof(range_buf))
LOG_ERR(" [!] Invalid message size!");
while (n) {
amqp_simple_wait_frame(conn, &f);
memcpy(pbuf,
f.payload.body_fragment.bytes,
f.payload.body_fragment.len);
n -= f.payload.body_fragment.len;
pbuf += f.payload.body_fragment.len;
}
LOG_INFO(" [x] Message recevied from queue\n");
}
amqp_destroy_envelope(&e);
amqp_maybe_release_buffers(conn);
}