Skip to content

perf(arrow codec): optimize scalar RecordBatch encoding#25734

Open
dannote wants to merge 1 commit into
vectordotdev:masterfrom
dannote:arrow-scalar-recordbatch
Open

perf(arrow codec): optimize scalar RecordBatch encoding#25734
dannote wants to merge 1 commit into
vectordotdev:masterfrom
dannote:arrow-scalar-recordbatch

Conversation

@dannote

@dannote dannote commented Jul 1, 2026

Copy link
Copy Markdown

perf(arrow codec): optimize scalar RecordBatch encoding

Summary

This PR optimizes ArrowStreamSerializer::encode_to_record_batch for table/native sinks that encode log events into an Arrow RecordBatch with a fixed scalar schema.

Previously, this path always encoded through an intermediate JSON representation:

Vector Event -> serde_json::Value -> Arrow JSON decoder -> RecordBatch

That path is flexible and remains the fallback. This PR adds a direct path for schemas made of supported scalar Arrow types:

Vector Event -> Arrow builders -> RecordBatch

The direct path currently supports:

  • booleans
  • signed and unsigned integers
  • floats
  • UTF-8 strings
  • binary values
  • date/time/timestamp values
  • decimal128 values

If the schema contains unsupported types, or if a value needs Arrow's more permissive JSON decoder behavior, encoding falls back to the existing JSON decoder path.

Vector configuration

This change is in the Arrow codec internals and does not add or change Vector configuration.

The motivating workload was a schema-aware sink that appends Arrow RecordBatch values into a table-native destination. In that setup, the destination append path was fast enough that RecordBatch construction became a visible part of the hot path.

Representative test configuration used for end-to-end validation while developing the downstream sink:

[sources.file]
type = "file"
include = ["/tmp/vector-duckdb-events-1m.ndjson"]
read_from = "beginning"
ignore_checkpoints = true

[transforms.parse]
type = "remap"
inputs = ["file"]
source = '''
. = parse_json!(.message)
'''

[sinks.duckdb]
type = "duckdb"
inputs = ["parse"]
endpoint = "/tmp/vector-file-duckdb.duckdb"
table = "events"
batch.max_events = 1000
request.concurrency = 2

The DuckDB sink itself is not part of this PR; this configuration is included only to explain the use case that exposed the bottleneck.

How did you test this PR?

Added tests covering the direct RecordBatch path and its fallback behavior:

  • direct scalar RecordBatch output matches the existing JSON decoder path
  • binary values encode correctly
  • nullable missing fields are encoded as null
  • missing non-nullable fields still reject
  • nested/list schemas fall back to the existing JSON decoder path
  • parseable timestamp strings fall back to the existing JSON decoder path

Ran the following local checks:

PROTOC=/tmp/protoc-vector/protoc mise exec rust@1.95 -- \
  cargo test -p codecs --features arrow encoding::format::arrow -- --nocapture

PROTOC=/tmp/protoc-vector/protoc mise exec rust@1.95 -- \
  cargo test -p codecs --features arrow

PROTOC=/tmp/protoc-vector/protoc mise exec rust@1.95 -- \
  cargo check -p codecs --features arrow

PROTOC=/tmp/protoc-vector/protoc mise exec rust@1.95 -- \
  cargo check --no-default-features --features codecs-arrow

git diff --check

All passed.

I also ran release-mode stress tests against the downstream table-native sink workload while developing this change. In a direct sink stress test with a scalar schema:

1,000,000 events
batch.max_events = 1000
request.concurrency = 2

Before the direct path:

~475k events/s
RecordBatch encode stage: ~0.606s

After the direct path:

~605k events/s
RecordBatch encode stage: ~0.220s

In a full file-source JSON pipeline, file reading and JSON parsing dominated, and the sink using RecordBatch encoding was close to a blackhole baseline:

file -> remap -> blackhole: ~195k events/s
file -> remap -> sink using RecordBatch: ~193k events/s

Change Type

  • Bug fix
  • New feature
  • Dependencies
  • Non-functional (chore, refactoring, docs)
  • Performance

Is this a breaking change?

  • Yes
  • No

Does this PR include user facing changes?

  • Yes. Please add a changelog fragment based on our guidelines.
  • No. A maintainer will apply the no-changelog label to this PR.

References

Related follow-up work: a table-native DuckDB sink that uses ArrowStreamSerializer::encode_to_record_batch for appends.

Notes

The existing JSON decoder path is intentionally preserved as the fallback. This keeps compatibility for nested schemas and for values that Arrow's JSON decoder can coerce more broadly than the direct scalar path.

@dannote dannote requested a review from a team as a code owner July 1, 2026 19:48
@github-actions

github-actions Bot commented Jul 1, 2026

Copy link
Copy Markdown
Contributor

Thank you for your contribution! Before we can merge this PR, please sign our Contributor License Agreement.

To sign, copy and post the phrase below as a new comment on this PR.

Note: If the bot says your username was not found, the email used in your git commit may not be linked to your GitHub account. Fix this at github.com/settings/emails, then comment recheck to retry.


I have read the CLA Document and I hereby sign the CLA


You can retrigger this bot by commenting recheck in this Pull Request. Posted by the CLA Assistant Lite bot.

@datadog-vectordotdev

Copy link
Copy Markdown

Pipelines

⚠️ Warnings

🚦 1 Pipeline job failed

CLA Assistant | CLAAssistant   View in Datadog   GitHub Actions

This comment will be updated automatically if new data arrives.
🔗 Commit SHA: d7c26fa | Docs | Give us feedback!

@dannote

dannote commented Jul 1, 2026

Copy link
Copy Markdown
Author

I have read the CLA Document and I hereby sign the CLA

@dannote

dannote commented Jul 2, 2026

Copy link
Copy Markdown
Author

recheck

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant