How to correctly record some very similar events in python?

With the pythons logging module, is there a way to collect multiple events into a single log entry ? An ideal solution would be an extension of the python logging module or custom formatting / filter , so logging events of the same type are collected in the background, and nothing needs to be added to the body of the code (for example, each time the logging function is called).

Here is an example that generates a large number of the same or very similar events :

 import logging for i in range(99999): try: asdf[i] # not defined! except NameError: logging.exception('foo') # generates large number of logging events else: pass # ... more code with more logging ... for i in range(88888): logging.info('more of the same %d' % i) # ... and so on ... 

So, we have the same exception 99999 and write it down. It would be nice if the magazine just said something like:

 ERROR:root:foo (occured 99999 times) Traceback (most recent call last): File "./exceptionlogging.py", line 10, in <module> asdf[i] # not defined! NameError: name 'asdf' is not defined INFO:root:foo more of the same (occured 88888 times with various values) 
+6
source share
4 answers

You should probably write an aggregate / message statistics class, rather than trying to connect to the singletons logging system, but I think you might have an existing code base that uses logging.

I also suggest that you create an instance of your registrars, rather than using the default root. The Python Logging Cookbook contains detailed explanations and examples.

The next class should do what you ask.

 import logging import atexit import pprint class Aggregator(object): logs = {} @classmethod def _aggregate(cls, record): id = '{0[levelname]}:{0[name]}:{0[msg]}'.format(record.__dict__) if id not in cls.logs: # first occurrence cls.logs[id] = [1, record] else: # subsequent occurrence cls.logs[id][0] += 1 @classmethod def _output(cls): for count, record in cls.logs.values(): record.__dict__['msg'] += ' (occured {} times)'.format(count) logging.getLogger(record.__dict__['name']).handle(record) @staticmethod def filter(record): # pprint.pprint(record) Aggregator._aggregate(record) return False @staticmethod def exit(): Aggregator._output() logging.getLogger().addFilter(Aggregator) atexit.register(Aggregator.exit) for i in range(99999): try: asdf[i] # not defined! except NameError: logging.exception('foo') # generates large number of logging events else: pass # ... more code with more logging ... for i in range(88888): logging.error('more of the same') # ... and so on ... 

Please note that you do not receive any logs until the program exits.

The result of its launch:

  ERROR: root: foo (occured 99999 times)
     Traceback (most recent call last):
       File "C: \ work \ VEMS \ python \ logcount.py", line 38, in 
         asdf [i] # not defined!
     NameError: name 'asdf' is not defined
     ERROR: root: more of the same (occured 88888 times)
+7
source

Your question hides a subconscious assumption about how "very similar" is defined. Log entries can be either const-only (instances of which are strictly identical), or a combination of constants and variables (no constants are generally considered mixes).

The aggregator for const-only journal entries is a piece of cake. You just need to decide whether the process / thread will abandon your aggregation or not. For log entries that include both constants and variables, you need to decide whether to aggregate based on the variables that you have in your entry.

The dictionary style counter (from the collections import counter) can serve as a cache that will read your instances in O (1), but you may need a higher level structure to write variables down if you want, In addition, you will have to manually process the cache entry to the file - every X seconds (binning) or after the program exits (risky - you can lose all data in memory if something is stuck).

The structure for aggregation will look something like this (tested on Python v3.4):

 from logging import Handler from threading import RLock, Timer from collections import defaultdict class LogAggregatorHandler(Handler): _default_flush_timer = 300 # Number of seconds between flushes _default_separator = "\t" # Seperator char between metadata strings _default_metadata = ["filename", "name", "funcName", "lineno", "levelname"] # metadata defining unique log records class LogAggregatorCache(object): """ Keeps whatever is interesting in log records aggregation. """ def __init__(self, record=None): self.message = None self.counter = 0 self.timestamp = list() self.args = list() if record is not None: self.cache(record) def cache(self, record): if self.message is None: # Only the first message is kept self.message = record.msg assert self.message == record.msg, "Non-matching log record" # note: will not work with string formatting for log records; eg "blah {}".format(i) self.timestamp.append(record.created) self.args.append(record.args) self.counter += 1 def __str__(self): """ The string of this object is used as the default output of log records aggregation. For example: record message with occurrences. """ return self.message + "\t (occurred {} times)".format(self.counter) def __init__(self, flush_timer=None, separator=None, add_process_thread=False): """ Log record metadata will be concatenated to a unique string, separated by self._separator. Process and thread IDs will be added to the metadata if set to True; otherwise log records across processes/threads will be aggregated together. :param separator: str :param add_process_thread: bool """ super().__init__() self._flush_timer = flush_timer or self._default_flush_timer self._cache = self.cache_factory() self._separator = separator or self._default_separator self._metadata = self._default_metadata if add_process_thread is True: self._metadata += ["process", "thread"] self._aggregation_lock = RLock() self._store_aggregation_timer = self.flush_timer_factory() self._store_aggregation_timer.start() # Demo logger which outputs aggregations through a StreamHandler: self.agg_log = logging.getLogger("aggregation_logger") self.agg_log.addHandler(logging.StreamHandler()) self.agg_log.setLevel(logging.DEBUG) self.agg_log.propagate = False def cache_factory(self): """ Returns an instance of a new caching object. """ return defaultdict(self.LogAggregatorCache) def flush_timer_factory(self): """ Returns a threading.Timer daemon object which flushes the Handler aggregations. """ timer = Timer(self._flush_timer, self.flush) timer.daemon = True return timer def find_unique(self, record): """ Extracts a unique metadata string from log records. """ metadata = "" for single_metadata in self._metadata: value = getattr(record, single_metadata, "missing " + str(single_metadata)) metadata += str(value) + self._separator return metadata[:-len(self._separator)] def emit(self, record): try: with self._aggregation_lock: metadata = self.find_unique(record) self._cache[metadata].cache(record) except Exception: self.handleError(record) def flush(self): self.store_aggregation() def store_aggregation(self): """ Write the aggregation data to file. """ self._store_aggregation_timer.cancel() del self._store_aggregation_timer with self._aggregation_lock: temp_aggregation = self._cache self._cache = self.cache_factory() # ---> handle temp_aggregation and write to file <--- # for key, value in sorted(temp_aggregation.items()): self.agg_log.info("{}\t{}".format(key, value)) # ---> re-create the store_aggregation Timer object <--- # self._store_aggregation_timer = self.flush_timer_factory() self._store_aggregation_timer.start() 

