How to implement RabbitMQ user using Pyspark Streaming module?

I have an Apache Spark cluster and RabbitMQ broker, and I want to consume messages and calculate some indicators using the pyspark.streaming module.

The problem is that I found this package , but is implemented in Java and Scala. In addition, I have not found a single example or implementation of a bridge in Python.

I have a consumer using Pika , but I don’t know how to transfer the payload to my StreamingContext .

+5
source share
1 answer

This solution uses an example of the asynchronous user pika and socketTextStream from Spark Streaming

  • Download the example and save it as a .py file
  • Modify the file to use your own RabbitMQ credentials and connection settings. In my case, I had to change the Consumer class
  • In if __name__ == '__main__': we need to open a socket with HOST and PORT that matches your TCP connection to Spark Streaming. We need to save the sendall method from socket to variable, pass this to the Consumer class

     with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind((HOST, PORT)) s.listen(1) conn, addr = s.accept() dispatcher = conn.sendall #assigning sendall to dispatcher variable consumer = Consumer(dispatcher) try: consumer.run() except Exception as e: consumer.stop() s.close() 
  • Modify __init__ method in Consumer to pass dispatcher

     def __init__(self,dispatcher): self._connection = None self._channel = None self._closing = False self._consumer_tag = None self._url = amqp_url #new code self._dispatcher = dispatcher 
  • In the on_message method inside the Consumer, we call self._dispatcher to send the body AMQP message

     def on_message(self, unused_channel, basic_deliver, properties, body): self._channel.basic_ack(basic_deliver.delivery_tag) try: # we need an '\n' at the each row Spark socketTextStream self._dispatcher(bytes(body.decode("utf-8")+'\n',"utf-8")) except Exception as e: raise 
  • In Spark, enter ssc.socketTextStream(HOST, int(PORT)) with HOST and PORT corresponding to our TCP socket. Spark will manage the connection

  • Run user first and then Spark app

Concluding remarks:

  • Try running your consumer on a different machine, not on your Spark machine.
  • Any port over 10,000 should be in order. Do not let the kernel open some random port
  • Platform: Linux Debian 7 and 8 and Ubuntu 14.04 and 16.04
  • Pika version 0.10.0
  • Python Version 3.5.2
  • Sparks version 1.6.1, 1.6.2 and 2.0.0
+2
source

All Articles