PHP-ETL - Operations
Extract - CSV File
The CsvExtractConfig operation reads a CSV file and outputs individual DataItems for each row. It’s typically used at the beginning of a chain or after file-finding operations to extract data from CSV files for processing.
Key characteristics:
- Reads CSV files row by row
- Returns one
DataItemper CSV row - Configurable delimiter, enclosure, and escape characters
- Supports nested file paths in data items
- Efficient for large files (streaming, not loading entire file into memory)
Configuration
Use CsvExtractConfig with these optional parameters:
use Oliverde8\Component\PhpEtl\OperationConfig\Extract\CsvExtractConfig;
$csvConfig = new CsvExtractConfig(
delimiter: ';', // Field separator (default: ';')
enclosure: '"', // Field enclosure (default: '"')
escape: '\\', // Escape character (default: '\')
fileKey: 'file' // Key containing file path (default: 'file')
);
Parameters:
delimiter: Character separating fields. Common values:',',';','\t'enclosure: Character enclosing fields (for fields containing delimiters). Must be"or'escape: Character for escaping special characters within fieldsfileKey: The key in the input data that contains the CSV file path (supports nested keys like'data/file')
Example: Basic CSV Reading
Read a CSV file and process each row:
use Oliverde8\Component\PhpEtl\ChainConfig;
use Oliverde8\Component\PhpEtl\OperationConfig\Extract\CsvExtractConfig;
use Oliverde8\Component\PhpEtl\OperationConfig\Transformer\RuleTransformConfig;
use Oliverde8\Component\PhpEtl\OperationConfig\Loader\CsvFileWriterConfig;
use Oliverde8\Component\PhpEtl\Item\DataItem;
$chainConfig = new ChainConfig();
$chainConfig
->addLink(new CsvExtractConfig())
->addLink((new RuleTransformConfig(false))
->addColumn('customer_id', [['get' => ['field' => 'ID']]])
->addColumn('name', [['get' => ['field' => 'Name']]])
->addColumn('email', [['get' => ['field' => 'Email']]])
)
->addLink(new CsvFileWriterConfig('output.csv'));
$chainProcessor = $chainBuilder->createChain($chainConfig);
// Input: DataItem with file path
$chainProcessor->process(
new ArrayIterator([new DataItem(['file' => 'customers.csv'])]),
[]
);
Example: Comma-Delimited CSV
For standard comma-separated CSV files:
$chainConfig = new ChainConfig();
$chainConfig
->addLink(new CsvExtractConfig(
delimiter: ',',
enclosure: '"',
escape: '\\'
))
->addLink((new RuleTransformConfig(false))
->addColumn('id', [['get' => ['field' => 'ID']]])
->addColumn('product', [['get' => ['field' => 'Product']]])
->addColumn('price', [['get' => ['field' => 'Price']]])
)
->addLink(new CsvFileWriterConfig('products.csv'));
$chainProcessor->process(
new ArrayIterator([new DataItem(['file' => 'data/products.csv'])]),
[]
);
Example: Tab-Delimited File
For TSV (tab-separated values) files:
$chainConfig = new ChainConfig();
$chainConfig
->addLink(new CsvExtractConfig(
delimiter: "\t", // Tab delimiter
enclosure: '"',
escape: '\\'
))
->addLink((new RuleTransformConfig(false))
->addColumn('id', [['get' => ['field' => 'UserID']]])
->addColumn('username', [['get' => ['field' => 'Username']]])
)
->addLink(new CsvFileWriterConfig('users.csv'));
$chainProcessor->process(
new ArrayIterator([new DataItem(['file' => 'users.tsv'])]),
[]
);
Example: Custom File Key
When the file path is in a different key or nested:
// Input data: ['data' => ['csv_file' => 'import.csv']]
$chainConfig = new ChainConfig();
$chainConfig
->addLink(new CsvExtractConfig(
fileKey: 'data/csv_file' // Nested path to file
))
->addLink((new RuleTransformConfig(false))
->addColumn('order_id', [['get' => ['field' => 'OrderID']]])
->addColumn('amount', [['get' => ['field' => 'Amount']]])
)
->addLink(new CsvFileWriterConfig('orders.csv'));
$chainProcessor->process(
new ArrayIterator([
new DataItem(['data' => ['csv_file' => 'orders.csv']])
]),
[]
);
Example: Processing Multiple CSV Files
Read and process multiple CSV files in one chain:
use Oliverde8\Component\PhpEtl\OperationConfig\Transformer\CallBackTransformerConfig;
$chainConfig = new ChainConfig();
$chainConfig
->addLink(new CsvExtractConfig(delimiter: ','))
->addLink(new CallBackTransformerConfig(function(DataItem $item) {
$data = $item->getData();
echo "Processing row from {$data['source_file']}: {$data['ID']}\n";
return $item;
}))
->addLink(new CsvFileWriterConfig('combined.csv'));
// Process multiple files
$files = [
new DataItem(['file' => 'customers1.csv']),
new DataItem(['file' => 'customers2.csv']),
new DataItem(['file' => 'customers3.csv']),
];
$chainProcessor->process(new ArrayIterator($files), []);
Example: With External File Finder
Import files from remote location and process CSV content:
use Oliverde8\Component\PhpEtl\OperationConfig\Extract\ExternalFileFinderConfig;
use Oliverde8\Component\PhpEtl\OperationConfig\Transformer\ExternalFileProcessorConfig;
$chainConfig = new ChainConfig();
$chainConfig
// Find CSV files on remote filesystem
->addLink(new ExternalFileFinderConfig(directory: '/remote/exports'))
// Copy file locally
->addLink(new ExternalFileProcessorConfig())
// Extract CSV data
->addLink(new CsvExtractConfig(delimiter: ','))
// Transform data
->addLink((new RuleTransformConfig(false))
->addColumn('id', [['get' => ['field' => 'ID']]])
->addColumn('status', [['get' => ['field' => 'Status']]])
)
// Save results
->addLink(new CsvFileWriterConfig('imported-data.csv'))
// Clean up local file
->addLink(new ExternalFileProcessorConfig());
$chainProcessor->process(
new ArrayIterator([new DataItem('/^export_[0-9]{8}\.csv$/')]),
[]
);
Example: With Filtering
Extract CSV and filter rows based on conditions:
use Oliverde8\Component\PhpEtl\OperationConfig\Transformer\FilterDataConfig;
$chainConfig = new ChainConfig();
$chainConfig
->addLink(new CsvExtractConfig(delimiter: ','))
// Only process active customers
->addLink(new FilterDataConfig('@data["Status"] == "Active"'))
->addLink((new RuleTransformConfig(false))
->addColumn('customer_id', [['get' => ['field' => 'ID']]])
->addColumn('name', [['get' => ['field' => 'Name']]])
->addColumn('email', [['get' => ['field' => 'Email']]])
)
->addLink(new CsvFileWriterConfig('active-customers.csv'));
$chainProcessor->process(
new ArrayIterator([new DataItem(['file' => 'all-customers.csv'])]),
[]
);
Example: With Grouping
Extract CSV and group data:
use Oliverde8\Component\PhpEtl\OperationConfig\Grouping\SimpleGroupingConfig;
$chainConfig = new ChainConfig();
$chainConfig
->addLink(new CsvExtractConfig(delimiter: ','))
// Group orders by customer
->addLink(new SimpleGroupingConfig(
groupBy: [['get' => ['field' => 'CustomerID']]],
aggregations: [
'total_orders' => [['count' => []]],
'total_amount' => [['sum' => ['field' => 'Amount']]],
]
))
->addLink(new CsvFileWriterConfig('customer-summary.csv'));
$chainProcessor->process(
new ArrayIterator([new DataItem(['file' => 'orders.csv'])]),
[]
);
Example: Large File Processing with Logging
Process large CSV files with progress logging:
use Oliverde8\Component\PhpEtl\OperationConfig\Transformer\LogConfig;
$chainConfig = new ChainConfig();
$rowCount = 0;
$chainConfig
->addLink(new CsvExtractConfig(delimiter: ','))
->addLink(new CallBackTransformerConfig(function(DataItem $item) use (&$rowCount) {
$rowCount++;
if ($rowCount % 1000 == 0) {
echo "Processed {$rowCount} rows...\n";
}
return $item;
}))
->addLink((new RuleTransformConfig(false))
->addColumn('id', [['get' => ['field' => 'ID']]])
->addColumn('data', [['get' => ['field' => 'Data']]])
)
->addLink(new CsvFileWriterConfig('output.csv'));
$chainProcessor->process(
new ArrayIterator([new DataItem(['file' => 'large-dataset.csv'])]),
[]
);
echo "Completed! Total rows: {$rowCount}\n";
Example: CSV with Error Handling
Handle malformed CSV files gracefully:
use Oliverde8\Component\PhpEtl\OperationConfig\FailSafeConfig;
$csvChain = (new ChainConfig())
->addLink(new CsvExtractConfig(delimiter: ','))
->addLink((new RuleTransformConfig(false))
->addColumn('id', [['get' => ['field' => 'ID']]])
->addColumn('name', [['get' => ['field' => 'Name']]])
);
$chainConfig = new ChainConfig();
$chainConfig
->addLink(new FailSafeConfig(
chainConfig: $csvChain,
exceptionsToCatch: [\Exception::class],
nbAttempts: 1 // Don't retry malformed files
))
->addLink(new CsvFileWriterConfig('valid-records.csv'));
$chainProcessor->process(
new ArrayIterator([new DataItem(['file' => 'input.csv'])]),
[]
);
Understanding CSV Structure
Input CSV file:
ID,Name,Email,Status
1,John Doe,john@example.com,Active
2,Jane Smith,jane@example.com,Inactive
3,Bob Wilson,bob@example.com,Active
After CsvExtractConfig, each row becomes a DataItem:
DataItem(['ID' => '1', 'Name' => 'John Doe', 'Email' => 'john@example.com', 'Status' => 'Active'])
DataItem(['ID' => '2', 'Name' => 'Jane Smith', 'Email' => 'jane@example.com', 'Status' => 'Inactive'])
DataItem(['ID' => '3', 'Name' => 'Bob Wilson', 'Email' => 'bob@example.com', 'Status' => 'Active'])
Key points:
- First row is used as column headers
- Each subsequent row becomes a separate DataItem
- Column headers become array keys in the DataItem
- All values are strings (type conversion happens in transformers)
Best Practices
1. Match Delimiter to File Format
// Standard CSV (comma)
new CsvExtractConfig(delimiter: ',')
// European CSV (semicolon)
new CsvExtractConfig(delimiter: ';')
// Tab-delimited
new CsvExtractConfig(delimiter: "\t")
2. Handle Missing Headers
// If CSV has no header row, add one first or use CallbackTransformer
->addLink(new CallBackTransformerConfig(function(DataItem $item) {
$data = $item->getData();
// Map numeric indices to named keys
return new DataItem([
'id' => $data[0] ?? null,
'name' => $data[1] ?? null,
'email' => $data[2] ?? null,
]);
}))
3. Validate Data Early
->addLink(new FilterDataConfig('@data["ID"] != null and data["Email"] != ""'))
4. Log Progress for Large Files
->addLink(new LogConfig(
message: 'Processing record @data["ID"]',
level: 'debug'
))
5. Handle Special Characters
// Ensure enclosure is set correctly for fields with delimiters
new CsvExtractConfig(
delimiter: ',',
enclosure: '"', // Fields like "Smith, John" are handled correctly
escape: '\\'
)
6. Use Appropriate File Keys
// For simple file path
fileKey: 'file'
// For nested data structures
fileKey: 'input/csv_file'
fileKey: 'data/source/path'