PHP-ETL - Operations
Transform - External File Processor
The External File Processor operation moves and registers external files (e.g., from SFTP, local FS, cloud storage) into the ETL execution context. This operation works hand-in-hand with the ExternalFileFinderConfig and is essential for enabling further processing of remote files.
Purpose
When ExternalFileFinderConfig locates a remote file, it returns an FileExtractedItem. However, that file is not yet part of the ETL’s working context.
The ExternalFileProcessorConfig operation:
- Copies the external file into the ETL context, making it accessible to extract operations like
CsvExtractConfig,JsonExtractConfig, etc. - Archives the file within the ETL execution history, so it can be tracked and audited later.
- Returns a
DataItemcontaining the path of the new local file, making it usable by downstream operations.
File Lifecycle & Behavior
The operation follows a structured file management flow across multiple runs:
- Initial Processing:
- The file is moved from its source directory into a
processing/subdirectory (within the external filesystem). - It is also copied to the local ETL context (temporary working directory).
- A
DataItemis emitted with the new local file path.
- The file is moved from its source directory into a
- Post-Processing:
- If the operation is used a second time in the same chain (e.g., near the end of the flow), it will:
- Move the remote file from
processing/toprocessed/, effectively archiving it. - This signals the file has been fully and successfully handled.
- Move the remote file from
- If the operation is used a second time in the same chain (e.g., near the end of the flow), it will:
💡 Best Practice:
UseExternalFileProcessorConfigtwice in a chain:
- Once immediately after the
ExternalFileFinderConfig.- Once at the end of the chain, to archive the file remotely after successful processing.
Filesystem Agnostic
The operation does not require manual configuration of the filesystem. It uses the File system instance already embedded in the FileExtractedItem provided by the ExternalFileFinderConfig.
Configuration
Basic Configuration
use Oliverde8\Component\PhpEtl\OperationConfig\Transformer\ExternalFileProcessorConfig;
$config = new ExternalFileProcessorConfig();
Constructor Parameters
public function __construct(
string $flavor = 'default'
)
| Parameter | Type | Default | Description |
|---|---|---|---|
flavor |
string |
'default' |
Operation flavor for custom implementations |
Input/Output
- Input: Expects
FileExtractedItemobjects fromExternalFileFinderConfig - Output: Produces
DataItemobjects containing local file paths
Examples
Example 1: Basic File Processing
Process files from a local directory, move them through processing stages:
use Oliverde8\Component\PhpEtl\ChainConfig;
use Oliverde8\Component\PhpEtl\Item\DataItem;
use Oliverde8\Component\PhpEtl\OperationConfig\Extract\ExternalFileFinderConfig;
use Oliverde8\Component\PhpEtl\OperationConfig\Extract\CsvExtractConfig;
use Oliverde8\Component\PhpEtl\OperationConfig\Loader\CsvFileWriterConfig;
use Oliverde8\Component\PhpEtl\OperationConfig\Transformer\ExternalFileProcessorConfig;
$chainConfig = new ChainConfig();
$chainConfig
->addLink(new ExternalFileFinderConfig(directory: '/data/incoming'))
->addLink(new ExternalFileProcessorConfig()) // Copy to local context
->addLink(new CsvExtractConfig())
->addLink(new CsvFileWriterConfig('output.csv'))
->addLink(new ExternalFileProcessorConfig()); // Archive to processed/
$chainProcessor = $chainBuilder->createChain($chainConfig);
$chainProcessor->process(
new ArrayIterator([new DataItem('/^orders_.*\.csv$/')]),
['dir' => '/data/incoming']
);
What happens:
ExternalFileFinderConfigfinds files matching the pattern in/data/incoming- First
ExternalFileProcessorConfigmoves files toprocessing/and copies to local context - Files are processed (CSV extraction and transformation)
- Second
ExternalFileProcessorConfigmoves files fromprocessing/toprocessed/
Example 2: SFTP File Processing
Process files from an SFTP server with error handling:
use League\Flysystem\Ftp\FtpAdapter;
use League\Flysystem\Ftp\FtpConnectionOptions;
use League\Flysystem\Filesystem;
use Oliverde8\Component\PhpEtl\ChainConfig;
use Oliverde8\Component\PhpEtl\Item\DataItem;
use Oliverde8\Component\PhpEtl\OperationConfig\Extract\ExternalFileFinderConfig;
use Oliverde8\Component\PhpEtl\OperationConfig\Extract\CsvExtractConfig;
use Oliverde8\Component\PhpEtl\OperationConfig\Transformer\ExternalFileProcessorConfig;
use Oliverde8\Component\PhpEtl\OperationConfig\Transformer\RuleTransformConfig;
use Oliverde8\Component\PhpEtl\OperationConfig\Loader\CsvFileWriterConfig;
// Create SFTP filesystem
$adapter = new FtpAdapter(
FtpConnectionOptions::fromArray([
'host' => 'sftp.example.com',
'username' => 'user',
'password' => 'pass',
'port' => 22,
])
);
$filesystem = new Filesystem($adapter);
$chainConfig = new ChainConfig();
$chainConfig
->addLink(new ExternalFileFinderConfig(
directory: '/uploads',
fileSystem: $filesystem
))
->addLink(new ExternalFileProcessorConfig())
->addLink(new CsvExtractConfig(delimiter: ';'))
->addLink(new RuleTransformConfig([
'status' => '"processed"',
'processed_at' => 'now()'
]))
->addLink(new CsvFileWriterConfig('processed_orders.csv'))
->addLink(new ExternalFileProcessorConfig());
$chainProcessor = $chainBuilder->createChain($chainConfig);
$chainProcessor->process(
new ArrayIterator([new DataItem('/^order_.*\.csv$/')]),
[]
);
Example 3: Multiple File Types with Conditional Processing
Process different file types differently after initial file processor:
use Oliverde8\Component\PhpEtl\ChainConfig;
use Oliverde8\Component\PhpEtl\Item\DataItem;
use Oliverde8\Component\PhpEtl\OperationConfig\Extract\ExternalFileFinderConfig;
use Oliverde8\Component\PhpEtl\OperationConfig\Transformer\ExternalFileProcessorConfig;
use Oliverde8\Component\PhpEtl\OperationConfig\Building\ChainSplitConfig;
use Oliverde8\Component\PhpEtl\OperationConfig\Extract\CsvExtractConfig;
use Oliverde8\Component\PhpEtl\OperationConfig\Extract\JsonExtractConfig;
use Oliverde8\Component\PhpEtl\OperationConfig\Loader\CsvFileWriterConfig;
$csvChain = new ChainConfig();
$csvChain
->addLink(new CsvExtractConfig())
->addLink(new CsvFileWriterConfig('csv_output.csv'));
$jsonChain = new ChainConfig();
$jsonChain
->addLink(new JsonExtractConfig())
->addLink(new CsvFileWriterConfig('json_output.csv'));
$mainChain = new ChainConfig();
$mainChain
->addLink(new ExternalFileFinderConfig(directory: '/data/mixed'))
->addLink(new ExternalFileProcessorConfig())
->addLink(new ChainSplitConfig()
->addSplit('preg_match("/\.csv$/", item.data)', $csvChain)
->addSplit('preg_match("/\.json$/", item.data)', $jsonChain)
)
->addLink(new ExternalFileProcessorConfig());
$chainProcessor = $chainBuilder->createChain($mainChain);
$chainProcessor->process(
new ArrayIterator([new DataItem('/^data_.*\.(csv|json)$/')]),
[]
);
Example 4: Processing with Validation
Validate files before processing and only archive valid ones:
use Oliverde8\Component\PhpEtl\ChainConfig;
use Oliverde8\Component\PhpEtl\Item\DataItem;
use Oliverde8\Component\PhpEtl\OperationConfig\Extract\ExternalFileFinderConfig;
use Oliverde8\Component\PhpEtl\OperationConfig\Transformer\ExternalFileProcessorConfig;
use Oliverde8\Component\PhpEtl\OperationConfig\Extract\CsvExtractConfig;
use Oliverde8\Component\PhpEtl\OperationConfig\Transformer\FilterDataConfig;
use Oliverde8\Component\PhpEtl\OperationConfig\Transformer\CallBackTransformerConfig;
use Oliverde8\Component\PhpEtl\OperationConfig\Loader\CsvFileWriterConfig;
$chainConfig = new ChainConfig();
$chainConfig
->addLink(new ExternalFileFinderConfig(directory: '/data/incoming'))
->addLink(new ExternalFileProcessorConfig())
->addLink(new CsvExtractConfig())
// Validate: only keep rows with required fields
->addLink(new FilterDataConfig(
'item.id != null and item.name != null and item.email != null'
))
// Transform: add validation status
->addLink(new CallBackTransformerConfig(
function($item, $context) {
$item['validated'] = true;
$item['validation_date'] = date('Y-m-d H:i:s');
return $item;
}
))
->addLink(new CsvFileWriterConfig('validated_output.csv'))
->addLink(new ExternalFileProcessorConfig()); // Only archive if processing succeeded
$chainProcessor = $chainBuilder->createChain($chainConfig);
$chainProcessor->process(
new ArrayIterator([new DataItem('/^customers_.*\.csv$/')]),
[]
);
Example 5: Re-processing Failed Files
Process files that failed in previous runs:
use Oliverde8\Component\PhpEtl\ChainConfig;
use Oliverde8\Component\PhpEtl\Item\DataItem;
use Oliverde8\Component\PhpEtl\OperationConfig\Extract\ExternalFileFinderConfig;
use Oliverde8\Component\PhpEtl\OperationConfig\Transformer\ExternalFileProcessorConfig;
use Oliverde8\Component\PhpEtl\OperationConfig\Extract\CsvExtractConfig;
use Oliverde8\Component\PhpEtl\OperationConfig\Building\FailSafeConfig;
use Oliverde8\Component\PhpEtl\OperationConfig\Loader\CsvFileWriterConfig;
// Create chain with error handling
$processingChain = new ChainConfig();
$processingChain
->addLink(new CsvExtractConfig())
->addLink(new CsvFileWriterConfig('output.csv'));
$mainChain = new ChainConfig();
$mainChain
// Look in both incoming and processing directories (for retry)
->addLink(new ExternalFileFinderConfig(
directory: '/data/incoming',
additionalDirectories: ['/data/processing']
))
->addLink(new ExternalFileProcessorConfig())
->addLink(new FailSafeConfig(
chain: $processingChain,
retries: 3
))
->addLink(new ExternalFileProcessorConfig());
$chainProcessor = $chainBuilder->createChain($mainChain);
$chainProcessor->process(
new ArrayIterator([new DataItem('/^data_.*\.csv$/')]),
[]
);
Note: Files that failed previously will be in the processing/ directory. The ExternalFileFinderConfig can be configured to look in multiple directories.
File Lifecycle Details
Directory Structure
The operation creates and uses the following directory structure:
/data/incoming/ # Original files arrive here
├── file1.csv
├── file2.csv
└── processing/ # Files being processed
└── file1.csv
└── processed/ # Successfully processed files
└── file1.csv
State Transitions
- Found: File is discovered by
ExternalFileFinderConfigin the source directory - Processing: First
ExternalFileProcessorConfigmoves file toprocessing/subdirectory - Processed: Second
ExternalFileProcessorConfigmoves file toprocessed/subdirectory
Best Practices
1. Always Use Twice
Always use ExternalFileProcessorConfig twice in your chain:
$chainConfig
->addLink(new ExternalFileFinderConfig(...))
->addLink(new ExternalFileProcessorConfig()) // Move to processing/
// ... processing operations ...
->addLink(new ExternalFileProcessorConfig()); // Archive to processed/
This ensures proper file lifecycle management.
2. Log File Processing
Add logging to track file processing:
use Oliverde8\Component\PhpEtl\OperationConfig\Transformer\LogConfig;
$chainConfig
->addLink(new ExternalFileFinderConfig(...))
->addLink(new LogConfig('Found file: {item.filename}'))
->addLink(new ExternalFileProcessorConfig())
->addLink(new LogConfig('Processing file: {item.data}'))
// ... processing ...
->addLink(new LogConfig('Completed file: {item.data}'))
->addLink(new ExternalFileProcessorConfig());
Related Operations
- ExternalFileFinderConfig - Find files in external filesystems
- CsvExtractConfig - Extract data from CSV files
- JsonExtractConfig - Extract data from JSON files
- FailSafeConfig - Handle errors during processing