Python cache in memory live over time

I have several threads performing the same process, which should be able to notify each other that something should not be processed within the next n seconds, and not at the end of the world, if they do.

My goal is to pass the line and TTL to the cache and be able to retrieve all the lines that are in the cache as a list. The cache can live in memory, and TTL will be no more than 20 seconds.

Does anyone have any suggestions on how to do this?

+13
source share
6 answers

You can use the expiringdict module:

The core of the library is the ExpiringDict class, which is an ordered dictionary with automatically expiring values ​​for caching purposes.

In the description, they do not talk about multithreading, so in order not to get confused, use Lock .

+21
source

The OP uses python 2.7, but if you are using python 3, the ExpiringDict mentioned in the accepted answer has now expired. The last commit on the github repository was June 17, 2017, and there is an open problem that it does not work with Python 3.5

There is a more recently supported cachetools project (last commit June 14, 2018)

pip install cachetools

 from cachetools import TTLCache cache = TTLCache(maxsize=10, ttl=360) cache['apple'] = 'top dog' ... >>> cache['apple'] 'top dog' ... after 360 seconds... >>> cache['apple'] KeyError exception thrown 

ttl time to live in seconds.

+23
source

If you do not want to use any third libraries, you can add one more parameter to your expensive function: ttl_hash=None . This new parameter is called the so-called "time-sensitive hash", its sole purpose is to influence lru_cache .

For instance:

 from functools import lru_cache import time @lru_cache() def my_expensive_function(a, b, ttl_hash=None): del ttl_hash # to emphasize we don't use it and to shut pylint up return a + b # horrible CPU load... def get_ttl_hash(seconds=3600): """Return the same value withing 'seconds' time period""" return round(time.time() / seconds) # somewhere in your code... res = my_expensive_function(2, 2, ttl_hash=get_ttl_hash()) # cache will be updated once in an hour 
+9
source

As for the cache with an expiring memory, for general purposes, a design pattern is usually used, which is usually done not through a dictionary, but through a function or method decorator. The cache dictionary is managed behind the scenes. Thus, this answer somewhat complements the answer of a user who uses a dictionary, not a decorator.

ttl_cache decorator in cachetools==3.1.0 works much like functools.lru_cache , but with a lifetime .

 import cachetools.func @cachetools.func.ttl_cache(maxsize=128, ttl=10 * 60) def example_function(key): return get_expensively_computed_value(key) class ExampleClass: EXP = 2 @classmethod @cachetools.func.ttl_cache() def example_classmethod(cls, i): return i * cls.EXP @staticmethod @cachetools.func.ttl_cache() def example_staticmethod(i): return i * 3 
+7
source

Something like that?

 from time import time, sleep import itertools from threading import Thread, RLock import signal class CacheEntry(): def __init__(self, string, ttl=20): self.string = string self.expires_at = time() + ttl self._expired = False def expired(self): if self._expired is False: return (self.expires_at < time()) else: return self._expired class CacheList(): def __init__(self): self.entries = [] self.lock = RLock() def add_entry(self, string, ttl=20): with self.lock: self.entries.append(CacheEntry(string, ttl)) def read_entries(self): with self.lock: self.entries = list(itertools.dropwhile(lambda x:x.expired(), self.entries)) return self.entries def read_entries(name, slp, cachelist): while True: print "{}: {}".format(name, ",".join(map(lambda x:x.string, cachelist.read_entries()))) sleep(slp) def add_entries(name, ttl, cachelist): s = 'A' while True: cachelist.add_entry(s, ttl) print("Added ({}): {}".format(name, s)) sleep(1) s += 'A' if __name__ == "__main__": signal.signal(signal.SIGINT, signal.SIG_DFL) cl = CacheList() print_threads = [] print_threads.append(Thread(None, read_entries, args=('t1', 1, cl))) # print_threads.append(Thread(None, read_entries, args=('t2', 2, cl))) # print_threads.append(Thread(None, read_entries, args=('t3', 3, cl))) adder_thread = Thread(None, add_entries, args=('a1', 2, cl)) adder_thread.start() for t in print_threads: t.start() for t in print_threads: t.join() adder_thread.join() 
+2
source

I really like the idea from @iutinvg, I just would like to go a little further. Disconnect it from the need to know how to go through ttl , and just make it a decorator so you don't have to think about it. If you have django , py3 and you don't feel that pip is installing any dependencies, try this.

 import time from django.utils.functional import lazy from functools import lru_cache, partial, update_wrapper def lru_cache_time(seconds, maxsize=None): """ Adds time aware caching to lru_cache """ def wrapper(func): # Lazy function that makes sure the lru_cache() invalidate after X secs ttl_hash = lazy(lambda: round(time.time() / seconds), int)() @lru_cache(maxsize) def time_aware(__ttl, *args, **kwargs): """ Main wrapper, note that the first argument ttl is not passed down. This is because no function should bother to know this that this is here. """ def wrapping(*args, **kwargs): return func(*args, **kwargs) return wrapping(*args, **kwargs) return update_wrapper(partial(time_aware, ttl_hash), func) return wrapper @lru_cache_time(seconds=10) def meaning_of_life(): """ This message should show up if you call help(). """ print('this better only show up once!') return 42 @lru_cache_time(seconds=10) def mutiply(a, b): """ This message should show up if you call help(). """ print('this better only show up once!') return a * b # This is a test, prints a '.' for every second, there should be 10s # beween each "this better only show up once!" *2 because of the two functions. for _ in range(20): meaning_of_life() mutiply(50, 99991) print('.') time.sleep(1) 
+2
source

All Articles