Python: "Batteries Included"
Instead of looking for a data warehouse such as RabbitMQ, Redis or RDBMS, I think that for python and several libraries is more than enough to solve this problem. Some may complain that this individual approach reinvents the wheel, but I prefer to use a hundred lines of python code to manage another data store.
Priority Queue Implementation
The operations that you define: add, accept, advertise and lower, describe the priority queue. Unfortunately, python has no built-in priority queue data type. But it does have a heap library called heapq, and priority queues are often implemented as heaps. Here, my priority queue implementation meets your requirements:
class PQueue: """ Implements a priority queue with append, take, promote, and demote operations. """ def __init__(self): """ Initialize empty priority queue. self.toll is max(priority) and max(rowid) in the queue self.heap is the heap maintained for take command self.rows is a mapping from rowid to items self.pris is a mapping from priority to items """ self.toll = 0 self.heap = list() self.rows = dict() self.pris = dict() def append(self, value): """ Append value to our priority queue. The new value is added with lowest priority as an item. Items are threeple lists consisting of [priority, rowid, value]. The rowid is used by the promote/demote commands. Returns the new rowid corresponding to the new item. """ self.toll += 1 item = [self.toll, self.toll, value] self.heap.append(item) self.rows[self.toll] = item self.pris[self.toll] = item return self.toll def take(self): """ Take the highest priority item out of the queue. Returns the value of the item. """ item = heapq.heappop(self.heap) del self.pris[item[0]] del self.rows[item[1]] return item[2] def promote(self, rowid): """ Promote an item in the queue. The promoted item swaps position with the next highest item. Returns the number of affected rows. """ if rowid not in self.rows: return 0 item = self.rows[rowid] item_pri, item_row, item_val = item next = item_pri - 1 if next in self.pris: iota = self.pris[next] iota_pri, iota_row, iota_val = iota iota[1], iota[2] = item_row, item_val item[1], item[2] = iota_row, iota_val self.rows[item_row] = iota self.rows[iota_row] = item return 2 return 0
The demote command is almost identical to the promotion command, so I omit it for brevity. Note that this only depends on the python, dicts, and heapq libraries.
Serving our priority queue
Now with the PQueue data type, we would like to allow distributed interactions with the instance. Great library for this gevent . Although gevent is relatively new and still in beta, it is remarkably fast and well tested. With gevent, we can configure the socket server on the local host: 4040 is pretty easy. Here is my server code:
pqueue = PQueue() def pqueue_server(sock, addr): text = sock.recv(1024) cmds = text.split(' ') if cmds[0] == 'append': result = pqueue.append(cmds[1]) elif cmds[0] == 'take': result = pqueue.take() elif cmds[0] == 'promote': result = pqueue.promote(int(cmds[1])) elif cmds[0] == 'demote': result = pqueue.demote(int(cmds[1])) else: result = '' sock.sendall(str(result)) print 'Request:', text, '; Response:', str(result) if args.listen: server = StreamServer(('127.0.0.1', 4040), pqueue_server) print 'Starting pqueue server on port 4040...' server.serve_forever()
Before this happens, you will of course want to do some better error / buffer handling. But it will be very good for rapid prototyping. Note that this does not require locking around the pqueue object. Gevent does not actually run the code in parallel; it just creates that impression. The downside is that more cores won't help, but the advantage is the free code.
Do not get me wrong, gevent SocketServer will process several requests at the same time. But he switches between responding to requests through collaborative multitasking. This means that you must give a coroutine slice of time. Although gevents socket I / O functions are designed to be implemented, our implementation of pqueue is not. Fortunately, pqueue is quick to complete.
Customer too
During prototyping, I also found it helpful to have a client. To write a client, it took several search queries, so I will also share this code:
if args.client: while True: msg = raw_input('> ') sock = gsocket.socket(gsocket.AF_INET, gsocket.SOCK_STREAM) sock.connect(('127.0.0.1', 4040)) sock.sendall(msg) text = sock.recv(1024) sock.close() print text
To use the new data warehouse, first start the server, and then start the client. At the client’s prompt, you should be able to:
> append one 1 > append two 2 > append three 3 > promote 2 2 > promote 2 0 > take two
Scaling is very good
Given your opinion on the data warehouse, it seems like you are really worried about bandwidth and durability. But "the scale is very good" does not quantify your needs. So I decided to compare the test results with the test function. Here's the test function:
def test(): import time import urllib2 import subprocess import random random = random.Random(0) from progressbar import ProgressBar, Percentage, Bar, ETA widgets = [Percentage(), Bar(), ETA()] def make_name(): alphabet = 'abcdefghijklmnopqrstuvwxyz' return ''.join(random.choice(alphabet) for rpt in xrange(random.randrange(3, 20))) def make_request(cmds): sock = gsocket.socket(gsocket.AF_INET, gsocket.SOCK_STREAM) sock.connect(('127.0.0.1', 4040)) sock.sendall(cmds) text = sock.recv(1024) sock.close() print 'Starting server and waiting 3 seconds.' subprocess.call('start cmd.exe /c python.exe queue_thing_gevent.py -l', shell=True) time.sleep(3) tests = [] def wrap_test(name, limit=10000): def wrap(func): def wrapped(): progress = ProgressBar(widgets=widgets) for rpt in progress(xrange(limit)): func() secs = progress.seconds_elapsed print '{0} {1} records in {2:.3f} s at {3:.3f} r/s'.format( name, limit, secs, limit / secs) tests.append(wrapped) return wrapped return wrap def direct_append(): name = make_name() pqueue.append(name) count = 1000000 @wrap_test('Loaded', count) def direct_append_test(): direct_append() def append(): name = make_name() make_request('append ' + name) @wrap_test('Appended') def append_test(): append() ... print 'Running speed tests.' for tst in tests: tst()
Test results
I conducted 6 tests against a server running on my laptop. I think the results are very good. Here's the conclusion:
Starting server and waiting 3 seconds. Running speed tests. 100%||Time: 0:00:21 Loaded 1000000 records in 21.770 s at 45934.773 r/s 100%||Time: 0:00:06 Appended 10000 records in 6.825 s at 1465.201 r/s 100%||Time: 0:00:06 Promoted 10000 records in 6.270 s at 1594.896 r/s 100%||Time: 0:00:05 Demoted 10000 records in 5.686 s at 1758.706 r/s 100%||Time: 0:00:05 Took 10000 records in 5.950 s at 1680.672 r/s 100%||Time: 0:00:07 Mixed load processed 10000 records in 7.410 s at 1349.528 r/s
End Boundary: Strength
Finally, longevity is the only problem that I am not fully prototype. But I don’t think so either. In our priority queue, the heap (list) of elements has all the information we need to save the data type on disk. Since, with gevent, we can also run functions in multiprocessor mode, I suggested that I use this function:
def save_heap(heap, toll): name = 'heap-{0}.txt'.format(toll) with open(name, 'w') as temp: for val in heap: temp.write(str(val)) gevent.sleep(0)
and adding the save function to our priority queue:
def save(self): heap_copy = tuple(self.heap) toll = self.toll gevent.spawn(save_heap, heap_copy, toll)
Now you can copy the Redis forking model and write the data store to disk every few minutes. If you need even more durability, then connect the above to a system that writes commands to disk. Together, these are the persistence methods AFP and RDB that Redis uses.