Skip to content

Conversation

@yruslan
Copy link
Collaborator

@yruslan yruslan commented Dec 19, 2025

Closes #809

Summary by CodeRabbit

  • New Features

    • Per-file compression detection and support for reading Hadoop-compatible compressed files (gzip, bz2, etc.) with limited per-file parallelism.
  • Options

    • enable_index_cache now documented as on by default.
    • New option input_split_size_compressed_mb to control input split sizing for compressed files.
  • Documentation

    • README updated to recommend uncompressed files for best performance.
  • Tests

    • New integration tests, sample data and expected outputs covering compressed, mixed and ASCII scenarios.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Dec 19, 2025

Walkthrough

Adds compression-awareness across file I/O and reader configuration: streams and FileStreamer now accept Hadoop Configuration, expose isCompressed/isEndOfStream, FileWithOrder carries per-file compression, reader selection adapts when compressed files exist, split sizing for compressed files added, README and tests updated to exercise compressed and mixed-file scenarios.

Changes

Cohort / File(s) Summary
Documentation
README.md
Notes support for Hadoop-compatible compressed files and adjusts Multisegment/options wording for enable_index_cache.
Stream trait
cobol-parser/src/main/scala/.../SimpleStream.scala
Added def isCompressed: Boolean = false to the trait API.
Streaming core & input streams
spark-cobol/src/main/scala/.../streaming/FileStreamer.scala, spark-cobol/src/main/scala/.../streaming/BufferedFSDataInputStream.scala, cobol-parser/src/main/scala/.../reader/stream/*.scala
Constructors changed from FileSystem to Configuration/SerializableConfiguration; added compression detection and isCompressed/isEndOfStream; adjusted size, seek/skip, buffering, and copyStream/ensureOpened logic to work with compressed and uncompressed streams.
File utilities
spark-cobol/src/main/scala/.../utils/FileUtils.scala
New helpers: isCompressed(Path, Configuration), getCompressionCodec(Path, Configuration), getCompressedFileSize(Path, Configuration) (reads through decompressor to compute uncompressed size).
Reader parameters & parsing
cobol-parser/src/main/scala/.../parameters/{CobolParametersParser.scala,ReaderParameters.scala,VariableLengthParameters.scala}
Added inputSplitSizeCompressedMB option, propagated through VariableLengthParameters, ReaderParameters, and CobolParameters parsing; default for enable_index_cache adjusted.
Variable-length reader sizing
cobol-parser/src/main/scala/.../reader/VarLenNestedReader.scala
getSplitSizeMB now accepts isCompressed to choose compressed/uncompressed defaults and max split-size; error messages and limits adjusted accordingly.
Indexing & scanners
spark-cobol/src/main/scala/.../index/IndexBuilder.scala, spark-cobol/src/main/scala/.../scanners/CobolScanners.scala
Use compressed-uncompressed-aware file size when computing bytesToRead/maximumFileBytes; replaced FileSystem-based FileStreamer calls with Configuration-based constructors.
Processor & call sites
spark-cobol/src/main/scala/.../SparkCobolProcessor.scala
FileStreamer constructions now pass SerializableConfiguration instead of FileSystem.
Relation & metadata
spark-cobol/src/main/scala/.../CobolRelation.scala, spark-cobol/src/main/scala/.../types/FileWithOrder.scala
CobolRelation now accepts external filesList: Array[FileWithOrder]; FileWithOrder gains isCompressed: Boolean; getListFilesWithOrder detects per-file compression via CompressionCodecFactory and Path.
DataSource / DefaultSource
spark-cobol/src/main/scala/.../DefaultSource.scala
Retrieves filesList with compression metadata, computes hasCompressedFiles, buildEitherReader now accepts hasCompressedFiles, added recursive retrieval helper, switched to getDefaultFsBlockSize.
Utilities rename
spark-cobol/src/main/scala/.../utils/SparkUtils.scala
Renamed getDefaultHdfsBlockSizegetDefaultFsBlockSize.
Tests — new fixtures & data
data/test40_copybook.cob, data/test40_expected/*, data/test40_data_ascii/ascii.txt
Added copybook, expected layout/schema fixtures, and ASCII test file.
Tests — new integration
spark-cobol/src/test/scala/.../integration/Test40CompressesFilesSpec.scala
New integration tests for gzip/bz2 compressed EBCDIC/ASCII, mixed files, and index scenarios.
Tests — updates
spark-cobol/src/test/scala/.../{CobolRelationSpec.scala, index/IndexBuilderSpec.scala, streaming/FileStreamerSpec.scala, integration/Test5MultisegmentSpec.scala, regression/Test12MultiRootSparseIndex.scala, integration/Test37RecordLengthMappingSpec.scala}
Updated to pass filesList and isCompressed where required, switch FileStreamer tests to use Configuration, and pass enable_index_cache = false in some tests.

Sequence Diagram(s)

sequenceDiagram
    participant App as Application
    participant DS as DefaultSource
    participant CR as CobolRelation
    participant FU as FileUtils
    participant CCF as CompressionCodecFactory
    participant FSM as FileStreamer
    participant BIS as BufferedFSDataInputStream
    participant Reader as COBOL Reader

    App->>DS: createRelation(sourceDirs)
    DS->>CR: getListFilesWithOrder(sourceDirs, sqlContext, isRecursive)
    loop per file
        CR->>CCF: probe codec for Path(file) using Configuration
        CCF-->>CR: codec / null
        CR->>FU: isCompressed(Path, conf)
        FU-->>CR: true/false
        CR->>CR: build FileWithOrder(path, order, isCompressed)
    end
    CR-->>DS: filesList
    DS->>DS: hasCompressedFiles = filesList.exists(_.isCompressed)
    alt hasCompressedFiles
        DS->>DS: select reader avoiding fixed-length parallel reads
    else
        DS->>DS: allow fixed-length reader
    end

    Reader->>FSM: openStream(path, sconf, offsets)
    FSM->>BIS: create BufferedFSDataInputStream(path, conf, ...)
    BIS->>CCF: detect & wrap decompressor if needed
    CCF-->>BIS: decompressed stream or passthrough
    BIS-->>FSM: return stream (isCompressed set)
    FSM-->>Reader: provide bytes / slices
    Reader->>Reader: parse COBOL records
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

  • Attention points:
    • FileStreamer.scala — migration to Configuration, correct fileSize/size/totalSize/isEndOfStream semantics, copyStream behavior.
    • BufferedFSDataInputStream.scala — decompressor wrapping, accurate startOffset skipping/seeking for compressed vs uncompressed streams, buffer edge cases and bytesRead accounting.
    • FileUtils.getCompressedFileSize — correctness and performance of full-stream scan for large files; resource/exception handling.
    • CobolRelation.getListFilesWithOrder & FileWithOrder — codec detection correctness and propagation into downstream logic.
    • DefaultSource.buildEitherReader & reader selection — ensure behavior when mixed compressed/uncompressed files present.
    • Parameter propagation — inputSplitSizeCompressedMB threading through parser → readers and validation.

Possibly related PRs

Poem

🐰
I hop through gzip and bz2 fields,
I sniff the bytes the codec yields.
No unzip porch, I leap inside,
One crunchy record at each stride.
Hooray — compressed files now opened wide!

Pre-merge checks and finishing touches

✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The PR title clearly describes the main objective: adding support for reading compressed EBCDIC files, which aligns with the core changeset.
Linked Issues check ✅ Passed The PR implements the requested feature from issue #809 to read compressed COBOL/EBCDIC files directly, enabling Spark to handle decompression and reading in parallel.
Out of Scope Changes check ✅ Passed All changes support the core objective: compression detection, file handling refactoring, parameter additions, and comprehensive test coverage for compressed file scenarios.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.
✨ Finishing touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feature/809-add-compression-support

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions
Copy link

github-actions bot commented Dec 19, 2025

JaCoCo code coverage report - 'cobol-parser'

Overall Project 91.52% -0.08% 🍏
Files changed 33.33% 🍏

File Coverage
SimpleStream.scala 100% 🍏
VarLenNestedReader.scala 68.93% -5.67% 🍏

@github-actions
Copy link

github-actions bot commented Dec 19, 2025

JaCoCo code coverage report - 'spark-cobol'

File Coverage [92.66%] 🍏
FileWithOrder.scala 100% 🍏
FileStreamer.scala 99.07% 🍏
IndexBuilder.scala 97.17% 🍏
SparkCobolProcessor.scala 96.69% 🍏
DefaultSource.scala 93.64% 🍏
CobolScanners.scala 93.36% 🍏
BufferedFSDataInputStream.scala 93.05% 🍏
SparkUtils.scala 90.97% 🍏
FileUtils.scala 82.71% 🍏
CobolRelation.scala 82.38% 🍏
Total Project Coverage 81.79% 🍏

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/BufferedFSDataInputStream.scala (1)

19-138: Handle startOffset safely for compressed files and tighten readFully copy logic

Two issues need addressing:

  1. Compressed + startOffset is unsafe

openStream() seeks the underlying FSDataInputStream before detecting compression, but most compression codecs require full decompression starting at the beginning of the stream. Seeking into the middle of a compressed byte stream before wrapping with a codec will corrupt data or cause IO errors.

This should either fail fast when codec != null && startOffset > 0, or skip on the decompressed stream instead of the compressed one. A safer pattern:

val fsIn: FSDataInputStream = fileSystem.open(filePath)

val factory = new CompressionCodecFactory(hadoopConfig)
val codec = factory.getCodec(filePath)

val baseStream: InputStream =
  if (codec != null) {
    isCompressedStream = true
    codec.createInputStream(fsIn)
  } else {
    fsIn
  }

if (startOffset > 0) {
  if (codec == null) {
    fsIn.seek(startOffset)
  } else {
    var toSkip = startOffset
    while (toSkip > 0) {
      val skipped = baseStream.skip(toSkip)
      if (skipped <= 0) return baseStream
      toSkip -= skipped
    }
  }
}

baseStream
  1. Avoid copying more bytes than were actually read into the buffer

In readFully, the second copy branch uses lengthLeft as the copy size even when bufferContainBytes < lengthLeft. System.arraycopy will throw IndexOutOfBoundsException if copying exceeds array bounds, and this code risks copying uninitialized bytes.

Cap the copy by what's actually available:

if (bufferContainBytes > 0 && lengthLeft > 0) {
  val available = bufferContainBytes - bufferPos
  val bytesToCopy = Math.min(lengthLeft, available)
  System.arraycopy(buffer, bufferPos, b, offsetLeft, bytesToCopy)
  bufferPos += bytesToCopy
  offsetLeft += bytesToCopy
  lengthLeft -= bytesToCopy
}
🧹 Nitpick comments (2)
README.md (1)

42-43: Clarify compression wording and parallelism limitation

The note is accurate but a bit vague. Consider clarifying that this relies on Hadoop compression codecs and that non-splittable codecs (e.g. gzip) imply only per-file parallelism, while uncompressed data allows true per-split parallelism. For example:

Supports reading files compressed with Hadoop compression codecs (gzip, bzip2, etc.). For non‑splittable codecs only per‑file parallelism is available, so uncompressed files are preferred for performance.

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/stream/SimpleStream.scala (1)

32-32: Default isCompressed is reasonable; consider documenting semantics

The default false is sensible and keeps existing implementations working. It would help future implementers if you briefly document that isCompressed reports whether the underlying stream is compressed (in the Hadoop‑codec sense) and that callers may adjust behavior based on it.

📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7cbf3af and 54b2abd.

⛔ Files ignored due to path filters (3)
  • data/test40_data/example.dat is excluded by !**/*.dat
  • data/test40_data/example.dat.bz2 is excluded by !**/*.bz2
  • data/test40_data/example.dat.gz is excluded by !**/*.gz
