How to implement a distributed similar queue role on top of the RBDMS or NOSQL data warehouse or another messaging system (for example, rabbitmq)?

From the category of questions not related to the category of "..."

By "queue-like-thing" I mean supporting the following operations:

  • append (record: record) - add a record to the tail of the queue
  • take (): Entry - remove an entry from the head of the queue and return it
  • promotion (entry_id) - moving a position one position closer to the head; The record that currently holds this position moves to the old position.
  • demote (entry_id) - the opposite of promotion (entry_id)

Additional operations will look something like this:

  • promotion (entry_id, amount) - like promotion (entry_id), except that you specify the number of positions
  • demote (entry_id, amount) - opposite promotion (entry_id, amount)
  • Of course, if we assume that the amount is positive or negative, we can combine the promotion / reduction methods using one method (entry_id, amount)

It would be ideal if the following operations could be performed in a queue in a distributed manner (several clients interacting with the queue):

queue = ... queue.append( a ) queue.append( b ) queue.append( c ) print queue "abc" queue.promote( b.id ) print queue "bac" queue.demote( a.id ) "bca" x = queue.take() print x "b" print queue "ca" 

Are there any data warehouses that are particularly suitable for this use case? The queue must always be in a consistent state, even if several users are simultaneously changing the queue.

If it were not for the move / lower / move requirement, the problem would not be significant.

Edit: Bonus points if there are Java and / or Python libraries to implement the task described above.

The solution should scale very well.

+7
source share
6 answers

Python: "Batteries Included"

Instead of looking for a data warehouse such as RabbitMQ, Redis or RDBMS, I think that for python and several libraries is more than enough to solve this problem. Some may complain that this individual approach reinvents the wheel, but I prefer to use a hundred lines of python code to manage another data store.

Priority Queue Implementation

The operations that you define: add, accept, advertise and lower, describe the priority queue. Unfortunately, python has no built-in priority queue data type. But it does have a heap library called heapq, and priority queues are often implemented as heaps. Here, my priority queue implementation meets your requirements:

 class PQueue: """ Implements a priority queue with append, take, promote, and demote operations. """ def __init__(self): """ Initialize empty priority queue. self.toll is max(priority) and max(rowid) in the queue self.heap is the heap maintained for take command self.rows is a mapping from rowid to items self.pris is a mapping from priority to items """ self.toll = 0 self.heap = list() self.rows = dict() self.pris = dict() def append(self, value): """ Append value to our priority queue. The new value is added with lowest priority as an item. Items are threeple lists consisting of [priority, rowid, value]. The rowid is used by the promote/demote commands. Returns the new rowid corresponding to the new item. """ self.toll += 1 item = [self.toll, self.toll, value] self.heap.append(item) self.rows[self.toll] = item self.pris[self.toll] = item return self.toll def take(self): """ Take the highest priority item out of the queue. Returns the value of the item. """ item = heapq.heappop(self.heap) del self.pris[item[0]] del self.rows[item[1]] return item[2] def promote(self, rowid): """ Promote an item in the queue. The promoted item swaps position with the next highest item. Returns the number of affected rows. """ if rowid not in self.rows: return 0 item = self.rows[rowid] item_pri, item_row, item_val = item next = item_pri - 1 if next in self.pris: iota = self.pris[next] iota_pri, iota_row, iota_val = iota iota[1], iota[2] = item_row, item_val item[1], item[2] = iota_row, iota_val self.rows[item_row] = iota self.rows[iota_row] = item return 2 return 0 

The demote command is almost identical to the promotion command, so I omit it for brevity. Note that this only depends on the python, dicts, and heapq libraries.

Serving our priority queue

Now with the PQueue data type, we would like to allow distributed interactions with the instance. Great library for this gevent . Although gevent is relatively new and still in beta, it is remarkably fast and well tested. With gevent, we can configure the socket server on the local host: 4040 is pretty easy. Here is my server code:

 pqueue = PQueue() def pqueue_server(sock, addr): text = sock.recv(1024) cmds = text.split(' ') if cmds[0] == 'append': result = pqueue.append(cmds[1]) elif cmds[0] == 'take': result = pqueue.take() elif cmds[0] == 'promote': result = pqueue.promote(int(cmds[1])) elif cmds[0] == 'demote': result = pqueue.demote(int(cmds[1])) else: result = '' sock.sendall(str(result)) print 'Request:', text, '; Response:', str(result) if args.listen: server = StreamServer(('127.0.0.1', 4040), pqueue_server) print 'Starting pqueue server on port 4040...' server.serve_forever() 

Before this happens, you will of course want to do some better error / buffer handling. But it will be very good for rapid prototyping. Note that this does not require locking around the pqueue object. Gevent does not actually run the code in parallel; it just creates that impression. The downside is that more cores won't help, but the advantage is the free code.

Do not get me wrong, gevent SocketServer will process several requests at the same time. But he switches between responding to requests through collaborative multitasking. This means that you must give a coroutine slice of time. Although gevents socket I / O functions are designed to be implemented, our implementation of pqueue is not. Fortunately, pqueue is quick to complete.

Customer too

During prototyping, I also found it helpful to have a client. To write a client, it took several search queries, so I will also share this code:

 if args.client: while True: msg = raw_input('> ') sock = gsocket.socket(gsocket.AF_INET, gsocket.SOCK_STREAM) sock.connect(('127.0.0.1', 4040)) sock.sendall(msg) text = sock.recv(1024) sock.close() print text 

To use the new data warehouse, first start the server, and then start the client. At the client’s prompt, you should be able to:

 > append one 1 > append two 2 > append three 3 > promote 2 2 > promote 2 0 > take two 

