PHP pthreads - shared objects

Im looking for a safe and fast way to use a shared object.

I asked a question already here: https://github.com/krakjoe/pthreads/issues/470 but obviously this was not a suitable place.

An attempt to share an object (Threaded) with many other contexts (Thread). All threads update this shard object โ€” they can set their own requests and also respond to requests from others.

Now that krakjoe replied that the lock / unlock would not be available at 7, I had a problem.

I know about: .synchronized, but have no idea how to use it to make it work for my needs.

How can I use :: synchronized to write methods like

  • lock ()
  • Unlock ()
  • is_locked () - check if it is locked, and if this is not recommended - just continue and try again later

EDIT:

I wrote (imo) a very simple test script.

this script includes no syc / lock / ... atm methods.

he should just show what I'm trying to do.

im still looking for a way to use :: for sharing security.

code:

<?php /* TEST: create n threads each will - Shared::set() its own ref - check if Shared::exists() its own ref - Shared::get() its ref back - call method ::isRunning() at returned val to easily check if is ref or got overwritten by another context TODO: using ::synchronized to handle multi-context-access NOTES: every method as public to prevent pthreads v2 "Method Modifiers - Special Behaviour" see: "Method Modifiers - Special Behaviour" at http://blog.krakjoe.ninja/2015/08/a-letter-from-future.html */ class Shared extends Threaded { public $data; public function exists($ident) { return isset($this->data[$ident]); } public function set($ident, $ref) { $return = false; if(!isset($this->data[$ident])){ $data = $this->data; $data[$ident] = $ref; $this->data = $data; $return = $this->data[$ident]; } #echo __METHOD__ . '(' . $ident . ') => ' . gettype($return) . PHP_EOL; return $return; } public function get($ident) { $return = false; if($this->exists($ident) === true){ $data = $this->data; $return = $data[$ident]; unset($data[$ident]); $this->data = $data; } #echo __METHOD__ . '(' . $ident . ') => ' . gettype($return) . PHP_EOL; return $return; } } class T extends Thread { public $count; public function __construct(Shared $Shared, $ident) { $this->Shared = $Shared; $this->ident = $ident; } public function run() { $slowdown = true; $this->count = 0; while(true){ if($slowdown){ // "don't allow usleep or sleep" : https://github.com/krakjoe/pthreads/commit/a157b34057b0f584b4db326f30961b5c760dead8 // loop a bit to simulate work: $start = microtime(true); $until = rand(1, 100000)/1000000; while(microtime(true)-$start < $until){ // ... } } if($this->Shared->exists($this->ident) === true){ $ref = $this->Shared->get($this->ident); } else{ $ref = $this->Shared->set($this->ident, $this); } // calling a method on $ref -- if not a ref we crash $ref->isRunning(); unset($ref); $this->count++; } } } echo 'start ...' . PHP_EOL; $n = 8; $Shared = new Shared(); for($i = 0, $refs = array(); $i < $n; $i++){ $refs[$i] = new T($Shared, $i); $refs[$i]->start(); } while(!empty($refs)){ // print status: if(!isset($t)or microtime(true)-$t > 1){ $t = microtime(true); echo 'status: ' . count($refs) . ' running atm ...' . PHP_EOL; } // join crashed threads: foreach($refs as $i => $thread){ if($thread->isRunning() === false){ echo 'T-' . $i . ' stopped after ' . $thread->count . PHP_EOL; if($thread->isJoined() === false){ $thread->join(); } unset($refs[$i]); } } } echo 'no thread running anymore.' . PHP_EOL; /* output start ... status: 8 running atm ... Notice: Undefined offset: 6 in ...\shared_test.php on line 33 Fatal error: Call to a member function isRunning() on null in ...\shared_test.php on line 82 T-6 stopped after 10 status: 7 running atm ... Notice: Undefined offset: 4 in ...\shared_test.php on line 33 Fatal error: Call to a member function isRunning() on null in ...\shared_test.php on line 82 T-4 stopped after 35 status: 6 running atm ... Notice: Undefined offset: 7 in ...\shared_test.php on line 33 Fatal error: Call to a member function isRunning() on null in ...\shared_test.php on line 82 T-7 stopped after 43 status: 5 running atm ... status: 5 running atm ... status: 5 running atm ... [...] */ ?> 
+5
source share
1 answer

Threaded objects are already thread safe, that is, at any time when you read, write, check for the presence or delete (cancel) an element, the operation is atomic - no other context can perform any of the above operations when the first operation is performed. The same is true for engine handlers that the user does not know, all the way to the lowest level implicitly safe.

Enough reassurance ... This has obvious limitations when the logic becomes more complex, for example, checking for the existence of a member before installing or doing something else with it, as you do. Although operations on an object are atomic, there is nothing to stop another unset context with an element between isset calling and the call to read the property / dimension.

This applies to PHP7 (pthreads v3 +)

Security and integrity are two different things. When integrity is important, you can use Threaded::synchronized in PHP7 to save it correctly. In PHP5, you can also save it, but the code will be more complex, as well as the explanation.

The second example should run endlessly if I understand its logic. So I'm using this assumption to build the right code, I'm going to make additional assumptions about what you might want to do in this endless loop, and provide some idea of โ€‹โ€‹where it seems to be required.

 <?php class Referee extends Threaded { public function find(string $ident, Threaded $reference) { return $this->synchronized(function () use($ident, $reference) { if (isset($this[$ident])) { return $this[$ident]; } else return ($this[$ident] = $reference); }); } public function foreach(Closure $closure) { $this->synchronized(function() use($closure) { foreach ($this as $ident => $reference) { $closure($ident, $reference); } }); } } class Test extends Thread { public function __construct(Referee $referee, string $ident, bool $delay) { $this->referee = $referee; $this->ident = $ident; $this->delay = $delay; } public function run() { while (1) { if ($this->delay) { $this->synchronized(function(){ $this->wait(1000000); }); } $reference = $this->referee->find($this->ident, $this); /* do something with reference here, I guess */ /* do something with all references here */ $this->referee->foreach(function($ident, $reference){ var_dump(Thread::getCurrentThreadId(), $reference->getIdent(), $reference->isRunning()); }); } } public function getIdent() { return $this->ident; } private $referee; private $ident; private $delay; } $referee = new Referee(); $threads = []; $thread = 0; $idents = [ "smelly", "dopey", "bashful", "grumpy", "sneezy", "sleepy", "happy", "naughty" ]; while ($thread < 8) { $threads[$thread] = new Test($referee, $idents[$thread], rand(0, 1)); $threads[$thread]->start(); $thread++; } foreach ($threads as $thread) $thread->join(); ?> 

So, we will look at the differences, I will tell you why they are what they are, and how else you could write them, you already know that now we are not talking about security, but about integrity, you are given a (very remarkable) assumption that everything you write is โ€œsafe,โ€ as explained.

The first important difference is the following:

 if ($this->delay) { $this->synchronized(function(){ $this->wait(1000000); }); } 

This is just a suitable way to wait for Thread , you will not need to use Thread for synchronization, you can use any Threaded object. The advantage of doing things right if it is not clear that sleeping and sleeping do not leave threads in a susceptible state using ::wait .

In the real world, where you really only have to wait for something, it will be a more complex block, it can (and should) look more:

 if ($this->delay) { $this->synchronized(function(){ while ($this->condition) { $this->wait(1000000); } }); } 

Note: waiting for a timeout is technically waiting for something, however you can be awakened by something other than the timeout reached, and the code should be ready for this.

Thus, another context may notify Thread that it should stop waiting and shutting down gracefully or perform some other important action immediately, simply by synchronizing, changing the state, and notifying Thread .

For predictable code, it is extremely important that it is convenient how to synchronize, wait and notify work.

Next, we have our logic for setting and getting the link:

 $reference = $this->referee->find($this->ident, $this); 

What causes this:

 public function find(string $ident, Threaded $reference) { return $this->synchronized(function () use($ident, $reference) { if (isset($this[$ident])) { return $this[$ident]; } else return ($this[$ident] = $reference); }); } 

It is poorly named, it is difficult to name things, but you can see that integrity is maintained by synchronization, while these grouped operations take place. The same method can also be used to get a link to another object with a small amount of settings.

I think you are doing something with this specific link (which will always be $this for now). I canโ€™t guess. Moving on ...

I made the assumption that you want to do something with each of these Threads , and you want to maintain data integrity during the entire iteration:

 $this->referee->foreach(function($ident, $reference){ var_dump(Thread::getCurrentThreadId(), $reference->getIdent(), $reference->isRunning()); }); 

What causes:

 public function foreach(Closure $closure) { $this->synchronized(function() use($closure) { foreach ($this as $ident => $reference) { $closure($ident, $reference); } }); } 

So you would do that.

It is worth mentioning that synchronization is not necessarily required here; just as nothing bad happens if you remove an element from the array you iterate, nothing bad happens if you cancel or set or do something else for the object during iteration.

+8
source

All Articles