Perl resume thread

I am trying to start a thread that I could pause / resume at any time. This is how I create the stream:

use strict; use warnings; use threads; use threads::shared; use Thread::Suspend; use Thread::Semaphore; sub createThread { my $semaphore = Thread::Semaphore->new(); my $thr = threads->create(sub { local $SIG{KILL} = sub { die "Thread killed\n"; }; local $SIG{STOP} = sub { print "sig stop\n"; $semaphore->down(); }; local $SIG{CONT} = sub { $semaphore->up(); print "sig cont\n"; }; my $test = sub { while (1) { $semaphore->down(); print STDERR "Working process\n"; sleep(2); $semaphore->up(); } }; return $test->(); }); return $thr->tid(); } 

After getting the thread id (using return $thr->tid(); ). Then I try to pause it and the sig stop message is printed, and later, when I try to resume it, sig cont not printed. Here is the code that pauses / resumes the stream:

 sub pause { my $class = shift; my $threadId = shift; my $thr = threads->object($threadId); if ($thr) { if ($thr->is_suspended() == 0) { $thr->kill('STOP'); $thr->suspend(); return "Process $threadId paused\n"; } else { return "Process $threadId has to be resumed\n"; } } else { return "Process $threadId not found\n"; } } sub resume { my $class = shift; my $threadId = shift; my $thr = threads->object($threadId); if ($thr) { if ($thr->is_suspended() != 0) { $thr->resume(); $thr->kill('CONT'); return "Operation $threadId resumed\n"; } else { return "Operation $threadId has not been paused\n"; } } else { return "Process $threadId not found\n"; } } 

After the paused stream Operation X was resumed , the Operation X was resumed , but sig cont not executed, and the stream function does not resume either.

+8
multithreading perl
source share
2 answers

It is not clear if you need Thread::Semaphore for a separate purpose, but this is not required for Thread::Suspend to function.

I suspect the reason your pause / resume doesn't work is because you redefined the signal handler that Thread::Suspend setting up for its purpose

If I delete all the signal handlers and Thread::Semaphore objects, then your code will work fine:

 use strict; use warnings 'all'; use threads; use Thread::Suspend; STDOUT->autoflush; my $tid = create_thread(); for ( 1 .. 10 ) { sleep 5; print pause($tid); sleep 5; print resume($tid); } sub create_thread { my $thr = threads->create( sub { while () { print "Working thread\n"; sleep 1; } } ); return $thr->tid; } sub pause { my ($tid) = @_; my $thr = threads->object($tid); return "Thread $tid not found\n" unless $thr; return "Thread $tid is already suspended\n" if $thr->is_suspended; $thr->suspend; return "Thread $tid paused\n"; } sub resume { my ($tid) = @_; my $thr = threads->object($tid); return "Thread $tid not found\n" unless $thr; return "Thread $tid has not been paused\n" unless $thr->is_suspended; $thr->resume; return "Thread $tid resumed\n"; } 

Exit

 Working thread Working thread Working thread Working thread Working thread Thread 1 paused Thread 1 resumed Working thread Working thread Working thread Working thread Working thread Thread 1 paused Thread 1 resumed Working thread Working thread ... 



Update

There is no real need for your routines. Here's a bare implementation

 use strict; use warnings 'all'; use threads; use Thread::Suspend; STDOUT->autoflush; sub thread_sub { while () { printf "Working thread %d\n", threads->self->tid; sleep 1; } } my $thr = threads->create(\&thread_sub); for ( 1 .. 10 ) { sleep 5; if ( my $suspended = $thr->suspend ) { printf "Thread %d suspended\n", $suspended->tid; } sleep 5; if ( my $resumed = $thr->resume ) { printf "Thread %d resumed\n", $resumed->tid; } } 

Exit

 Working thread 1 Working thread 1 Working thread 1 Working thread 1 Working thread 1 Working thread 1 Thread 1 suspended Thread 1 resumed Working thread 1 Working thread 1 Working thread 1 Working thread 1 Working thread 1 Working thread 1 Thread 1 suspended Thread 1 resumed Working thread 1 Working thread 1 Working thread 1 
+5
source share

No actual signals are used here. Signals can only be sent to processes, not threads. From the documentation of $thread->kill :

CAVEAT: The flow signaling capabilities provided by this module do not actually send signals through the OS. It emulates signals at the Perl level so that signal handlers are called in the corresponding thread. For example, sending $thr->kill('STOP') does not actually pause the thread (or the whole process), but calls the calling handler $SIG{'STOP'} in that thread (as shown above).

Since no real signals are used here, you do not mix signals and streams. Good.

But this is too complicated design. Just type $sem->down_force() in pause and $sem->up() in resume . There is no need for this thread.

 use strict; use warnings; use threads; use Thread::Semaphore qw( ); { package Worker; sub new { my $class = shift; return bless({ @_ }, $class); } sub thr { return $_[0]{thr} } sub tid { return $_[0]{thr}->tid() } sub join { $_[0]{thr}->join() } sub pause { $_[0]{sem}->down_force() } sub resume { $_[0]{sem}->up() } } sub createThread { my $sem = Thread::Semaphore->new(); my $thr = async { while (1) { $sem->down(); ... $sem->up(); } }; return Worker->new( thr => $thr, sem => $sem ); } sub pause { my ($worker) = @_; $worker->pause(); } sub resume { my ($worker) = @_; $worker->resume(); } 

Of course, it is assumed that you want to pause the flow between work units. If you want to immediately suspend a stream, you do not need semaphores or β€œsignals” at all [1] .

 use strict; use warnings; use threads; use Thread::Suspend; # Must call import! { package Worker; sub new { my $class = shift; return bless({ @_ }, $class); } sub thr { return $_[0]{thr} } sub tid { return $_[0]{thr}->tid() } sub join { $_[0]{thr}->join() } sub pause { $_[0]{suspended}++ || $_[0]{thr}->suspend() } sub resume { --$_[0]{suspended} || $_[0]{thr}->resume() } } sub createThread { my $thr = async { ... }; return Worker->new( thr => $thr ); } sub pause { my ($worker) = @_; $worker->pause(); } sub resume { my ($worker) = @_; $worker->resume(); } 

Bonus: $worker->pause; $worker->pause; $worker->resume; $worker->resume; $worker->pause; $worker->pause; $worker->resume; $worker->resume; works great for both of these methods (unlike the version in question).

If you want to continue using tid instead of an object, just save the object in a hash entered with tid.


  • At least not in addition to Thread :: Suspend, which uses a β€œsignal”, as well as what makes up the semaphores inside.
+4
source share

All Articles