PHP-ETL - Getting Started
🎵 Symfony Framework

The Symfony Bundle

The Php etl bundle allows the usage of the library in symfony. It adds the required “commands” as well as “services” to make the etl easy to use.

Install

  1. Start by installing the necessary dependencies
     composer require oliverde8/php-etl-bundle
    
  2. in /config/ create a directory etl

  3. Enable bundle:
    \Oliverde8\PhpEtlBundle\Oliverde8PhpEtlBundle::class => ['all' => true],
    
  4. Optional You can enable queue’s if you have an interface allowing users to execute etl processes (Easy Admin for example).
    framework:
      messenger:
     routing:
         "Oliverde8\PhpEtlBundle\Message\EtlExecutionMessage": async
    
  5. Optional: Enable creation of individual files for each log by editing the monolog.yaml
    etl:
      type: service
      id: Oliverde8\PhpEtlBundle\Services\ChainExecutionLogger
      level: debug
      channels: ["!event"] 
    

Usage

Creating an ETL chain

To create an ETL chain in Symfony, you need to create a service that implements ChainDefinitionInterface. The chain is built using typed PHP configuration objects.

1. Create a chain definition service:

<?php

namespace App\Etl\ChainDefinition;

use Oliverde8\Component\PhpEtl\ChainConfig;
use Oliverde8\PhpEtlBundle\Etl\ChainDefinitionInterface\ChainDefinitionInterface;
use Oliverde8\Component\PhpEtl\OperationConfig\Extract\CsvExtractConfig;
use Oliverde8\Component\PhpEtl\OperationConfig\Transformer\RuleTransformConfig;
use Oliverde8\Component\PhpEtl\OperationConfig\Loader\CsvFileWriterConfig;

class CustomerImportDefinition implements ChainDefinitionInterface
{
    public function getKey(): string
    {
        return 'customer-import';
    }

    public function build(): ChainConfig
    {
        return (new ChainConfig())
            ->addLink(new CsvExtractConfig())
            ->addLink((new RuleTransformConfig(false))
                ->addColumn('customer_id', [['get' => ['field' => 'ID']]])
                ->addColumn('full_name', [
                    ['implode' => [
                        'values' => [
                            [['get' => ['field' => 'FirstName']]],
                            [['get' => ['field' => 'LastName']]],
                        ],
                        'with' => ' ',
                    ]]
                ])
                ->addColumn('email', [['get' => ['field' => 'Email']]])
            )
            ->addLink(new CsvFileWriterConfig('output/customers.csv'));
    }
}

2. Register the service (if not using autoconfigure):

The interface has the #[AutoconfigureTag('etl.chain_definition')] attribute, so services implementing it are automatically tagged when autoconfigure is enabled (default in Symfony).

services:
    App\Etl\ChainDefinition\CustomerImportDefinition:
        tags: ['etl.chain_definition']

3. Configure maxAsynchronousItems and other chain settings:

class HighVolumeImportDefinition implements ChainDefinitionInterface
{
    public function getKey(): string
    {
        return 'high-volume-import';
    }

    public function build(): ChainConfig
    {
        $chainConfig = new ChainConfig();
        $chainConfig->setMaxAsynchronousItems(100); // Process up to 100 items in parallel

        return $chainConfig
            ->addLink(new CsvExtractConfig())
            ->addLink((new RuleTransformConfig(false))
                ->addColumn('id', [['get' => ['field' => 'ID']]])
            )
            ->addLink(new CsvFileWriterConfig('output/processed.csv'));
    }
}

4. You can also inject dependencies into your chain definition:

class ApiImportDefinition implements ChainDefinitionInterface
{
    public function __construct(
        private string $apiUrl,
    ) {}

    public function getKey(): string
    {
        return 'api-import';
    }

    public function build(): ChainConfig
    {
        return (new ChainConfig())
            ->addLink(new SimpleHttpConfig(
                url: $this->apiUrl,
                method: 'GET',
                responseIsJson: true
            ))
            ->addLink(new LogConfig(
                message: 'Imported record',
                level: 'info'
            ))
            ->addLink(new CsvFileWriterConfig('output/api-data.csv'));
    }
}

Creating custom operations

Custom operations are automatically registered when they implement ConfigurableChainOperationInterface. The bundle’s compiler pass discovers them and sets up dependency injection automatically.

1. Create a config class:

<?php

namespace App\Etl\Config;

use Oliverde8\Component\PhpEtl\OperationConfig\OperationConfigInterface;

class CustomTransformConfig implements OperationConfigInterface
{
    public function __construct(
        public readonly string $targetField,
        public readonly string $transformation,
    ) {}
}

2. Create the operation:

<?php

namespace App\Etl\Operation;

use App\Etl\Config\CustomTransformConfig;
use Oliverde8\Component\PhpEtl\ChainOperation\ConfigurableChainOperationInterface;
use Oliverde8\Component\PhpEtl\ChainBuilderV2;
use Psr\Log\LoggerInterface;

