workerman / redis-queue
Message queue system written in PHP based on workerman and backed by Redis.
Installs: 159 489
Dependents: 12
Suggesters: 0
Security: 0
Stars: 45
Watchers: 2
Forks: 11
Open Issues: 6
pkg:composer/workerman/redis-queue
Requires
- php: >=7.0
- workerman/redis: ^1.0||^2.0
- workerman/workerman: >=4.0.20
README
Message queue system written in PHP based on workerman and backed by Redis.
Install
composer require workerman/redis-queue
Usage
test.php
<?php require __DIR__ . '/vendor/autoload.php'; use Workerman\Worker; use Workerman\Timer; use Workerman\RedisQueue\Client; $worker = new Worker(); $worker->onWorkerStart = function () { $client = new Client('redis://127.0.0.1:6379'); $client->subscribe('user-1', function($data) { echo "user-1\n"; var_export($data); }); $client->subscribe('user-2', function($data) { echo "user-2\n"; var_export($data); }); $client->onConsumeFailure(function (\Throwable $exception, $package) { echo "consume failure\n"; echo $exception->getMessage(), "\n"; var_export($package); }); Timer::add(1, function() use ($client) { $client->send('user-1', ['some', 'data']); }); }; Worker::runAll();
Run with command php test.php start or php test.php start -d.
API
- Client::__construct()
- Client::send()
- Client::subscribe()
- Client::unsubscribe()
- Client::onConsumeFailure()
__construct (string $address, [array $options])
Create an instance by $address and $options.
- 
$addressfor exampleredis://ip:6379.
- 
$optionsis the client connection options. Defaults:- auth: default ''
- db: default 0
- retry_seconds: Retry interval after consumption failure
- max_attempts: Maximum number of retries after consumption failure
 
send(String $queue, Mixed $data, [int $delay=0])
Send a message to a queue
- $queueis the queue to publish to,- String
- $datais the message to publish,- Mixed
- $delayis delay seconds for delayed consumption,- Int
subscribe(mixed $queue, callable $callback)
Subscribe to a queue or queues
- $queueis a- Stringqueue or an- Arraywhich has as keys the queue name to subscribe.
- $callback-- function (Mixed $data),- $datais the data sent by- send($queue, $data).
unsubscribe(mixed $queue)
Unsubscribe from a queue or queues
onConsumeFailure(callable $callback)
When consumption fails onConsumeFailure is triggered.
- $callback-- function (\Throwable $exception, array $package),- $packagecontains information such as data queue attempts