b2pweb / bdf-queue
Bdf queue component
Installs: 7 857
Dependents: 3
Suggesters: 0
Security: 0
Stars: 3
Watchers: 1
Forks: 0
Open Issues: 2
pkg:composer/b2pweb/bdf-queue
Requires
- php: ~7.4 || ~8.0.0 || ~8.1.0 || ~8.2.0 || ~8.3.0 || ~8.4.0
- ext-pcntl: *
- b2pweb/bdf-dsn: ~1.0
- b2pweb/bdf-instantiator: ~1.0
- symfony/console: ~5.4|~6.0|~7.0
- symfony/messenger: ~5.4|~6.0|~7.0
Requires (Dev)
- b2pweb/bdf-serializer: ~1.0
- doctrine/dbal: ~2.5|~3.0
- enqueue/enqueue: ~0.9
- enqueue/fs: ~0.9
- friendsofphp/php-cs-fixer: ^3.0
- league/container: ~3.0
- monolog/monolog: ~2.0
- pda/pheanstalk: ^3.1@dev
- php-amqplib/php-amqplib: ~3.0
- phpbench/phpbench: ~0.0|~1.0
- phpunit/phpunit: ~9.6
- predis/predis: ~1.1.0
- ramsey/uuid: ~3.0|~4.0
- symfony/error-handler: ~5.4|~6.0|~7.0
- symfony/phpunit-bridge: ~5.4|~6.0|~7.0
- symfony/var-dumper: ~5.4|~6.0|~7.0
- vimeo/psalm: ~5.22 | ~6.0
Suggests
- ext-redis: Required to use the Redis queue driver
- aws/aws-sdk-php: Required to use the SQS queue driver (~3.0).
- enqueue/enqueue: Required to use the Enqueue driver (~0.9)
- ext/rdkafka: Required to use the Kafka queue driver (~4.0|~5.0|~6.0)
- iron-io/iron_mq: Required to use the iron queue driver (~2.0).
- kwn/php-rdkafka-stubs: Required to use the Kafka queue driver (~2.0)
- pda/pheanstalk: Required to use the beanstalk queue driver (~3.0).
- php-amqplib/php-amqplib: Required to use the RabbitMQ queue driver (~2.6|~3.0)
- symfony/var-dumper: VarDumper could be used for displaying failed message (~5.4|~6.0|~7.0)
Conflicts
- enqueue/dsn: 0.10.25
README
This package provides 2 layers for abstraction of message broker.
- A connection layer
- A destination layer
Supports
| Message Broker | Library | Driver name | 
|---|---|---|
| Beanstalk | Pheanstalk | pheanstalk | 
| Db | Doctrine | doctrine+(*) | 
| Enqueue | php-enqueue | enqueue+(*) | 
| Gearman | Pecl Gearman | gearman | 
| Kafka | RdKafka | rdkafka | 
| Memory | memory | |
| Null | null | |
| RabbitMQ | Amqp lib | amqp-lib | 
| Redis (Ext) | PhpRedis | redis+phpredis | 
| Redis | PRedis | redis+predis | 
Usage Instructions
Produce messages
First, create a new destination manager instance.
<?php use Bdf\Queue\Connection\Factory\ResolverConnectionDriverFactory; use Bdf\Queue\Connection\Pheanstalk\PheanstalkConnection; use Bdf\Queue\Destination\ConfigurationDestinationFactory; use Bdf\Queue\Destination\DestinationManager; use Bdf\Queue\Destination\DestinationFactory; use Bdf\Queue\Serializer\JsonSerializer; // Declare connections $driverFactory = new ResolverConnectionDriverFactory([ 'foo' => [ 'driver' => 'pheanstalk', 'host' => 'localhost', 'port' => '11300', 'additionalOption' => 'value', ] // OR use DSN 'foo' => 'pheanstalk://localhost:11300?additionalOption=value' ]); // Declare drivers $driverFactory->addDriverResolver('pheanstalk', function($config) { //echo $config['connection'] displays "foo" return new PheanstalkConnection($config['connection'], new JsonSerializer()); }); // Declare destination // You can also declare your custom destination that defined type of transport (queue, multi queues, topic, ...), // the connection to use, and the name of the queue(s) / topic(s) to use. // This example will use the queue driver of the "foo" connection defined above. And send / consume message on the queue named "default". $destinationFactory = new DestinationFactory( $driverFactory, ['my_destination' => 'queue://foo/default'] ); // To send a message to multiple destinations, you can use "aggregate" destination type. // You can use a wildcard to send to all destinations that match the pattern. // In this example, 'user' destination will be sent to the "foo" and "bar" queues, and to all topics that match the pattern "*.user" $destinationFactory = new DestinationFactory( $driverFactory, [ 'foo' => 'queue://test/foo', 'bar' => 'queue://test/bar', 'a.user' => 'topic://a/user', 'b.user' => 'topic://b/user', 'user' => 'aggregate://foo,bar,*.user', ] ); // Create the manager $manager = new DestinationManager($driverFactory, $destinationFactory);
Push a basic message into the queue. The consume should defined handler to process the message.
<?php use Bdf\Queue\Message\Message; $message = Message::create('Hello world'); $message->setDestination('my_destination'); // or use a lower level setting the connection and queue. $message = Message::create('Hello world', 'queue'); $message->setConnection('foo'); /** @var Bdf\Queue\Destination\DestinationManager $manager */ $manager->send($message);
Useful for monolithic application that needs to differ a process. Push a message job into the queue. The consumer will evaluate the job string and run the processor. In this use case the producer and the receiver share the same model.
<?php $message = \Bdf\Queue\Message\Message::createFromJob(Mailer::class.'@send', ['body' => 'my content']); $message->setDestination('my_destination'); /** @var Bdf\Queue\Destination\DestinationManager $manager */ $manager->send($message);
Available type for dsn destination
The class Bdf\Queue\Destination\DsnDestinationFactory provides default type of destination:
| Name | Exemple | Definition | 
|---|---|---|
| queue | queue://connection_name/queue_name | Publish and consume a single queue | 
| queues | queues://connection_name/queue1,queue2 | Only consume multi queues | 
| topic | topic://connection_name/topic | Publish and consume a topic. Pattern with wildcard are allowed for consumer use case only (ex: topic.*) | 
| topics | topics://connection_name/topic1,topic2 | Only consume multi topics | 
You can declare your own type:
<?php use Bdf\Dsn\DsnRequest; use Bdf\Queue\Connection\ConnectionDriverInterface; use Bdf\Queue\Connection\Factory\ResolverConnectionDriverFactory; /** @var ResolverConnectionDriverFactory $driverFactory */ $destinationFactory = new Bdf\Queue\Destination\DsnDestinationFactory($driverFactory); $destinationFactory->register('my_own_type', function(ConnectionDriverInterface $connection, DsnRequest $dsn) { // ... }); // use dsn as "my_own_type://connection/queue_or_topic_name?option="
Consume messages
The consumer layer provides many tools for message handling. The default stack of objects that will receive the message is:
consumer (ConsumerInterface) -> receivers (ReceiverInterface) -> processor (ProcessorInterface) -> handler (callable)
- consumerhas the strategy for reading the message from queue / topic. It also manage a graceful shutdown.
- receiversis the stack of middlewares interacts with the envelope.
- processorresolves the handler arguments. You can plug here your business logic and remove the handler layer. By default processor injects 2 arguments in handlers: the message data and the envelope.
- handlermanages the business logic. Handler allows an interface less mode.
An example to consume a simple message:
<?php use Bdf\Queue\Consumer\Receiver\ProcessorReceiver; use Bdf\Queue\Destination\DestinationManager; use Bdf\Queue\Processor\CallbackProcessor; use Bdf\Queue\Processor\MapProcessorResolver; // Create your processor and declare in a map: $myProcessor = new CallbackProcessor(function($data) { echo $data; }); $processorResolver = new MapProcessorResolver(['foo' => $myProcessor]); /** @var DestinationManager $manager */ $manager->create('queue://foo')->consumer(new ProcessorReceiver($processorResolver))->consume(0);
Consume a job message:
<?php use Bdf\Instantiator\Instantiator; use Bdf\Queue\Consumer\Receiver\ProcessorReceiver; use Bdf\Queue\Destination\DestinationManager; use Bdf\Queue\Processor\JobHintProcessorResolver; /** @var Instantiator $instantiator */ // The job should be provided from message to get the processor $processorResolver = new JobHintProcessorResolver($instantiator); /** @var DestinationManager $manager */ $manager->create('queue://foo')->consumer(new ProcessorReceiver($processorResolver))->consume(0);
Create a handler
<?php /** @var Bdf\Queue\Destination\DestinationManager $manager */ class MyHandler { public function handle($data, \Bdf\Queue\Message\EnvelopeInterface $envelope) { echo $data; // Display 'foo' // Ack the message. Default behavior. The ack is sent before the call by the consumer. $envelope->acknowledge(); // Reject the message. It will be no more available. The message is rejected if and exception is thrown. $envelope->reject(); // Reject the message and send it back to the queue $envelope->reject(true); } } $message = \Bdf\Queue\Message\Message::createFromJob(MyHandler::class, 'foo', 'queue'); $manager->send($message);
Use the synthax "Class@method" to determine the callable (By default the method is "handle")
or register your handlers on a specific destination with the receiver builder:
<?php use Bdf\Queue\Consumer\Receiver\Builder\ReceiverBuilder; use Bdf\Queue\Consumer\Receiver\Builder\ReceiverLoader; use Bdf\Queue\Consumer\Receiver\Builder\ReceiverLoaderInterface; use Psr\Container\ContainerInterface; /** @var ContainerInterface $container */ /** @var Bdf\Queue\Destination\DestinationManager $manager */ $container->set(ReceiverLoaderInterface::class, function (ContainerInterface $container) { return new ReceiverLoader( $container, [ 'destination_name or connection_name' => function(ReceiverBuilder $builder) { /** @var \Bdf\Queue\Processor\ProcessorInterface $myProcessor */ /** @var \Bdf\Queue\Consumer\ReceiverInterface $myReceiver */ // Register your unique handler for the destination or connection. // all message will be handled by this handler. $builder->handler(MyHandler::class); // Or register your unique processor $builder->processor($myProcessor); // Or register the job bearer resolver as processor. The procesor will resolve the job // from the Message::$job attribute value. $builder->jobProcessor(); // Or register your own processor or handler by queue in case you consume a connection. // By default the key of the map is the queue name. You can provide your own key provider // with the second parameter. $builder->mapProcessor([ 'queue1' => $myProcessor, 'queue2' => MyHandler::class, ]); // Or register your final own receiver $builder->outlet($myReceiver); // Or register your own receiver in the stack $builder->add($myReceiver); // You can add more defined middlewares here // $builder->retry(2); } ] ); }); $receiver = $container->get(ReceiverLoaderInterface::class)->load('destination_name or connection_name')->build(); $manager->create('queue://foo')->consumer($receiver)->consume(0);
Run the consumer in console
$ example/consumer.php "connection name OR destination name"
Create receiver extensions
The consumer use a stack of receivers to extend the reception of messages.
See the interface Bdf\Queue\Consumer\ReceiverInterface and the trait Bdf\Queue\Consumer\DelegateHelper.
<?php class MyExtension implements \Bdf\Queue\Consumer\ReceiverInterface { use \Bdf\Queue\Consumer\DelegateHelper; private $options; /** * MyExtension constructor. */ public function __construct(\Bdf\Queue\Consumer\ReceiverInterface $delegate, array $options) { $this->delegate = $delegate; $this->options = $options; } /** * {@inheritdoc} */ public function receive($message, \Bdf\Queue\Consumer\ConsumerInterface $consumer): void { // Do something when receiving message if ($message->queue() === 'foo') { return; } // Call the next receiver $this->delegate->receive($message, $consumer); } }
You can use the Bdf\Queue\Consumer\Receiver\Builder\ReceiverLoader::add() to register your receiver in the stack
<?php $options = ['foo' => 'bar']; /** @var \Bdf\Queue\Consumer\Receiver\Builder\ReceiverBuilder $builder */ $builder->add(MyExtension::class, [$options]);
Customize the string payload
The class Bdf\Queue\Serializer\SerializerInterface manage the payload content sent to the message broker.
By default metadata are added to the json as:
- PHP Type: to help consumer to deserialize complex entities.
- Message info: The attempt number for retry, The sending date, ...
A basic payload looks like:
{
  "name": "Foo",
  "data": "Hello World",
  "date": "2019-12-23T16:02:03+01:00"
}
You can customize the string with your own implementation of the serializer interface.
Try the hello world example (configure the message broker in example/config/connections.php):
$ example/producer.php foo '{"name":"Foo", "data":"Hello World"}' --raw
$ example/consumer.php foo
RPC client
<?php use Bdf\Queue\Message\InteractEnvelopeInterface; use Bdf\Queue\Message\Message; class RpcReplyHandler { public function doSomethingUseful(int $number, InteractEnvelopeInterface $envelope) { // Send bask: 1 x 2 to client $envelope->reply($number * 2); // Or retry in 10sec $envelope->retry(10); } } $message = Message::createFromJob(RpcReplyHandler::class.'@doSomethingUseful', 1, 'queue'); $message->setConnection('foo'); /** @var Bdf\Queue\Destination\DestinationManager $manager */ $promise = $manager->send($message); // Consume the foo connection // Receive data from the reply queue. If the header "replyTo" is not set, // the response will be sent to "queue_reply" echo $promise->await(500)->data(); // Display 2
Additionnal options for connection
| Option | Type | Supports | Description | 
|---|---|---|---|
| driver | string | all | The name of the driver to use. See driver name in support section. | 
| vendor | string | all | Second part of the protocol. Vendor is used by some driver that use internal drivers. | 
| queue | string | all | The default queue of the connection used only if no queue has been set on the message. Destination should provide the queue. | 
| host | string | all | The host / ip to connect to message broker. Usually set to localhost. | 
| port | int | all | The port of the message broker. Usually set to the default port. | 
| user | string | all | |
| password | string | all | |
| prefetch | int | all | Load a number of message in memory. Faster for some broker that supports reservation | 
| serializer | string | all | Load a serializer for this connection. Used only by driver that needs serializer. | 
| vhost | string | amqp-lib | Default /. | 
| group | string | amqp-lib | Group use by topic to allows set of consumers on the same topic. Default bdf. | 
| sleep_duration | int | amqp-lib | The internal sleep in milliseconds between two pop. Default 200. | 
| queue_flags | int | amqp-lib | The flag for queue declaration. See AmqpDriver constants. Default 2(FLAG_QUEUE_DURABLEvalue). | 
| topic_flags | int | amqp-lib | The flag for topic declaration. See AmqpDriver constants. Default 0(FLAG_NOPARAMvalue). | 
| consumer_flags | int | amqp-lib | The flag for consumer. See AmqpDriver constants. Default 0(FLAG_NOPARAMvalue). | 
| auto_declare | bool | amqp-lib, redis, enqueue | Auto declare the queue when pushing or poping. Use queue setup command otherwise. Default false. | 
| qos_prefetch_size | int | amqp-lib | Prefetch optimisation. Default 0. | 
| qos_prefetch_count | int | amqp-lib | Prefetch optimisation. Default 1. | 
| qos_global | int | amqp-lib | Prefetch optimisation. Default false. | 
| table | string | doctrine | The table name to use to store message. Default value doctrine_queue | 
| ttr | int | pheanstalk | Time to run in seconds. Can also be defined in message header. Default 60. | 
| client-timeout | int | pheanstalk, gearman | Timeout of client in milliseconds. Disable by default. | 
| commitAsync | bool | rdkafka | Enable asynchrone ack. Default false. | 
| offset | int | rdkafka | Position to start consumer. Default null. | 
| partition | int | rdkafka | Partition to for the consumer, see kafka constant. Default -1(RD_KAFKA_PARTITION_UAvalue). | 
| global | array | rdkafka | Kafka config for global settings. | 
| producer | array | rdkafka | Kafka config for producer. | 
| consumer | array | rdkafka | Kafka config for the consume | 
| poll_timeout | int | rdkafka | The timeout for the poll method in milliseconds. | 
| flush_timeout | int | rdkafka | The timeout for the flush method in milliseconds. | 
| dr_msg_cb | callable | rdkafka | Delivery report callback. | 
| error_cb | callable | rdkafka | Error callback. | 
| rebalance_cb | callable | rdkafka | Called after consumer group has been rebalanced. | 
| stats_cb | callable | rdkafka | Statistics callback. | 
| partitioner | string | rdkafka | Kafka partitioner for topic settings. | 
| group | string | rdkafka | Group use by topic to allows set of consumers on the same topic. Default "2". | 
| timeout | int | redis | The connection timeout in seconds. Default 0. | 
| prefix | string | redis | The key prefix. Default queues:. | 
Note:
- Format of a valid DSN: {driver}+{vendor}://{user}:{password}@{host}:{port}/{queue}?{option}=value
- See https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md for more kafka options.
Additionnal options for message
| Option | Type | Supports | Description | 
|---|---|---|---|
| flags | int | amqp-lib | The flags for message. See driver constants. | 
| priority | int | pheanstalk | Priority message. Default 1024. | 
| ttr | int | pheanstalk | Time to run in seconds. Default 60. | 
| key | string | rdkafka | |
| partition | int | rdkafka | The number of the partition. | 
Serialization
Benchmarks
simple job / closure job
| Serializer | Serializer | +Compress | Bdf JSON | +Compress | Bdf binary | 
|---|---|---|---|---|---|
| Size | 141 / 377 | 105 / 244 | 109 / 407 | 76 / 247 | 98 / 355 | 
| Serialize time | 0.0014 / 6.8 | 0.016 / 7 | 0.011 / 7 | 0.026 / 7 | 0.011 / 7 | 
| Unserialize time | 0.007 / 0.0025 | 0.0082 / 0.0068 | 0.024 / 0.015 | 0.024 / 0.019 | 0.019 / 0.011 | 
Analysis
- For the best execution time, regardless of size, use the default Serializer
- For the smaller size, regardless of time, use BdfSerializerwithCompressedSerializer
- For the best compromise, use SerializerwithCompressedSerializer- Always smaller than pure BdfSerializer(JSON or Binary)
- Faster on unserialize, slightly slower on serialize
- Around twice faster than compressed bdf, but only ~40% larger on simple job
 
- Always smaller than pure 
License
Distributed under the terms of the MIT license.