Support generating splits with finer granularity than file level #6917
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
[core] Support generating splits with finer granularity than file level
Purpose
Linked issue: close #5012
This PR implements support for generating splits at finer granularity than file level (e.g., row groups for Parquet, stripes for ORC) to significantly enhance concurrency when reading large files. This follows the proven pattern used by Spark and Flink, where files are split at natural boundaries (row groups/stripes) rather than file boundaries.
The implementation leverages existing
RawFileinfrastructure withoffsetandlengthfields, ensuring backward compatibility while enabling improved parallelism for large file reads.Tests
Unit tests and integration tests are needed for:
ParquetMetadataReader: Verify row group boundary extractionOrcMetadataReader: Verify stripe boundary extractionFineGrainedSplitGenerator: Verify split generation logicParquetReaderFactory.createReader(offset, length): Verify range-based readingOrcReaderFactory.createReader(offset, length): Verify range-based readingAPI and Format
New Configuration Options:
source.split.file-enabled: Enable finer-grained file splitting (default:false)source.split.file-threshold: Minimum file size to consider splitting (default:128MB)source.split.file-max-splits: Maximum splits per file (default:100)New Interfaces/Classes:
FormatMetadataReader: Interface for reading format-specific metadataFileSplitBoundary: Represents split boundaries (offset, length, rowCount)ParquetMetadataReader: Extracts row group boundaries from Parquet filesOrcMetadataReader: Extracts stripe boundaries from ORC filesFineGrainedSplitGenerator: Decorator forSplitGeneratorthat enables fine-grained splittingExtended Interfaces:
FormatReaderFactory.createReader(Context, offset, length): Now implemented for Parquet and ORCDataSplit: Added transientfileSplitBoundariesfield (not serialized for backward compatibility)Storage Format: No changes to storage format. This is a read-time optimization that doesn't affect how data is written or stored.
Documentation
This change introduces a new feature that should be documented:
source.split.file-enabledand related optionsFormatMetadataReaderinterface and implementationsThe feature is disabled by default to maintain backward compatibility. Users can enable it by setting
source.split.file-enabled=truein their table options.