CherryPy: how to stop and buffer an incoming request during data refresh

I work with cherrypy on a server that implements a RESTful API. The answers suggest some heavy calculations that take about 2 seconds to query. To perform these calculations, some data is used that is updated three times a day.

Data is updated in the background (it takes about half an hour), and after it is updated, links to new data are transferred to functions that respond to requests. It only takes a millisecond.

I need each request to respond either with old data or new data, but request processing cannot take place, while data references are being changed. Ideally, I would like to find a way to buffer an incoming request when data changes, and also to ensure that the links are changed after all requests in the process are complete.

My current (un) working minimum example is as follows:

import time import cherrypy from cherrypy.process import plugins theData = 0 def processData(): """Backround task works for half hour three times a day, and when finishes it publish it in the engine buffer.""" global theData # using global variables to simplify the example theData += 1 cherrypy.engine.publish("doChangeData", theData) class DataPublisher(object): def __init__(self): self.data = 'initData' cherrypy.engine.subscribe('doChangeData', self.changeData) def changeData(self, newData): cherrypy.engine.log("Changing data, buffering should start!") self.data = newData time.sleep(1) #exageration of the 1 milisec of the references update to visualize the problem cherrypy.engine.log("Continue serving buffered and new requests.") @cherrypy.expose def index(self): result = "I get "+str(self.data) cherrypy.engine.log(result) time.sleep(3) return result if __name__ == '__main__': conf = { '/': { 'server.socket_host': '127.0.0.1', 'server.socket_port': 8080} } cherrypy.config.update(conf) btask = plugins.BackgroundTask(5, processData) #5 secs for the example btask.start() cherrypy.quickstart(DataPublisher()) 

If I run this script and also open a browser, put localhost: 8080 and refresh on the page a lot, I get:

 ... [17/Sep/2015:21:32:41] ENGINE Changing data, buffering should start! 127.0.0.1 - - [17/Sep/2015:21:32:41] "GET / HTTP/1.1" 200 7 "... [17/Sep/2015:21:32:42] ENGINE I get 3 [17/Sep/2015:21:32:42] ENGINE Continue serving buffered and new requests. 127.0.0.1 - - [17/Sep/2015:21:24:44] "GET / HTTP/1.1" 200 7 "... ... 

This means that the processing of some queries began before and after the data link starts or ends for the change. I want to avoid both cases. Sort of:

 ... 127.0.0.1 - - [17/Sep/2015:21:32:41] "GET / HTTP/1.1" 200 7 "... [17/Sep/2015:21:32:41] ENGINE Changing data, buffering should start! [17/Sep/2015:21:32:42] ENGINE Continue serving buffered and new requests. [17/Sep/2015:21:32:42] ENGINE I get 3 127.0.0.1 - - [17/Sep/2015:21:24:44] "GET / HTTP/1.1" 200 7 "... ... 

I searched the documentation and the Internet and found these links that do not fully cover this case:

http://www.defuze.org/archives/198-managing-your-process-with-the-cherrypy-bus.html

How to do asynchronous post processing in CherryPy?

http://tools.cherrypy.org/wiki/BackgroundTaskQueue

Cherrypy: what are the solutions for pages with a long processing time

How to stop request processing on Cherrypy?

Update (using a simple solution):

Thinking, I think this question is misleading, because it includes some implementation requirements in the question itself, namely: stop processing and start buffering. While the problem can be simplified for the problem: make sure that each request is processed with either old data or new data.

For a later time, it is enough to save a temporary local link to the data used . This link can be used in all request processing, and this will not be a problem if another thread changes to self.data . For python objects, the garbage collector will take care of the old data.

In particular, it is enough to change the index function to:

 @cherrypy.expose def index(self): tempData = self.data result = "I started with %s"%str(tempData) time.sleep(3) # Heavy use of tempData result += " that changed to %s"%str(self.data) result += " but I am still using %s"%str(tempData) cherrypy.engine.log(result) return result 

And as a result, we will see:

[21/Sep/2015:10:06:00] ENGINE I started with 1 that changed to 2 but I am still using 1

I still want to keep the original (more restrictive) question and cyraxjoe answer too, as I find these solutions very useful.

+5
source share
1 answer

I will explain two one es approach that will solve the problem.

The first one is based on a plugin.

