Consume json queue messages in Laravel

Typically, Laravel expects it to queue all messages that it subsequently consumes. It creates a payload with the job attribute, which later indicates how to process the queue message. When you do jobs in a queue with Laravel and then process them with Laravel, it works great!

However, I have some non-Laravel applications that send json messages to the queue. I need Laravel to pick up these messages and process them.

I can write a command bus job to process messages, but I was not able to figure out how to tell queue:work to send messages to my specific handler.

It seems that Laravel has a strong assumption that any messages in the queue that he requests will be correctly formatted, serialized, and structured as they expect them.

How can I get Laravel to pick up these raw json files, ignore the structure (there is nothing to understand it) and just pass the payload to my handler?

For example, if I have a message in the queue similar to:

 { "foo" : "bar" } 

So again, Laravel has nothing to inspect or understand.

But I have a job handler that knows how to handle this:

 namespace App\Jobs; class MyQueueHandler { public function handle($payload) { Log::info($payload['foo']); // yay! } } 

Now, how to get queue:work and queue:listen to just pass all the useful data to this App\Jobs\MyQueueHandler , where can I do the rest myself?

+5
source share
2 answers

You did not specify which version of Laravel, so I assume 5.1 (a huge difference in how this is handled in L4.2 and L5.x).

If you have already configured your App\Jobs\MyQueueHandler and want to queue the task from the controller using any data, you can simply do this:

 use App\Jobs\MyQueueHandler; class MyController { public function myFunction() { $this->dispatch(new MyQueueHandler(['foo' => 'bar'])); } } 

In your MyQueueHandler class MyQueueHandler payload is actually included in your constructor. The handle method is still running when your queue is being processed. However, you can use the parameters on your handle method if you rely on dependency injection ( read more here , just above "When Things Go Wrong") So something like this should do this:

 namespace App\Jobs; class MyQueueHandler { protected $payload; public function __construct($payload) { $this->payload = $payload; } public function handle() { Log::info($this->payload['foo']); // yay! } } 

Note. If you want to send the task from outside the main controller (which inherits from the standard class App\Http\Controller ), use the DispatchesJobs character;

 MyClass { use DispatchesJobs; public function myFunction() { $this->dispatch(new MyQueueHandler(['foo' => 'bar'])); } } 

(Code verified using Laravel 5.1.19 and beanstalkd queue-adapter).

0
source

What you are asking for is not possible because Laravel is trying to fulfill the Gearman payload (see \Illuminate\Bus\Dispatcher ).

I was in the same situation and just created a command wrapper around the Laravel job class. This is not a pleasant solution, as it will reorder the events in the json queue, but you do not need to touch the existing task classes. Perhaps someone with great experience knows how to send a task without sending it again to the wire.

Suppose we have one regular Laravel working class called GenerateIdentApplicationPdfJob .

 class GenerateIdentApplicationPdfJob extends Job implements SelfHandling, ShouldQueue { use InteractsWithQueue, SerializesModels; /** @var User */ protected $user; protected $requestId; /** * Create a new job instance. * * QUEUE_NAME = 'ident-pdf'; * * @param User $user * @param $requestId */ public function __construct(User $user, $requestId) { $this->user = $user; $this->requestId = $requestId; } /** * Execute the job. * * @return void */ public function handle(Client $client) { // ... } } 

To be able to handle this class, we need to provide constructor arguments to our own. This is the required data from our json queue.

The following is the Laravel command class GearmanPdfWorker , which runs all the Gearman and json_decode connection patterns in order to be able to process the source class of the job.

class GearmanPdfWorker extends Team {

  /** * The console command name. * * @var string */ protected $name = 'pdf:worker'; /** * The console command description. * * @var string */ protected $description = 'listen to the queue for pdf generation jobs'; /** * @var \GearmanClient */ private $client; /** * @var \GearmanWorker */ private $worker; public function __construct(\GearmanClient $client, \GearmanWorker $worker) { parent::__construct(); $this->client = $client; $this->worker = $worker; } /** * Wrapper listener for gearman jobs with plain json payload * * @return mixed */ public function handle() { $gearmanHost = env('CB_GEARMAN_HOST'); $gearmanPort = env('CB_GEARMAN_PORT'); if (!$this->worker->addServer($gearmanHost, $gearmanPort)) { $this->error('Error adding gearman server: ' . $gearmanHost . ':' . $gearmanPort); return 1; } else { $this->info("added server $gearmanHost:$gearmanPort"); } // use a different queue name than the original laravel command, since the payload is incompatible $queueName = 'JSON.' . GenerateIdentApplicationPdfJob::QUEUE_NAME; $this->info('using queue: ' . $queueName); if (!$this->worker->addFunction($queueName, function(\GearmanJob $job, $args) { $queueName = $args[0]; $decoded = json_decode($job->workload()); $this->info("[$queueName] payload: " . print_r($decoded, 1)); $job = new GenerateIdentApplicationPdfJob(User::whereUsrid($decoded->usrid)->first(), $decoded->rid); $job->onQueue(GenerateIdentApplicationPdfJob::QUEUE_NAME); $this->info("[$queueName] dispatch: " . print_r(dispatch($job))); }, [$queueName])) { $msg = "Error registering gearman handler to: $queueName"; $this->error($msg); return 1; } while (1) { $this->info("Waiting for job on `$queueName` ..."); $ret = $this->worker->work(); if ($this->worker->returnCode() != GEARMAN_SUCCESS) { $this->error("something went wrong on `$queueName`: $ret"); break; } $this->info("... done `$queueName`"); } } } 

The GearmanPdfWorker class must be registered in your \Bundle\Console\Kernel as follows:

 class Kernel extends ConsoleKernel { protected $commands = [ // ... \Bundle\Console\Commands\GearmanPdfWorker::class ]; // ... 

With all this in place, you can call php artisan pdf:worker to start the worker and place one job in Gearman via the command line: gearman -v -f JSON.ident-pdf '{"usrid":9955,"rid":"ABC4711"}'

You can see the successful operation, then

 added server localhost:4730 using queue: JSON.ident-pdf Waiting for job on `JSON.ident-pdf` ... [JSON.ident-pdf] payload: stdClass Object ( [usrid] => 9955 [rid] => ABC4711 ) 0[JSON.ident-pdf] dispatch: 1 ... done `JSON.ident-pdf` Waiting for job on `JSON.ident-pdf` ... 
0
source

All Articles