Can SQLAlchemy events be used to update a denormalized data cache?

For performance reasons, I have a denormalized database where some tables contain data that has been merged from many rows into other tables. I would like to keep this denormalized data cache using SQLAlchemy events . As an example, suppose I wrote forum software and wanted each Thread to have a column that tracks the combined word count of all comments in the stream in order to effectively display this information:

 class Thread(Base): id = Column(UUID, primary_key=True, default=uuid.uuid4) title = Column(UnicodeText(), nullable=False) word_count = Column(Integer, nullable=False, default=0) class Comment(Base): id = Column(UUID, primary_key=True, default=uuid.uuid4) thread_id = Column(UUID, ForeignKey('thread.id', ondelete='CASCADE'), nullable=False) thread = relationship('Thread', backref='comments') message = Column(UnicodeText(), nullable=False) @property def word_count(self): return len(self.message.split()) 

Therefore, every time a comment is added (for simplicity, it can be said that comments are never edited or deleted), we want to update the word_count attribute on the associated Thread object. So I would like to do something like

 def after_insert(mapper, connection, target): thread = target.thread thread.word_count = sum(c.word_count for c in thread.comments) print "updated cached word count to", thread.word_count event.listen(Comment, "after_insert", after_insert) 

Therefore, when I insert Comment , I can see the triggering of the event and see that it correctly calculated the number of words, but this change is not saved in the Thread line in the database. I don't see any caveats about updating other tables in the after_insert documentation , although I do see some caveats in some others, like after_delete .

So, is this method supported with SQLAlchemy events? I already use SQLAlchemy events for a lot of other things, so I would like to do it this way and not write database triggers.

+7
source share
2 answers

the after_insert () event is one way to do this, and you may notice that an SQLAlchemy Connection object is passed to it instead of Session , as is the case with other flush related events. Continuous events at the display level are intended to be used, as a rule, to invoke SQL directly in a given Connection :

 @event.listens_for(Comment, "after_insert") def after_insert(mapper, connection, target): thread_table = Thread.__table__ thread = target.thread connection.execute( thread_table.update(). where(thread_table.c.id==thread.id). values(word_count=sum(c.word_count for c in thread.comments)) ) print "updated cached word count to", thread.word_count 

what is noteworthy here is that calling the UPDATE statement directly is also much more efficient than repeating this attribute throughout the workflow unit.

However, an event like after_insert () is not really needed here, since we know the value of word_count before the reset occurs. We actually know this, because Comment and Thread objects are related to each other, and we could keep Thread.word_count completely fresh in memory at all times using attribute events:

 def _word_count(msg): return len(msg.split()) @event.listens_for(Comment.message, "set") def set(target, value, oldvalue, initiator): if target.thread is not None: target.thread.word_count += (_word_count(value) - _word_count(oldvalue)) @event.listens_for(Comment.thread, "set") def set(target, value, oldvalue, initiator): # the new Thread, if any if value is not None: value.word_count += _word_count(target.message) # the old Thread, if any if oldvalue is not None: oldvalue.word_count -= _word_count(target.message) 

the great advantage of this method is that it is also not necessary to iterate through thread.comments, which for an unloaded collection means that another SELECT is returned.

Another way is to do this in before_flush (). The following is a quick and dirty version that can be refined for a more thorough analysis of what has changed to determine whether word_count needs to be updated or not:

 @event.listens_for(Session, "before_flush") def before_flush(session, flush_context, instances): for obj in session.new | session.dirty: if isinstance(obj, Thread): obj.word_count = sum(c.word_count for c in obj.comments) elif isinstance(obj, Comment): obj.thread.word_count = sum(c.word_count for c in obj.comments) 

I would go with the attribute event method, as it is most efficient and updated.

+33
source

You can do this using SQLAlchemy-Utils aggregated columns: http://sqlalchemy-utils.readthedocs.org/en/latest/aggregates.html

+3
source

All Articles