Subversion-Projekte lars-tiefland.laravel_shop

Revision

Details | Letzte Änderung | Log anzeigen | RSS feed

Revision Autor Zeilennr. Zeile
199 lars 1
<?php namespace Clockwork\DataSource;
2
 
3
use Clockwork\Helpers\Serializer;
4
use Clockwork\Helpers\StackTrace;
5
use Clockwork\Request\Request;
6
 
7
use Illuminate\Queue\Queue;
8
 
9
// Data source for Laravel queue component, provides dispatched queue jobs
10
class LaravelQueueDataSource extends DataSource
11
{
12
	// Queue instance
13
	protected $queue;
14
 
15
	// Dispatched queue jobs
16
	protected $jobs = [];
17
 
18
	// Clockwork ID of the current request
19
	protected $currentRequestId;
20
 
21
	// Create a new data source instance, takes a queue as an argument
22
	public function __construct(Queue $queue)
23
	{
24
		$this->queue = $queue;
25
	}
26
 
27
	// Adds dispatched queue jobs to the request
28
	public function resolve(Request $request)
29
	{
30
		$request->queueJobs = array_merge($request->queueJobs, $this->getJobs());
31
 
32
		return $request;
33
	}
34
 
35
	// Reset the data source to an empty state, clearing any collected data
36
	public function reset()
37
	{
38
		$this->jobs = [];
39
	}
40
 
41
	// Listen to the queue events
42
	public function listenToEvents()
43
	{
44
		$this->queue->createPayloadUsing(function ($connection, $queue, $payload) {
45
			$this->registerJob([
46
				'id'         => $id = (new Request)->id,
47
				'connection' => $connection,
48
				'queue'      => $queue,
49
				'name'       => $payload['displayName'],
50
				'data'       => isset($payload['data']['command']) ? $payload['data']['command'] : null,
51
				'maxTries'   => $payload['maxTries'],
52
				'timeout'    => $payload['timeout'],
53
				'time'       => microtime(true)
54
			]);
55
 
56
			return [ 'clockwork_id' => $id, 'clockwork_parent_id' => $this->currentRequestId ];
57
		});
58
	}
59
 
60
	// Set Clockwork ID of the current request
61
	public function setCurrentRequestId($requestId)
62
	{
63
		$this->currentRequestId = $requestId;
64
		return $this;
65
	}
66
 
67
	// Collect a dispatched queue job
68
	protected function registerJob(array $job)
69
	{
70
		$trace = StackTrace::get()->resolveViewName();
71
 
72
		$job = array_merge($job, [
73
			'trace' => (new Serializer)->trace($trace)
74
		]);
75
 
76
		if ($this->passesFilters([ $job ])) {
77
			$this->jobs[] = $job;
78
		}
79
	}
80
 
81
	// Get an array of dispatched queue jobs commands
82
	protected function getJobs()
83
	{
84
		return array_map(function ($query) {
85
			return array_merge($query, [
86
				'data' => isset($query['data']) ? (new Serializer)->normalize($query['data']) : null
87
			]);
88
		}, $this->jobs);
89
	}
90
}