class CustomTransformOperation implements ConfigurableChainOperationInterface
{
    public function __construct(
        private CustomTransformConfig $config,
        private ChainBuilderV2 $chainBuilder,
        private string $flavor,
        private LoggerInterface $logger, // Auto-injected by Symfony
    ) {}

    public function process(mixed $item, ?array &$output = null, mixed $context = null): void
    {
        $this->logger->info('Processing item', ['field' => $this->config->targetField]);
        
        // Your transformation logic here
        $item[$this->config->targetField] = strtoupper($item[$this->config->targetField] ?? '');
        
        $output[] = $item;
    }
}

The operation is automatically registered and all dependencies (except $config, $chainBuilder, and $flavor) are auto-injected using Symfony’s autowiring.

3. Use it in your chain:

class MyChainDefinition implements ChainDefinitionInterface
{
    public function getKey(): string
    {
        return 'my-custom-chain';
    }

    public function build(): ChainConfig
    {
        return (new ChainConfig())
            ->addLink(new CsvExtractConfig())
            ->addLink(new CustomTransformConfig('email', 'uppercase'))
            ->addLink(new CsvFileWriterConfig('output/transformed.csv'));
    }
}

How automatic dependency injection works:

The ChainBuilderV2Compiler compiler pass:

  • Discovers all services implementing ConfigurableChainOperationInterface
  • Identifies the config class from the constructor (must implement OperationConfigInterface)
  • Resolves all constructor dependencies at compile time using Symfony’s dependency injection
  • Creates a GenericChainFactory for each operation with resolved dependencies
  • Automatically skips injection for $config, $chainBuilder, and $flavor (handled by the factory)

Manual dependency injection:

If you need more control over dependency injection, you can configure arguments manually:

services:
    App\Etl\Operation\CustomTransformOperation:
        arguments:
            $logger: '@monolog.logger.etl'

Or use the #[Autowire] attribute in PHP 8.1+:

use Symfony\Component\DependencyInjection\Attribute\Autowire;

class CustomTransformOperation implements ConfigurableChainOperationInterface
{
    public function __construct(
        private CustomTransformConfig $config,
        private ChainBuilderV2 $chainBuilder,
        private string $flavor,
        #[Autowire(service: 'monolog.logger.etl')]
        private LoggerInterface $logger,
        #[Autowire('%app.etl.batch_size%')]
        private int $batchSize,
    ) {}
}

The compiler pass respects:

  • Manually configured service arguments
  • #[Autowire] attributes on constructor parameters
  • Default parameter values
  • Nullable parameters

Executing a chain

./bin/console etl:execute customer-import '[["test1"],["test2"]]' '{"opt1": "val1"}'

The first argument is the chain key (returned by getKey()). The second argument is the input data, depending on your chain it can be empty or a JSON array. The third argument contains parameters that will be available in the execution context.

Get a definition

./bin/console etl:get-definition customer-import

This displays the chain configuration and all its operations.

Get definition graph

./bin/console etl:definition:graph customer-import

This returns a Mermaid graph visualization of your ETL chain. Adding -u will return the URL to the Mermaid graph image.

Adding an Easyadmin interface

If you a use easyadmin with your symfony project you can have an admin interface allowing you to monitor & execute etl processes (see enable queue’s for allowing creation of tasks)

  1. Install the necessary dependencies
     composer require oliverde8/php-etl-easyadmin-bundle
    
  2. Enable the bundle
    \Oliverde8\PhpEtlBundle\Oliverde8PhpEtlEasyAdminBundle::class => ['all' => true],
    
  3. Add to easy admin
    yield MenuItem::linktoRoute("Job Dashboard", 'fas fa-chart-bar', "etl_execution_dashboard");
    yield MenuItem::linkToCrud('Etl Executions', 'fas fa-list', EtlExecution::class);
    
  4. Enable routes
    etl_bundle:
      resource: '@Oliverde8PhpEtlEasyAdminBundle/Controller'
      type: annotation
      prefix: /admin
    

See the github repository for additional information.

Changing the location of the contextual file system dir

Every PHP-ETL execution is tied to a dedicated directory, which serves as a central location for:

  • Input and output files
  • Logs and debug artifacts

This design allows each execution to be self-contained, making logs and file traces easy to access and audit—without requiring individual operations to manage paths or storage manually.

By default, PHP-ETL stores these execution directories on the local filesystem in var/etl of the symfony project. But what if:

  • You want to store files on remote storage, like Amazon S3 or Google Cloud Storage?
  • You need to move or centralize execution data across environments?

PHP-ETL uses Flysystem as its file abstraction layer—wrapped in its own internal abstraction. This allows you to fully control where and how files are stored, using any Flysystem-compatible adapter (S3, SFTP, etc.).

To customize where the execution directory is stored, you can override the default FileSystemFactory.

  1. Create a custom implementation of the FileSystemFactoryInterface.

  2. Register your service in Symfony, replacing the default implementation:

services:
  Oliverde8\PhpEtlBundle\Services\FileSystemFactoryInterface: '@App\Services\FileSystemFactory'