ikilobyte/pulsar-client-php

PHP Native Client library for Apache Pulsar

Maintainers

Package info

github.com/ikilobyte/pulsar-client-php

pkg:composer/ikilobyte/pulsar-client-php

Statistics

Installs: 114 776

Dependents: 3

Suggesters: 0

Stars: 60

Open Issues: 8

v1.4.8 2026-05-14 02:53 UTC

README

Contents

About

English | 中文

This is a Apache Pulsar client library implemented in php Reference PulsarApi.proto And support Swoole coroutine

Features

  • Support URL (pulsar://pulsar+ssl://http://https://)
  • Multi topic consumers
  • TLS connection
  • Automatic reconnection (Only Consumer)
  • Message batching
  • Message Properties
  • Compression with zstd, zlib
  • Authentication with jwt, basic

Requirements

  • PHP >=7.1
  • ZLib Extension(If you want to use zlib compression)
  • Zstd Extension(If you want to use zstd compression)
  • Swoole Extension(If you want to use in swoole)
    • Use in the swoole only requires that the SWOOLE_HOOK_SOCKETS、SWOOLE_HOOK_STREAM_FUNCTION or SWOOLE_HOOK_ALL

Tips

If the following error occurs, please install extension gmp or bcmath

Negative integers are only supported with GMP or BC (64bit) intextensions.

Installation

composer require ikilobyte/pulsar-client-php

Producer

<?php

use Pulsar\Authentication\Basic;
use Pulsar\Authentication\Jwt;
use Pulsar\Compression\Compression;
use Pulsar\Producer;
use Pulsar\ProducerOptions;
use Pulsar\MessageOptions;

require_once __DIR__ . '/vendor/autoload.php';

$options = new ProducerOptions();

// If permission authentication is available
// use JWT authentication
$options->setAuthentication(new Jwt('token')); 

// use Basic authentication
//$options->setAuthentication(new Basic('user','password'));

$options->setConnectTimeout(3);
$options->setTopic('persistent://public/default/demo');
$options->setCompression(Compression::ZLIB);
$producer = new Producer('pulsar://localhost:6650', $options);
// or use pulsar proxy address
//$producer = new Producer('http://localhost:8080', $options);

$producer->connect();

for ($i = 0; $i < 10; $i++) {
    $messageID = $producer->send(sprintf('hello %d',$i));
    
    $messageID = $producer->send(sprintf('hello properties %d',$i),[
        MessageOptions::PROPERTIES => [
           'key' => 'value',
           'ms'  => microtime(true),
        ],
    ]);
    echo 'messageID ' . $messageID . "\n";
}

// Sending delayed messages
for ($i = 0; $i < 10; $i++) {
    $producer->send(sprintf('hello-delay %d',$i),[
        MessageOptions::DELAY_SECONDS => $i * 5, // Seconds
    ]);
}

// Send Batch message 
// The underlying protocol will automatically package these messages into a message and send it to pulsar
$messages = [];
for ($i = 0;$i < 10;$i++) {
    $messages[] = json_encode([
          'id'    => $i,
          'now'   => date('Y-m-d H:i:s')
    ]);
}

$messageID = $producer->send($messages);
echo "batch message id ${messageID}\n";

// close
$producer->close();

Keepalive Connection (Recommended)

  • require Swoole extension
  • If it is a resident memory application, it is recommended to open it.
  • Will keep connected, no need to repeatedly establish a connection
  • Calling the close method closes the connection
  • Please see example
$options->setKeepalive(true);

Message deduplication

  • Message de-duplication is a feature provided by pulsar and is based on the producer name and sequence number ID
  • The name of the same producer needs to be fixed and unique, generally distinguished by business latitude, and the sequence number ID of each message is unique and self-incrementing.
  • Reference Pulsar Docs
$options = new ProducerOptions();
$options->setProducerName('name');

$producer = new Producer('pulsar://localhost:6650', $options);
$producer->send('body',[
    \Pulsar\MessageOptions::SEQUENCE_ID => 123456,
]);

Consumer

<?php

use Pulsar\Authentication\Jwt;
use Pulsar\Authentication\Basic;
use Pulsar\Consumer;
use Pulsar\ConsumerOptions;
use Pulsar\SubscriptionType;
use Pulsar\Proto\CommandSubscribe\InitialPosition;

require_once __DIR__ . '/vendor/autoload.php';

$options = new ConsumerOptions();

// If permission authentication is available
// use JWT authentication
$options->setAuthentication(new Jwt('token'));

// use Basic authentication
//$options->setAuthentication(new Basic('user','password'));

$options->setConnectTimeout(3);
$options->setTopic('persistent://public/default/demo');
$options->setSubscription('logic');
$options->setSubscriptionType(SubscriptionType::Shared);

// Initial position at which to set cursor when subscribing to a topic at first time.	
// default use InitialPosition::Latest()
// $options->setSubscriptionInitialPosition(InitialPosition::Earliest());


// Configure how many seconds Nack's messages are redelivered, the default is 1 minute
$options->setNackRedeliveryDelay(20);

$consumer = new Consumer('pulsar://localhost:6650', $options);
// or use pulsar proxy address
//$consumer = new Consumer('http://localhost:8080', $options);

$consumer->connect();

while (true) {
    $message = $consumer->receive();
    
    // get properties
    var_export($message->getProperties());
    
    echo sprintf('Got message 【%s】messageID[%s] topic[%s] publishTime[%s] redeliveryCount[%d]',
        $message->getPayload(),
        $message->getMessageId(),
        $message->getTopic(),
        $message->getPublishTime(),
        $message->getRedeliveryCount()
    ) . "\n";

    // ... 
    
    // Remember to confirm that the message is complete after processing
    $consumer->ack($message);
    
    // When processing fails, you can also execute the Nack
    // The message will be re-delivered after the specified time
    // $consumer->nack($message);
}

$consumer->close();

Receive Batch Message

  • Only when the producer sends the message in bulk can the batch message be received.
$messages = $consumer->batchReceive();
foreach ($messages as $message) {
    // ...
    
    // Ack
    $consumer->ack($message);
}

Subscribe to multiple topics

$options->setTopics([
    'persistent://public/default/demo-1',
    'persistent://public/default/demo-2',
    'persistent://public/default/demo-3',
    //....
]);

Dead letter topic

// Assuming that the subject matter is: <topicname>-<subscriptionname>-DLQ
$options->setDeadLetterPolicy(6);

// Custom topic name
$options->setDeadLetterPolicy(6,'persistent://public/default/demo-dead');

// Custom subscription name
$options->setDeadLetterPolicy(6,'persistent://public/default/demo-dead','sub-name');

Reconnect(Only Support Consumer)

// start reconnect
$options->setReconnectPolicy(true);

// Reconnect interval(seconds)
$options->setReconnectPolicy(true,3);

// Maximum number of reconnections
$options->setReconnectPolicy(true,3,100);

Not loop Receive And Smooth exit

$running = true;

// kill -15 $PID  
pcntl_signal(SIGTERM,function() use (&$running){
    $running = false;
});

while ($running) {
    try {
        $message = $consumer->receive(false);
        
        // ...
    } catch (\Pulsar\Exception\MessageNotFound $e) {
        if ($e ->getCode() != \Pulsar\Exception\MessageNotFound::Ignore) {
            die($e->getMessage());
        }
        echo "Message Not Found\n";
        continue;
    } catch (Throwable $e) {
        echo $e->getMessage() . "\n";
        throw $e;
    } finally {
        pcntl_signal_dispatch();
    }
}

TLS

  • Refer to the official documentation for certificate configuration

  • Example

$tls = new \Pulsar\TLSOptions('./cert.pem','./cert.key.pem');

// Establishing a TLS connection without a certificate
//$tls = new \Pulsar\TLSOptions('','');

// CA Cert
$tls->setTrustCertsFilePath('./ca.cert.pem');

// optional
$tls->setAllowInsecureConnection(false);
$tls->setValidateHostname(true);
$options->setTLS($tls);

$consumer = new \Pulsar\Consumer('pulsar+ssl://localhost:6651',$options);
//$producer = new \Pulsar\Producer('pulsar+ssl://localhost:6651',$options);


// or https
$consumer = new \Pulsar\Consumer('https://localhost:8081',$options);
//$producer = new \Pulsar\Producer('https://localhost:8081',$options);

Schema

<?php

class Person
{
    public $id;
    public $name;
    public $age;
    // ...
}
  • Producer Statement Schema
<?php
$define = '{"type":"record","name":"Person","fields":[{"name":"id","type":"int"},{"name":"name","type":"string"},{"name":"age","type":"int"}]}';
$schema = new \Pulsar\Schema\SchemaJson($define, [
    'key' => 'value',
]);

// ... some code
$producerOptions->setSchema($schema);
$producer = new \Pulsar\Producer('xx',$options);
$producer->connect();

$person = new Person();
$person->id = 1;
$person->name = 'Tony';
$person->age = 18;

// directly send Person Object No need to manually convert to json string
$id = $producer->send($person);
  • Consumer Statement Schema
<?php
$define = '{"type":"record","name":"Person","fields":[{"name":"id","type":"int"},{"name":"name","type":"string"},{"name":"age","type":"int"}]}';

$schema = new \Pulsar\Schema\SchemaJson($define, [
   'key' => 'value',
]);

// ... some code
$consumerOptions->setSchema($schema);
$consumer = new \Pulsar\Consumer('pulsar://xxx',$consumerOptions);
$consumer->connect();

while (true) {
   $message = $consumer->receive();
   $person = new Person();
   $message->getSchemaValue($person);
   echo sprintf(
       'payload %s id %d name %s age %d',
       $message->getPayload(),
       $person->id,
       $person->name,
       $person->age
   ) . "\n";
   
   // .. some code
}

Reader

<?php
use Pulsar\Message;
use Pulsar\Reader;
use Pulsar\ReaderOptions;

require_once __DIR__ . '/../vendor/autoload.php';


$options = new ReaderOptions();

// If permission authentication is available
// Only JWT authentication is currently supported
// $options->setAuthentication(new Jwt('token'));

$options->setConnectTimeout(3);
$options->setTopic('persistent://public/default/demo'); // support partition topic

// Read the latest message
$options->setStartMessageID(Message::latestMessageIdData());

// From the earliest message
// $options->setStartMessageID(Message::earliestMessageIdData());

// Start reading from a message
// $options->setStartMessageID(Message::deserialize('621:103:0'));

$reader = new Reader('pulsar://localhost:6650', $options);
$reader->connect();

while (true) {
    $message = $reader->next();
    echo sprintf('Got message 【%s】messageID[%s]  topic[%s] publishTime[%s]',
            $message->getPayload(),
            $message->getMessageId(),
            $message->getTopic(),
            $message->getPublishTime()
        ) . "\n";

}

$reader->close();

Options

  • ProducerOptions
    • setTopic()
    • setAuthentication()
    • setConnectTimeout()
    • setProducerName()
    • setCompression()
    • setSchema()
    • setKeepalive()
  • ConsumerOptions
    • setTopic()
    • setTopics()
    • setAuthentication()
    • setConnectTimeout()
    • setConsumerName()
    • setSubscription()
    • setSubscriptionType()
    • setNackRedeliveryDelay()
    • setReceiveQueueSize()
    • setDeadLetterPolicy()
    • setSubscriptionInitialPosition()
    • setReconnectPolicy()
    • setSchema()
  • ReaderOptions
    • setTopic()
    • setAuthentication()
    • setConnectTimeout()
    • setReaderName()
    • setStartMessageID()
    • setReceiveQueueSize()
  • MessageOptions
    • DELAY_SECONDS
    • SEQUENCE_ID
    • PROPERTIES
  • TLSOption (v1.3.0)
    • __construct(string $certFilePath, string $keyFilePath)
    • setTrustCertsFilePath()
    • setValidateHostname()
    • setAllowInsecureConnection()

MessageNotFound ErrCode (v1.2.1)

  • MessageNotFound::Ignore
  • MessageNotFound::CommandParseFail

MCP Server for AI Agents

This library includes an MCP (Model Context Protocol) stdio server that lets AI agents (Cursor, Claude Desktop, etc.) produce, consume, and peek at Pulsar messages. No extra dependencies required.

Tools: pulsar_publish · pulsar_consume · pulsar_peek

Quick setup — add to ~/.cursor/mcp.json (Cursor) or claude_desktop_config.json (Claude Desktop):

{
  "mcpServers": {
    "pulsar": {
      "command": "php",
      "args": ["/path/to/pulsar-client-php/examples/mcp-server.php"],
      "env": {
        "PULSAR_BROKER_URL": "pulsar://localhost:6650",
        "PULSAR_TOKEN": ""
      }
    }
  }
}

See examples/mcp.md for full tool reference and testing instructions.

License

MIT LICENSE