Scaling is very good

Given your opinion on the data warehouse, it seems like you are really worried about bandwidth and durability. But "the scale is very good" does not quantify your needs. So I decided to compare the test results with the test function. Here's the test function:

 def test(): import time import urllib2 import subprocess import random random = random.Random(0) from progressbar import ProgressBar, Percentage, Bar, ETA widgets = [Percentage(), Bar(), ETA()] def make_name(): alphabet = 'abcdefghijklmnopqrstuvwxyz' return ''.join(random.choice(alphabet) for rpt in xrange(random.randrange(3, 20))) def make_request(cmds): sock = gsocket.socket(gsocket.AF_INET, gsocket.SOCK_STREAM) sock.connect(('127.0.0.1', 4040)) sock.sendall(cmds) text = sock.recv(1024) sock.close() print 'Starting server and waiting 3 seconds.' subprocess.call('start cmd.exe /c python.exe queue_thing_gevent.py -l', shell=True) time.sleep(3) tests = [] def wrap_test(name, limit=10000): def wrap(func): def wrapped(): progress = ProgressBar(widgets=widgets) for rpt in progress(xrange(limit)): func() secs = progress.seconds_elapsed print '{0} {1} records in {2:.3f} s at {3:.3f} r/s'.format( name, limit, secs, limit / secs) tests.append(wrapped) return wrapped return wrap def direct_append(): name = make_name() pqueue.append(name) count = 1000000 @wrap_test('Loaded', count) def direct_append_test(): direct_append() def append(): name = make_name() make_request('append ' + name) @wrap_test('Appended') def append_test(): append() ... print 'Running speed tests.' for tst in tests: tst() 

Test results

I conducted 6 tests against a server running on my laptop. I think the results are very good. Here's the conclusion:

 Starting server and waiting 3 seconds. Running speed tests. 100%|############################################################|Time: 0:00:21 Loaded 1000000 records in 21.770 s at 45934.773 r/s 100%|############################################################|Time: 0:00:06 Appended 10000 records in 6.825 s at 1465.201 r/s 100%|############################################################|Time: 0:00:06 Promoted 10000 records in 6.270 s at 1594.896 r/s 100%|############################################################|Time: 0:00:05 Demoted 10000 records in 5.686 s at 1758.706 r/s 100%|############################################################|Time: 0:00:05 Took 10000 records in 5.950 s at 1680.672 r/s 100%|############################################################|Time: 0:00:07 Mixed load processed 10000 records in 7.410 s at 1349.528 r/s 

End Boundary: Strength

Finally, longevity is the only problem that I am not fully prototype. But I don’t think so either. In our priority queue, the heap (list) of elements has all the information we need to save the data type on disk. Since, with gevent, we can also run functions in multiprocessor mode, I suggested that I use this function:

 def save_heap(heap, toll): name = 'heap-{0}.txt'.format(toll) with open(name, 'w') as temp: for val in heap: temp.write(str(val)) gevent.sleep(0) 

and adding the save function to our priority queue:

 def save(self): heap_copy = tuple(self.heap) toll = self.toll gevent.spawn(save_heap, heap_copy, toll) 

Now you can copy the Redis forking model and write the data store to disk every few minutes. If you need even more durability, then connect the above to a system that writes commands to disk. Together, these are the persistence methods AFP and RDB that Redis uses.

+4
source

Redis supports lists and ordered sets: http://redis.io/topics/data-types#lists

It also supports transactions and publishes / signs messages. So yes, I would say that this can be done easily on redis.

Update: in fact, about 80% of it has been done many times: http://www.google.co.uk/search?q=python+redis+queue

Some of these hits can be updated to add what you want. You will need to use transactions to perform promotion / demote operations.

Perhaps you can use lua on the server side to create this functionality, rather than having it in the client code. In addition, you can create a thin shell around redis on the server, which implements only those operations that you want.

+7
source

Websphere MQ can do almost all of this.

Advance / decrease is possible practically by removing the message from the queue and re-enabling it with a higher / lower priority or using "CORRELID" as the sequence number.

+2
source

What happened to RabbitMQ? It sounds exactly the way you need it.

We also use Redis extensively in our production environment, but it does not have some features. Typically, queues have, for example, setting a task as completed or resending a task if it is not completed in some TTLs.On the other hand, it has other functions that are not in Queue, for example, it is a universal repository, and it is REALLY fast.

+2
source

If for some reason you decided to use the SQL database as a backend, I would not use MySQL, because it requires a poll (well, and will not use it for many other reasons), but PostgreSQL supports LISTEN / NOTIFY to signal other clients. so that they can’t poll the changes. However, it simultaneously signals all listening clients, so you will need a mechanism to select a winner.

As a support, I'm not sure that the promotion / downgrade mechanism will be useful; it would be better to schedule tasks properly when pasting ...

0
source

Use Redisson , it implements the familiar interfaces List , Queue , BlockingQueue , Deque java in the Redis distributed approach. Deque :

 Redisson redisson = Redisson.create(); RDeque<SomeObject> queue = redisson.getDeque("anyDeque"); queue.addFirst(new SomeObject()); queue.addLast(new SomeObject()); SomeObject obj = queue.removeFirst(); SomeObject someObj = queue.removeLast(); redisson.shutdown(); 

Other samples:

https://github.com/mrniko/redisson/wiki/7.-distributed-collections/#77-list
https://github.com/mrniko/redisson/wiki/7.-distributed-collections/#78-queue https://github.com/mrniko/redisson/wiki/7.-distributed-collections/#710-blocking -queue

0
source

All Articles