flyokai / amp-data-pipeline
Async data pipeline using amphp
Requires
- php: ^8.1
- amphp/amp: ^3
- amphp/pipeline: ^1
README
User docs →
README.md· Agent quick-ref →CLAUDE.md· Agent deep dive →AGENTS.md
Composable async data pipelines on AMPHP 3.x — sources, processors, batching, multicast, with explicit concurrency and backpressure controls.
A small toolkit for building pull-based concurrent processing pipelines. You wire together a DataSource and one or more Processors; each processor controls its own fiber count and output queue size. Batch and multicast operators give you the rest.
Features
DataItemwrapper —data+metaarrays, immutable-ish- Sources —
ArraySource,IteratorSource,QueueSource - Processors —
ProcessorAbstract(overrideprocessDataItem()),SkipProcessor - Composition —
ProcessorCompositionchains stages sequentially - Batching —
Batch\BatchProcessorgroups items and creates a per-batch processor - Multicast —
DataCast\MultiCastProcessorfans out each item to parallel cast processors - Per-stage concurrency — fiber count, buffer size, optional ordering
- Cancellation propagated through the chain
Installation
composer require flyokai/amp-data-pipeline
Quick start
use Flyokai\AmpDataPipeline\{ArraySource, DataItem, ProcessorAbstract, ProcessorComposition}; final class Upper extends ProcessorAbstract { protected function processDataItem(DataItem $item): void { $item->setData('value', strtoupper($item->getData('value'))); $this->releaseDataItem($item); } } $source = new ArraySource([ DataItem::fromArray(['value' => 'alice'], []), DataItem::fromArray(['value' => 'bob'], []), ]); $pipeline = new ProcessorComposition([new Upper()]); $pipeline->setSource($source); $pipeline->run(function (DataItem $item) { echo $item->getData('value'), "\n"; // ALICE, BOB });
Concepts
DataItem
$item->getData('key'); // payload access $item->setData('key', 'value'); // returns mutated $item->getMeta(); // metadata bag
Sources
| Class | Use case |
|---|---|
ArraySource |
wraps a PHP array (ConcurrentArrayIterator) |
IteratorSource |
wraps any iterable |
QueueSource |
wraps an AMPHP Queue for push-based input |
Processors
ProcessorAbstract gives you:
setConcurrency(int)— fiber count inside the stagesetBufferSize(int)— output queue depth (0 = same as concurrency)setCancellation(Cancellation)— graceful shutdownreleaseDataItem(DataItem)— push to output
new MyProcessor() ->setConcurrency(8) ->setBufferSize(16);
Linear pipeline
$pipeline = new ProcessorComposition([ new PrepareProcessor(), new ValidateProcessor(), new SaveProcessor(), ]); $pipeline->setSource(new ArraySource($rows)); $pipeline->run(/* optional itemCallback */);
Batching
use Flyokai\AmpDataPipeline\Batch\BatchProcessor; $batcher = new BatchProcessor( batchProcessorFactory: fn() => new SaveBatchProcessor(), resultHandlerFactory: fn() => new ResultRouter(), batchSize: 100, ordered: false, // true → preserve order across batches groupResults: false, // true → merge batch results into one DataItem throwIfUnhandled: true, );
Items accumulate up to batchSize, a fresh processor is built for each batch, and results are routed through a DataItemHandler strategy.
Multicast
use Flyokai\AmpDataPipeline\DataCast\MultiCastProcessor; $cast = new MultiCastProcessor( castProcessorFactories: [ fn() => new IndexInOpensearch(), fn() => new WriteToCache(), ], groupResults: true, groupBufferSize: 10, );
Each input item is delivered to every cast processor in parallel; outputs are aggregated by MultiCastConsumer.
Handler strategies
DataItemHandler is the strategy for handling specific items:
interface DataItemHandler { public function canHandle(DataItem $item): bool; public function handle(DataItem $item): Future; }
HandlerComposition enforces mutual exclusion — exactly one handler per item; multiple matches throw. Pass $ordered = true to preserve item order via Mutex / Sequence.
Concurrency model
- Inter-stage: a
ProcessorCompositionchains iterators (pull-based, demand-driven). - Intra-stage:
concurrencycontrols the number of fibers servicing the queue inside a processor. - Buffer:
bufferSizedecouples producer/consumer (set to 0 to mirror concurrency). - Multicast: every cast processor fires simultaneously per item.
- Backpressure: queue +
groupBufferSizecap memory growth.
Gotchas
- Order is not preserved by default. Use
$ordered = trueonBatchProcessororHandlerCompositionif you need it. reset()requires queue completion. Resetting an incomplete queue throwsRuntimeException.- Handler exclusivity — a
HandlerCompositionenforces one handler per item. Two matching handlers throw. CastProcessor≠Processor. Cast processors receive a rawConcurrentIteratorand own their queues.groupBufferSize = 0is unlimited. Multicast with grouping can grow memory unboundedly.- Cancellation must propagate.
ProcessorCompositionpropagates to children automatically, but custom compositions need to do this explicitly. - Reflection in error handling —
errorDisposeQueue()reaches intoQueueinternals via reflection. AMPHP version updates may break it.
See also
flyokai/indexer— full reindex uses pipelines- Bulk data-import services typically use this as their processing core (
DataSource → BatchProcessor → Stage Pipeline).
License
MIT