Based on plugins Some kind of synchronization is still needed. It only works because there is only one BackgroundTask that does the modifications (itโ€™s also just an atomic operation).

 import time import threading import cherrypy from cherrypy.process import plugins UPDATE_INTERVAL = 0.5 REQUEST_DELAY = 0.1 UPDATE_DELAY = 0.1 THREAD_POOL_SIZE = 20 next_data = 1 class DataGateway(plugins.SimplePlugin): def __init__(self, bus): super(DataGateway, self).__init__(bus) self.data = next_data def start(self): self.bus.log("Starting DataGateway") self.bus.subscribe('dg:get', self._get_data) self.bus.subscribe('dg:update', self._update_data) self.bus.log("DataGateway has been started") def stop(self): self.bus.log("Stopping DataGateway") self.bus.unsubscribe('dg:get', self._get_data) self.bus.unsubscribe('dg:update', self._update_data) self.bus.log("DataGateway has been stopped") def _update_data(self, new_val): self.bus.log("Changing data, buffering should start!") self.data = new_val time.sleep(UPDATE_DELAY) self.bus.log("Continue serving buffered and new requests.") def _get_data(self): return self.data def processData(): """Backround task works for half hour three times a day, and when finishes it publish it in the engine buffer.""" global next_data cherrypy.engine.publish("dg:update", next_data) next_data += 1 class DataPublisher(object): @property def data(self): return cherrypy.engine.publish('dg:get').pop() @cherrypy.expose def index(self): result = "I get " + str(self.data) cherrypy.engine.log(result) time.sleep(REQUEST_DELAY) return result if __name__ == '__main__': conf = { 'global': { 'server.thread_pool': THREAD_POOL_SIZE, 'server.socket_host': '127.0.0.1', 'server.socket_port': 8080, } } cherrypy.config.update(conf) DataGateway(cherrypy.engine).subscribe() plugins.BackgroundTask(UPDATE_DELAY, processData).start() cherrypy.quickstart(DataPublisher()) 

In this version, synchronization occurs because read and write operations are performed in the cherrypy.engine stream. Everything is abstracted on the DataGateway plugin DataGateway you just used the publication in the engine.

The second approach is to use an Event a threading.Event object. This is a more manual approach with the added benefit of which is likely to be faster if reading is faster because it is not running in the cherrypy.engine thread.

threading.Event based (aka manual)

 import time import cherrypy import threading from cherrypy.process import plugins UPDATE_INTERVAL = 0.5 REQUEST_DELAY = 0.1 UPDATE_DELAY = 0.1 THREAD_POOL_SIZE = 20 next_data = 1 def processData(): """Backround task works for half hour three times a day, and when finishes it publish it in the engine buffer.""" global next_data cherrypy.engine.publish("doChangeData", next_data) next_data += 1 class DataPublisher(object): def __init__(self): self._data = next_data self._data_readable = threading.Event() cherrypy.engine.subscribe('doChangeData', self.changeData) @property def data(self): if self._data_readable.is_set(): return self._data else: self._data_readable.wait() return self.data @data.setter def data(self, value): self._data_readable.clear() time.sleep(UPDATE_DELAY) self._data = value self._data_readable.set() def changeData(self, newData): cherrypy.engine.log("Changing data, buffering should start!") self.data = newData cherrypy.engine.log("Continue serving buffered and new requests.") @cherrypy.expose def index(self): result = "I get " + str(self.data) cherrypy.engine.log(result) time.sleep(REQUEST_DELAY) return result if __name__ == '__main__': conf = { 'global': { 'server.thread_pool': THREAD_POOL_SIZE, 'server.socket_host': '127.0.0.1', 'server.socket_port': 8080, } } cherrypy.config.update(conf) plugins.BackgroundTask(UPDATE_INTERVAL, processData).start() cherrypy.quickstart(DataPublisher()) 

I added some subtleties with the @property decorator, but the real point is in threading.Event and the fact that the DataPublisher object DataPublisher shared between workflows.

I also added the thread pool configuration needed to increase the thread pool size in both examples. The default value is 10.

How to check what I just said, you can run this Python 3 script (if you do not have python3, you have a pretext for installing it), it will make 100 requests more or less at the same time as the thread pool.

Test script

 import time import urllib.request import concurrent.futures URL = 'http://localhost:8080/' TIMEOUT = 60 DELAY = 0.05 MAX_WORKERS = 20 REQ_RANGE = range(1, 101) def load_url(): with urllib.request.urlopen(URL, timeout=TIMEOUT) as conn: return conn.read() with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: futures = {} for i in REQ_RANGE: print("Sending req {}".format(i)) futures[executor.submit(load_url)] = i time.sleep(DELAY) results = [] for future in concurrent.futures.as_completed(futures): try: data = future.result().decode() except Exception as exc: print(exc) else: results.append((futures[future], data)) curr_max = 0 for i, data in sorted(results, key=lambda r: r[0]): new_max = int(data.split()[-1]) assert new_max >= curr_max, "The data was not updated correctly" print("Req {}: {}".format(i, data)) curr_max = new_max 

As you determine that you have a journal-based issue, it is not trustworthy for such issues. It is specifically indicated that you do not control the time during which the request is recorded in the access log. I could not get him to not compromise his code with my test code, but in general there really is a race condition, in this example it should work all the time, because the code just does an atomic operation . Only one attribute assignment periodically from a central point.

I hope the code itself will explain if you have a question leave a comment.

EDIT: I edited the plugin based approach because it only works because there is only one place the plugin executes, if you create another background task that updates the data then it may have problems when you do something- itโ€™s more than just an assignment, No matter what the code may be, what you are looking for if you update it from one BackgroundTask .

+3
source

All Articles