How can I put a task on Celery from C #?

As far as I understand, message brokers such as RabbitMQ help different applications written in different languages ​​/ platforms communicate with each other. Since celery can use RabbitMQ as a message broker, I believe that we can set the task from any application in Celery, although the manufacturer is not written in Python.

Now I'm trying to figure out how I can put a task on celery from an application written in C # via RabbitMQ. But so far I have not found such an example.

The only information close to this, I found this SO question

In cases where the accepted answer suggests using the Celery message format protocol to send messages to RabbitMQ with Java. However, the link provided in the answer has no example, only the message format.

In addition, the message format indicates that a task identifier (UUID) is required for communication in this protocol. How should my C # application know the celery task task id? As far as I understand, he can only know the name of the task, but not the identifier of the task.

+7
source share
3 answers

I do not know if the question is relevant, but I hope the answer will help others.

Here's how I managed to turn the task into an example of a Celery employee .

  1. You need to establish a connection between your manufacturer (client) and RabbitMQ, as described here .

    ConnectionFactory factory = new ConnectionFactory(); factory.UserName = username; factory.Password = password; factory.VirtualHost = virtualhost; factory.HostName = hostname; factory.Port = port; IConnection connection = factory.CreateConnection(); IModel channel = connection.CreateModel(); 

    In the RabbitMQ configuration, by default there is only the Guest user, which can only be used for local connections (from 127.0.0.1). The answer to this question explains how to identify users in RabbitMQ.

  2. Next is creating a callback to get the results. This example uses Direct reply-to , so the response listener will look like this:

      var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var ansBody = ea.Body; var ansMessage = Encoding.UTF8.GetString(ansBody); Console.WriteLine(" [x] Received {0}", ansMessage); Console.WriteLine(" [x] Done"); }; channel.BasicConsume(queue: "amq.rabbitmq.reply-to", noAck: true, consumer: consumer); 
  3. Creating a task message that Celery will use:

      IDictionary<string, object> headers = new Dictionary<string, object>(); headers.Add("task", "tasks.add"); Guid id = Guid.NewGuid(); headers.Add("id", id.ToString()); IBasicProperties props = channel.CreateBasicProperties(); props.Headers = headers; props.CorrelationId = (string)headers["id"]; props.ContentEncoding = "utf-8"; props.ContentType = "application/json"; props.ReplyTo = "amq.rabbitmq.reply-to"; object[] taskArgs = new object[] { 1, 200 }; object[] arguments = new object[] { taskArgs, new object(), new object()}; MemoryStream stream = new MemoryStream(); DataContractJsonSerializer ser = new DataContractJsonSerializer(typeof(object[])); ser.WriteObject(stream, arguments); stream.Position = 0; StreamReader sr = new StreamReader(stream); string message = sr.ReadToEnd(); var body = Encoding.UTF8.GetBytes(message); 
  4. And finally, posting to RabbitMQ:

      channel.BasicPublish(exchange: "", routingKey: "celery", basicProperties: props, body: body); 
+5
source

Celery comes with a flower. Flower provides a REST API for task management. https://flower.readthedocs.io/en/latest/api.html#post--api-task-async-apply-(.+) In most cases, it will be much easier and more reliable to use than creating tasks manually and pasting them into MQ.

0
source

All Articles