Your question hides a subconscious assumption about how "very similar" is defined. Log entries can be either const-only (instances of which are strictly identical), or a combination of constants and variables (no constants are generally considered mixes).
The aggregator for const-only journal entries is a piece of cake. You just need to decide whether the process / thread will abandon your aggregation or not. For log entries that include both constants and variables, you need to decide whether to aggregate based on the variables that you have in your entry.
The dictionary style counter (from the collections import counter) can serve as a cache that will read your instances in O (1), but you may need a higher level structure to write variables down if you want, In addition, you will have to manually process the cache entry to the file - every X seconds (binning) or after the program exits (risky - you can lose all data in memory if something is stuck).
The structure for aggregation will look something like this (tested on Python v3.4):
from logging import Handler from threading import RLock, Timer from collections import defaultdict class LogAggregatorHandler(Handler): _default_flush_timer = 300 # Number of seconds between flushes _default_separator = "\t" # Seperator char between metadata strings _default_metadata = ["filename", "name", "funcName", "lineno", "levelname"] # metadata defining unique log records class LogAggregatorCache(object): """ Keeps whatever is interesting in log records aggregation. """ def __init__(self, record=None): self.message = None self.counter = 0 self.timestamp = list() self.args = list() if record is not None: self.cache(record) def cache(self, record): if self.message is None: # Only the first message is kept self.message = record.msg assert self.message == record.msg, "Non-matching log record" # note: will not work with string formatting for log records; eg "blah {}".format(i) self.timestamp.append(record.created) self.args.append(record.args) self.counter += 1 def __str__(self): """ The string of this object is used as the default output of log records aggregation. For example: record message with occurrences. """ return self.message + "\t (occurred {} times)".format(self.counter) def __init__(self, flush_timer=None, separator=None, add_process_thread=False): """ Log record metadata will be concatenated to a unique string, separated by self._separator. Process and thread IDs will be added to the metadata if set to True; otherwise log records across processes/threads will be aggregated together. :param separator: str :param add_process_thread: bool """ super().__init__() self._flush_timer = flush_timer or self._default_flush_timer self._cache = self.cache_factory() self._separator = separator or self._default_separator self._metadata = self._default_metadata if add_process_thread is True: self._metadata += ["process", "thread"] self._aggregation_lock = RLock() self._store_aggregation_timer = self.flush_timer_factory() self._store_aggregation_timer.start() # Demo logger which outputs aggregations through a StreamHandler: self.agg_log = logging.getLogger("aggregation_logger") self.agg_log.addHandler(logging.StreamHandler()) self.agg_log.setLevel(logging.DEBUG) self.agg_log.propagate = False def cache_factory(self): """ Returns an instance of a new caching object. """ return defaultdict(self.LogAggregatorCache) def flush_timer_factory(self): """ Returns a threading.Timer daemon object which flushes the Handler aggregations. """ timer = Timer(self._flush_timer, self.flush) timer.daemon = True return timer def find_unique(self, record): """ Extracts a unique metadata string from log records. """ metadata = "" for single_metadata in self._metadata: value = getattr(record, single_metadata, "missing " + str(single_metadata)) metadata += str(value) + self._separator return metadata[:-len(self._separator)] def emit(self, record): try: with self._aggregation_lock: metadata = self.find_unique(record) self._cache[metadata].cache(record) except Exception: self.handleError(record) def flush(self): self.store_aggregation() def store_aggregation(self): """ Write the aggregation data to file. """ self._store_aggregation_timer.cancel() del self._store_aggregation_timer with self._aggregation_lock: temp_aggregation = self._cache self._cache = self.cache_factory() # ---> handle temp_aggregation and write to file <--- # for key, value in sorted(temp_aggregation.items()): self.agg_log.info("{}\t{}".format(key, value)) # ---> re-create the store_aggregation Timer object <--- # self._store_aggregation_timer = self.flush_timer_factory() self._store_aggregation_timer.start()
Testing this handler class with statistical log severity in a for loop:
if __name__ == "__main__": import random import logging logger = logging.getLogger() handler = LogAggregatorHandler() logger.addHandler(handler) logger.addHandler(logging.StreamHandler()) logger.setLevel(logging.DEBUG) logger.info("entering logging loop") for i in range(25): # Randomly choose log severity: severity = random.choice([logging.DEBUG, logging.INFO, logging.WARN, logging.ERROR, logging.CRITICAL]) logger.log(severity, "test message number %s", i) logger.info("end of test code")
If you want to add more stuff, here is what the Python log entry looks like:
{'args': ['()'], 'created': ['1413747902.18'], 'exc_info': ['None'], 'exc_text': ['None'], 'filename': ['push_socket_log.py'], 'funcName': ['<module>'], 'levelname': ['DEBUG'], 'levelno': ['10'], 'lineno': ['17'], 'module': ['push_socket_log'], 'msecs': ['181.387901306'], 'msg': ['Test message.'], 'name': ['__main__'], 'pathname': ['./push_socket_log.py'], 'process': ['65486'], 'processName': ['MainProcess'], 'relativeCreated': ['12.6709938049'], 'thread': ['140735262810896'], 'threadName': ['MainThread']}
Another thing to think about: Most of the functions you run depend on the flow of several consecutive commands (which ideally matches log reports); for example, client-server communication, as a rule, depends on receiving the request, processing it, reading some data from the database (which requires a connection and some reading commands), some sorting / processing, building a response package and presenting a response code.
This highlights one of the main drawbacks of using the aggregation approach: by aggregating journal entries, you lose information about the time and order of actions that occurred. It will be very difficult to figure out which request was incorrectly structured if you only have aggregation. My advice in this case is that you save both raw data and aggregation (using two file handlers or something like that) so that you can examine the macrolevel (aggregation) and the microlevel (regular log).
However, you are still responsible for finding out that everything went wrong, and then manually investing what caused it. When developing on your PC, this is a fairly simple task; but deploying your code on multiple production servers makes these tasks cumbersome, wasting a lot of time. Accordingly, there are several companies developing products specifically for magazine management. Most of the cumulative similar journal entries are together, but others include machine learning algorithms for automatic aggregation and learning about the behavior of your software. Log processing outsourcing can help you focus on your product, not your mistakes.
Disclaimer: I am working on Coralogix , one such solution.