PHP-ETL - Operations
Building Blocks - Chain Repeat
The ChainRepeatConfig operation executes a chain of operations repeatedly until a specified condition is met.
This is the perfect solution for paginated APIs, iterative data processing, or any scenario where you need to loop until a condition becomes false.
Key characteristics:
- Executes a sub-chain repeatedly in a loop
- Continues while validation expression evaluates to true
- Stops when validation expression returns false
- Each iteration can modify data for the next iteration
- Supports asynchronous processing within the loop
Configuration
Use ChainRepeatConfig with these parameters:
use Oliverde8\Component\PhpEtl\OperationConfig\ChainRepeatConfig;
use Oliverde8\Component\PhpEtl\ChainConfig;
$repeatConfig = new ChainRepeatConfig(
chainConfig: $chainToRepeat, // ChainConfig to execute repeatedly
validationExpression: 'expression', // Symfony expression - loop while true
allowAsynchronous: false // Optional: allow async operations (default: false)
);
Parameters:
chainConfig: AChainConfigcontaining operations to execute in each iterationvalidationExpression: A Symfony Expression Language expression that must return boolean. Loop continues whiletrueallowAsynchronous: Whether operations within the loop can execute asynchronously
Example: Basic API Pagination
Fetch all pages from a paginated API:
use Oliverde8\Component\PhpEtl\ChainConfig;
use Oliverde8\Component\PhpEtl\OperationConfig\ChainRepeatConfig;
use Oliverde8\Component\PhpEtl\OperationConfig\Transformer\SimpleHttpConfig;
use Oliverde8\Component\PhpEtl\OperationConfig\Transformer\CallBackTransformerConfig;
use Oliverde8\Component\PhpEtl\OperationConfig\Transformer\SplitItemConfig;
use Oliverde8\Component\PhpEtl\OperationConfig\Loader\CsvFileWriterConfig;
use Oliverde8\Component\PhpEtl\Item\DataItem;
use Oliverde8\Component\PhpEtl\Item\DataItemInterface;
$chainConfig = new ChainConfig();
$page = 1;
$repeatConfig = new ChainRepeatConfig(
chainConfig: (new ChainConfig())
->addLink(new CallBackTransformerConfig(function(DataItemInterface $dataItem) use (&$page) {
echo "Fetching page {$page}...\n";
$totalPages = 5;
// Simulate API call returning paginated data
$items = [];
for ($i = 1; $i <= 10; $i++) {
$itemId = (($page - 1) * 10) + $i;
$items[] = [
'id' => $itemId,
'name' => "Item {$itemId}",
'page' => $page,
];
}
$hasNextPage = $page < $totalPages;
$page++;
return new DataItem([
'items' => $items,
'hasNextPage' => $hasNextPage,
]);
})),
validationExpression: 'data["hasNextPage"] == true',
allowAsynchronous: false
);
$chainConfig
->addLink($repeatConfig)
->addLink(new SplitItemConfig(keys: ['items'])) // Extract items array
->addLink(new CsvFileWriterConfig('all-items.csv'));
$chainProcessor = $chainBuilder->createChain($chainConfig);
$chainProcessor->process(
new ArrayIterator([new DataItem([])]),
[]
);
Result: Fetches pages 1-5, extracts all items, writes to CSV.
Example: Cursor-Based API Pagination
Modern APIs often use cursor-based pagination instead of page numbers:
$repeatConfig = new ChainRepeatConfig(
chainConfig: (new ChainConfig())
// Make API request with cursor
->addLink(new SimpleHttpConfig(
url: '@"https://api.example.com/items?cursor="~(data["cursor"] ?? "")',
method: 'GET',
responseIsJson: true,
responseKey: 'response'
))
// Process response and extract next cursor
->addLink(new CallBackTransformerConfig(function(DataItemInterface $item) {
$data = $item->getData();
$response = $data['response'];
return new DataItem([
'items' => $response['data'],
'cursor' => $response['pagination']['next_cursor'] ?? null,
'hasMore' => !empty($response['pagination']['next_cursor']),
]);
}))
->addLink(new SplitItemConfig(keys: ['items'])),
validationExpression: 'data["hasMore"] == true'
);
$chainConfig
->addLink($repeatConfig)
->addLink(new CsvFileWriterConfig('api-data.csv'));
Example: Page Number Pagination
APIs that use page numbers with a total page count:
$repeatConfig = new ChainRepeatConfig(
chainConfig: (new ChainConfig())
->addLink(new SimpleHttpConfig(
url: '@"https://api.example.com/users?page="~data["currentPage"]',
method: 'GET',
responseIsJson: true,
responseKey: 'response'
))
->addLink(new CallBackTransformerConfig(function(DataItemInterface $item) {
$data = $item->getData();
$response = $data['response'];
return new DataItem([
'items' => $response['results'],
'currentPage' => $data['currentPage'] + 1,
'totalPages' => $response['total_pages'],
]);
}))
->addLink(new SplitItemConfig(keys: ['items'])),
validationExpression: 'data["currentPage"] <= data["totalPages"]'
);
// Start with page 1
$chainProcessor->process(
new ArrayIterator([new DataItem(['currentPage' => 1, 'totalPages' => 1])]),
[]
);
Example: Offset-Based Pagination
APIs using offset/limit pattern:
$repeatConfig = new ChainRepeatConfig(
chainConfig: (new ChainConfig())
->addLink(new SimpleHttpConfig(
url: '@"https://api.example.com/records?offset="~data["offset"]~"&limit=100"',
method: 'GET',
responseIsJson: true,
responseKey: 'response'
))
->addLink(new CallBackTransformerConfig(function(DataItemInterface $item) {
$data = $item->getData();
$response = $data['response'];
$fetchedCount = count($response['records']);
return new DataItem([
'items' => $response['records'],
'offset' => $data['offset'] + $fetchedCount,
'total' => $response['total'],
'hasMore' => $data['offset'] + $fetchedCount < $response['total'],
]);
}))
->addLink(new SplitItemConfig(keys: ['items'])),
validationExpression: 'data["hasMore"] == true'
);
// Start at offset 0
$chainProcessor->process(
new ArrayIterator([new DataItem(['offset' => 0])]),
[]
);
Example: Retry Until Success
Use ChainRepeat for retry logic (though FailSafeConfig is usually better for this):
$attempts = 0;
$maxAttempts = 3;
$repeatConfig = new ChainRepeatConfig(
chainConfig: (new ChainConfig())
->addLink(new CallBackTransformerConfig(function(DataItemInterface $item) use (&$attempts, $maxAttempts) {
$attempts++;
echo "Attempt {$attempts}/{$maxAttempts}\n";
// Simulate operation that might fail
$success = rand(1, 10) > 7; // 30% success rate
if ($success) {
echo "Success!\n";
}
return new DataItem([
'success' => $success,
'shouldRetry' => !$success && $attempts < $maxAttempts,
]);
})),
validationExpression: 'data["shouldRetry"] == true'
);
Example: With Error Handling
Combine ChainRepeat with FailSafe for robust pagination:
use Oliverde8\Component\PhpEtl\OperationConfig\FailSafeConfig;
$repeatConfig = new ChainRepeatConfig(
chainConfig: (new ChainConfig())
->addLink(new FailSafeConfig(
chainConfig: (new ChainConfig())
->addLink(new SimpleHttpConfig(
url: '@"https://api.example.com/data?page="~data["page"]',
method: 'GET',
responseIsJson: true
)),
exceptionsToCatch: [\Exception::class],
nbAttempts: 3
))
->addLink(new CallBackTransformerConfig(function(DataItemInterface $item) {
$data = $item->getData();
return new DataItem([
'items' => $data['results'] ?? [],
'page' => $data['page'] + 1,
'hasMore' => !empty($data['next']),
]);
}))
->addLink(new SplitItemConfig(keys: ['items'])),
validationExpression: 'data["hasMore"] == true'
);
Understanding the Validation Expression
The validation expression is evaluated before each iteration. The loop continues while the expression returns true:
// Loop while there are more pages
validationExpression: 'data["hasMore"] == true'
// Loop while page number is less than total
validationExpression: 'data["page"] < data["totalPages"]'
// Loop while cursor exists
validationExpression: 'data["cursor"] != null'
// Complex condition
validationExpression: 'data["hasMore"] == true and data["errorCount"] < 5'
// Using context
validationExpression: 'context["shouldContinue"] == true'
Asynchronous Processing
Set allowAsynchronous: true to allow operations within the repeat chain to execute asynchronously:
$repeatConfig = new ChainRepeatConfig(
chainConfig: $paginationChain,
validationExpression: 'data["hasMore"] == true',
allowAsynchronous: true // Enable async processing
);
Common Use Cases
- API Pagination: Fetch all pages from paginated REST APIs
- Cursor Navigation: Process cursor-based API responses
- Retry Logic: Retry operations until success (use FailSafe instead when possible)
- Polling: Poll an endpoint until data is available
- Queue Processing: Process messages from a queue until empty