PHP-ETL - Getting Started
🦢 Sylius the e-commerce framework based on symfony

The sylius bundle

The sylius bundle uses the Symfony bundle (maybe pretty obvious). It therefore allows the usage of the library in symfony. It adds the required “commands” as well as “services” to make the etl easy to use.

Install

Start by installing the symfony bundle

  1. Start by installing the necessary dependencies
     composer require oliverde8/php-etl-bundle
    
  2. in /config/ create a directory etl

  3. Enable bundle:
    \Oliverde8\PhpEtlBundle\Oliverde8PhpEtlBundle::class => ['all' => true],
    
  4. Optional You can enable queue’s if you have an interface allowing users to execute etl processes (Easy Admin for example).
    framework:
      messenger:
     routing:
         "Oliverde8\PhpEtlBundle\Message\EtlExecutionMessage": async
    
  5. Optional: Enable creation of individual files for each log by editing the monolog.yaml
    etl:
      type: service
      id: Oliverde8\PhpEtlBundle\Services\ChainExecutionLogger
      level: debug
      channels: ["!event"] 
    

Now let’s install the sylius bundle

  1. Install the additional dependency
    composer require oliverde8/php-etl-sylius-admin-bundle
    
  2. Create EtlExecution table via migrations

  3. Import configs
    # config/packages/etl.yaml
    imports:
      - { resource: "@Oliverde8PhpEtlSyliusAdminBundle/Resources/config/config.yaml" }
    
  4. Import routes
    # config/routes/etl.yaml
    oliverde8_etl:
      resource: '@Oliverde8PhpEtlSyliusAdminBundle/Resources/config/routing.yaml'
    
  5. Optional: Configure EtlExecution Message:
    # config/packages/messenger.yaml
    framework:
     messenger:
         # Uncomment this (and the failed transport below) to send failed messages to this transport for later handling.
         failure_transport: failed
    
         transports:
             failed: 'doctrine://default?queue_name=failed'
             generic_with_retry:
                 dsn: 'doctrine://default?queue_name=generic_with_retry'
                 retry_strategy:
                     max_retries: 3
                     multiplier: 4
                     delay: 3600000 #1H first retry, 4H second retry, 16H third retry (see multiplier) 
             etl_async:
                 dsn: 'doctrine://default?queue_name=etl_async'
                 retry_strategy:
                     max_retries: 0
    
         routing:
             'Oliverde8\PhpEtlBundle\Message\EtlExecutionMessage': etl_async
    

Usage

Creating an ETL chain

To create an ETL chain in Symfony, you need to create a service that implements ChainDefinitionInterface. The chain is built using typed PHP configuration objects.

1. Create a chain definition service:

<?php

namespace App\Etl\ChainDefinition;

use Oliverde8\Component\PhpEtl\ChainConfig;
use Oliverde8\PhpEtlBundle\Etl\ChainDefinitionInterface\ChainDefinitionInterface;
use Oliverde8\Component\PhpEtl\OperationConfig\Extract\CsvExtractConfig;
use Oliverde8\Component\PhpEtl\OperationConfig\Transformer\RuleTransformConfig;
use Oliverde8\Component\PhpEtl\OperationConfig\Loader\CsvFileWriterConfig;

class CustomerImportDefinition implements ChainDefinitionInterface
{
    public function getKey(): string
    {
        return 'customer-import';
    }

    public function build(): ChainConfig
    {
        return (new ChainConfig())
            ->addLink(new CsvExtractConfig())
            ->addLink((new RuleTransformConfig(false))
                ->addColumn('customer_id', [['get' => ['field' => 'ID']]])
                ->addColumn('full_name', [
                    ['implode' => [
                        'values' => [
                            [['get' => ['field' => 'FirstName']]],
                            [['get' => ['field' => 'LastName']]],
                        ],
                        'with' => ' ',
                    ]]
                ])
                ->addColumn('email', [['get' => ['field' => 'Email']]])
            )
            ->addLink(new CsvFileWriterConfig('output/customers.csv'));
    }
}

2. Register the service (if not using autoconfigure):

The interface has the #[AutoconfigureTag('etl.chain_definition')] attribute, so services implementing it are automatically tagged when autoconfigure is enabled (default in Symfony).

services:
    App\Etl\ChainDefinition\CustomerImportDefinition:
        tags: ['etl.chain_definition']

3. Configure maxAsynchronousItems and other chain settings:

class HighVolumeImportDefinition implements ChainDefinitionInterface
{
    public function getKey(): string
    {
        return 'high-volume-import';
    }

    public function build(): ChainConfig
    {
        $chainConfig = new ChainConfig();
        $chainConfig->setMaxAsynchronousItems(100); // Process up to 100 items in parallel

        return $chainConfig
            ->addLink(new CsvExtractConfig())
            ->addLink((new RuleTransformConfig(false))
                ->addColumn('id', [['get' => ['field' => 'ID']]])
            )
            ->addLink(new CsvFileWriterConfig('output/processed.csv'));
    }
}