📒 Files selected for processing (19)
  • README.md (1 hunks)
  • cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/stream/SimpleStream.scala (1 hunks)
  • data/test40_copybook.cob (1 hunks)
  • data/test40_expected/test40_layout.txt (1 hunks)
  • data/test40_expected/test40_schema.json (1 hunks)
  • spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessor.scala (3 hunks)
  • spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/CobolRelation.scala (4 hunks)
  • spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala (4 hunks)
  • spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala (2 hunks)
  • spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/scanners/CobolScanners.scala (2 hunks)
  • spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/BufferedFSDataInputStream.scala (3 hunks)
  • spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala (5 hunks)
  • spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/types/FileWithOrder.scala (1 hunks)
  • spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/CobolRelationSpec.scala (4 hunks)
  • spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilderSpec.scala (9 hunks)
  • spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test40CompressesFilesSpec.scala (1 hunks)
  • spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test5MultisegmentSpec.scala (2 hunks)
  • spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test12MultiRootSparseIndex.scala (2 hunks)
  • spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamerSpec.scala (3 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-08-19T09:12:31.865Z
Learnt from: yruslan
Repo: AbsaOSS/cobrix PR: 775
File: spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/writer/FixedLengthEbcdicWriterSuite.scala:0-0
Timestamp: 2025-08-19T09:12:31.865Z
Learning: In Cobrix COBOL writer, null values are encoded as zero bytes (0x00) for all field types, including PIC X fields, rather than using traditional COBOL padding (EBCDIC spaces 0x40 for PIC X). This design decision prioritizes consistency with the reader side, which already treats zero bytes as nulls, ensuring round-trip fidelity between reading and writing operations.

Applied to files:

  • data/test40_copybook.cob
🧬 Code graph analysis (8)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/BufferedFSDataInputStream.scala (2)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala (2)
  • isCompressed (71-71)
  • close (133-140)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/stream/SimpleStream.scala (1)
  • isCompressed (32-32)
spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test40CompressesFilesSpec.scala (5)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/CopybookParser.scala (1)
  • CopybookParser (42-490)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/policies/DebugFieldsPolicy.scala (1)
  • DebugFieldsPolicy (19-44)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/FileUtils.scala (3)
  • FileUtils (35-235)
  • writeStringToFile (89-96)
  • writeStringsToFile (118-128)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/SparkUtils.scala (2)
  • SparkUtils (37-554)
  • prettyJSON (444-450)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/Copybook.scala (1)
  • generateRecordLayoutPositions (224-286)
spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamerSpec.scala (1)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala (1)
  • next (91-131)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala (5)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/BufferedFSDataInputStream.scala (1)
  • isCompressed (54-54)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala (1)
  • isCompressed (71-71)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/stream/SimpleStream.scala (1)
  • isCompressed (32-32)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/index/entry/SparseIndexEntry.scala (1)
  • SparseIndexEntry (19-19)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (1)
  • isRdwBigEndian (97-97)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/stream/SimpleStream.scala (2)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/BufferedFSDataInputStream.scala (1)
  • isCompressed (54-54)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala (1)
  • isCompressed (71-71)
spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/CobolRelationSpec.scala (2)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/CobolRelation.scala (3)
  • CobolRelation (65-104)
  • CobolRelation (106-131)
  • getListFilesWithOrder (113-130)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala (2)
  • sqlContext (131-131)
  • isRecursiveRetrieval (143-146)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/CobolRelation.scala (3)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/types/FileWithOrder.scala (1)
  • source (22-22)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/BufferedFSDataInputStream.scala (1)
  • isCompressed (54-54)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/stream/SimpleStream.scala (1)
  • isCompressed (32-32)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/types/FileWithOrder.scala (1)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/scanners/CobolScanners.scala (7)
  • source (36-187)
  • source (38-55)
  • source (57-82)
  • source (84-116)
  • source (118-129)
  • source (131-147)
  • source (149-175)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
  • GitHub Check: test (2.12.20, 2.12, 3.3.4, 0, 80, 20)
  • GitHub Check: Spark 3.5.7 on Scala 2.13.17
  • GitHub Check: Spark 3.5.7 on Scala 2.12.20
  • GitHub Check: Spark 2.4.8 on Scala 2.11.12
  • GitHub Check: Spark 3.4.4 on Scala 2.12.20
🔇 Additional comments (31)
data/test40_expected/test40_schema.json (1)

1-985: Schema golden file looks consistent with the copybook

The expected Spark schema for test40 appears internally consistent (types, precision/scale, maxLength where applicable) and matches the new copybook on the spot‑checked fields. Good addition for guarding regressions in numeric and edge‑case decoding.

data/test40_expected/test40_layout.txt (1)

1-393: Layout golden file is coherent and aligns with the new copybook

The positional layout for RECORD (field order, start/end, and lengths, including debug fields) is consistent with the copybook structure and total record length. This should be a solid regression guard for both parsing and writer behavior.

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala (1)

19-47: Compression‑aware FileStreamer looks good; rely on underlying stream to enforce offset rules

The refactor to use Configuration + Path, the explicit isCompressed flag, and the updated isEndOfStream logic for compressed streams all make sense and are consistent with the new compressed‑file support.

One thing to keep in mind is that correctness for compressed files with non‑zero startOffset or maximumBytes now hinges on how BufferedFSDataInputStream applies those parameters. With the proposed fix there (avoid seeking into the middle of compressed streams and instead skip on the decompressed stream, or disallow offsets for compressed), this class’ logic for size, next(), and isEndOfStream will behave as expected.

No additional changes required here once the lower‑level stream semantics are corrected.

Also applies to: 54-78, 91-151, 154-158

data/test40_copybook.cob (1)

1-260: Comprehensive test copybook is well‑structured and aligned with schema/layout

This copybook is a good, exhaustive set of edge cases (string numerics, binary/COMP/COMP‑3, scaled P fields, separate signs, floating point, and “exotic” PICs). It matches the new expected schema/layout on the spot‑checked fields and should significantly strengthen regression coverage for numeric decoding and formatting.

spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilderSpec.scala (1)

56-56: LGTM! Consistent test updates for new FileWithOrder signature.

All test cases correctly updated to include isCompressed = false parameter, which is appropriate for uncompressed local test files.

Also applies to: 78-78, 100-100, 120-120, 140-140, 162-162, 181-181, 201-204, 242-243

spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test12MultiRootSparseIndex.scala (1)

64-64: LGTM! Consistent migration to Configuration-based FileStreamer.

The tests correctly updated to use new Configuration() instead of obtaining a FileSystem, aligning with the broader API refactor.

Also applies to: 80-80

spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test5MultisegmentSpec.scala (2)

211-211: LGTM! Consistent migration to Configuration-based FileStreamer.

The FileStreamer constructor correctly updated to use new Configuration(), aligning with the broader refactor.


30-31: These imports are necessary and actively used throughout the file.

The verification shows that StandardCharsets, Files, and Paths are used extensively across multiple test methods (lines 67, 84–85, 91, 122, 140–141, 147, 177, 195–196, 202, 206, 246, 263–264, 270, 297, 314–315, 321). These classes are called repeatedly for file operations reading expected/actual results and copybook data with character encoding specifications.

Likely an incorrect or invalid review comment.

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/types/FileWithOrder.scala (1)

22-22: LGTM! Essential addition for compression support.

The isCompressed field enables per-file compression metadata tracking, which is core to the PR objectives. The change is a breaking API modification, but all call sites appear to have been updated consistently across the codebase.

spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamerSpec.scala (1)

47-47: LGTM! Comprehensive test updates for Configuration-based API.

All FileStreamer instantiations consistently updated to use new Configuration(), ensuring test coverage for the refactored constructor signature.

Also applies to: 54-54, 62-62, 74-74, 82-82, 90-90

spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test40CompressesFilesSpec.scala (4)

43-90: Thorough validation helper for compressed EBCDIC files.

The test helper validates layout, schema, and sample data, ensuring comprehensive test coverage. The approach of writing actual results to files when mismatches occur aids debugging.


92-116: Good coverage for ASCII compressed files.

The helper tests ASCII file reading with various options, verifying both record counts and data content.


118-160: Comprehensive test cases for compression formats.

Tests cover:

  • Individual formats (gzip, bz2)
  • Mixed compressed/uncompressed files
  • ASCII compressed files with various charset options

This provides good coverage for the compression feature.


31-42: Well-structured test suite for compression support.

The test suite provides comprehensive coverage for the new compression feature with clear test data paths and expected outputs. All required test data files (test40_copybook.cob, test40_schema.json, test40_layout.txt, and test40.txt) are present in the data directory.

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala (2)

246-247: LGTM! Consistent Configuration-based stream creation.

FileStreamer constructors correctly updated to use config parameter, aligning with the broader API refactor.


208-213: Correct handling of compressed files in index generation.

For compressed streams, the code returns a single SparseIndexEntry(0, -1, fileOrder, 0L) instead of attempting traditional indexing. This is appropriate because compressed streams cannot be randomly accessed—they must be read sequentially from the beginning. Downstream code properly processes the offsetTo = -1 sentinel by replacing it with the calculated end offset during index post-processing.

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessor.scala (1)

232-233: LGTM! Consistent Configuration-based stream creation throughout.

All FileStreamer instantiations correctly updated to use sconf.value (Configuration), maintaining consistency with the broader API refactor while preserving existing processing logic.

Also applies to: 243-243, 269-269

spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/CobolRelationSpec.scala (4)

64-72: LGTM!

The test correctly adapts to the new CobolRelation constructor signature by precomputing the filesList via getListFilesWithOrder. The use of isRecursiveRetrieval = true is appropriate for finding files within the test directory.


93-99: LGTM!

Consistent update pattern for the new constructor signature.


111-117: LGTM!

Consistent update completing the test file's adaptation to the new API.


32-33: The java.io.File import is necessary.

java.io is not implicitly imported in Scala (only scala, java.lang, and object Predef are imported by default). The import at line 32 is essential because the code uses the File type at line 38 for the copybookFile variable declaration. The earlier import of org.apache.commons.io.FileUtils (line 19) is unrelated—it's from Apache Commons, not the standard java.io package. The import is correct and required.

Likely an incorrect or invalid review comment.

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala (5)

21-21: LGTM!

Import added to support the isRecursiveRetrieval method's use of FileInputFormat.INPUT_DIR_RECURSIVE.


63-69: LGTM!

Good approach to detect compression upfront and log the processing mode change. The log message appropriately informs users that binary parallelism and indexes won't be used for compressed files.


71-75: LGTM!

Correctly passes the precomputed filesList to CobolRelation and the compression flag to the reader factory.


139-146: LGTM!

The helper method correctly reads the Hadoop configuration for recursive directory traversal with a safe default of false.


157-165: The concern raised in the review comment is incorrect. The current logic properly handles compressed text files.

Analysis:

  1. How compression is detected: isCompressed is determined by checking if Spark detected a compression codec: val isCompressed = codec != null. This is set in CobolRelation.scala.

  2. The logic is correct: When a file is text (.isText == true) with no variable-length parameters, it uses createTextReader regardless of compression. This is the correct behavior because:

    • Spark's text reader natively handles compressed files transparently through codec detection
    • Text files don't use binary parallelism or indexes anyway (those apply only to fixed-length binary records)
    • The log message "Binary parallelism and indexes won't be used for them" confirms the hasCompressedFiles check applies to binary files, not text
  3. Tests confirm this works: Test40CompressesFilesSpec.scala includes three passing tests ("read a compressed ASCII file 1/2/3") that explicitly load gzip-compressed text files (ascii.txt.gz) with text record formats ("D" and "D2"), and they work correctly.

  4. Why the condition order is appropriate:

    • Text files (compressed or not) → use createTextReader (Spark's native codec handling)
    • Fixed-length binary without compression → use createFixedLengthReader (enables binary parallelism)
    • Fixed-length binary with compression OR variable-length → use createVariableLengthReader (requires streaming)

The hasCompressedFiles check on line 160 intentionally applies only to fixed-length binary records, not to text records, which is the correct design.

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/CobolRelation.scala (5)

20-21: LGTM!

New imports added to support Hadoop-based compression detection via CompressionCodecFactory and Path.


65-70: LGTM!

Good refactoring to accept the precomputed filesList externally. This separation of concerns allows the caller (DefaultSource) to inspect compression status and select the appropriate reader before constructing the relation.


96-103: LGTM!

Adding the explicit return type RDD[Row] improves API clarity for this package-private method.


119-129: LGTM!

The compression detection uses Hadoop's standard CompressionCodecFactory.getCodec() which identifies compression based on file extension (e.g., .gz, .bz2, .snappy). The factory is created once and reused efficiently for all files.


27-27: LGTM!

Import updated to include VarLenReader which aligns with the reader type usage in buildScan().

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (4)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (2)

155-160: Consider context-aware validation bounds.

The upper bound of 200000 MB (~200 GB) is very permissive and applies uniformly to both compressed and uncompressed files. While this accommodates large compressed files (per the ~140 GB use case), it may be overly permissive for uncompressed files where such large split sizes could indicate a configuration error.

Consider either:

  1. Applying stricter bounds for uncompressed files (e.g., keep 2000 MB), or
  2. Adding a note in the error message indicating whether the limit applies to compressed or uncompressed context.
🔎 Proposed differentiated validation
     if (inputSplitSizeMB.nonEmpty) {
-      if (inputSplitSizeMB.get < 1 || inputSplitSizeMB.get > 200000) {
-        throw new IllegalArgumentException(s"Invalid input split size of ${inputSplitSizeMB.get} MB.")
+      val maxSplitSizeMB = if (dataStream.isCompressed) 200000 else 2000
+      if (inputSplitSizeMB.get < 1 || inputSplitSizeMB.get > maxSplitSizeMB) {
+        throw new IllegalArgumentException(s"Invalid input split size of ${inputSplitSizeMB.get} MB (max allowed: $maxSplitSizeMB MB).")
       }

217-230: Minor: Simplify pattern match on line 220.

The size variable is captured but unused; the original Option is returned instead. This can be simplified.

🔎 Proposed simplification
   private def getSplitSizeMB(isCompressed: Boolean): Option[Int] = {
     if (isCompressed) {
-      readerProperties.inputSplitSizeCompressedMB match {
-        case Some(size) => readerProperties.inputSplitSizeCompressedMB
-        case None => Some(1024)
-      }
+      readerProperties.inputSplitSizeCompressedMB.orElse(Some(1024))
     } else {
-      if (readerProperties.inputSplitSizeMB.isDefined) {
-        readerProperties.inputSplitSizeMB
-      } else {
-        readerProperties.hdfsDefaultBlockSize
-      }
+      readerProperties.inputSplitSizeMB.orElse(readerProperties.hdfsDefaultBlockSize)
     }
   }
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/FileUtils.scala (1)

226-253: Compression size calculation is correct but may impact performance.

The getCompressedFileSize method correctly determines the uncompressed size by scanning the entire compressed stream. However, for very large compressed files (e.g., the 140GB files mentioned in the PR objectives), this full scan could take considerable time.

Considerations:

  • The warning log at line 227 is good, but might not be visible to users until after a long wait
  • For production use with large files, consider adding progress logging or a way to estimate/configure expected scan time
  • The 50MB skip buffer size seems reasonable

The implementation is functionally correct with proper resource cleanup and error handling.

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala (1)

157-165: Add explanatory comment for compression handling in reader selection.

The condition at line 160 that prevents using FixedLenReader when compressed files are present is correct. Compressed files (gzip, LZO, Snappy, etc.) are not splittable in Hadoop and must be read sequentially from the start of the stream. Fixed-length readers rely on block-based splitting that requires random seeks within files, making them incompatible with compression. The variable-length reader's streaming approach is compatible with sequential compressed file processing.

Consider adding an inline comment explaining this technical constraint to guide future maintainers.

📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0b3aa3e and 7cfb193.

📒 Files selected for processing (12)
  • README.md (2 hunks)
  • cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (3 hunks)
  • cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala (4 hunks)
  • cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala (3 hunks)
  • cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/VariableLengthParameters.scala (1 hunks)
  • spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala (4 hunks)
  • spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala (2 hunks)
  • spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/scanners/CobolScanners.scala (2 hunks)
  • spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/BufferedFSDataInputStream.scala (3 hunks)
  • spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/FileUtils.scala (2 hunks)
  • spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test37RecordLengthMappingSpec.scala (5 hunks)
  • spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test40CompressesFilesSpec.scala (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/scanners/CobolScanners.scala
  • README.md
🧰 Additional context used
🧬 Code graph analysis (6)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/FileUtils.scala (1)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/BufferedFSDataInputStream.scala (2)
  • isCompressed (54-54)
  • close (43-50)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala (1)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/Parameters.scala (1)
  • get (47-50)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/BufferedFSDataInputStream.scala (3)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/FileUtils.scala (1)
  • isCompressed (219-224)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/stream/SimpleStream.scala (1)
  • isCompressed (32-32)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala (2)
  • isCompressed (71-71)
  • close (133-140)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (4)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/FileUtils.scala (1)
  • isCompressed (219-224)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/BufferedFSDataInputStream.scala (1)
  • isCompressed (54-54)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/stream/SimpleStream.scala (1)
  • isCompressed (32-32)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala (1)
  • isCompressed (71-71)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala (1)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/FileUtils.scala (3)
  • FileUtils (36-271)
  • isCompressed (219-224)
  • getCompressedFileSize (226-253)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/VariableLengthParameters.scala (1)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (2)
  • isRdwBigEndian (97-97)
  • recordExtractor (54-89)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
  • GitHub Check: Spark 3.5.7 on Scala 2.13.17
  • GitHub Check: Spark 3.5.7 on Scala 2.12.20
  • GitHub Check: Spark 2.4.8 on Scala 2.11.12
  • GitHub Check: Spark 3.4.4 on Scala 2.12.20
  • GitHub Check: test (2.12.20, 2.12, 3.3.4, 0, 80, 20)
🔇 Additional comments (18)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (1)

147-147: LGTM!

The compression state is correctly passed to getSplitSizeMB, enabling compression-aware split size determination.

spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test37RecordLengthMappingSpec.scala (1)

61-61: LGTM! Index caching explicitly disabled for these test scenarios.

The addition of enable_index_cache = false across multiple test cases ensures consistent behavior when testing record length mapping functionality without the complexity of cached indexes. This is a reasonable test isolation strategy.

Also applies to: 82-82, 103-103, 124-124, 169-169

spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test40CompressesFilesSpec.scala (6)

43-101: Well-structured test helper method.

The testCompressedFile method provides comprehensive validation (layout, schema, and data) with good error reporting. The conditional index configuration and cleanup logic are appropriate.

One consideration: verify that dropping File_Id, Record_Id, and Record_Byte_Length at line 75 is appropriate for all test scenarios using this helper.


103-127: LGTM! Clean ASCII compression test helper.

The method is well-structured with clear assertions on both count and ordered data content. The parameterized options allow for flexible test scenarios.


129-153: Good coverage for mixed file scenarios.

This test helper validates an important use case: reading both compressed and uncompressed files from the same directory. The expected count of 6 (2x the single file) provides a good sanity check.


171-202: Good edge case coverage.

The tests for mixed compressed files (lines 171-185) and with file_end_offset (lines 187-202) provide valuable coverage for important production scenarios.


204-240: Comprehensive ASCII compression test coverage.

The tests provide good coverage of different ASCII format options (record_format D/D2, ascii_charset variations) for both single and mixed file scenarios.


31-42: No action needed—all test data files are already in the repository.

The test fixtures are present, including the copybook file, expected results, and compressed data files (both .gz and .bz2 formats) for testing compressed file handling.

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/VariableLengthParameters.scala (1)

19-69: LGTM! Clean parameter addition.

The new inputSplitSizeCompressedMB parameter is well-documented and appropriately typed as Option[Int]. The placement after inputSplitSizeMB provides logical grouping of related split-size parameters.

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala (2)

23-23: Minor import style change noted.

Changed from specific policy imports to wildcard import. This is a minor style change with no functional impact.


53-53: LGTM! Parameter addition mirrors VariableLengthParameters.

The new inputSplitSizeCompressedMB parameter is properly documented and has an appropriate default value of None, consistent with inputSplitSizeMB.

Also applies to: 106-106

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/FileUtils.scala (1)

219-224: LGTM! Clean compression detection.

The isCompressed method uses Hadoop's CompressionCodecFactory appropriately to detect compressed files. The implementation is straightforward and correct.

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala (2)

234-245: Compression-aware file sizing correctly implemented.

The conditional logic appropriately uses FileUtils.getCompressedFileSize for compressed files and falls back to standard getFileStatus for uncompressed files. The bytesToRead calculation correctly accounts for start/end offsets.

Note: For very large compressed files, the getCompressedFileSize call (line 235) will perform a full scan, which may impact performance. This is acceptable since it's only done during index generation, not for every read.


247-248: FileStreamer constructor updated to use Configuration.

The change from FileSystem to Configuration parameter aligns with the broader compression-awareness changes in the PR. This enables the FileStreamer to detect and handle compression internally.

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala (2)

125-125: LGTM! Parameter constant follows naming conventions.

The new parameter constant PARAM_INPUT_SPLIT_SIZE_COMPRESSED_MB is properly defined and follows the existing naming pattern.


389-389: Complete parameter integration.

The new inputSplitSizeCompressedMB parameter is properly:

  • Defaulted to None when constructing VariableLengthParameters (line 389)
  • Propagated to ReaderParameters (line 426)
  • Parsed from user options with type conversion (line 514)

The implementation follows the established patterns for similar parameters.

Also applies to: 426-426, 514-514

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala (2)

63-69: Good compression detection and logging.

The code properly detects compressed files in the input and logs a notice when found. This provides good observability for users.


143-146: LGTM! Recursive retrieval support added.

The isRecursiveRetrieval method correctly checks the Hadoop configuration for the recursive directory flag. This is a useful addition for handling nested directory structures.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/BufferedFSDataInputStream.scala (1)

141-146: Critical: Skip failure returns mispositioned stream, causing data corruption.

When skip() returns <= 0 on line 144, the method returns the stream without reaching startOffset. This causes the caller to read from the wrong offset, leading to incorrect data or silent corruption. An exception should be thrown instead.

🔎 Proposed fix
       } else {
         var toSkip = startOffset
         while (toSkip > 0) {
           val skipped = baseStream.skip(toSkip)
-          if (skipped <= 0) return baseStream
+          if (skipped <= 0) {
+            throw new IOException(s"Failed to skip to offset $startOffset in compressed stream. Skipped ${startOffset - toSkip} bytes, but $toSkip bytes remain.")
+          }
           toSkip -= skipped
         }
       }
🧹 Nitpick comments (2)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (1)

218-231: Simplify the redundant pattern match for compressed split size.

The pattern match at lines 220-222 is redundant—it matches Some(size) but returns inputSplitSizeCompressedMB again instead of using the matched value.

🔎 Proposed simplification
  private def getSplitSizeMB(isCompressed: Boolean): Option[Int] = {
    if (isCompressed) {
-     readerProperties.inputSplitSizeCompressedMB match {
-       case Some(size) => readerProperties.inputSplitSizeCompressedMB
-       case None => Some(1024)
-     }
+     readerProperties.inputSplitSizeCompressedMB.orElse(Some(1024))
    } else {
      if (readerProperties.inputSplitSizeMB.isDefined) {
        readerProperties.inputSplitSizeMB
      } else {
        readerProperties.hdfsDefaultBlockSize
      }
    }
  }
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/BufferedFSDataInputStream.scala (1)

67-72: Use bytesLeft instead of recomputing the expression.

Line 70 recomputes bufferContainBytes - bufferPos, which was already calculated and stored in bytesLeft on line 68. For consistency and clarity, use the existing variable.

🔎 Proposed refactor
       if (bufferPos < bufferContainBytes) {
         val bytesLeft = bufferContainBytes - bufferPos
         System.arraycopy(buffer, bufferPos, b, off, bytesLeft)
-        lengthLeft -= bufferContainBytes - bufferPos
+        lengthLeft -= bytesLeft
         offsetLeft += bytesLeft
       }
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7cfb193 and 27665e7.

📒 Files selected for processing (3)
  • cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (3 hunks)
  • spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/BufferedFSDataInputStream.scala (3 hunks)
  • spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test40CompressesFilesSpec.scala (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test40CompressesFilesSpec.scala
🧰 Additional context used
🧬 Code graph analysis (2)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/BufferedFSDataInputStream.scala (3)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/FileUtils.scala (1)
  • isCompressed (219-224)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/stream/SimpleStream.scala (1)
  • isCompressed (32-32)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala (2)
  • isCompressed (71-71)
  • close (133-140)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (5)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/BufferedFSDataInputStream.scala (1)
  • isCompressed (54-54)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/FileUtils.scala (1)
  • isCompressed (219-224)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/stream/SimpleStream.scala (1)
  • isCompressed (32-32)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala (2)
  • isCompressed (71-71)
  • size (65-65)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/Parameters.scala (1)
  • get (47-50)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
  • GitHub Check: Spark 3.5.7 on Scala 2.12.20
  • GitHub Check: Spark 3.5.7 on Scala 2.13.17
  • GitHub Check: Spark 2.4.8 on Scala 2.11.12
  • GitHub Check: Spark 3.4.4 on Scala 2.12.20
  • GitHub Check: test (2.12.20, 2.12, 3.3.4, 0, 80, 20)
🔇 Additional comments (4)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (2)

147-147: LGTM! Compression status now flows to split size calculation.

The call correctly passes dataStream.isCompressed to determine appropriate split sizes for compressed vs. uncompressed files.


156-158: LGTM! Dynamic max split size aligns well with compression constraints.

The 200 GB limit for compressed files vs. 2 GB for uncompressed makes sense given that compressed formats like gzip are typically not splittable. The error message now includes the dynamic limit for better user feedback.

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/BufferedFSDataInputStream.scala (2)

19-54: LGTM: Constructor and initialization updated correctly for compression support.

The switch from FileSystem to Configuration parameter enables compression detection, the typo fix (bufferConitainBytesbufferContainBytes) improves maintainability, and the public isCompressed accessor properly exposes the compression state.


89-96: Critical issue from past review has been resolved.

Lines 93-95 now correctly use bytesToCopy (the actual number of bytes copied) instead of bufferContainBytes (the total buffer size) when updating positions. This fixes the data corruption issue flagged in the previous review.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/BufferedFSDataInputStream.scala (1)

141-146: Improve error handling when skip fails for compressed streams.

The seek type error from the previous review was fixed (line 139 correctly uses fsIn.seek()). However, the skip failure handling remains problematic: when skip() returns <= 0 before reaching the target offset, the code silently returns a mispositioned stream. This can lead to reading from the wrong offset and data corruption.

🔎 Proposed fix
       } else {
         var toSkip = startOffset
         while (toSkip > 0) {
           val skipped = baseStream.skip(toSkip)
-          if (skipped <= 0) return baseStream
+          if (skipped <= 0) {
+            throw new IOException(s"Failed to skip to offset $startOffset in compressed stream. " +
+              s"Only skipped ${startOffset - toSkip} bytes before stream ended or blocked.")
+          }
           toSkip -= skipped
         }
       }
🧹 Nitpick comments (3)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/BufferedFSDataInputStream.scala (1)

122-135: Consider defensive resource handling if codec initialization fails.

If codec.createInputStream(fsIn) throws an exception, the underlying FSDataInputStream (fsIn) is not closed, causing a resource leak. While this is a rare edge case, wrapping in try-catch would be more robust.

🔎 Proposed fix
   private def openStream(): InputStream = {
     val fileSystem = filePath.getFileSystem(hadoopConfig)
     val fsIn: FSDataInputStream = fileSystem.open(filePath)
 
     val factory = new CompressionCodecFactory(hadoopConfig)
     val codec = factory.getCodec(filePath)
 
-    val baseStream = if (codec != null) {
-      isCompressedStream = true
-      codec.createInputStream(fsIn)
-    } else {
-      // No compression detected
-      fsIn
+    val baseStream = try {
+      if (codec != null) {
+        isCompressedStream = true
+        codec.createInputStream(fsIn)
+      } else {
+        // No compression detected
+        fsIn
+      }
+    } catch {
+      case e: Exception =>
+        fsIn.close()
+        throw e
     }
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (2)

155-161: Consider caching isCompressed and extracting magic numbers.

dataStream.isCompressed is evaluated twice (line 147 and 156). While likely inexpensive, caching it in a local val at the start would improve clarity. Additionally, the magic numbers 200000 and 2000 could be extracted as named constants to document their purpose.

🔎 Suggested refactor
   override def generateIndex(dataStream: SimpleStream,
                              headerStream: SimpleStream,
                              fileNumber: Int,
                              isRdwBigEndian: Boolean): ArrayBuffer[SparseIndexEntry] = {
+    val isCompressedStream = dataStream.isCompressed
     val inputSplitSizeRecords: Option[Int] = readerProperties.inputSplitRecords
-    val inputSplitSizeMB: Option[Int] = getSplitSizeMB(dataStream.isCompressed)
+    val inputSplitSizeMB: Option[Int] = getSplitSizeMB(isCompressedStream)

     if (inputSplitSizeRecords.isDefined) {
       if (inputSplitSizeRecords.get < 1 || inputSplitSizeRecords.get > 1000000000) {
         throw new IllegalArgumentException(s"Invalid input split size. The requested number of records is ${inputSplitSizeRecords.get}.")
       }
       logger.info(s"Input split size = ${inputSplitSizeRecords.get} records")
     } else {
       if (inputSplitSizeMB.nonEmpty) {
-        val maxSplitSizeMB = if (dataStream.isCompressed) 200000 else 2000
+        val maxSplitSizeMB = if (isCompressedStream) MaxSplitSizeCompressedMB else MaxSplitSizeUncompressedMB
         if (inputSplitSizeMB.get < 1 || inputSplitSizeMB.get > maxSplitSizeMB) {

And add companion object constants:

object VarLenNestedReader {
  private val MaxSplitSizeCompressedMB = 200000   // ~195 GB for large compressed files
  private val MaxSplitSizeUncompressedMB = 2000   // 2 GB for uncompressed files
}

218-228: Simplify the else branch using orElse.

The uncompressed path (lines 222-226) can be simplified to match the pattern used in the compressed path.

🔎 Suggested simplification
  private def getSplitSizeMB(isCompressed: Boolean): Option[Int] = {
    if (isCompressed) {
      readerProperties.inputSplitSizeCompressedMB.orElse(Some(1024))
    } else {
-      if (readerProperties.inputSplitSizeMB.isDefined) {
-        readerProperties.inputSplitSizeMB
-      } else {
-        readerProperties.hdfsDefaultBlockSize
-      }
+      readerProperties.inputSplitSizeMB.orElse(readerProperties.hdfsDefaultBlockSize)
    }
  }
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 27665e7 and 23aabb0.

📒 Files selected for processing (2)
  • cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (3 hunks)
  • spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/BufferedFSDataInputStream.scala (3 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (4)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/BufferedFSDataInputStream.scala (1)
  • isCompressed (54-54)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/FileUtils.scala (1)
  • isCompressed (219-224)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/stream/SimpleStream.scala (1)
  • isCompressed (32-32)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala (1)
  • isCompressed (71-71)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/BufferedFSDataInputStream.scala (3)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/FileUtils.scala (1)
  • isCompressed (219-224)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/stream/SimpleStream.scala (1)
  • isCompressed (32-32)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala (2)
  • isCompressed (71-71)
  • close (133-140)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
  • GitHub Check: Spark 3.4.4 on Scala 2.12.20
  • GitHub Check: Spark 2.4.8 on Scala 2.11.12
  • GitHub Check: Spark 3.5.7 on Scala 2.12.20
  • GitHub Check: Spark 3.5.7 on Scala 2.13.17
  • GitHub Check: test (2.12.20, 2.12, 3.3.4, 0, 80, 20)
🔇 Additional comments (4)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/BufferedFSDataInputStream.scala (3)

38-41: LGTM! Integer overflow fix properly addresses large file handling.

The change from Int to Long for bytesRead correctly handles files larger than ~2GB, which is essential given the PR objective to support compressed files up to ~140GB.


52-54: LGTM!

The isCompressed accessor is consistent with the SimpleStream interface and FileStreamer implementation.


89-96: LGTM! Buffer position updates correctly use bytesToCopy.

The fix properly uses the actual bytes copied for position updates instead of the total buffer size, preventing data corruption.

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (1)

147-147: LGTM!

The isCompressed parameter is correctly passed to getSplitSizeMB, enabling compression-aware split sizing. This aligns with the SimpleStream interface where isCompressed defaults to false and is overridden by FileStreamer when compression is detected.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/BufferedFSDataInputStream.scala (1)

139-145: Skip failure silently returns mispositioned stream.

When skip() returns <= 0 before reaching startOffset, the code returns the stream at an incorrect position. This can cause the caller to read from the wrong offset, leading to data corruption. The previous review flagged this issue.

🔎 Proposed fix
       } else {
         var toSkip = startOffset
         while (toSkip > 0) {
           val skipped = baseStream.skip(toSkip)
-          if (skipped <= 0) return baseStream
+          if (skipped <= 0) {
+            throw new IOException(s"Failed to skip to offset $startOffset in compressed stream. Only skipped ${startOffset - toSkip} bytes.")
+          }
           toSkip -= skipped
         }
       }
🧹 Nitpick comments (6)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/SparkUtils.scala (2)

22-22: Consider using explicit imports instead of wildcard import.

Wildcard imports can lead to namespace pollution and make it harder to track which functions are actually used. The previous selective imports (array, col, expr, max, struct) were more explicit.


492-507: Good rename for broader filesystem compatibility; minor formatting nit.

The rename from getDefaultHdfsBlockSize to getDefaultFsBlockSize correctly reflects that this method works with any FileSystem, not just HDFS.

However, there's a minor formatting inconsistency on line 495:

🔎 Suggested fix
-    val fileSystem  =pathOpt match {
+    val fileSystem = pathOpt match {
spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test40CompressesFilesSpec.scala (2)

43-102: Consider consistent cleanup of temporary files.

The helper writes temporary files (actualLayoutPath, actualSchemaPath, actualResultsPath) when comparisons fail or succeed. However, only actualResultsPath is deleted (line 99), while the other temporary files are left behind. Consider either cleaning up all temporary files or using a consistent strategy (e.g., only clean up on success, or use test temp directories).

Additional minor issue

Line 76 has double spacing between "File_Id" and "Record_Id" in the .drop() call. While this doesn't affect functionality, consistent spacing improves readability.

-      .drop("File_Id",  "Record_Id", "Record_Byte_Length")
+      .drop("File_Id", "Record_Id", "Record_Byte_Length")

104-154: Clear and well-structured test helpers.

Both testCompressedAsciiFile and testMixedAsciiFiles are clear and effective. The duplication between them is acceptable for integration tests where clarity is prioritized over DRY principles.

Optional: Consider extracting common logic

If you find yourself adding more similar helpers in the future, consider extracting the common Spark read configuration and validation logic into a shared helper method to reduce duplication.

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/FileUtils.scala (1)

235-244: Consider using read-based EOF detection for robustness.

The skip loop exits when skip() returns <= 0. Per InputStream.skip() contract, returning 0 doesn't guarantee EOF—it may just indicate a temporary inability to skip. While most compression codecs only return 0 at EOF, a more defensive approach would read a byte to confirm EOF when skip returns 0.

That said, this pattern is commonly used in practice and works reliably with Hadoop compression codecs, so this is a minor robustness concern rather than a functional bug.

🔎 Optional: More robust EOF detection
     val size = try {
       val SKIP_BUFFER_SIZE = 1024*1024*50
       var totalBytesSkipped = 0L
       var skippedLast = 1L
-      while (skippedLast > 0) {
+      var eof = false
+      while (!eof) {
         skippedLast = ifs.skip(SKIP_BUFFER_SIZE)
-        if (skippedLast > 0)
+        if (skippedLast > 0) {
           totalBytesSkipped += skippedLast
+        } else {
+          // Confirm EOF by attempting to read a byte
+          eof = ifs.read() == -1
+          if (!eof) totalBytesSkipped += 1
+        }
       }
       totalBytesSkipped
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (1)

220-226: Review the block size multiplier application logic.

When inputSplitSizeMB is not specified for uncompressed files, the code multiplies the filesystem default block size by 4. However, when inputSplitSizeMB is explicitly provided by the user, this multiplier is not applied. This asymmetry means:

  • Default behavior: 128 MB block size → 512 MB split
  • Explicit inputSplitSizeMB=128: → 128 MB split

This is likely intentional (respecting explicit user configuration), but the logic could be clearer.

Consider adding a comment explaining why the multiplier only applies to the filesystem default:

 private def getSplitSizeMB(isCompressed: Boolean): Option[Int] = {
   if (isCompressed) {
     readerProperties.inputSplitSizeCompressedMB.orElse(Some(DEFAULT_INDEX_SIZE_COMPRESSED_FILES_MB))
   } else {
+    // Apply 4x multiplier only to filesystem block size default, not to explicit user configuration
     readerProperties.inputSplitSizeMB.orElse(readerProperties.fsDefaultBlockSize).map(_ * DEFAULT_FS_INDEX_SIZE_MULTIPLIER)
   }
 }
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 23aabb0 and 8000d43.

📒 Files selected for processing (9)
  • cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (4 hunks)
  • cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala (4 hunks)
  • cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala (3 hunks)
  • spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala (7 hunks)
  • spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/BufferedFSDataInputStream.scala (3 hunks)
  • spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala (5 hunks)
  • spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/FileUtils.scala (2 hunks)
  • spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/SparkUtils.scala (2 hunks)
  • spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test40CompressesFilesSpec.scala (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (8)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala (2)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/FileUtils.scala (1)
  • isCompressed (219-221)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/SparkUtils.scala (2)
  • SparkUtils (37-554)
  • getDefaultFsBlockSize (492-507)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala (1)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/Parameters.scala (2)
  • getOrElse (78-81)
  • get (47-50)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/BufferedFSDataInputStream.scala (3)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/FileUtils.scala (3)
  • FileUtils (36-272)
  • isCompressed (219-221)
  • getCompressionCodec (223-226)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala (1)
  • isCompressed (66-66)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/stream/SimpleStream.scala (1)
  • isCompressed (32-32)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/SparkUtils.scala (3)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/schema/CobolSchema.scala (1)
  • cobrix (132-139)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/MetadataFields.scala (1)
  • MetadataFields (19-26)
spark-cobol/src/main/scala_2.12/za/co/absa/cobrix/spark/cobol/utils/impl/HofsWrapper.scala (2)
  • HofsWrapper (22-38)
  • transform (33-37)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (5)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/BufferedFSDataInputStream.scala (1)
  • isCompressed (54-54)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala (1)
  • isCompressed (66-66)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/FileUtils.scala (1)
  • isCompressed (219-221)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/stream/SimpleStream.scala (1)
  • isCompressed (32-32)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/Parameters.scala (1)
  • get (47-50)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/FileUtils.scala (1)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/BufferedFSDataInputStream.scala (2)
  • isCompressed (54-54)
  • close (43-50)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala (1)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/FileUtils.scala (2)
  • FileUtils (36-272)
  • isCompressed (219-221)
spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test40CompressesFilesSpec.scala (5)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/CopybookParser.scala (2)
  • CopybookParser (42-490)
  • parseTree (198-241)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/policies/DebugFieldsPolicy.scala (1)
  • DebugFieldsPolicy (19-44)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/FileUtils.scala (3)
  • FileUtils (36-272)
  • writeStringToFile (89-96)
  • writeStringsToFile (118-128)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/SparkUtils.scala (2)
  • SparkUtils (37-554)
  • prettyJSON (444-450)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/Copybook.scala (1)
  • generateRecordLayoutPositions (224-286)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
  • GitHub Check: Spark 3.5.7 on Scala 2.12.20
  • GitHub Check: Spark 2.4.8 on Scala 2.11.12
  • GitHub Check: Spark 3.4.4 on Scala 2.12.20
  • GitHub Check: Spark 3.5.7 on Scala 2.13.17
  • GitHub Check: test (2.12.20, 2.12, 3.3.4, 0, 80, 20)
🔇 Additional comments (29)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala (4)

63-75: LGTM! Compression awareness logic is well-implemented.

The implementation correctly:

  1. Retrieves file list with compression metadata upfront
  2. Checks for compressed files using the isCompressed flag from FileWithOrder
  3. Logs an informative message when compressed files are detected
  4. Passes both filesList and compression awareness to downstream components

139-146: LGTM! Clean helper for recursive retrieval configuration.

The method correctly reads Hadoop's FileInputFormat.INPUT_DIR_RECURSIVE configuration property with a sensible default of false.


177-177: LGTM! Method calls updated to use renamed getDefaultFsBlockSize.

All three reader factory methods (createTextReader, createFixedLengthReader, createVariableLengthReader) correctly use the renamed method that reflects broader filesystem compatibility.

Also applies to: 188-188, 202-202


157-170: Public API change: buildEitherReader signature updated.

The buildEitherReader method now requires a hasCompressedFiles parameter. The reader selection logic correctly routes compressed files to the variable-length reader, which supports stream-based decompression. All callers of this method have been updated to pass the new parameter.

spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test40CompressesFilesSpec.scala (6)

1-31: LGTM!

The package declaration, imports, and test class structure follow Scala and ScalaTest best practices. The mix-in traits provide good separation of concerns for test utilities.


32-42: Well-organized test constants.

The file path constants are clearly named and organized. The use of relative paths is acceptable for integration tests, assuming the working directory is consistently set by the test runner.


156-170: Excellent test coverage for compressed EBCDIC files.

The tests properly cover both compression formats (gzip and bzip2) with and without index functionality. The previous review comments regarding missing useIndexes parameters have been correctly addressed.


172-188: Good validation of mixed compressed files.

This test correctly validates that the system can handle directories containing multiple compressed files and process them in parallel.


190-206: Valuable edge case validation.

This test correctly validates that file_end_offset works as expected with compressed files, reducing the record count from 300 to 297.


208-244: Comprehensive coverage of ASCII file scenarios.

The tests effectively cover various combinations of compressed/mixed ASCII files with different record formats and character set options, ensuring robust handling of these configurations.

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/FileUtils.scala (3)

19-27: LGTM!

The import additions for Hadoop compression codec support and standard Java I/O classes are appropriate for the new compression functionality.


219-226: LGTM!

Clean utility methods that properly delegate to Hadoop's CompressionCodecFactory for compression detection. The null-check pattern for isCompressed is idiomatic.


248-251: Resource cleanup order is correct.

The wrapper stream (ifs) is closed before the underlying FSDataInputStream (fsIn), which is the proper order for layered streams.

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/BufferedFSDataInputStream.scala (5)

19-27: LGTM!

Good refactoring to accept Configuration instead of FileSystem. This provides more flexibility and allows the stream to determine compression settings consistently with the rest of the codebase.


33-41: LGTM!

Good use of lazy stream initialization via openStream(). The change from Int to Long for bytesRead addresses the previous overflow concern for files larger than 2 GB.


52-54: LGTM!

The isClosed and isCompressed accessors provide clean public API for stream state inspection.


89-96: LGTM!

The buffer position tracking now correctly uses bytesToCopy instead of bufferContainBytes, addressing the data corruption issue from the previous review.


136-137: LGTM!

The type error from the previous review is fixed—fsIn.seek(startOffset) is correctly called on the FSDataInputStream instead of the generic InputStream.

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala (4)

40-44: LGTM!

Good refactoring to accept Configuration instead of FileSystem. The hadoopPath field properly encapsulates the path construction.


54-56: LGTM!

Good use of lazy val for isCompressedStream and fileSize. This defers potentially expensive operations (compression codec lookup, file system metadata call) until they're actually needed, improving performance for streams that may never be used.


66-72: LGTM!

The isEndOfStream logic correctly handles the semantic difference between compressed and uncompressed streams:

  • Compressed: End-of-stream can only be determined after reading (buffer state check)
  • Uncompressed: End-of-stream can be calculated from offset vs. known size

This aligns well with the PR objective of supporting compressed file reading.


137-150: LGTM!

The copyStream, ensureOpened, and getHadoopFileSize methods are correctly updated to use hadoopConfig consistently throughout.

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (2)

45-46: New constants for compression-aware split sizing.

The constants are well-defined. The 1024 MB default for compressed files and the 4x multiplier for filesystem block size are reasonable starting points for compressed file handling.


158-161: Verify the 200 GB maximum split size for compressed files is intentional.

The maximum split size for compressed files is set to 200,000 MB (approximately 200 GB), which is 100x larger than the 2 GB limit for uncompressed files. While compressed files cannot be split arbitrarily (requiring decompression from the start), this very large limit may lead to memory pressure or extremely long-running tasks for large compressed files.

Please confirm this design choice addresses the use case described in issue #809, where compressed files can be up to 140 GB. Consider whether processing such large compressed files in a single partition aligns with Spark's parallel processing model, or if additional guidance should be provided to users about splitting large files before compression.

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala (2)

23-27: Import organization changes.

The wildcard import on line 23 followed by the explicit import on line 27 is valid Scala. The explicit import for SchemaRetentionPolicy.SchemaRetentionPolicy ensures the type is available even when imported via wildcard.


106-107: Parameter rename is already complete—no backward compatibility measures applied.

The hdfsDefaultBlockSize parameter has been fully removed and replaced with fsDefaultBlockSize throughout the codebase. There is no residual usage of the old parameter name anywhere, and no deprecated version exists to support backward compatibility. The breaking change was implemented directly without the suggested mitigations (deprecation wrapper, dual-parameter support, or release notes).

If backward compatibility is required, it would need to be retrofitted by: (1) restoring hdfsDefaultBlockSize as a deprecated alias that maps to fsDefaultBlockSize, and (2) documenting the breaking change in release notes.

Likely an incorrect or invalid review comment.

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala (3)

120-128: Parameter constants reorganized with new compression support.

The addition of PARAM_INPUT_SPLIT_SIZE_COMPRESSED_MB and reorganization of the index-related constants improves code organization.


389-389: Parameter propagation for compression-aware split sizing.

The new inputSplitSizeCompressedMB parameter correctly flows from VariableLengthParameters (line 389, defaulting to None) through to ReaderParameters (line 426). The fsDefaultBlockSize parameter is properly set from the defaultBlockSize argument (line 427).

Also applies to: 426-427


514-514: Parsing logic for compressed split size parameter.

The parsing of PARAM_INPUT_SPLIT_SIZE_COMPRESSED_MB correctly follows the same pattern as PARAM_INPUT_SPLIT_SIZE_MB, using .map(v => v.toInt) to convert the string to an integer.

isRecordIdGenerationEnabled,
params.getOrElse(PARAM_ENABLE_INDEXES, "true").toBoolean,
params.getOrElse(PARAM_ENABLE_INDEX_CACHE, "false").toBoolean,
params.getOrElse(PARAM_ENABLE_INDEX_CACHE, "true").toBoolean,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# First, find the file and examine its current state
find . -name "CobolParametersParser.scala" -type f 2>/dev/null

# Check if the file exists and examine lines around 511
if [ -f "cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala" ]; then
  echo "=== File found, showing context around line 511 ==="
  sed -n '505,520p' cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala
fi

# Search for PARAM_ENABLE_INDEX_CACHE throughout the codebase
echo ""
echo "=== Searching for PARAM_ENABLE_INDEX_CACHE references ==="
rg -n "PARAM_ENABLE_INDEX_CACHE" --type scala

# Check git history for changes to this parameter
echo ""
echo "=== Checking git log for CobolParametersParser changes ==="
git log --oneline --all -- "cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala" 2>/dev/null | head -20

Repository: AbsaOSS/cobrix

Length of output: 3226


🏁 Script executed:

#!/bin/bash
# Check for release notes, changelog, or README documentation
echo "=== Checking for README or documentation about index cache ==="
rg -n "index_cache|index cache" -i -g "*.md" -g "README*"

# Look at the specific commit to understand the intent
echo ""
echo "=== Checking commit details ==="
git show --stat 8000d431 2>/dev/null | head -50

# Check if there are tests specifically for this default
echo ""
echo "=== Looking for tests related to index cache defaults ==="
rg -n "enable_index_cache.*true" --type scala -C 2

Repository: AbsaOSS/cobrix

Length of output: 5366


Index cache now enabled by default — verify memory impact guidance.

The default value for PARAM_ENABLE_INDEX_CACHE has changed from "false" to "true". This is a behavioral change documented in README.md that will affect all users who don't explicitly set this option.

Impact: Index caching improves performance by reusing generated indexes across jobs, but increases memory usage. While the README documents this feature is "true (default)," it lacks explicit guidance on:

  • Memory consumption for large files or multiple concurrent jobs
  • Whether memory bounds or limits are enforced
  • Recommendations for resource-constrained environments

Recommendation: Enhance documentation to include:

  1. Memory impact guidance (e.g., approximate memory usage patterns)
  2. Configuration advice for environments with limited resources
  3. Information about memory bounds or cleanup mechanisms for long-running jobs
🤖 Prompt for AI Agents
In
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala
around line 511, the default for PARAM_ENABLE_INDEX_CACHE was changed to true
which increases memory usage by caching indexes; update the project
documentation (README.md) to explicitly document the memory impact and
operational guidance: add a new section describing approximate memory usage
characteristics for index caches (e.g., per-file or per-part memory patterns and
how size scales with record count and schema complexity), describe behavior for
multiple concurrent jobs, note whether the cache has any enforced bounds or
eviction/cleanup mechanisms, and provide practical recommendations for
resource-constrained environments (how to disable the cache, tune related
settings, and monitor/limit memory use), plus a short example config snippet
showing how to explicitly disable the cache when needed.

@yruslan yruslan merged commit a0136c0 into master Dec 19, 2025
7 checks passed
@yruslan yruslan deleted the feature/809-add-compression-support branch December 19, 2025 12:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Zip/gZip read a compressed binary file without having previously decompressed it

2 participants