Perl asynchronous tasks for "any" code, no matter what it is?

I am writing a verification system that performs various โ€œchecksโ€ on various services, systems, databases, files, etc. A โ€œcheckโ€ is general in nature and can be anything. All checks are reported in a common weather format that they skip or fail, no matter what.

This is written in a modular way OO, so that developers can simply follow the frame and write checks regardless of both. Each object contains a common reporting object, which after starting the check is simply $ self โ†’ {'report'} โ†’ report (params). The parameters are defined, and it is assumed that the developers report this accordingly. The reporting entity then indexes these reports. My main script loader has entries like:

my $reportingObject = new Checks::Reporting(params); my @checks; push @checks, new Checks::Check_One($reportingObject, params)); push @checks, new Checks::Check_One($reportingObject, params)); . . push @checks, new Checks::Check_N($reportingObject, params)); 

To start checking and complete the report after they are completed, I do:

 foreach my $check (@checks) { $check->run_stuff(); } $reportingObject->finalize_report(); 

Now, since these checks are completely independent (do not worry about the reporting object), they can be run in parallel. As an improvement, I did:

 my @threads; foreach my $check (@checks) { push @threads, async { $check->run_stuff(); } } foreach my $thread (@threads) { $thread->join; } #All threads are complete, and thus all checks are done $reportingObject->finalize_report(); 

As I said, developers will write Checks independently of each other. Some checks are simple, while others are not. Simple checks may not have asynchronous code in them, but others may need to run asynchronously inside, for example

 sub do_check { my @threads; my @list = @{$self->{'list'}}; foreach my $item (@list) { push @threads, async { #do_work_on_$item #return 1 or 0 for success or fail }; foreach my $thread (@threads) { my $res = $thread->join; if($res == 1) { $self->{'reporting'}->report(params_here); } } } } 

As you can see, the threading model allows me to do things in very vague terms. Each "Check" no matter what it runs independently in its own branch. If an individual developer has asynchronous material, regardless of what he has, he simply does it on his own in his thread. I need a model like this.

Unfortunately, threads are slow and inefficient. All asynchronous libraries have specific observers, such as IO, etc. I do not want anything specific. I need an event-based model that allows me to simply run asynchronous tasks, no matter what they are, and just notify when all this is done so I can move on.

Hope this explains this and you can point me in the right direction.

+7
source share
1 answer

This seems like a good model for an employee boss:

  • Create some workers at the beginning of the program. Make sure they all have access to the queue.

  • Block as many checks as you want. Workers deactivate checks, execute them, and complete the result in the output queue.

  • Your main thread looks at the output stream results and does whatever it wants.

  • Join the workers in the END block

You will probably want to look at Thread::Queue::Any if there is a chance that you want to queue coderefs.

Here is a complete example:

 use strict; use feature 'say'; use threads; use threads::shared; use Thread::Queue::Any; use constant NUM_THREADS => 5; local $Storable::Deparse = 1; local $Storable::Eval = 1; # needed to serialize code my $check_q = Thread::Queue::Any->new; my $result_q = Thread::Queue::Any->new; # start the workers { my $running :shared = NUM_THREADS; my @threads = map threads->new(\&worker, $check_q, $result_q, \$running), 1..NUM_THREADS; END { $_->join for @threads } } # enqueue the checks $check_q->enqueue($_) for sub {1}, sub{2}, sub{"hi"}, sub{ die }; $check_q->enqueue(undef) for 1..NUM_THREADS; # end the queue while(defined( my $result = $result_q->dequeue )) { report($$result); } sub report { say shift // "FAILED"; } sub worker { my ($in, $out, $running_ref) = @_; while (defined( my $check = $in->dequeue )) { my $result = eval { $check->() }; $out->enqueue(\$result); } # last thread closes the door lock $$running_ref; --$$running_ref || $out->enqueue(undef); } 

Will print

 1 2 hi FAILED 

in a somewhat random manner.

+6
source

All Articles