I came across a schema and an upsert stored procedure that caused locking issues. I have a general idea of ββwhy this causes a dead end and how to fix it. I can reproduce it, but I do not have a clear understanding of the sequence of steps that cause it. It would be great if someone could explain why this is causing a dead end.
Here is the schema and stored procedures. This code runs on PostgreSQL 9.2.2.
CREATE TABLE counters ( count_type INTEGER NOT NULL, count_id INTEGER NOT NULL, count INTEGER NOT NULL ); CREATE TABLE primary_relation ( id INTEGER PRIMARY KEY, a_counter INTEGER NOT NULL DEFAULT 0 ); INSERT INTO primary_relation SELECT i FROM generate_series(1,5) AS i; CREATE OR REPLACE FUNCTION increment_count(ctype integer, cid integer, i integer) RETURNS VOID AS $$ BEGIN LOOP UPDATE counters SET count = count + i WHERE count_type = ctype AND count_id = cid; IF FOUND THEN RETURN; END IF; BEGIN INSERT INTO counters (count_type, count_id, count) VALUES (ctype, cid, i); RETURN; EXCEPTION WHEN OTHERS THEN END; END LOOP; END; $$ LANGUAGE PLPGSQL; CREATE OR REPLACE FUNCTION update_primary_a_count(ctype integer) RETURNS VOID AS $$ WITH deleted_counts_cte AS ( DELETE FROM counters WHERE count_type = ctype RETURNING * ), rollup_cte AS ( SELECT count_id, SUM(count) AS count FROM deleted_counts_cte GROUP BY count_id HAVING SUM(count) <> 0 ) UPDATE primary_relation SET a_counter = a_counter + rollup_cte.count FROM rollup_cte WHERE primary_relation.id = rollup_cte.count_id $$ LANGUAGE SQL;
And here is the python script to reproduce the dead end.
import os import random import time import psycopg2 COUNTERS = 5 THREADS = 10 ITERATIONS = 500 def increment(): outf = open('synctest.out.%d' % os.getpid(), 'w') conn = psycopg2.connect(database="test") cur = conn.cursor() for i in range(0,ITERATIONS): time.sleep(random.random()) start = time.time() cur.execute("SELECT increment_count(0, %s, 1)", [random.randint(1,COUNTERS)]) conn.commit() outf.write("%f\n" % (time.time() - start)) conn.close() outf.close() def update(n): outf = open('synctest.update', 'w') conn = psycopg2.connect(database="test") cur = conn.cursor() for i in range(0,n): time.sleep(random.random()) start = time.time() cur.execute("SELECT update_primary_a_count(0)") conn.commit() outf.write("%f\n" % (time.time() - start)) conn.close() pids = [] for i in range(THREADS): pid = os.fork() if pid != 0: print 'Process %d spawned' % pid pids.append(pid) else: print 'Starting child %d' % os.getpid() increment() print 'Exiting child %d' % os.getpid() os._exit(0) update(ITERATIONS) for pid in pids: print "waiting on %d" % pid os.waitpid(pid, 0)
I understand that one problem is that upsert will produce duplicate lines (with multiple authors) that are likely to lead to some kind of double count. But why does this lead to a dead end?
The error received from PostgreSQL looks something like this:
process 91924 detected deadlock while waiting for ShareLock on transaction 4683083 after 100.559 ms",,,,,"SQL statement ""UPDATE counters
And the client spews something like this:
psycopg2.extensions.TransactionRollbackError: deadlock detected DETAIL: Process 91924 waits for ShareLock on transaction 4683083; blocked by process 91933. Process 91933 waits for ShareLock on transaction 4683079; blocked by process 91924. HINT: See server log for query details.CONTEXT: SQL statement "UPDATE counters SET count = count + i WHERE count_type = ctype AND count_id = cid" PL/pgSQL function increment_count(integer,integer,integer) line 4 at SQL statement
To fix the problem, you need to add the primary key as follows:
ALTER TABLE counters ADD PRIMARY KEY (count_type, count_id);
Any insight would be very helpful. Thanks!