Testing this handler class with statistical log severity in a for loop:

 if __name__ == "__main__": import random import logging logger = logging.getLogger() handler = LogAggregatorHandler() logger.addHandler(handler) logger.addHandler(logging.StreamHandler()) logger.setLevel(logging.DEBUG) logger.info("entering logging loop") for i in range(25): # Randomly choose log severity: severity = random.choice([logging.DEBUG, logging.INFO, logging.WARN, logging.ERROR, logging.CRITICAL]) logger.log(severity, "test message number %s", i) logger.info("end of test code") 

If you want to add more stuff, here is what the Python log entry looks like:

 {'args': ['()'], 'created': ['1413747902.18'], 'exc_info': ['None'], 'exc_text': ['None'], 'filename': ['push_socket_log.py'], 'funcName': ['<module>'], 'levelname': ['DEBUG'], 'levelno': ['10'], 'lineno': ['17'], 'module': ['push_socket_log'], 'msecs': ['181.387901306'], 'msg': ['Test message.'], 'name': ['__main__'], 'pathname': ['./push_socket_log.py'], 'process': ['65486'], 'processName': ['MainProcess'], 'relativeCreated': ['12.6709938049'], 'thread': ['140735262810896'], 'threadName': ['MainThread']} 

Another thing to think about: Most of the functions you run depend on the flow of several consecutive commands (which ideally matches log reports); for example, client-server communication, as a rule, depends on receiving the request, processing it, reading some data from the database (which requires a connection and some reading commands), some sorting / processing, building a response package and presenting a response code.

This highlights one of the main drawbacks of using the aggregation approach: by aggregating journal entries, you lose information about the time and order of actions that occurred. It will be very difficult to figure out which request was incorrectly structured if you only have aggregation. My advice in this case is that you save both raw data and aggregation (using two file handlers or something like that) so that you can examine the macrolevel (aggregation) and the microlevel (regular log).

However, you are still responsible for finding out that everything went wrong, and then manually investing what caused it. When developing on your PC, this is a fairly simple task; but deploying your code on multiple production servers makes these tasks cumbersome, wasting a lot of time. Accordingly, there are several companies developing products specifically for magazine management. Most of the cumulative similar journal entries are together, but others include machine learning algorithms for automatic aggregation and learning about the behavior of your software. Log processing outsourcing can help you focus on your product, not your mistakes.

Disclaimer: I am working on Coralogix , one such solution.

+3
source

You can subclass the registrar class and override the exception method to cache your error types until they reach a specific counter before they are logged.

 import logging from collections import defaultdict MAX_COUNT = 99999 class MyLogger(logging.getLoggerClass()): def __init__(self, name): super(MyLogger, self).__init__(name) self.cache = defaultdict(int) def exception(self, msg, *args, **kwargs): err = msg.__class__.__name__ self.cache[err] += 1 if self.cache[err] > MAX_COUNT: new_msg = "{err} occurred {count} times.\n{msg}" new_msg = new_msg.format(err=err, count=MAX_COUNT, msg=msg) self.log(logging.ERROR, new_msg, *args, **kwargs) self.cache[err] = None log = MyLogger('main') try: raise TypeError("Useful error message") except TypeError as err: log.exception(err) 

Please note that this is not a copy code.
You need to add your handlers (I also recommend formatting).
https://docs.python.org/2/howto/logging.html#handlers

Good luck.

+2
source

Create a counter and register it only for count=1 , then increase it and unload it in the finally block (so that it is registered no matter how bad the application works and burns). Of course, this can mean a problem if you have the same exception for various reasons, but you can always find the line number to check for the same problem or something similar. Minimal example:

 name_error_exception_count = 0 try: for i in range(99999): try: asdf[i] # not defined! except NameError: name_error_exception_count += 1 if name_error_exception_count == 1: logging.exception('foo') else: pass except Exception: pass # this is just to get the finally block, handle exceptions here too, maybe finally: if name_error_exception_count > 0: logging.exception('NameError exception occurred {} times.'.format(name_error_exception_count)) 
0
source

All Articles