Value Based Flow Lock

Forgive me if this was asked before. I looked around a lot, but I get the feeling that I don’t have the right vocabulary to find this by doing an online search.

I have a multithreaded application in python. I want to be able to block a specific block of code, but only for other threads with a specific condition. Let me give you an example: There are three streams: thread_a, thread_band thread_c. Each thread can go through a function fooat any time. I do not want any two streams with barequal to each other in order to be able to access at the Code block ALPHAsame time. However, I do not want to block threads whose value baris different. In this case, say, thread_ahas bar == "cat"and displays a string (3). Before thread_aclick a line (5), say thread_b, with bar == "cat"click a line (3). I would like to thread_bwait. But if it thread_cgoes withbar == "dog"I wish he could keep going.

(1) def foo(bar):
(2)    
(3)     lock(bar)
(4)     # Code block ALPHA (two threads with equivalent bar should not be in here)
(5)     unlock(bar)

As another note, the possible values ​​for are barcompletely unpredictable, but with a very high probability of a collision.

Thanks for any help. The library I'm looking at is a python thread library

+4
source share
2 answers

Update

The good news: I was able to reproduce the problem release_lockyou encountered using my original answer through the somewhat rude test bench I put together and fix the problem using the counting mechanism (as you said) - at least I can tell with of my test apparatus.

, "" , , , , .

, , .

import threading

namespace_lock = threading.Lock()
namespace = {}
counters = {}

def aquire_lock(value):
    with namespace_lock:
        if value in namespace:
            counters[value] += 1
        else:
            namespace[value] = threading.Lock()
            counters[value] = 1

    namespace[value].acquire()

def release_lock(value):
    with namespace_lock:
        if counters[value] == 1:
            del counters[value]
            lock = namespace.pop(value)
        else:
            counters[value] -= 1
            lock = namespace[value]

    lock.release()

# sample usage    
def foo(bar):
    aquire_lock(bar)
    # Code block ALPHA (two threads with equivalent bar should not be in here)
    release_lock(bar)
+3

, , bar. , , , :

import collections
import contextlib
import threading

lock = threading.Lock()

wait_tracker = collections.defaultdict(lambda: (False, 0, threading.Condition(lock)))

@contextlib.contextmanager
def critical(bar):
    with lock:
        busy, waiters, condition = wait_tracker[bar]
        if busy:
            # Someone with the same bar value is in the critical section.

            # Record that we're waiting.
            waiters += 1
            wait_tracker[bar] = busy, waiters, condition

            # Wait for our turn.
            while wait_tracker[bar][0]:
                condition.wait()

            # Record that we're not waiting any more.
            busy, waiters, condition = wait_tracker[bar]
            waiters -= 1

        # Record that we're entering the critical section.
        busy = True
        wait_tracker[bar] = busy, waiters, condition
    try:
        # Critical section runs here.
        yield
    finally:
        with lock:
            # Record that we're out of the critical section.
            busy, waiters, condition = wait_tracker[bar]
            busy = False
            if waiters:
                # Someone was waiting for us. Tell them it their turn now.
                wait_tracker[bar] = busy, waiters, condition
                condition.notify()
            else:
                # No one was waiting for us. Clean up a bit so the wait_tracker
                # doesn't grow forever.
                del wait_tracker[bar]

, , :

with critical(bar):
    # Critical section.

, parallelism , locks-and-shared-memory parallelism. , .

+2

All Articles