4. You can also inject dependencies into your chain definition:

class ApiImportDefinition implements ChainDefinitionInterface
{
    public function __construct(
        private string $apiUrl,
    ) {}

    public function getKey(): string
    {
        return 'api-import';
    }

    public function build(): ChainConfig
    {
        return (new ChainConfig())
            ->addLink(new SimpleHttpConfig(
                url: $this->apiUrl,
                method: 'GET',
                responseIsJson: true
            ))
            ->addLink(new LogConfig(
                message: 'Imported record',
                level: 'info'
            ))
            ->addLink(new CsvFileWriterConfig('output/api-data.csv'));
    }
}

Creating custom operations

Custom operations are automatically registered when they implement ConfigurableChainOperationInterface. The bundle’s compiler pass discovers them and sets up dependency injection automatically.

1. Create a config class:

<?php

namespace App\Etl\Config;

use Oliverde8\Component\PhpEtl\OperationConfig\OperationConfigInterface;

class CustomTransformConfig implements OperationConfigInterface
{
    public function __construct(
        public readonly string $targetField,
        public readonly string $transformation,
    ) {}
}

2. Create the operation:

<?php

namespace App\Etl\Operation;

use App\Etl\Config\CustomTransformConfig;
use Oliverde8\Component\PhpEtl\ChainOperation\ConfigurableChainOperationInterface;
use Oliverde8\Component\PhpEtl\ChainBuilderV2;
use Psr\Log\LoggerInterface;

class CustomTransformOperation implements ConfigurableChainOperationInterface
{
    public function __construct(
        private CustomTransformConfig $config,
        private ChainBuilderV2 $chainBuilder,
        private string $flavor,
        private LoggerInterface $logger, // Auto-injected by Symfony
    ) {}

    public function process(mixed $item, ?array &$output = null, mixed $context = null): void
    {
        $this->logger->info('Processing item', ['field' => $this->config->targetField]);
        
        // Your transformation logic here
        $item[$this->config->targetField] = strtoupper($item[$this->config->targetField] ?? '');
        
        $output[] = $item;
    }
}

The operation is automatically registered and all dependencies (except $config, $chainBuilder, and $flavor) are auto-injected using Symfony’s autowiring.

3. Use it in your chain:

class MyChainDefinition implements ChainDefinitionInterface
{
    public function getKey(): string
    {
        return 'my-custom-chain';
    }

    public function build(): ChainConfig
    {
        return (new ChainConfig())
            ->addLink(new CsvExtractConfig())
            ->addLink(new CustomTransformConfig('email', 'uppercase'))
            ->addLink(new CsvFileWriterConfig('output/transformed.csv'));
    }
}

How automatic dependency injection works:

The ChainBuilderV2Compiler compiler pass:

  • Discovers all services implementing ConfigurableChainOperationInterface
  • Identifies the config class from the constructor (must implement OperationConfigInterface)
  • Resolves all constructor dependencies at compile time using Symfony’s dependency injection
  • Creates a GenericChainFactory for each operation with resolved dependencies
  • Automatically skips injection for $config, $chainBuilder, and $flavor (handled by the factory)

Manual dependency injection:

If you need more control over dependency injection, you can configure arguments manually:

services:
    App\Etl\Operation\CustomTransformOperation:
        arguments:
            $logger: '@monolog.logger.etl'

Or use the #[Autowire] attribute in PHP 8.1+:

use Symfony\Component\DependencyInjection\Attribute\Autowire;

class CustomTransformOperation implements ConfigurableChainOperationInterface
{
    public function __construct(
        private CustomTransformConfig $config,
        private ChainBuilderV2 $chainBuilder,
        private string $flavor,
        #[Autowire(service: 'monolog.logger.etl')]
        private LoggerInterface $logger,
        #[Autowire('%app.etl.batch_size%')]
        private int $batchSize,
    ) {}
}

The compiler pass respects:

  • Manually configured service arguments
  • #[Autowire] attributes on constructor parameters
  • Default parameter values
  • Nullable parameters

Executing a chain

./bin/console etl:execute customer-import '[["test1"],["test2"]]' '{"opt1": "val1"}'

The first argument is the chain key (returned by getKey()). The second argument is the input data, depending on your chain it can be empty or a JSON array. The third argument contains parameters that will be available in the execution context.

Get a definition

./bin/console etl:get-definition customer-import

This displays the chain configuration and all its operations.

Get definition graph

./bin/console etl:definition:graph customer-import

This returns a Mermaid graph visualization of your ETL chain. Adding -u will return the URL to the Mermaid graph image.