diff --git a/.github/workflows/exaforce-publish.yaml b/.github/workflows/exaforce-publish.yaml index 2a00d16a01d9a..45600f338e8e4 100644 --- a/.github/workflows/exaforce-publish.yaml +++ b/.github/workflows/exaforce-publish.yaml @@ -58,6 +58,12 @@ jobs: uses: actions/checkout@v4 - name: Bootstrap runner environment (Ubuntu-specific) run: sudo -E bash scripts/environment/bootstrap-ubuntu-20.04.sh + # Required for some reason for 1.83 + - name: Install Rust toolchain explicitly + run: | + rustup set profile minimal + rustup toolchain install $(grep 'channel' rust-toolchain.toml | cut -d '"' -f 2) --profile minimal + rustup default $(grep 'channel' rust-toolchain.toml | cut -d '"' -f 2) - name: Bootstrap runner environment (generic) run: bash scripts/environment/prepare.sh - name: Build Vector @@ -83,6 +89,12 @@ jobs: uses: actions/checkout@v4 - name: Bootstrap runner environment (Ubuntu-specific) run: sudo -E bash scripts/environment/bootstrap-ubuntu-20.04.sh + # Required for some reason for 1.83 + - name: Install Rust toolchain explicitly + run: | + rustup set profile minimal + rustup toolchain install $(grep 'channel' rust-toolchain.toml | cut -d '"' -f 2) --profile minimal + rustup default $(grep 'channel' rust-toolchain.toml | cut -d '"' -f 2) - name: Bootstrap runner environment (generic) run: bash scripts/environment/prepare.sh - name: Build Vector diff --git a/Cargo.lock b/Cargo.lock index 71573d66f7f19..4800725a44136 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -100,6 +100,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" dependencies = [ "cfg-if", + "const-random", "getrandom 0.2.15", "once_cell", "version_check", @@ -343,6 +344,93 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" +[[package]] +name = "arrow-array" +version = "39.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7a27466d897d99654357a6d95dc0a26931d9e4306e60c14fc31a894edb86579" +dependencies = [ + "ahash 0.8.11", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "chrono", + "half", + "hashbrown 0.13.1", + "num", +] + +[[package]] +name = "arrow-buffer" +version = "39.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9405b78106a9d767c7b97c78a70ee1b23ee51a74f5188a821a716d9a85d1af2b" +dependencies = [ + "half", + "num", +] + +[[package]] +name = "arrow-cast" +version = "39.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be0ec5a79a87783dc828b7ff8f89f62880b3f553bc5f5b932a82f4a1035024b4" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", + "chrono", + "lexical-core", + "num", +] + +[[package]] +name = "arrow-data" +version = "39.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6f710d98964d2c069b8baf566130045e79e11baa105623f038a6c942f805681" +dependencies = [ + "arrow-buffer", + "arrow-schema", + "half", + "num", +] + +[[package]] +name = "arrow-ipc" +version = "39.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c99787cb8fabc187285da9e7182d22f2b80ecfac61ca0a42c4299e9eecdf903" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-schema", + "flatbuffers", +] + +[[package]] +name = "arrow-schema" +version = "39.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18c41d058b2895a12f46dfafc306ee3529ad9660406be0ab8a7967d5e27c417e" + +[[package]] +name = "arrow-select" +version = "39.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fcbdda2772b7e712e77444f3a71f4ee517095aceb993b35de71de41c70d9b4f" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "num", +] + [[package]] name = "ascii" version = "0.9.3" @@ -413,7 +501,7 @@ version = "0.4.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df895a515f70646414f4b45c0b79082783b80552b373a68283012928df56f522" dependencies = [ - "brotli", + "brotli 7.0.0", "flate2", "futures-core", "memchr", @@ -1536,6 +1624,24 @@ dependencies = [ "uuid", ] +[[package]] +name = "azure_storage_queues" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "962b4ba3486866eb8f9b9b19a1635f34eeb2ae2c6f61726e1849dc941abb7760" +dependencies = [ + "azure_core", + "azure_storage", + "futures 0.3.31", + "log", + "serde", + "serde_derive", + "serde_json", + "time", + "url", + "uuid", +] + [[package]] name = "backoff" version = "0.4.0" @@ -1803,6 +1909,17 @@ dependencies = [ "syn_derive", ] +[[package]] +name = "brotli" +version = "3.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1a0b1dbcc8ae29329621f8d4f0d835787c1c38bb1401979b49d13b0b305ff68" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", + "brotli-decompressor 2.3.4", +] + [[package]] name = "brotli" version = "7.0.0" @@ -1811,7 +1928,17 @@ checksum = "cc97b8f16f944bba54f0433f07e30be199b6dc2bd25937444bbad560bcea29bd" dependencies = [ "alloc-no-stdlib", "alloc-stdlib", - "brotli-decompressor", + "brotli-decompressor 4.0.0", +] + +[[package]] +name = "brotli-decompressor" +version = "2.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b6561fd3f895a11e8f72af2cb7d22e08366bebc2b6b57f7744c4bda27034744" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", ] [[package]] @@ -2258,6 +2385,7 @@ dependencies = [ "influxdb-line-protocol", "memchr", "ordered-float 4.6.0", + "parquet", "prost 0.12.6", "prost-reflect", "rand 0.8.5", @@ -2442,6 +2570,26 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28c122c3980598d243d63d9a704629a2d748d101f278052ff068be5a4423ab6f" +[[package]] +name = "const-random" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87e00182fe74b066627d63b85fd550ac2998d4b0bd86bfed477a0ae4c7c71359" +dependencies = [ + "const-random-macro", +] + +[[package]] +name = "const-random-macro" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e" +dependencies = [ + "getrandom 0.2.15", + "once_cell", + "tiny-keccak", +] + [[package]] name = "const_fn" version = "0.4.9" @@ -3659,6 +3807,16 @@ version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d52a7e408202050813e6f1d9addadcaafef3dca7530c7ddfb005d4081cce6779" +[[package]] +name = "flatbuffers" +version = "23.1.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77f5399c2c9c50ae9418e522842ad362f61ee48b346ac106807bd355a8a7c619" +dependencies = [ + "bitflags 1.3.2", + "rustc_version 0.4.1", +] + [[package]] name = "flate2" version = "1.0.35" @@ -4149,12 +4307,12 @@ dependencies = [ [[package]] name = "half" -version = "2.4.1" +version = "2.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6dd08c532ae367adf81c312a4580bc67f1d0fe8bc9c460520283f4c0ff277888" +checksum = "02b4af3693f1b705df946e9fe5631932443781d0aabb423b62fcd4d73f6d2fd0" dependencies = [ - "cfg-if", "crunchy", + "num-traits", ] [[package]] @@ -5086,6 +5244,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "integer-encoding" +version = "3.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" + [[package]] name = "inventory" version = "0.3.19" @@ -5535,6 +5699,70 @@ dependencies = [ "spin 0.5.2", ] +[[package]] +name = "lexical-core" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2cde5de06e8d4c2faabc400238f9ae1c74d5412d03a7bd067645ccbc47070e46" +dependencies = [ + "lexical-parse-float", + "lexical-parse-integer", + "lexical-util", + "lexical-write-float", + "lexical-write-integer", +] + +[[package]] +name = "lexical-parse-float" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "683b3a5ebd0130b8fb52ba0bdc718cc56815b6a097e28ae5a6997d0ad17dc05f" +dependencies = [ + "lexical-parse-integer", + "lexical-util", + "static_assertions", +] + +[[package]] +name = "lexical-parse-integer" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d0994485ed0c312f6d965766754ea177d07f9c00c9b82a5ee62ed5b47945ee9" +dependencies = [ + "lexical-util", + "static_assertions", +] + +[[package]] +name = "lexical-util" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5255b9ff16ff898710eb9eb63cb39248ea8a5bb036bea8085b1a767ff6c4e3fc" +dependencies = [ + "static_assertions", +] + +[[package]] +name = "lexical-write-float" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accabaa1c4581f05a3923d1b4cfd124c329352288b7b9da09e766b0668116862" +dependencies = [ + "lexical-util", + "lexical-write-integer", + "static_assertions", +] + +[[package]] +name = "lexical-write-integer" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1b6f3d1f4422866b68192d62f77bc5c700bee84f3069f2469d7bc8c77852446" +dependencies = [ + "lexical-util", + "static_assertions", +] + [[package]] name = "libc" version = "0.2.169" @@ -6390,6 +6618,20 @@ dependencies = [ "rand 0.8.5", ] +[[package]] +name = "num" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43db66d1170d347f9a065114077f7dccb00c1b9478c89384490a3425279a4606" +dependencies = [ + "num-bigint", + "num-complex", + "num-integer", + "num-iter", + "num-rational 0.4.1", + "num-traits", +] + [[package]] name = "num-bigint" version = "0.4.4" @@ -6476,6 +6718,18 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-rational" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0638a1c9d0a3c0914158145bc76cff373a75a627e6ecbfb71cbe6f453a5a19b0" +dependencies = [ + "autocfg", + "num-bigint", + "num-integer", + "num-traits", +] + [[package]] name = "num-traits" version = "0.2.19" @@ -6900,6 +7154,37 @@ dependencies = [ "windows-targets 0.48.5", ] +[[package]] +name = "parquet" +version = "39.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0a1e6fa27f09ebddba280f5966ef435f3ac4d74cfc3ffe370fd3fd59c2e004d" +dependencies = [ + "ahash 0.8.11", + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-ipc", + "arrow-schema", + "arrow-select", + "base64 0.21.7", + "brotli 3.3.4", + "bytes 1.10.0", + "chrono", + "flate2", + "hashbrown 0.13.1", + "lz4", + "num", + "num-bigint", + "paste", + "seq-macro", + "snap", + "thrift", + "twox-hash 1.6.3", + "zstd 0.12.4", +] + [[package]] name = "parse-size" version = "1.1.0" @@ -8886,6 +9171,12 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" +[[package]] +name = "seq-macro" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6b44e8fc93a14e66336d230954dda83d18b4605ccace8fe09bc7514a71ad0bc" + [[package]] name = "serde" version = "1.0.217" @@ -9854,6 +10145,17 @@ dependencies = [ "once_cell", ] +[[package]] +name = "thrift" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e54bc85fc7faa8bc175c4bab5b92ba8d9a3ce893d0e9f42cc455c8ab16a9e09" +dependencies = [ + "byteorder", + "integer-encoding", + "ordered-float 2.10.1", +] + [[package]] name = "tikv-jemalloc-sys" version = "0.6.0+5.3.0-1-ge13ca993e8ccb9ba9847cc330696e02839f328f7" @@ -9908,6 +10210,15 @@ dependencies = [ "time-core", ] +[[package]] +name = "tiny-keccak" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237" +dependencies = [ + "crunchy", +] + [[package]] name = "tinystr" version = "0.7.6" @@ -10633,6 +10944,16 @@ dependencies = [ "utf-8", ] +[[package]] +name = "twox-hash" +version = "1.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" +dependencies = [ + "cfg-if", + "static_assertions", +] + [[package]] name = "twox-hash" version = "2.1.0" @@ -10842,7 +11163,7 @@ version = "0.31.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b1ee6bfd0a27bf614353809a035cf6880b74239ec6c5e39a7b2860ca16809137" dependencies = [ - "num-rational", + "num-rational 0.3.2", "num-traits", "typenum", ] @@ -10953,6 +11274,7 @@ dependencies = [ name = "vector" version = "0.45.0" dependencies = [ + "anyhow", "apache-avro", "approx", "arc-swap", @@ -10989,6 +11311,7 @@ dependencies = [ "azure_identity", "azure_storage", "azure_storage_blobs", + "azure_storage_queues", "base64 0.22.1", "bloomy", "bollard", @@ -11417,7 +11740,7 @@ dependencies = [ "tokio-util", "tower", "tracing 0.1.41", - "twox-hash", + "twox-hash 2.1.0", "vector-common", "vector-core", ] diff --git a/Cargo.toml b/Cargo.toml index b1ef6c6cb9827..8369dcce03539 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -171,6 +171,7 @@ vector-config-macros = { path = "lib/vector-config-macros" } vrl = { version = "0.22.0", features = ["arbitrary", "cli", "test", "test_framework"] } [dependencies] +reqwest = { version = "0.11", features = ["json"] } pin-project.workspace = true clap.workspace = true uuid.workspace = true @@ -245,6 +246,7 @@ azure_core = { version = "0.17", default-features = false, features = ["enable_r azure_identity = { version = "0.17", default-features = false, features = ["enable_reqwest"], optional = true } azure_storage = { version = "0.17", default-features = false, optional = true } azure_storage_blobs = { version = "0.17", default-features = false, optional = true } +azure_storage_queues = { version = "0.17", default-features = false, optional = true } # OpenDAL opendal = { version = "0.45", default-features = false, features = ["native-tls", "services-webhdfs"], optional = true } @@ -382,6 +384,7 @@ url = { version = "2.5.4", default-features = false, features = ["serde"] } warp = { version = "0.3.7", default-features = false } zstd = { version = "0.13.0", default-features = false } arr_macro = { version = "0.2.1" } +anyhow = "1.0.82" # depending on fork for bumped nix dependency # https://github.com/heim-rs/heim/pull/360 @@ -418,6 +421,7 @@ azure_core = { version = "0.17", default-features = false, features = ["enable_r azure_identity = { version = "0.17", default-features = false, features = ["enable_reqwest"] } azure_storage_blobs = { version = "0.17", default-features = false, features = ["azurite_workaround"] } azure_storage = { version = "0.17", default-features = false } +azure_storage_queues = { version = "0.17", default-features = false } base64 = "0.22.1" criterion = { version = "0.5.1", features = ["html_reports", "async_tokio"] } itertools = { version = "0.14.0", default-features = false, features = ["use_alloc"] } @@ -550,6 +554,7 @@ sources-logs = [ "sources-aws_kinesis_firehose", "sources-aws_s3", "sources-aws_sqs", + "sources-azure_blob", "sources-datadog_agent", "sources-demo_logs", "sources-docker_logs", @@ -598,6 +603,7 @@ sources-aws_ecs_metrics = ["sources-utils-http-client"] sources-aws_kinesis_firehose = ["dep:base64"] sources-aws_s3 = ["aws-core", "dep:aws-sdk-sqs", "dep:aws-sdk-s3", "dep:semver", "dep:async-compression", "sources-aws_sqs", "tokio-util/io"] sources-aws_sqs = ["aws-core", "dep:aws-sdk-sqs"] +sources-azure_blob= ["dep:azure_storage_queues"] sources-datadog_agent = ["sources-utils-http-error", "protobuf-build", "dep:prost"] sources-demo_logs = ["dep:fakedata"] sources-dnstap = ["sources-utils-net-tcp", "dep:base64", "dep:hickory-proto", "dep:dnsmsg-parser", "dep:dnstap-parser", "protobuf-build", "dep:prost"] @@ -878,7 +884,8 @@ aws-integration-tests = [ ] azure-integration-tests = [ - "azure-blob-integration-tests" + "azure-blob-integration-tests", + "azure-blob-source-integration-tests" ] aws-cloudwatch-logs-integration-tests = ["sinks-aws_cloudwatch_logs"] @@ -892,6 +899,7 @@ aws-sqs-integration-tests = ["sinks-aws_sqs"] aws-sns-integration-tests = ["sinks-aws_sns"] axiom-integration-tests = ["sinks-axiom"] azure-blob-integration-tests = ["sinks-azure_blob"] +azure-blob-source-integration-tests = ["sources-azure_blob"] chronicle-integration-tests = ["sinks-gcp"] clickhouse-integration-tests = ["sinks-clickhouse"] databend-integration-tests = ["sinks-databend"] diff --git a/lib/codecs/Cargo.toml b/lib/codecs/Cargo.toml index 722400ac595aa..821fbb8c78a64 100644 --- a/lib/codecs/Cargo.toml +++ b/lib/codecs/Cargo.toml @@ -1,9 +1,9 @@ [package] -name = "codecs" -version = "0.1.0" authors = ["Vector Contributors "] edition = "2021" +name = "codecs" publish = false +version = "0.1.0" [[bin]] name = "generate-avro-fixtures" @@ -21,6 +21,7 @@ influxdb-line-protocol = { version = "2", default-features = false } lookup = { package = "vector-lookup", path = "../vector-lookup", default-features = false, features = ["test"] } memchr = { version = "2", default-features = false } ordered-float = { version = "4.6.0", default-features = false } +parquet = {version = "39.0.0", default-feature = false} prost.workspace = true prost-reflect.workspace = true rand.workspace = true diff --git a/lib/codecs/src/encoding/format/mod.rs b/lib/codecs/src/encoding/format/mod.rs index 225cdb7f5bf33..a35fd775a9bd4 100644 --- a/lib/codecs/src/encoding/format/mod.rs +++ b/lib/codecs/src/encoding/format/mod.rs @@ -13,12 +13,14 @@ mod logfmt; mod native; mod native_json; mod protobuf; +mod parquet; mod raw_message; mod text; use std::fmt::Debug; pub use self::csv::{CsvSerializer, CsvSerializerConfig}; +pub use self::parquet::{ParquetSerializer, ParquetSerializerConfig, ParquetSerializerOptions}; pub use avro::{AvroSerializer, AvroSerializerConfig, AvroSerializerOptions}; pub use cef::{CefSerializer, CefSerializerConfig}; use dyn_clone::DynClone; diff --git a/lib/codecs/src/encoding/format/parquet.rs b/lib/codecs/src/encoding/format/parquet.rs new file mode 100644 index 0000000000000..9b5fdf5f2174b --- /dev/null +++ b/lib/codecs/src/encoding/format/parquet.rs @@ -0,0 +1,1317 @@ +use core::panic; +use std::{io, sync::Arc}; + +use bytes::{BufMut, BytesMut}; +use parquet::{ + basic::{LogicalType, Repetition, Type as PhysicalType}, + column::writer::{ColumnWriter::*, ColumnWriterImpl}, + data_type::DataType, + errors::ParquetError, + file::{properties::WriterProperties, writer::SerializedFileWriter}, + schema::{ + parser::parse_message_type, + types::{BasicTypeInfo, ColumnDescriptor, Type, TypePtr}, + }, +}; +use serde::{Deserialize, Serialize}; +use snafu::*; +use tokio_util::codec::Encoder; +use tracing::error; + +use vector_config::configurable_component; +use vector_core::{ + config, + event::{Event, Value}, + schema, +}; + +use crate::encoding::BuildError; + +/// Errors that can occur during Parquet serialization. +#[derive(Debug, Snafu)] +pub enum ParquetSerializerError { + #[snafu(display(r#"Event does not contain required field. field = "{}""#, field))] + MissingField { + field: String, + }, + #[snafu(display( + r#"Event contains a value with an invalid type. field = "{}" type = "{}" expected type = "{}""#, + field, + actual_type, + expected_type + ))] + InvalidValueType { + field: String, + actual_type: String, + expected_type: String, + }, + #[snafu(display("Failed to write. error: {}", error))] + ParquetError { + error: ParquetError, + }, + IoError { + source: io::Error, + }, +} + +impl ParquetSerializerError { + fn invalid_type( + desc: &ColumnDescriptor, + value: &Value, + expected: &str, + ) -> ParquetSerializerError { + ParquetSerializerError::InvalidValueType { + field: desc.name().to_string(), + actual_type: value.kind_str().to_string(), + expected_type: expected.to_string(), + } + } +} + +impl From for ParquetSerializerError { + fn from(error: ParquetError) -> Self { + Self::ParquetError { error } + } +} + +impl From for ParquetSerializerError { + fn from(source: io::Error) -> Self { + Self::IoError { source } + } +} + +/// Config used to build a `ParquetSerializer`. +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct ParquetSerializerConfig { + /// Options for the Parquet serializer. + pub parquet: ParquetSerializerOptions, +} + +impl ParquetSerializerConfig { + /// Creates a new `ParquetSerializerConfig`. + pub const fn new(schema: String) -> Self { + Self { + parquet: ParquetSerializerOptions { schema }, + } + } + + /// Build the `ParquetSerializerConfig` from this configuration. + pub fn build(&self) -> Result { + let schema = parse_message_type(&self.parquet.schema) + .map_err(|error| format!("Failed building Parquet serializer: {}", error))?; + self.validate_logical_schema(&schema) + .map_err(|error| format!("Failed building Parquet serializer: {}", error))?; + Ok(ParquetSerializer { + schema: Arc::new(schema), + }) + } + + /// The data type of events that are accepted by `ParquetSerializer`. + pub fn input_type(&self) -> config::DataType { + config::DataType::Log + } + + /// The schema required by the serializer. + pub fn schema_requirement(&self) -> schema::Requirement { + // TODO: Convert the Parquet schema to a vector schema requirement. + // NOTE: This isn't yet doable. We don't have meanings to + // to specify for requirement. + schema::Requirement::empty() + } + + fn validate_logical_schema(&self, schema: &Type) -> Result<(), String> { + let info = schema.get_basic_info(); + match info.logical_type() { + // Validate LIST types + Some(LogicalType::List) => { + Self::not_repeated(schema, "LIST")?; + let list = Self::single_child(schema, "LIST")?; + + Self::repeated(list, "child of LIST")?; + let element = Self::single_child(list, "list of LIST")?; + + Self::not_repeated(element, "element of LIST")?; + self.validate_logical_schema(element)?; + } + // Validate MAP types + Some(LogicalType::Map) => { + Self::not_repeated(schema, "MAP")?; + let key_value = Self::single_child(schema, "MAP")?; + + Self::repeated(key_value, "child of MAP")?; + match key_value.get_fields().len() { + 1 | 2 => (), + _ => { + return Err(format!( + "Invalid MAP type. key_value of MAP type must have one or two children, found {}.", + key_value.get_fields().len() + )); + } + } + + let mut found_key = false; + for element in key_value.get_fields() { + match element.name() { + "key" => { + found_key = true; + Self::required(element, "key of MAP")?; + if !element.is_primitive() + || element.get_physical_type() != PhysicalType::BYTE_ARRAY + { + return Err( + "Invalid primitive type for key of MAP type. Must be binary." + .to_string(), + ); + } + } + _ => self.validate_logical_schema(element)?, + } + } + if !found_key { + return Err( + "Invalid MAP type. key_value of MAP type must have a child named \"key\"." + .to_string(), + ); + } + } + _ if schema.is_group() => { + for field in schema.get_fields() { + self.validate_logical_schema(field)?; + } + } + _ => (), + } + Ok(()) + } + + fn not_repeated(ty: &Type, kind: &str) -> Result<(), String> { + let info = ty.get_basic_info(); + if info.has_repetition() && info.repetition() == Repetition::REPEATED { + Err(format!( + "Invalid repetition for {kind} type. repetition = {:?}", + info.repetition() + )) + } else { + Ok(()) + } + } + + fn repeated(ty: &Type, kind: &str) -> Result<(), String> { + let info = ty.get_basic_info(); + if !info.has_repetition() || info.repetition() != Repetition::REPEATED { + Err(format!( + "Invalid repetition for {kind} type. repetition = {:?}", + if info.has_repetition() { + info.repetition() + } else { + Repetition::REQUIRED + } + )) + } else { + Ok(()) + } + } + + fn required(ty: &Type, kind: &str) -> Result<(), String> { + let info = ty.get_basic_info(); + if !info.has_repetition() || info.repetition() == Repetition::REQUIRED { + Err(format!( + "Invalid repetition for {kind} type. repetition = {:?}", + info.repetition() + )) + } else { + Ok(()) + } + } + + fn single_child<'a>(ty: &'a Type, kind: &str) -> Result<&'a Type, String> { + let len = ty.get_fields().len(); + if len != 1 { + Err(format!( + "Invalid {kind} type. Expected one child, found {len}.", + )) + } else { + Ok(ty.get_fields().get(0).expect("Should have a child.")) + } + } +} + +/// Options for the Parquet serializer. +#[configurable_component] +#[derive(Clone, Debug)] +pub struct ParquetSerializerOptions { + /// The Parquet schema. + #[configurable(metadata(docs::examples = r#"message test { + required group data { + required binary name; + repeated int64 values; + } + }"#))] + pub schema: String, +} + +/// Serializer that converts `Vec` to bytes using the Apache Parquet format. +#[derive(Debug, Clone)] +pub struct ParquetSerializer { + schema: TypePtr, +} + +impl ParquetSerializer { + /// Creates a new `ParquetSerializer`. + pub const fn new(schema: TypePtr) -> Self { + Self { schema } + } +} + +impl ParquetSerializer { + fn process( + &self, + events: &[Event], + desc: &ColumnDescriptor, + extractor: impl Fn(&Value) -> Result<::T, ParquetSerializerError>, + writer: &mut ColumnWriterImpl, + ) -> Result<(), ParquetSerializerError> { + let mut column = Column::<::T, _>::new(&*self.schema, desc, extractor); + column.extract_column(events)?; + let written_values = writer.write_batch( + &column.values, + column.def_levels.as_ref().map(|vec| vec.as_slice()), + column.rep_levels.as_ref().map(|vec| vec.as_slice()), + )?; + + assert_eq!(written_values, column.values.len()); + Ok(()) + } +} + +impl Encoder> for ParquetSerializer { + type Error = vector_common::Error; + + /// Builds columns from events and writes them to the writer. + /// + /// Expects that all events satisfy the schema, else whole batch can fail. + fn encode(&mut self, events: Vec, buffer: &mut BytesMut) -> Result<(), Self::Error> { + // Encode events + let props = Arc::new(WriterProperties::builder().build()); + let mut parquet_writer = + SerializedFileWriter::new(buffer.writer(), self.schema.clone(), props)?; + + let mut row_group_writer = parquet_writer.next_row_group()?; + while let Some(mut column_writer) = row_group_writer.next_column()? { + match column_writer.untyped() { + BoolColumnWriter(ref mut writer) => { + let desc = writer.get_descriptor().clone(); + self.process( + &events, + &desc, + |value| match value { + Value::Boolean(value) => Ok(*value), + _ => Err(ParquetSerializerError::invalid_type( + &desc, value, "boolean", + )), + }, + writer, + )? + } + Int64ColumnWriter(writer) => { + let desc = writer.get_descriptor().clone(); + self.process( + &events, + &desc, + |value| match value { + Value::Integer(value) => Ok(*value), + _ => Err(ParquetSerializerError::invalid_type( + &desc, value, "integer", + )), + }, + writer, + )? + } + DoubleColumnWriter(writer) => { + let desc = writer.get_descriptor().clone(); + self.process( + &events, + &desc, + |value| match value { + Value::Float(value) => Ok(value.into_inner()), + _ => Err(ParquetSerializerError::invalid_type(&desc, value, "float")), + }, + writer, + )? + } + ByteArrayColumnWriter(writer) => { + let desc = writer.get_descriptor().clone(); + self.process( + &events, + &desc, + |value| match value { + Value::Bytes(value) => Ok(value.clone().into()), + _ => Err(ParquetSerializerError::invalid_type(&desc, value, "string")), + }, + writer, + )? + } + FixedLenByteArrayColumnWriter(_) => { + panic!("Fixed len byte array is not supported."); + } + Int32ColumnWriter(_) => panic!("Int32 is not supported."), + Int96ColumnWriter(_) => panic!("Int96 is not supported."), + FloatColumnWriter(_) => panic!("Float32 is not supported."), + } + column_writer.close()?; + } + + row_group_writer.close()?; + parquet_writer.close()?; + + Ok(()) + } +} + +struct Column<'a, T, F: Fn(&Value) -> Result> { + levels: Vec<&'a Type>, + extract: F, + values: Vec, + // If present encodes definition level. From 0 to column.max_def_level() inclusive. + // With any value bellow max encoding null on that level. + // One thing to keep in mind, if a column is required on some "level" then that level is not counted here. + // This is needed when values are optional. + // In case of null, that value is skipped in values. + def_levels: Option>, + // If present encodes repetition level. + // From 0 to column.max_rep_level() inclusive. With 0 starting a new record and any value bellow max encoding + // starting new list at that level. With max level just adding element to leaf list. + // This is needed when values are repeated. Where that repetition can have multiple nested levels. + rep_levels: Option>, +} + +impl<'a, T, F: Fn(&Value) -> Result> Column<'a, T, F> { + fn new(schema: &'a Type, column: &'a ColumnDescriptor, extract: F) -> Self { + let mut levels = vec![schema]; + for part in column.path().parts() { + match &levels[levels.len() - 1] { + Type::GroupType { fields, .. } => { + let field = fields + .iter() + .find(|field| field.name() == part) + .expect("Field not found in schema."); + levels.push(field); + } + Type::PrimitiveType { .. } => unreachable!(), + } + } + + let def_levels = if levels.iter().any(|ty| ty.is_optional()) { + Some(Vec::new()) + } else { + None + }; + + let rep_levels = if levels.iter().any(|ty| { + let info = ty.get_basic_info(); + info.has_repetition() && info.repetition() == Repetition::REPEATED + }) { + Some(Vec::new()) + } else { + None + }; + + Self { + levels, + extract, + values: Vec::new(), + def_levels, + rep_levels, + } + } + + fn extract_column(&mut self, events: &[Event]) -> Result<(), ParquetSerializerError> { + for event in events { + let res = match event { + Event::Log(log) => { + self.extract_value(log.value(), Level::root()) + } + Event::Trace(trace) => { + self.extract_value(trace.value(), Level::root()) + } + Event::Metric(_) => { + panic!("Metrics are not supported."); + } + }; + res.inspect_err(|error| { + // event to json string + match serde_json::to_string(&event) { + Ok(event) => error!( + error = ?error, + event = event, + ), + Err(e) => error!( + error = ?error, + event = ?event, + serde_error = %e, + ), + } + })?; + } + Ok(()) + } + + /// Will push at least one value, or error. + fn extract_value(&mut self, value: &Value, level: Level) -> Result<(), ParquetSerializerError> { + if let Some(part) = self.levels.get(level.level) { + let sub = match value { + Value::Object(object) => object.get(part.name()), + Value::Null => None, + // Invalid type, error + value => { + return Err(ParquetSerializerError::InvalidValueType { + field: self.path(level), + actual_type: value.kind_str().to_string(), + expected_type: "object".to_string(), + }); + } + }; + + self.process_value(sub, level) + } else if matches!(value, Value::Null) { + self.push_value(None, level.undefine()); + Ok(()) + } else { + let value = (self.extract)(value)?; + self.push_value(Some(value), level); + Ok(()) + } + } + + /// Will push at least one value, or error. + fn process_value( + &mut self, + value: Option<&Value>, + level: Level, + ) -> Result<(), ParquetSerializerError> { + let part = self + .levels + .get(level.level) + .expect("We should have checked this before hand."); + match value { + Some(Value::Null) | None if part.is_optional() => { + self.push_value(None, level); + Ok(()) + } + // Illegal null, error + Some(Value::Null) | None => Err(ParquetSerializerError::MissingField { + field: self.path(level), + }), + Some(value) => { + let info = part.get_basic_info(); + match info.logical_type() { + Some(LogicalType::List) => { + if let Value::Array(array) = value { + let list_level = level.descend(info); + let element_level = list_level.descend_repeated(); + + if array.is_empty() { + self.push_value(None, list_level); + } else { + let mut now = element_level; + for element in array { + self.process_value(Some(element), now)?; + now = now.next(); + } + } + + Ok(()) + } else { + return Err(ParquetSerializerError::InvalidValueType { + field: self.path(level), + actual_type: value.kind_str().to_string(), + expected_type: "array".to_string(), + }); + } + } + Some(LogicalType::Map) => { + if let Value::Object(object) = value { + let key_value = level.descend(info); + let element_level = key_value.descend_repeated(); + + if self + .levels + .get(element_level.level) + .expect("This must be valid MAP") + .name() + == "key" + { + // Key field + if object.is_empty() { + self.push_value(None, key_value); + } else { + let mut now = element_level; + for key in object.keys() { + let value = (self.extract)(&Value::from(key.as_str()))?; + self.push_value(Some(value), now.descend_required()); + now = now.next(); + } + } + } else { + // Value field + if object.is_empty() { + self.push_value(None, key_value); + } else { + let mut now = element_level; + for value in object.values() { + self.process_value(Some(value), now)?; + now = now.next(); + } + } + } + + Ok(()) + } else { + return Err(ParquetSerializerError::InvalidValueType { + field: self.path(level), + actual_type: value.kind_str().to_string(), + expected_type: "object".to_string(), + }); + } + } + _ => self.extract_flat(value, level.descend(info)), + } + } + } + } + + /// Will push at least one value, or error. + fn extract_flat(&mut self, value: &Value, level: Level) -> Result<(), ParquetSerializerError> { + match value { + Value::Array(array) if level.repeated => { + if array.is_empty() { + self.push_value(None, level.undefine()); + } else { + let mut now = level; + for value in array { + self.extract_value(value, now)?; + now = now.next(); + } + } + + Ok(()) + } + _ => self.extract_value(value, level), + } + } + + fn push_value(&mut self, value: Option, level: Level) { + if let Some(rep_levels) = &mut self.rep_levels { + rep_levels.push(level.start_rep_level); + } + if let Some(def_levels) = &mut self.def_levels { + def_levels.push(level.def_level); + } + if let Some(value) = value { + self.values.push(value); + } + } + + fn path(&self, level: Level) -> String { + let mut path = String::new(); + for level in &self.levels[1..level.level] { + path.push_str(level.name()); + path.push('.'); + } + path.push_str(self.levels[level.level].name()); + path + } +} + +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +struct Level { + start_rep_level: i16, + rep_level: i16, + def_level: i16, + level: usize, + // If this level can be null + optional: bool, + // If this level is repeated + repeated: bool, +} + +impl Level { + fn root() -> Self { + Self { + start_rep_level: 0, + rep_level: 0, + def_level: 0, + level: 1, + optional: false, + repeated: false, + } + } + + fn next(self) -> Self { + assert!(self.repeated); + Self { + start_rep_level: self.rep_level, + ..self + } + } + + /// Descend implies that the level is defined. + fn descend(self, info: &BasicTypeInfo) -> Self { + if info.has_repetition() { + match info.repetition() { + Repetition::OPTIONAL => self.descend_optional(), + Repetition::REPEATED => self.descend_repeated(), + Repetition::REQUIRED => self.descend_required(), + } + } else { + self.descend_required() + } + } + + fn descend_required(self) -> Self { + Self { + level: self.level + 1, + repeated: false, + optional: false, + ..self + } + } + + fn descend_optional(self) -> Self { + Self { + def_level: self.def_level + 1, + level: self.level + 1, + repeated: false, + optional: true, + ..self + } + } + + fn descend_repeated(self) -> Self { + Self { + rep_level: self.rep_level + 1, + def_level: self.def_level + 1, + level: self.level + 1, + repeated: true, + optional: true, + ..self + } + } + + /// Undefine by one level + fn undefine(self) -> Self { + Self { + def_level: self.def_level - 1, + ..self + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use bytes::Bytes; + use parquet::{ + column::reader::{ColumnReader, ColumnReaderImpl}, + file::reader::*, + file::serialized_reader::SerializedFileReader, + schema::parser::parse_message_type, + }; + use similar_asserts::assert_eq; + use std::panic; + use std::{collections::HashSet, sync::Arc}; + use vector_core::event::LogEvent; + use vrl::value::btreemap; + + macro_rules! log_event { + ($($key:expr => $value:expr),* $(,)?) => { + #[allow(unused_variables)] + { + let mut event = Event::Log(LogEvent::default()); + let log = event.as_mut_log(); + $( + log.insert($key, $value); + )* + event + } + }; + } + + fn assert_column( + count: usize, + expect_values: &[::T], + expect_rep_levels: Option<&[i16]>, + expect_def_levels: Option<&[i16]>, + mut column_reader: ColumnReaderImpl, + ) where + ::T: Default, + { + let mut values = Vec::new(); + values.resize(count, ::T::default()); + let mut def_levels = Vec::new(); + def_levels.resize(count, 0); + let mut rep_levels = Vec::new(); + rep_levels.resize(count, 0); + let (read, level) = column_reader + .read_batch( + count, + Some(def_levels.as_mut_slice()).filter(|_| expect_def_levels.is_some()), + Some(rep_levels.as_mut_slice()).filter(|_| expect_rep_levels.is_some()), + &mut values, + ) + .unwrap(); + + assert_eq!(level, count); + assert_eq!(&values[..read], expect_values); + if expect_rep_levels.is_some() { + assert_eq!(rep_levels, expect_rep_levels.unwrap()); + } + if expect_def_levels.is_some() { + assert_eq!(def_levels, expect_def_levels.unwrap()); + } + } + + fn validate( + schema: &str, + events: Vec, + num_columns: usize, + validate: impl Fn(usize, &str, &dyn RowGroupReader), + ) { + let schema = Arc::new(parse_message_type(schema).unwrap()); + let mut encoder = ParquetSerializer::new(schema); + + let mut buffer = BytesMut::new(); + encoder.encode(events, &mut buffer).unwrap(); + + let reader = SerializedFileReader::new(buffer.freeze()).unwrap(); + + let parquet_metadata = reader.metadata(); + assert_eq!(parquet_metadata.num_row_groups(), 1); + + let row_group_reader = reader.get_row_group(0).unwrap(); + assert_eq!(row_group_reader.num_columns(), num_columns); + + let metadata = row_group_reader.metadata(); + let mut visited = HashSet::new(); + for (i, column) in metadata.columns().iter().enumerate() { + let path = column.column_path().string(); + assert!(visited.insert(path.clone())); + validate(i, &path, row_group_reader.as_ref()); + } + + assert_eq!(visited.len(), num_columns); + } + + #[test] + fn test_serialize() { + let message_type = r#" + message test { + required group a { + required boolean b; + optional int64 c; + } + optional group d { + optional int64 e; + } + required group f { + repeated int64 g; + } + required binary h; + repeated group i { + required int64 j; + repeated double k; + } + } + "#; + + let events = vec![ + log_event! { + "a.b" => true, + "a.c" => 2, + "d.e" => 3, + "f.g" => vec![4, 5], + "h" => "hello", + "i" => vec![btreemap! { + "j" => 6, + "k" => vec![7.0, 8.0] + }] + }, + log_event! { + "a.b" => false, + "f" => Value::Object(Default::default()), + "h" => "world", + "i" => vec![ + btreemap! { + "j" => 9, + "k" => vec![10.0] + }, btreemap! { + "j" => 11, + }] + }, + ]; + + validate( + message_type, + events, + 7, + |i, path, row_group_reader| match path { + "a.b" => { + let reader = match row_group_reader.get_column_reader(i).unwrap() { + ColumnReader::BoolColumnReader(r) => r, + _ => panic!("Wrong column type"), + }; + assert_column(2, &[true, false], None, None, reader); + } + "a.c" => { + let reader = match row_group_reader.get_column_reader(i).unwrap() { + ColumnReader::Int64ColumnReader(r) => r, + _ => panic!("Wrong column type"), + }; + assert_column(2, &[2], None, Some(&[1, 0]), reader); + } + "d.e" => { + let reader = match row_group_reader.get_column_reader(i).unwrap() { + ColumnReader::Int64ColumnReader(r) => r, + _ => panic!("Wrong column type"), + }; + assert_column(2, &[3], None, Some(&[2, 0]), reader); + } + "f.g" => { + let reader = match row_group_reader.get_column_reader(i).unwrap() { + ColumnReader::Int64ColumnReader(r) => r, + _ => panic!("Wrong column type"), + }; + assert_column(3, &[4, 5], Some(&[0, 1, 0]), Some(&[1, 1, 0]), reader); + } + "h" => { + let reader = match row_group_reader.get_column_reader(i).unwrap() { + ColumnReader::ByteArrayColumnReader(r) => r, + _ => panic!("Wrong column type"), + }; + assert_column( + 2, + &[Bytes::from("hello").into(), Bytes::from("world").into()], + None, + None, + reader, + ); + } + "i.j" => { + let reader = match row_group_reader.get_column_reader(i).unwrap() { + ColumnReader::Int64ColumnReader(r) => r, + _ => panic!("Wrong column type"), + }; + assert_column(3, &[6, 9, 11], Some(&[0, 0, 1]), Some(&[1, 1, 1]), reader); + } + "i.k" => { + let reader = match row_group_reader.get_column_reader(i).unwrap() { + ColumnReader::DoubleColumnReader(r) => r, + _ => panic!("Wrong column type"), + }; + assert_column( + 4, + &[7.0, 8.0, 10.0], + Some(&[0, 2, 0, 1]), + Some(&[2, 2, 2, 1]), + reader, + ); + } + _ => panic!("Unexpected column"), + }, + ); + } + + #[test] + fn test_value_null() { + let message_type = r#" + message test { + optional group geo{ + optional binary city_name (UTF8); + } + } + "#; + + let events = vec![ + log_event! { + "geo.city_name" => "hello", + }, + log_event! { + "geo.city_name" => Value::Null, + }, + ]; + + validate( + message_type, + events, + 1, + |i, path, row_group_reader| match path { + "geo.city_name" => { + let reader = match row_group_reader.get_column_reader(i).unwrap() { + ColumnReader::ByteArrayColumnReader(r) => r, + _ => panic!("Wrong column type"), + }; + assert_column( + 2, + &[Bytes::from("hello").into()], + None, + Some(&[2, 1]), + reader, + ); + } + _ => panic!("Unexpected column"), + }, + ); + } + + #[test] + fn test_value_null_stack_optional() { + let message_type = r#" + message test { + optional group a{ + optional group b{ + optional boolean c; + } + } + } + "#; + + let events = vec![ + log_event! {}, + log_event! {"a" => Value::Null}, + log_event! {"a.b" => Value::Null}, + log_event! {"a.b.c" => Value::Null}, + log_event! {"a.b.c" => Value::Boolean(true)}, + ]; + + validate( + message_type, + events, + 1, + |i, path, row_group_reader| match path { + "a.b.c" => { + let reader = match row_group_reader.get_column_reader(i).unwrap() { + ColumnReader::BoolColumnReader(r) => r, + _ => panic!("Wrong column type"), + }; + assert_column(5, &[true], None, Some(&[0, 0, 1, 2, 3]), reader); + } + _ => panic!("Unexpected column"), + }, + ); + } + + #[test] + fn test_value_null_repeated_optional() { + let message_type = r#" + message test { + repeated group a{ + optional boolean b; + } + } + "#; + + let events = vec![ + log_event! {}, + log_event! {"a" => Value::Null}, + log_event! {"a.b" => Value::Null}, + log_event! {"a.b" => Value::Boolean(false)}, + log_event! {"a" => vec![ + btreemap! { "b" => Value::Null }, + btreemap! { "b" => Value::Boolean(true) } + ]}, + ]; + validate( + message_type, + events, + 1, + |i, path, row_group_reader| match path { + "a.b" => { + let reader = match row_group_reader.get_column_reader(i).unwrap() { + ColumnReader::BoolColumnReader(r) => r, + _ => panic!("Wrong column type"), + }; + assert_column( + 6, + &[false, true], + Some(&[0, 0, 0, 0, 0, 1]), + Some(&[0, 0, 1, 2, 1, 2]), + reader, + ); + } + _ => panic!("Unexpected column"), + }, + ); + } + + #[test] + fn test_value_null_repeated() { + let message_type = r#" + message test { + repeated boolean a; + } + "#; + + let events = vec![ + log_event! {}, + log_event! {"a" => Value::Null}, + log_event! {"a" => Value::Boolean(false)}, + log_event! {"a" => vec![ + Value::Null, + Value::Boolean(true), + Value::Null, + ]}, + ]; + + validate( + message_type, + events, + 1, + |i, path, row_group_reader| match path { + "a" => { + let reader = match row_group_reader.get_column_reader(i).unwrap() { + ColumnReader::BoolColumnReader(r) => r, + _ => panic!("Wrong column type"), + }; + assert_column( + 6, + &[false, true], + Some(&[0, 0, 0, 0, 1, 1]), + Some(&[0, 0, 1, 0, 1, 0]), + reader, + ); + } + _ => panic!("Unexpected column"), + }, + ); + } + + #[test] + fn test_value_empty_repeated() { + let message_type = r#" + message test { + repeated boolean a; + } + "#; + + let events = vec![log_event! {"a" => Vec::::new()}]; + + validate( + message_type, + events, + 1, + |i, path, row_group_reader| match path { + "a" => { + let reader = match row_group_reader.get_column_reader(i).unwrap() { + ColumnReader::BoolColumnReader(r) => r, + _ => panic!("Wrong column type"), + }; + assert_column(1, &[], Some(&[0]), Some(&[0]), reader); + } + _ => panic!("Unexpected column"), + }, + ); + } + + #[test] + fn test_repeated() { + let message_type = r#" + message test { + repeated group answer { + optional binary name (UTF8); + optional INT64 ttl; + } + } + "#; + + let events = vec![log_event! { + "answer" => vec![ + btreemap! { + "name" => "test1", + "ttl" => 0, + }, btreemap! { + "name" => "test2", + "ttl" => 3600, + }] + }]; + + validate( + message_type, + events, + 2, + |i, path, row_group_reader| match path { + "answer.name" => { + let reader = match row_group_reader.get_column_reader(i).unwrap() { + ColumnReader::ByteArrayColumnReader(r) => r, + _ => panic!("Wrong column type"), + }; + assert_column( + 2, + &[Bytes::from("test1").into(), Bytes::from("test2").into()], + Some(&[0, 1]), + Some(&[2, 2]), + reader, + ); + } + "answer.ttl" => { + let reader = match row_group_reader.get_column_reader(i).unwrap() { + ColumnReader::Int64ColumnReader(r) => r, + _ => panic!("Wrong column type"), + }; + assert_column(2, &[0, 3600], Some(&[0, 1]), Some(&[2, 2]), reader); + } + _ => panic!("Unexpected column"), + }, + ); + } + + #[test] + fn test_list() { + let message_type = r#" + message test { + optional group answers (LIST){ + repeated group list { + optional boolean element; + } + } + } + "#; + + let events = vec![ + log_event! {}, + log_event! {"answers" => Value::Null}, + log_event! {"answers" => Vec::::new()}, + log_event! {"answers" => vec![ + Value::Null, + Value::Boolean(true), + Value::Null, + ]}, + ]; + + validate( + message_type, + events, + 1, + |i, path, row_group_reader| match path { + "answers.list.element" => { + let reader = match row_group_reader.get_column_reader(i).unwrap() { + ColumnReader::BoolColumnReader(r) => r, + _ => panic!("Wrong column type"), + }; + assert_column( + 6, + &[true], + Some(&[0, 0, 0, 0, 1, 1]), + Some(&[0, 0, 1, 2, 3, 2]), + reader, + ); + } + _ => panic!("Unexpected column"), + }, + ); + } + + #[test] + fn illegal_list_scheme() { + let config = ParquetSerializerConfig { + parquet: ParquetSerializerOptions { + schema: r#" + message test { + optional group answers (LIST){ + optional group list { + optional boolean element; + } + } + } + "# + .to_string(), + }, + }; + + assert!(config.build().is_err()); + } + + #[test] + fn test_map() { + let message_type = r#" + message test { + optional group answers (MAP){ + repeated group key_value { + required binary key (UTF8); + optional boolean value; + } + } + } + "#; + + let events = vec![ + log_event! {}, + log_event! {"answers" => Value::Null}, + log_event! {"answers" => btreemap!{}}, + log_event! {"answers" => btreemap!{ + "test1" => Value::Null, + "test2" => Value::Boolean(true), + "test3" => Value::Null, + }}, + ]; + + validate( + message_type, + events, + 2, + |i, path, row_group_reader| match path { + "answers.key_value.key" => { + let reader = match row_group_reader.get_column_reader(i).unwrap() { + ColumnReader::ByteArrayColumnReader(r) => r, + _ => panic!("Wrong column type"), + }; + assert_column( + 6, + &[ + Bytes::from("test1").into(), + Bytes::from("test2").into(), + Bytes::from("test3").into(), + ], + Some(&[0, 0, 0, 0, 1, 1]), + Some(&[0, 0, 1, 2, 2, 2]), + reader, + ); + } + "answers.key_value.value" => { + let reader = match row_group_reader.get_column_reader(i).unwrap() { + ColumnReader::BoolColumnReader(r) => r, + _ => panic!("Wrong column type"), + }; + assert_column( + 6, + &[true], + Some(&[0, 0, 0, 0, 1, 1]), + Some(&[0, 0, 1, 2, 3, 2]), + reader, + ); + } + _ => panic!("Unexpected column"), + }, + ); + } + + #[test] + fn illegal_map_scheme() { + let config = ParquetSerializerConfig { + parquet: ParquetSerializerOptions { + schema: r#" + message test { + optional group answers (MAP){ + repeated group key_value { + required binary str (UTF8); + optional boolean value; + } + } + } + "# + .to_string(), + }, + }; + + assert!(config.build().is_err()); + } +} diff --git a/lib/codecs/src/encoding/mod.rs b/lib/codecs/src/encoding/mod.rs index 411f3f692515f..c9c4966b912bb 100644 --- a/lib/codecs/src/encoding/mod.rs +++ b/lib/codecs/src/encoding/mod.rs @@ -12,7 +12,8 @@ pub use format::{ CefSerializerConfig, CsvSerializer, CsvSerializerConfig, GelfSerializer, GelfSerializerConfig, JsonSerializer, JsonSerializerConfig, JsonSerializerOptions, LogfmtSerializer, LogfmtSerializerConfig, NativeJsonSerializer, NativeJsonSerializerConfig, NativeSerializer, - NativeSerializerConfig, ProtobufSerializer, ProtobufSerializerConfig, + NativeSerializerConfig, ParquetSerializer, + ParquetSerializerConfig, ParquetSerializerOptions, ProtobufSerializer, ProtobufSerializerConfig, ProtobufSerializerOptions, RawMessageSerializer, RawMessageSerializerConfig, TextSerializer, TextSerializerConfig, }; @@ -256,6 +257,14 @@ pub enum SerializerConfig { /// could lead to the encoding emitting empty strings for the given event. RawMessage, + /// Encodes events in [Apache Parquet format][parquet]. + /// + /// [parquet]: https://parquet.apache.org/ + Parquet { + /// Apache Parquet-specific encoder options. + parquet: ParquetSerializerOptions, + }, + /// Plain text encoding. /// /// This encoding uses the `message` field of a log event. For metrics, it uses an @@ -335,6 +344,7 @@ impl From for SerializerConfig { impl SerializerConfig { /// Build the `Serializer` from this configuration. + /// Fails if serializer is batched. pub fn build(&self) -> Result> { match self { SerializerConfig::Avro { avro } => Ok(Serializer::Avro( @@ -354,6 +364,32 @@ impl SerializerConfig { Ok(Serializer::RawMessage(RawMessageSerializerConfig.build())) } SerializerConfig::Text(config) => Ok(Serializer::Text(config.build())), + SerializerConfig::Parquet { .. } => { + Err("Parquet serializer is not for single event encoding.".into()) + } + } + } + + /// Build the `BatchSerializer` from this configuration. + /// Returns `None` if the serializer is not batched. + pub fn build_batched( + &self, + ) -> Result, Box> { + match self { + SerializerConfig::Parquet { parquet } => Ok(Some(BatchSerializer::Parquet( + ParquetSerializerConfig::new(parquet.schema.clone()).build()?, + ))), + SerializerConfig::Avro { .. } + | SerializerConfig::Csv(..) + | SerializerConfig::Gelf + | SerializerConfig::Json(..) + | SerializerConfig::Logfmt + | SerializerConfig::Native + | SerializerConfig::NativeJson + | SerializerConfig::RawMessage + | SerializerConfig::Text(..) + | SerializerConfig::Cef(..) + | SerializerConfig::Protobuf(..) => Ok(None), } } @@ -385,7 +421,8 @@ impl SerializerConfig { | SerializerConfig::Text(_) => FramingConfig::NewlineDelimited, SerializerConfig::Gelf => { FramingConfig::CharacterDelimited(CharacterDelimitedEncoderConfig::new(0)) - } + }, + SerializerConfig::Parquet { .. } => FramingConfig::Bytes, } } @@ -405,6 +442,9 @@ impl SerializerConfig { SerializerConfig::Protobuf(config) => config.input_type(), SerializerConfig::RawMessage => RawMessageSerializerConfig.input_type(), SerializerConfig::Text(config) => config.input_type(), + SerializerConfig::Parquet { parquet } => { + ParquetSerializerConfig::new(parquet.schema.clone()).input_type() + } } } @@ -424,6 +464,9 @@ impl SerializerConfig { SerializerConfig::Protobuf(config) => config.schema_requirement(), SerializerConfig::RawMessage => RawMessageSerializerConfig.schema_requirement(), SerializerConfig::Text(config) => config.schema_requirement(), + SerializerConfig::Parquet { parquet } => { + ParquetSerializerConfig::new(parquet.schema.clone()).schema_requirement() + } } } } @@ -581,3 +624,26 @@ impl tokio_util::codec::Encoder for Serializer { } } } + +/// Serialize structured batches of events as bytes. +#[derive(Debug, Clone)] +pub enum BatchSerializer { + /// Uses a `ParquetSerializer` for serialization. + Parquet(ParquetSerializer), +} + +impl From for BatchSerializer { + fn from(serializer: ParquetSerializer) -> Self { + Self::Parquet(serializer) + } +} + +impl tokio_util::codec::Encoder> for BatchSerializer { + type Error = vector_common::Error; + + fn encode(&mut self, events: Vec, buffer: &mut BytesMut) -> Result<(), Self::Error> { + match self { + BatchSerializer::Parquet(serializer) => serializer.encode(events, buffer), + } + } +} diff --git a/scripts/environment/prepare.sh b/scripts/environment/prepare.sh index 7ad153d4a5731..b343f195e1f72 100755 --- a/scripts/environment/prepare.sh +++ b/scripts/environment/prepare.sh @@ -4,7 +4,7 @@ set -e -o verbose git config --global --add safe.directory /git/vectordotdev/vector rustup show # causes installation of version from rust-toolchain.toml -rustup default "$(rustup show active-toolchain | awk '{print $1;}')" +rustup default "$(rustup show active-toolchain | awk '{print $1;}' | head -n 1)" if [[ "$(cargo-deb --version)" != "2.0.2" ]] ; then rustup run stable cargo install cargo-deb --version 2.0.0 --force --locked fi diff --git a/scripts/integration/azure/compose.yaml b/scripts/integration/azure/compose.yaml index 6c7b00e37d6e1..aecc7ffaba828 100644 --- a/scripts/integration/azure/compose.yaml +++ b/scripts/integration/azure/compose.yaml @@ -3,6 +3,6 @@ version: '3' services: local-azure-blob: image: mcr.microsoft.com/azure-storage/azurite:${CONFIG_VERSION} - command: azurite --blobHost 0.0.0.0 --loose + command: azurite --blobHost 0.0.0.0 --queueHost 0.0.0.0 --loose volumes: - /var/run:/var/run diff --git a/src/azure/mod.rs b/src/azure/mod.rs new file mode 100644 index 0000000000000..ddaf9b436d90c --- /dev/null +++ b/src/azure/mod.rs @@ -0,0 +1,235 @@ +//! Shared functionality for the Azure components. +use std::sync::Arc; + +use azure_core::{auth::TokenCredential, new_http_client, HttpClient, RetryOptions}; +use azure_identity::{ + AutoRefreshingTokenCredential, ClientSecretCredential, DefaultAzureCredential, + TokenCredentialOptions, +}; +use azure_storage::{prelude::*, CloudLocation, ConnectionString}; +use azure_storage_blobs; +use azure_storage_queues; +use serde_with::serde_as; + +use vector_lib::configurable::configurable_component; + +/// Stores credentials used to build Azure Clients. +#[serde_as] +#[configurable_component] +#[derive(Clone, Debug, Derivative)] +#[derivative(Default)] +#[serde(deny_unknown_fields)] +pub struct ClientCredentials { + /// Check how to get Tenant ID in [the docs][docs]. + /// + /// [docs]: https://learn.microsoft.com/en-us/azure/azure-portal/get-subscription-tenant-id + tenant_id: String, + + /// Check how to get Client ID in [the docs][docs]. + /// + /// [docs]: https://learn.microsoft.com/en-us/entra/identity-platform/quickstart-register-app#add-credentials + client_id: String, + + /// Check how to get Client Secret in [the docs][docs]. + /// + /// [docs]: https://learn.microsoft.com/en-us/entra/identity-platform/quickstart-register-app#add-credentials + client_secret: String, +} + +/// Builds Azure Storage Container Client. +/// +/// To authenticate only **one** of the following should be set: +/// 1. `connection_string` +/// 2. `storage_account` - optionally you can set `client_credentials` to provide credentials, +/// if `client_credentials` is None, [`DefaultAzureCredential`][dac] would be used. +/// +/// [dac]: https://docs.rs/azure_identity/0.17.0/azure_identity/struct.DefaultAzureCredential.html +pub fn build_container_client( + connection_string: Option, + storage_account: Option, + container_name: String, + endpoint: Option, + client_credentials: Option, +) -> crate::Result> { + let client; + match (connection_string, storage_account) { + (Some(connection_string_p), None) => { + let connection_string = ConnectionString::new(&connection_string_p)?; + + client = match connection_string.blob_endpoint { + // When the blob_endpoint is provided, we use the Custom CloudLocation since it is + // required to contain the full URI to the blob storage API endpoint, this means + // that account_name is not required to exist in the connection_string since + // account_name is only used with the default CloudLocation in the Azure SDK to + // generate the storage API endpoint + Some(uri) => azure_storage_blobs::prelude::ClientBuilder::with_location( + CloudLocation::Custom { + uri: uri.to_string(), + }, + connection_string.storage_credentials()?, + ), + // Without a valid blob_endpoint in the connection_string, assume we are in Azure + // Commercial (AzureCloud location) and create a default Blob Storage Client that + // builds the API endpoint location using the account_name as input + None => azure_storage_blobs::prelude::ClientBuilder::new( + connection_string + .account_name + .ok_or("Account name missing in connection string")?, + connection_string.storage_credentials()?, + ), + } + .retry(RetryOptions::none()) + .container_client(container_name); + } + (None, Some(storage_account_p)) => { + let creds: Arc = match client_credentials { + Some(client_credentials_p) => { + let http_client: Arc = new_http_client(); + let options = TokenCredentialOptions::default(); + let creds = std::sync::Arc::new(ClientSecretCredential::new( + http_client.clone(), + client_credentials_p.tenant_id, + client_credentials_p.client_id, + client_credentials_p.client_secret, + options, + )); + creds + } + None => { + let creds = std::sync::Arc::new(DefaultAzureCredential::default()); + creds + } + }; + let auto_creds = std::sync::Arc::new(AutoRefreshingTokenCredential::new(creds)); + let storage_credentials = StorageCredentials::token_credential(auto_creds); + + client = match endpoint { + // If a blob_endpoint is provided in the configuration, use it with a Custom + // CloudLocation, to allow overriding the blob storage API endpoint + Some(endpoint) => azure_storage_blobs::prelude::ClientBuilder::with_location( + CloudLocation::Custom { uri: endpoint }, + storage_credentials, + ), + // Use the storage_account configuration parameter and assume we are in Azure + // Commercial (AzureCloud location) and build the blob storage API endpoint using + // the storage_account as input. + None => azure_storage_blobs::prelude::ClientBuilder::new( + storage_account_p, + storage_credentials, + ), + } + .retry(RetryOptions::none()) + .container_client(container_name); + } + (None, None) => { + return Err("Either `connection_string` or `storage_account` has to be provided".into()) + } + (Some(_), Some(_)) => { + return Err( + "`connection_string` and `storage_account` can't be provided at the same time" + .into(), + ) + } + } + Ok(std::sync::Arc::new(client)) +} + +/// Builds Azure Queue Service Client. +/// +/// To authenticate only **one** of the following should be set: +/// 1. `connection_string` +/// 2. `storage_account` - optionally you can set `client_credentials` to provide credentials, +/// if `client_credentials` is None, [`DefaultAzureCredential`][dac] would be used. +/// +/// [dac]: https://docs.rs/azure_identity/0.17.0/azure_identity/struct.DefaultAzureCredential.html +pub fn build_queue_client( + connection_string: Option, + storage_account: Option, + queue_name: String, + endpoint: Option, + client_credentials: Option, +) -> crate::Result> { + let client; + match (connection_string, storage_account) { + (Some(connection_string_p), None) => { + let connection_string = ConnectionString::new(&connection_string_p)?; + + client = match connection_string.queue_endpoint { + // When the queue_endpoint is provided, we use the Custom CloudLocation since it is + // required to contain the full URI to the storage queue API endpoint, this means + // that account_name is not required to exist in the connection_string since + // account_name is only used with the default CloudLocation in the Azure SDK to + // generate the storage API endpoint + Some(uri) => azure_storage_queues::QueueServiceClientBuilder::with_location( + CloudLocation::Custom { + uri: uri.to_string(), + }, + connection_string.storage_credentials()?, + ), + // Without a valid queue_endpoint in the connection_string, assume we are in Azure + // Commercial (AzureCloud location) and create a default Blob Storage Client that + // builds the API endpoint location using the account_name as input + None => azure_storage_queues::QueueServiceClientBuilder::new( + connection_string + .account_name + .ok_or("Account name missing in connection string")?, + connection_string.storage_credentials()?, + ), + } + .retry(RetryOptions::none()) + .build() + .queue_client(queue_name); + } + (None, Some(storage_account_p)) => { + let creds: Arc = match client_credentials { + Some(client_credentials_p) => { + let http_client: Arc = new_http_client(); + let options = TokenCredentialOptions::default(); + let creds = std::sync::Arc::new(ClientSecretCredential::new( + http_client.clone(), + client_credentials_p.tenant_id, + client_credentials_p.client_id, + client_credentials_p.client_secret, + options, + )); + creds + } + None => { + let creds = std::sync::Arc::new(DefaultAzureCredential::default()); + creds + } + }; + let auto_creds = std::sync::Arc::new(AutoRefreshingTokenCredential::new(creds)); + let storage_credentials = StorageCredentials::token_credential(auto_creds); + + client = match endpoint { + // If a queue_endpoint is provided in the configuration, use it with a Custom + // CloudLocation, to allow overriding the storage queue API endpoint + Some(endpoint) => azure_storage_queues::QueueServiceClientBuilder::with_location( + CloudLocation::Custom { uri: endpoint }, + storage_credentials, + ), + // Use the storage_account configuration parameter and assume we are in Azure + // Commercial (AzureCloud location) and build the blob storage API endpoint using + // the storage_account as input. + None => azure_storage_queues::QueueServiceClientBuilder::new( + storage_account_p, + storage_credentials, + ), + } + .retry(RetryOptions::none()) + .build() + .queue_client(queue_name); + } + (None, None) => { + return Err("Either `connection_string` or `storage_account` has to be provided".into()) + } + (Some(_), Some(_)) => { + return Err( + "`connection_string` and `storage_account` can't be provided at the same time" + .into(), + ) + } + } + Ok(std::sync::Arc::new(client)) +} diff --git a/src/codecs/encoding/config.rs b/src/codecs/encoding/config.rs index b11fe751a7462..71ba84d2e7cd2 100644 --- a/src/codecs/encoding/config.rs +++ b/src/codecs/encoding/config.rs @@ -1,6 +1,6 @@ use crate::codecs::Transformer; use vector_lib::codecs::{ - encoding::{Framer, FramingConfig, Serializer, SerializerConfig}, + encoding::{BatchSerializer, Framer, FramingConfig, Serializer, SerializerConfig}, CharacterDelimitedEncoder, LengthDelimitedEncoder, NewlineDelimitedEncoder, }; use vector_lib::configurable::configurable_component; @@ -130,6 +130,13 @@ impl EncodingConfigWithFraming { Ok((framer, serializer)) } + + /// Build `BatchSerializer` for this config. + /// None if serializer is not batched. + pub fn build_batched(&self) -> crate::Result> { + let serializer = self.encoding.config().build_batched()?; + Ok(serializer) + } } /// The way a sink processes outgoing events. diff --git a/src/components/validation/resources/mod.rs b/src/components/validation/resources/mod.rs index 61d705c00eb36..35196e6d544bc 100644 --- a/src/components/validation/resources/mod.rs +++ b/src/components/validation/resources/mod.rs @@ -226,6 +226,7 @@ fn serializer_config_to_deserializer( }) } SerializerConfig::RawMessage | SerializerConfig::Text(_) => DeserializerConfig::Bytes, + SerializerConfig::Parquet { .. } => todo!(), }; deserializer_config.build() diff --git a/src/gcp.rs b/src/gcp.rs index 30d2120d0a422..4f812e71d4415 100644 --- a/src/gcp.rs +++ b/src/gcp.rs @@ -5,6 +5,7 @@ use std::{ }; use base64::prelude::{Engine as _, BASE64_URL_SAFE}; +use chrono::{DateTime, Utc}; pub use goauth::scopes::Scope; use goauth::{ auth::{JwtClaims, Token, TokenErr}, @@ -13,9 +14,13 @@ use goauth::{ }; use http::{uri::PathAndQuery, Uri}; use hyper::header::AUTHORIZATION; +use reqwest::{Client, Response}; +use serde_json::{from_value, json}; +use serde_with::serde_derive::Deserialize; use smpl_jwt::Jwt; use snafu::{ResultExt, Snafu}; use tokio::sync::watch; +use typetag::serde; use vector_lib::configurable::configurable_component; use vector_lib::sensitive_string::SensitiveString; @@ -100,6 +105,10 @@ pub struct GcpAuthConfig { #[serde(default, skip_serializing)] #[configurable(metadata(docs::hidden))] pub skip_authentication: bool, + + /// The service account to impersonate. The impersonated service account must have the + /// `roles/iam.serviceAccountTokenCreator` role on the target service account. + pub impersonated_service_account: Option, } impl GcpAuthConfig { @@ -110,7 +119,7 @@ impl GcpAuthConfig { let gap = std::env::var("GOOGLE_APPLICATION_CREDENTIALS").ok(); let creds_path = self.credentials_path.as_ref().or(gap.as_ref()); match (&creds_path, &self.api_key) { - (Some(path), _) => GcpAuthenticator::from_file(path, scope).await?, + (Some(path), _) => GcpAuthenticator::from_file(path, scope, self.impersonated_service_account.clone()).await?, (None, Some(api_key)) => GcpAuthenticator::from_api_key(api_key.inner())?, (None, None) => GcpAuthenticator::new_implicit().await?, } @@ -125,18 +134,30 @@ pub enum GcpAuthenticator { None, } +type ServiceAccount = String; +#[derive(Debug)] +pub enum Creds { + Regular(Credentials, Scope), + Impersonated(Credentials, Scope, ServiceAccount), +} #[derive(Debug)] pub struct InnerCreds { - creds: Option<(Credentials, Scope)>, + creds: Option, token: RwLock, } impl GcpAuthenticator { - async fn from_file(path: &str, scope: Scope) -> crate::Result { + async fn from_file(path: &str, scope: Scope, service_account: Option) -> crate::Result { let creds = Credentials::from_file(path).context(InvalidCredentialsSnafu)?; - let token = RwLock::new(fetch_token(&creds, &scope).await?); - let creds = Some((creds, scope)); - Ok(Self::Credentials(Arc::new(InnerCreds { creds, token }))) + let token = RwLock::new(fetch_token(&creds, &scope, service_account.as_deref()).await?); + + let creds = Some(match service_account { + Some(service_account) => + Creds::Impersonated(creds, scope, service_account), + None => + Creds::Regular(creds, scope), + }); + Ok(Self::Credentials(Arc::new(InnerCreds { creds, token, }))) } async fn new_implicit() -> crate::Result { @@ -242,8 +263,12 @@ impl GcpAuthenticator { impl InnerCreds { async fn regenerate_token(&self) -> crate::Result<()> { let token = match &self.creds { - Some((creds, scope)) => fetch_token(creds, scope).await?, - None => get_token_implicit().await?, + Some(Creds::Regular(creds, scope)) => + fetch_regular_token(creds, scope).await?, + Some(Creds::Impersonated(creds, scope, service_account)) => + fetch_impersonated_token(creds, scope, service_account).await?, + None => + get_token_implicit().await?, }; *self.token.write().unwrap() = token; Ok(()) @@ -255,7 +280,15 @@ impl InnerCreds { } } -async fn fetch_token(creds: &Credentials, scope: &Scope) -> crate::Result { +async fn fetch_token(creds: &Credentials, scope: &Scope, impersonated_service_account: Option<&str> +) -> crate::Result { + match impersonated_service_account { + Some(service_account) => fetch_impersonated_token(creds, scope, service_account).await, + None => fetch_regular_token(creds, scope).await, + } +} + +async fn fetch_regular_token(creds: &Credentials, scope: &Scope) -> crate::Result { let claims = JwtClaims::new(creds.iss(), scope, creds.token_uri(), None, None); let rsa_key = creds.rsa_key().context(InvalidRsaKeySnafu)?; let jwt = Jwt::new(claims, rsa_key, None); @@ -272,6 +305,118 @@ async fn fetch_token(creds: &Credentials, scope: &Scope) -> crate::Result .map_err(Into::into) } +async fn fetch_impersonated_token( + creds: &Credentials, + impersonated_scope: &Scope, + impersonated_service_account: &str, +) -> crate::Result { + // base scope is used only for impersonation from base service account to target service account + let base_scope = Scope::CloudPlatform; + let claims = JwtClaims::new(creds.iss(), &base_scope, creds.token_uri(), None, None); + let rsa_key = creds.rsa_key().context(InvalidRsaKeySnafu)?; + let jwt = Jwt::new(claims, rsa_key, None); + + debug!( + message = "Fetching base service account GCP authentication token.", + project = ?creds.project(), + iss = ?creds.iss(), + token_uri = ?creds.token_uri(), + ); + let token = goauth::get_token(&jwt, creds) + .await + .context(GetTokenSnafu)?; + + debug!( + message = "Fetching impersonated service account GCP authentication token.", + project = ?creds.project(), + impersonated_service_account = impersonated_service_account + ); + let token = do_fetch_impersonated_token(token.access_token(), + impersonated_service_account, + &[&impersonated_scope.url()]) + .await + .map_err(move |e| { + error!( + message = "Failed to generate impersonated token.", + impersonated_service_account = impersonated_service_account, + error = %e, + ); + e + })?; + Ok(token) +} + +async fn do_fetch_impersonated_token( + base_token: &str, + target_service_account: &str, + scopes: &[&str], +) -> crate::Result { + // Define the IAM Credentials API endpoint for generating impersonated tokens + let url = format!( + "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/{target_service_account}:generateAccessToken", + ); + + // Construct the JSON payload with the requested scopes + let body = json!({ + "scope": scopes, + }); + + // Create an HTTP client and make the POST request + let client = Client::new(); + let response = client + .post(&url) + .bearer_auth(base_token) // Use the base token for authorization + .json(&body) + .send() + .await?; + + token_from_json(response).await +} + +async fn token_from_json(resp: Response) -> crate::Result { + let is_success = resp.status().is_success(); + let resp = resp.bytes().await?; + if !is_success { + error!( + message = "No success in response.", + raw_resp = String::from_utf8_lossy(&resp).into_owned(), + ); + let token_err: TokenErr = serde_json::from_slice(&resp)?; + return Err(token_err.into()) + } + + #[derive(Deserialize)] + #[serde(rename_all = "camelCase")] + struct TokenCamelCase { + access_token: String, + expire_time: String, + } + let token: TokenCamelCase = serde_json::from_slice(&resp.clone()).map_err(|e| { + error!( + message = "Failed to parse OAuth token JSON.", + error = %e, + raw_resp = String::from_utf8_lossy(&resp).into_owned(), + ); + e + })?; + + let remapped = json!({ + "access_token": token.access_token, + "token_type": "Bearer", + "expires_in": seconds_from_now_to_timestamp(&token.expire_time)?, + }); + + let token: Token = from_value(remapped)?; + Ok(token) +} + +fn seconds_from_now_to_timestamp(timestamp: &str) -> crate::Result { + let future_time: DateTime = timestamp.parse()?; + let now = Utc::now(); + let duration = future_time.signed_duration_since(now); + Ok(duration.num_seconds() as u32) +} + async fn get_token_implicit() -> Result { debug!("Fetching implicit GCP authentication token."); let req = http::Request::get(SERVICE_ACCOUNT_TOKEN_URL) diff --git a/src/internal_events/azure_queue.rs b/src/internal_events/azure_queue.rs new file mode 100644 index 0000000000000..31f864b1d0d73 --- /dev/null +++ b/src/internal_events/azure_queue.rs @@ -0,0 +1,196 @@ +#[cfg(feature = "sources-azure_blob")] +pub use azure_blob::*; +use metrics::counter; +use vector_lib::internal_event::{error_stage, error_type, InternalEvent}; + +#[cfg(feature = "sources-azure_blob")] +mod azure_blob { + use super::*; + use crate::event::Event; + use crate::sources::azure_blob::queue::ProcessingError; + + #[derive(Debug)] + pub struct QueueMessageProcessingError<'a> { + pub message_id: &'a str, + pub error: &'a ProcessingError, + } + + impl<'a> InternalEvent for QueueMessageProcessingError<'a> { + fn emit(self) { + error!( + message = "Failed to process Queue message.", + message_id = %self.message_id, + error = %self.error, + error_code = "failed_processing_azure_queue_message", + error_type = error_type::PARSER_FAILED, + stage = error_stage::PROCESSING, + internal_log_rate_limit = true, + ); + counter!( + "component_errors_total", + "error_code" => "failed_processing_azure_queue_message", + "error_type" => error_type::PARSER_FAILED, + "stage" => error_stage::PROCESSING, + ).increment(1); + } + } + + #[derive(Debug)] + pub struct InvalidRowEventType<'a> { + pub event: &'a Event, + } + + impl<'a> InternalEvent for InvalidRowEventType<'a> { + fn emit(self) { + error!( + message = "Expected Azure rows as Log Events", + event = ?self.event, + error_code = "invalid_azure_row_event", + error_type = error_type::CONDITION_FAILED, + stage = error_stage::PROCESSING, + ); + counter!( + "component_errors_total", + "error_code" => "invalid_azure_row_event", + "error_type" => error_type::CONDITION_FAILED, + "stage" => error_stage::PROCESSING, + ).increment(1); + } + } +} + +#[derive(Debug)] +pub struct QueueMessageReceiveError<'a, E> { + pub error: &'a E, +} + +impl<'a, E: std::fmt::Display + std::fmt::Debug> InternalEvent for QueueMessageReceiveError<'a, E> { + fn emit(self) { + error!( + message = "Failed reading messages", + event = format!("{:?}", self.error), + error_code = "failed_fetching_azure_queue_events", + error_type = error_type::REQUEST_FAILED, + stage = error_stage::RECEIVING, + ); + counter!( + "component_errors_total", + "error_code" => "failed_fetching_azure_queue_events", + "error_type" => error_type::REQUEST_FAILED, + "stage" => error_stage::RECEIVING, + ).increment(1); + } +} + +#[derive(Debug)] +pub struct QueueMessageDeleteError<'a, E> { + pub error: &'a E, +} + +impl<'a, E: std::fmt::Display> InternalEvent for QueueMessageDeleteError<'a, E> { + fn emit(self) { + error!( + message = "Failed deleting message", + error = %self.error, + error_code = "failed_deleting_azure_queue_event", + error_type = error_type::ACKNOWLEDGMENT_FAILED, + stage = error_stage::PROCESSING, + ); + counter!( + "component_errors_total", + "error_code" => "failed_deleting_azure_queue_event", + "error_type" => error_type::WRITER_FAILED, + "stage" => error_stage::RECEIVING, + ).increment(1); + } +} + +#[derive(Debug)] +pub struct QueueStorageInvalidEventIgnored<'a> { + pub container: &'a str, + pub subject: &'a str, + pub event_type: &'a str, +} + +impl<'a> InternalEvent for QueueStorageInvalidEventIgnored<'a> { + fn emit(self) { + trace!( + message = "Ignoring event because of wrong event type", + container = %self.container, + subject = %self.subject, + event_type = %self.event_type + ); + counter!( + "azure_queue_event_ignored_total", + "ignore_type" => "invalid_event_type" + ).increment(1); + } +} + +#[derive(Debug)] +pub struct QueueStorageMismatchingContainerName<'a> { + pub container: &'a str, + pub configured_container: &'a str, +} + +impl<'a> InternalEvent for QueueStorageMismatchingContainerName<'a> { + fn emit(self) { + warn!( + message = "Ignoring event because of wrong container name", + configured_container = %self.configured_container, + container = %self.container, + ); + counter!( + "azure_queue_event_ignored_total", + "ignore_type" => "mismatching_container_name" + ).increment(1); + } +} + +#[derive(Debug)] +pub struct QueueMessageProcessingSucceeded {} + +impl InternalEvent for QueueMessageProcessingSucceeded { + fn emit(self) { + trace!(message = "Processed azure queue message successfully."); + counter!("azure_queue_message_processing_succeeded_total").increment(1); + } +} + +#[derive(Debug)] +pub struct QueueMessageProcessingErrored {} + +impl InternalEvent for QueueMessageProcessingErrored { + fn emit(self) { + error!(message = "Batch event had a transient error in delivery."); + counter!("azure_queue_message_processing_errored_total").increment(1); + } +} + +#[derive(Debug)] +pub struct QueueMessageProcessingRejected {} + +impl InternalEvent for QueueMessageProcessingRejected { + fn emit(self) { + error!(message = "Batch event had a permanent failure or rejection."); + counter!("azure_queue_message_processing_rejected_total").increment(1); + } +} + +#[derive(Debug)] +pub struct BlobDoesntExist<'a> { + pub nonexistent_blob_name: &'a str, +} + +impl<'a> InternalEvent for BlobDoesntExist<'a> { + fn emit(self) { + warn!( + message = "Ignoring event because blob doesn't exist in storage", + blob_name = self.nonexistent_blob_name + ); + counter!( + "azure_queue_event_ignored_total", + "ignore_type" => "blob_doesnt_exist" + ).increment(1); + } +} diff --git a/src/internal_events/mod.rs b/src/internal_events/mod.rs index 12a323214ec6d..26b93c16b39c9 100644 --- a/src/internal_events/mod.rs +++ b/src/internal_events/mod.rs @@ -138,6 +138,9 @@ mod websocket_server; mod file; mod windows; +#[cfg(any(feature = "sources-azure_blob", feature = "sources-azure_blob",))] +mod azure_queue; + #[cfg(feature = "sources-mongodb_metrics")] pub(crate) use mongodb_metrics::*; @@ -166,6 +169,8 @@ pub(crate) use self::aws_kinesis::*; pub(crate) use self::aws_kinesis_firehose::*; #[cfg(any(feature = "sources-aws_s3", feature = "sources-aws_sqs",))] pub(crate) use self::aws_sqs::*; +#[cfg(any(feature = "sources-azure_blob"))] +pub(crate) use self::azure_queue::*; pub(crate) use self::codecs::*; #[cfg(feature = "sources-datadog_agent")] pub(crate) use self::datadog_agent::*; diff --git a/src/lib.rs b/src/lib.rs index 59f91992a1f87..5d6e29b45aed8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -73,6 +73,7 @@ pub mod async_read; #[cfg(feature = "aws-config")] pub mod aws; #[allow(unreachable_pub)] +pub mod azure; pub mod codecs; pub mod common; mod convert_config; diff --git a/src/sinks/aws_s3/config.rs b/src/sinks/aws_s3/config.rs index 9e001db84071f..7503c2f9afe00 100644 --- a/src/sinks/aws_s3/config.rs +++ b/src/sinks/aws_s3/config.rs @@ -1,3 +1,4 @@ +use std::sync::Arc; use aws_sdk_s3::Client as S3Client; use tower::ServiceBuilder; use vector_lib::codecs::{ @@ -13,6 +14,7 @@ use crate::{ aws::{AwsAuthentication, RegionOrEndpoint}, codecs::{Encoder, EncodingConfigWithFraming, SinkType}, config::{AcknowledgementsConfig, GenerateConfig, Input, ProxyConfig, SinkConfig, SinkContext}, + event::Event, sinks::{ s3_common::{ self, @@ -234,8 +236,14 @@ impl S3SinkConfig { let partitioner = S3KeyPartitioner::new(key_prefix, ssekms_key_id, None); let transformer = self.encoding.transformer(); - let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?; - let encoder = Encoder::::new(framer, serializer); + let encoder = if let Some(serializer) = self.encoding.build_batched()? { + Arc::new((transformer, serializer)) + as Arc> + Send + Sync> + } else { + let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?; + let encoder = Encoder::::new(framer, serializer); + Arc::new((transformer, encoder)) as _ + }; let request_options = S3RequestOptions { bucket: self.bucket.clone(), @@ -243,7 +251,7 @@ impl S3SinkConfig { filename_extension: self.filename_extension.clone(), filename_time_format: self.filename_time_format.clone(), filename_append_uuid: self.filename_append_uuid, - encoder: (transformer, encoder), + encoder, compression: self.compression, filename_tz_offset: offset, }; diff --git a/src/sinks/aws_s3/sink.rs b/src/sinks/aws_s3/sink.rs index 0443f377a55e6..3e9feebc460d6 100644 --- a/src/sinks/aws_s3/sink.rs +++ b/src/sinks/aws_s3/sink.rs @@ -1,14 +1,12 @@ -use std::io; +use std::{io, sync::Arc}; use bytes::Bytes; use chrono::{FixedOffset, Utc}; use uuid::Uuid; -use vector_lib::codecs::encoding::Framer; use vector_lib::event::Finalizable; use vector_lib::request_metadata::RequestMetadata; use crate::{ - codecs::{Encoder, Transformer}, event::Event, sinks::{ s3_common::{ @@ -17,8 +15,8 @@ use crate::{ service::{S3Metadata, S3Request}, }, util::{ - metadata::RequestMetadataBuilder, request_builder::EncodeResult, Compression, - RequestBuilder, + encoding::Encoder, metadata::RequestMetadataBuilder, request_builder::EncodeResult, + Compression, RequestBuilder, }, }, }; @@ -30,7 +28,7 @@ pub struct S3RequestOptions { pub filename_append_uuid: bool, pub filename_extension: Option, pub api_options: S3Options, - pub encoder: (Transformer, Encoder), + pub encoder: Arc> + Send + Sync>, pub compression: Compression, pub filename_tz_offset: Option, } @@ -38,7 +36,7 @@ pub struct S3RequestOptions { impl RequestBuilder<(S3PartitionKey, Vec)> for S3RequestOptions { type Metadata = S3Metadata; type Events = Vec; - type Encoder = (Transformer, Encoder); + type Encoder = Arc> + Send + Sync>; type Payload = Bytes; type Request = S3Request; type Error = io::Error; // TODO: this is ugly. diff --git a/src/sinks/azure_blob/config.rs b/src/sinks/azure_blob/config.rs index 11f32900d5b32..4f29f18e707e2 100644 --- a/src/sinks/azure_blob/config.rs +++ b/src/sinks/azure_blob/config.rs @@ -9,6 +9,7 @@ use vector_lib::sensitive_string::SensitiveString; use super::request_builder::AzureBlobRequestOptions; use crate::sinks::util::service::TowerRequestConfigDefaults; use crate::{ + azure, codecs::{Encoder, EncodingConfigWithFraming, SinkType}, config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext}, sinks::{ @@ -198,13 +199,14 @@ impl GenerateConfig for AzureBlobSinkConfig { #[typetag::serde(name = "azure_blob")] impl SinkConfig for AzureBlobSinkConfig { async fn build(&self, _cx: SinkContext) -> Result<(VectorSink, Healthcheck)> { - let client = azure_common::config::build_client( + let client = azure::build_container_client( self.connection_string .as_ref() .map(|v| v.inner().to_string()), self.storage_account.as_ref().map(|v| v.to_string()), self.container_name.clone(), self.endpoint.clone(), + None, )?; let healthcheck = azure_common::config::build_healthcheck( diff --git a/src/sinks/azure_blob/integration_tests.rs b/src/sinks/azure_blob/integration_tests.rs index 9e36ed1a6acd7..54c1f64395c5d 100644 --- a/src/sinks/azure_blob/integration_tests.rs +++ b/src/sinks/azure_blob/integration_tests.rs @@ -17,6 +17,7 @@ use vector_lib::ByteSizeOf; use super::config::AzureBlobSinkConfig; use crate::{ + azure, event::{Event, EventArray, LogEvent}, sinks::{ azure_common, @@ -32,11 +33,12 @@ use crate::{ #[tokio::test] async fn azure_blob_healthcheck_passed() { let config = AzureBlobSinkConfig::new_emulator().await; - let client = azure_common::config::build_client( + let client = azure::build_container_client( config.connection_string.map(Into::into), None, config.container_name.clone(), None, + None, ) .expect("Failed to create client"); @@ -53,11 +55,12 @@ async fn azure_blob_healthcheck_unknown_container() { container_name: String::from("other-container-name"), ..config }; - let client = azure_common::config::build_client( + let client = azure::build_container_client( config.connection_string.map(Into::into), config.storage_account.map(Into::into), config.container_name.clone(), config.endpoint.clone(), + None, ) .expect("Failed to create client"); @@ -243,11 +246,12 @@ impl AzureBlobSinkConfig { } fn to_sink(&self) -> VectorSink { - let client = azure_common::config::build_client( + let client = azure::build_container_client( self.connection_string.clone().map(Into::into), self.storage_account.clone().map(Into::into), self.container_name.clone(), self.endpoint.clone(), + None, ) .expect("Failed to create client"); @@ -262,11 +266,12 @@ impl AzureBlobSinkConfig { } pub async fn list_blobs(&self, prefix: String) -> Vec { - let client = azure_common::config::build_client( + let client = azure::build_container_client( self.connection_string.clone().map(Into::into), self.storage_account.clone().map(Into::into), self.container_name.clone(), self.endpoint.clone(), + None, ) .unwrap(); let response = client @@ -291,11 +296,12 @@ impl AzureBlobSinkConfig { } pub async fn get_blob(&self, blob: String) -> (Blob, Vec) { - let client = azure_common::config::build_client( + let client = azure::build_container_client( self.connection_string.clone().map(Into::into), self.storage_account.clone().map(Into::into), self.container_name.clone(), self.endpoint.clone(), + None, ) .unwrap(); let response = client @@ -328,11 +334,12 @@ impl AzureBlobSinkConfig { } async fn ensure_container(&self) { - let client = azure_common::config::build_client( + let client = azure::build_container_client( self.connection_string.clone().map(Into::into), self.storage_account.clone().map(Into::into), self.container_name.clone(), self.endpoint.clone(), + None, ) .unwrap(); let request = client diff --git a/src/sinks/azure_common/config.rs b/src/sinks/azure_common/config.rs index 5e10ea797c305..76c36707f800b 100644 --- a/src/sinks/azure_common/config.rs +++ b/src/sinks/azure_common/config.rs @@ -1,8 +1,6 @@ use std::sync::Arc; -use azure_core::{error::HttpError, RetryOptions}; -use azure_identity::{AutoRefreshingTokenCredential, DefaultAzureCredential}; -use azure_storage::{prelude::*, CloudLocation, ConnectionString}; +use azure_core::error::HttpError; use azure_storage_blobs::{blob::operations::PutBlockBlobResponse, prelude::*}; use bytes::Bytes; use futures::FutureExt; @@ -122,72 +120,3 @@ pub fn build_healthcheck( Ok(healthcheck.boxed()) } - -pub fn build_client( - connection_string: Option, - storage_account: Option, - container_name: String, - endpoint: Option, -) -> crate::Result> { - let client; - match (connection_string, storage_account) { - (Some(connection_string_p), None) => { - let connection_string = ConnectionString::new(&connection_string_p)?; - - client = match connection_string.blob_endpoint { - // When the blob_endpoint is provided, we use the Custom CloudLocation since it is - // required to contain the full URI to the blob storage API endpoint, this means - // that account_name is not required to exist in the connection_string since - // account_name is only used with the default CloudLocation in the Azure SDK to - // generate the storage API endpoint - Some(uri) => ClientBuilder::with_location( - CloudLocation::Custom { - uri: uri.to_string(), - }, - connection_string.storage_credentials()?, - ), - // Without a valid blob_endpoint in the connection_string, assume we are in Azure - // Commercial (AzureCloud location) and create a default Blob Storage Client that - // builds the API endpoint location using the account_name as input - None => ClientBuilder::new( - connection_string - .account_name - .ok_or("Account name missing in connection string")?, - connection_string.storage_credentials()?, - ), - } - .retry(RetryOptions::none()) - .container_client(container_name); - } - (None, Some(storage_account_p)) => { - let creds = std::sync::Arc::new(DefaultAzureCredential::default()); - let auto_creds = std::sync::Arc::new(AutoRefreshingTokenCredential::new(creds)); - let storage_credentials = StorageCredentials::token_credential(auto_creds); - - client = match endpoint { - // If a blob_endpoint is provided in the configuration, use it with a Custom - // CloudLocation, to allow overriding the blob storage API endpoint - Some(endpoint) => ClientBuilder::with_location( - CloudLocation::Custom { uri: endpoint }, - storage_credentials, - ), - // Use the storage_account configuration parameter and assume we are in Azure - // Commercial (AzureCloud location) and build the blob storage API endpoint using - // the storage_account as input. - None => ClientBuilder::new(storage_account_p, storage_credentials), - } - .retry(RetryOptions::none()) - .container_client(container_name); - } - (None, None) => { - return Err("Either `connection_string` or `storage_account` has to be provided".into()) - } - (Some(_), Some(_)) => { - return Err( - "`connection_string` and `storage_account` can't be provided at the same time" - .into(), - ) - } - } - Ok(std::sync::Arc::new(client)) -} diff --git a/src/sinks/gcp/stackdriver/metrics/tests.rs b/src/sinks/gcp/stackdriver/metrics/tests.rs index 391803a741ae9..0a63920896ea9 100644 --- a/src/sinks/gcp/stackdriver/metrics/tests.rs +++ b/src/sinks/gcp/stackdriver/metrics/tests.rs @@ -54,6 +54,7 @@ async fn sends_metric() { api_key: None, credentials_path: None, skip_authentication: true, + impersonated_service_account: None, }, ..Default::default() }; @@ -114,6 +115,7 @@ async fn sends_multiple_metrics() { api_key: None, credentials_path: None, skip_authentication: true, + impersonated_service_account: None, }, batch, ..Default::default() @@ -201,6 +203,7 @@ async fn does_not_aggregate_metrics() { api_key: None, credentials_path: None, skip_authentication: true, + impersonated_service_account: None, }, batch, ..Default::default() diff --git a/src/sinks/util/encoding.rs b/src/sinks/util/encoding.rs index 73f5a6c3f8ce5..57bb326f55ddb 100644 --- a/src/sinks/util/encoding.rs +++ b/src/sinks/util/encoding.rs @@ -1,4 +1,4 @@ -use std::io; +use std::{io, sync::Arc}; use bytes::BytesMut; use itertools::{Itertools, Position}; @@ -7,7 +7,11 @@ use vector_lib::codecs::encoding::Framer; use vector_lib::request_metadata::GroupedCountByteSize; use vector_lib::{config::telemetry, EstimatedJsonEncodedSizeOf}; -use crate::{codecs::Transformer, event::Event, internal_events::EncoderWriteError}; +use crate::{ + codecs::Transformer, + event::Event, + internal_events::{EncoderSerializeError, EncoderWriteError}, +}; pub trait Encoder { /// Encodes the input into the provided writer. @@ -92,6 +96,44 @@ impl Encoder for (Transformer, crate::codecs::Encoder<()>) { } } +impl + ?Sized> Encoder for Arc { + fn encode_input( + &self, + input: T, + writer: &mut dyn io::Write + ) -> io::Result<(usize, GroupedCountByteSize)> { + (**self).encode_input(input, writer) + } +} + +impl Encoder> for (Transformer, vector_lib::codecs::encoding::BatchSerializer) { + fn encode_input( + &self, + mut events: Vec, + writer: &mut dyn io::Write, + ) -> io::Result<(usize, GroupedCountByteSize)> { + let mut encoder = self.1.clone(); + let n_events_pending = events.len(); + + let mut byte_size = telemetry().create_request_count_byte_size(); + for event in &mut events { + self.0.transform(event); + byte_size.add_event(event, event.estimated_json_encoded_size_of()); + } + + let mut bytes = BytesMut::new(); + encoder.encode(events, &mut bytes).map_err(|error| { + let error: crate::Error = error.into(); + emit!(EncoderSerializeError { error: &error }); + io::Error::new(io::ErrorKind::InvalidData, error) + })?; + + write_all(writer, n_events_pending, &bytes)?; + + Ok((bytes.len(), byte_size)) + } +} + /// Write the buffer to the writer. If the operation fails, emit an internal event which complies with the /// instrumentation spec- as this necessitates both an Error and EventsDropped event. /// diff --git a/src/sources/azure_blob/integration_tests.rs b/src/sources/azure_blob/integration_tests.rs new file mode 100644 index 0000000000000..2e453296d8335 --- /dev/null +++ b/src/sources/azure_blob/integration_tests.rs @@ -0,0 +1,215 @@ +use azure_core::error::HttpError; +use azure_storage_blobs::prelude::PublicAccess; +use base64::{prelude::BASE64_STANDARD, Engine}; +use http::StatusCode; + +use super::{ + queue::{make_container_client, make_queue_client, Config}, + time::Duration, + AzureBlobConfig, Strategy, +}; +use crate::{ + event::Event, + serde::default_decoding, + test_util::components::{ + run_and_assert_source_compliance, run_and_assert_source_error, COMPONENT_ERROR_TAGS, + SOURCE_TAGS, + }, +}; + +impl AzureBlobConfig { + pub async fn new_emulator() -> AzureBlobConfig { + let address = std::env::var("AZURE_ADDRESS").unwrap_or_else(|_| "localhost".into()); + let config = AzureBlobConfig { + connection_string: Some(format!("UseDevelopmentStorage=true;DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://{}:10000/devstoreaccount1;QueueEndpoint=http://{}:10001/devstoreaccount1;TableEndpoint=http://{}:10002/devstoreaccount1;", address, address, address).into()), + storage_account: None, + container_name: "logs".to_string(), + strategy: Strategy::StorageQueue, + queue: Some(Config { + queue_name: format!("test-{}", rand::random::()), + poll_secs: 1, + }), + // TODO shouldn't we have blob_endpoint and queue_endpoint? + endpoint: None, + acknowledgements: Default::default(), + // TODO this should be option + exec_interval_secs: 0, + log_namespace: None, + decoding: default_decoding(), + client_credentials: None, + }; + + config.ensure_container().await; + config.ensure_queue().await; + + config + } + + async fn run_assert(&self) -> Vec { + run_and_assert_source_compliance(self.clone(), Duration::from_secs(1), &SOURCE_TAGS).await + } + + async fn run_error(&self) -> Vec { + run_and_assert_source_error(self.clone(), Duration::from_secs(1), &COMPONENT_ERROR_TAGS) + .await + } + + async fn ensure_container(&self) { + let client = make_container_client(self).expect("Failed to create container client"); + let request = client + .create() + .public_access(PublicAccess::None) + .into_future(); + + let response = match request.await { + Ok(_) => Ok(()), + Err(reason) => match reason.downcast_ref::() { + Some(err) => match StatusCode::from_u16(err.status().into()) { + Ok(StatusCode::CONFLICT) => Ok(()), + _ => Err(format!("Unexpected status code {}", err.status())), + }, + _ => Err(format!("Unexpected error {}", reason)), + }, + }; + + response.expect("Failed to create container") + } + + async fn ensure_queue(&self) { + let client = make_queue_client(self).expect("Failed to create queue client"); + let request = client.create().into_future(); + + let response = match request.await { + Ok(_) => Ok(()), + Err(reason) => match reason.downcast_ref::() { + Some(err) => match StatusCode::from_u16(err.status().into()) { + Ok(StatusCode::CONFLICT) => Ok(()), + _ => Err(format!("Unexpected status code {}", err.status())), + }, + _ => Err(format!("Unexpected error {}", reason)), + }, + }; + + response.expect("Failed to create queue") + } + + async fn upload_blob(&self, name: String, content: String) { + let container_client = + make_container_client(self).expect("Failed to create container client"); + let blob_client = container_client.blob_client(name.clone()); + blob_client + .put_block_blob(content) + .await + .expect("Failed putting blob"); + + self.queue_notify_blob_created(&name).await; + } + + async fn queue_notify_blob_created(&self, name: &str) { + let queue_client = make_queue_client(self).expect("Failed to create queue client"); + let message = format!( + r#"{{ + "topic": "/subscriptions/fa5f2180-1451-4461-9b1f-aae7d4b33cf8/resourceGroups/events_poc/providers/Microsoft.Storage/storageAccounts/eventspocaccount", + "subject": "/blobServices/default/containers/logs/blobs/{}", + "eventType": "Microsoft.Storage.BlobCreated", + "id": "be3f21f7-201e-000b-7605-a29195062628", + "data": {{ + "api": "PutBlob", + "clientRequestId": "1fa42c94-6dd3-4172-95c4-fd9cf56b5009", + "requestId": "be3f21f7-201e-000b-7605-a29195000000", + "eTag": "0x8DC701C5D3FFDF6", + "contentType": "application/octet-stream", + "contentLength": 0, + "blobType": "BlockBlob", + "url": "https://eventspocaccount.blob.core.windows.net/logs/{}", + "sequencer": "0000000000000000000000000005C5360000000000276a63", + "storageDiagnostics": {{ + "batchId": "fec5b12c-2006-0034-0005-a25936000000" + }} + }}, + "dataVersion": "", + "metadataVersion": "1", + "eventTime": "2024-05-09T11:37:10.5637878Z" + }}"#, + name, name + ); + queue_client + .put_message(BASE64_STANDARD.encode(message)) + .await + .expect("Failed putting message"); + } +} + +#[tokio::test] +async fn azure_blob_read_single_line_from_blob() { + let config = AzureBlobConfig::new_emulator().await; + let content = "a"; + config + .upload_blob("file.txt".to_string(), content.to_string()) + .await; + + let events = config.run_assert().await; + assert_eq!(events.len(), 1); + assert_eq!(events[0].as_log()["message"], "a".into()); +} + +#[tokio::test] +async fn azure_blob_read_multiple_lines_from_blob() { + let config = AzureBlobConfig::new_emulator().await; + let content = "a\nb\nc"; + config + .upload_blob("file.txt".to_string(), content.to_string()) + .await; + + let events = config.run_assert().await; + assert_eq!(events.len(), 3); + assert_eq!(events[0].as_log()["message"], "a".into()); + assert_eq!(events[1].as_log()["message"], "b".into()); + assert_eq!(events[2].as_log()["message"], "c".into()); +} + +#[tokio::test] +async fn azure_blob_read_single_line_from_multiple_blobs() { + let config = AzureBlobConfig::new_emulator().await; + let contents = vec!["a", "b", "c"]; + for (i, content) in contents.clone().iter().enumerate() { + config + .upload_blob(format!("file{}.txt", i), content.to_string()) + .await; + } + + let events = + run_and_assert_source_compliance(config.clone(), Duration::from_secs(4), &SOURCE_TAGS) + .await; + assert_eq!(events.len(), contents.len()); + for (i, event) in events.iter().enumerate() { + assert_eq!(event.as_log()["message"], contents[i].into()); + } +} + +#[tokio::test] +async fn azure_blob_emit_error_on_message_read() { + let mut config = AzureBlobConfig::new_emulator().await; + let content = "a\nb\nc"; + config + .upload_blob("file.txt".to_string(), content.to_string()) + .await; + config.queue = Some(Config { + queue_name: "nonexistent".to_string(), + poll_secs: 1, + }); + + let events = config.run_error().await; + assert!(events.is_empty()); +} + +#[tokio::test] +async fn azure_blob_ignore_missing_blob() { + let config = AzureBlobConfig::new_emulator().await; + + config.queue_notify_blob_created("non-existent").await; + config.upload_blob("file.txt".to_string(), "some_content".to_string()).await; + + let events = config.run_assert().await; + assert_eq!(events.len(), 1); +} diff --git a/src/sources/azure_blob/mod.rs b/src/sources/azure_blob/mod.rs new file mode 100644 index 0000000000000..e2aa8d4892aeb --- /dev/null +++ b/src/sources/azure_blob/mod.rs @@ -0,0 +1,392 @@ +use std::{future::Future, pin::Pin, time::Duration}; + +use async_stream::stream; +use bytes::Bytes; +use futures::{stream::StreamExt, Stream}; +use tokio::{select, time}; +use tokio_stream::wrappers::IntervalStream; +use vrl::path; + +use vector_lib::internal_event::Registered; +use vector_lib::{ + codecs::{ + decoding::{DeserializerConfig, FramingConfig, NewlineDelimitedDecoderOptions}, + NewlineDelimitedDecoderConfig, + }, + config::LegacyKey, + internal_event::{ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol}, + sensitive_string::SensitiveString, +}; + +use crate::{ + azure::ClientCredentials, + codecs::{Decoder, DecodingConfig}, + config::{ + LogNamespace, SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput, + }, + event::{BatchNotifier, BatchStatus, EstimatedJsonEncodedSizeOf, Event}, + internal_events::{ + EventsReceived, InvalidRowEventType, QueueMessageProcessingErrored, + QueueMessageProcessingRejected, QueueMessageProcessingSucceeded, StreamClosedError, + }, + serde::{bool_or_struct, default_decoding}, + shutdown::ShutdownSignal, + sinks::prelude::configurable_component, + sources::azure_blob::queue::make_azure_row_stream, + SourceSender, +}; + +#[cfg(all(test, feature = "azure-blob-source-integration-tests"))] +mod integration_tests; +pub mod queue; +#[cfg(test)] +mod test; + +/// Strategies for consuming objects from Azure Storage. +#[configurable_component] +#[derive(Clone, Copy, Debug, Derivative)] +#[serde(rename_all = "lowercase")] +#[derivative(Default)] +enum Strategy { + /// Consumes objects by processing events sent to an [Azure Storage Queue][azure_storage_queue]. + /// + /// [azure_storage_queue]: https://learn.microsoft.com/en-us/azure/storage/queues/storage-queues-introduction + StorageQueue, + + /// This is a test strategy used only of development and PoC. Should be removed + /// once development is done. + #[derivative(Default)] + Test, +} + +/// WIP +/// A dummy implementation is used as a starter. +/// The source will send dummy messages at a fixed interval, incrementing a counter every +/// exec_interval_secs seconds. +#[configurable_component(source("azure_blob", "Collect logs from Azure Container."))] +#[derive(Clone, Debug, Derivative)] +#[derivative(Default)] +#[serde(default, deny_unknown_fields)] +pub struct AzureBlobConfig { + /// The namespace to use for logs. This overrides the global setting. + #[configurable(metadata(docs::hidden))] + #[serde(default)] + log_namespace: Option, + + /// The interval, in seconds, between subsequent dummy messages + #[serde(default = "default_exec_interval_secs")] + exec_interval_secs: u64, + + /// The strategy to use to consume objects from Azure Storage. + #[configurable(metadata(docs::hidden))] + strategy: Strategy, + + /// Configuration options for Storage Queue. + queue: Option, + + /// The Azure Blob Storage Account connection string. + /// + /// Authentication with access key is the only supported authentication method. + /// + /// Either `storage_account`, or this field, must be specified. + #[configurable(metadata( + docs::examples = "DefaultEndpointsProtocol=https;AccountName=mylogstorage;AccountKey=storageaccountkeybase64encoded;EndpointSuffix=core.windows.net" + ))] + pub connection_string: Option, + + /// The Azure Blob Storage Account name. + /// + /// Attempts to load credentials for the account in the following ways, in order: + /// + /// - read from environment variables ([more information][env_cred_docs]) + /// - looks for a [Managed Identity][managed_ident_docs] + /// - uses the `az` CLI tool to get an access token ([more information][az_cli_docs]) + /// + /// Either `connection_string`, or this field, must be specified. + /// + /// [env_cred_docs]: https://docs.rs/azure_identity/latest/azure_identity/struct.EnvironmentCredential.html + /// [managed_ident_docs]: https://docs.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/overview + /// [az_cli_docs]: https://docs.microsoft.com/en-us/cli/azure/account?view=azure-cli-latest#az-account-get-access-token + #[configurable(metadata(docs::examples = "mylogstorage"))] + pub storage_account: Option, + + #[configurable(derived)] + pub client_credentials: Option, + + /// The Azure Blob Storage Endpoint URL. + /// + /// This is used to override the default blob storage endpoint URL in cases where you are using + /// credentials read from the environment/managed identities or access tokens without using an + /// explicit connection_string (which already explicitly supports overriding the blob endpoint + /// URL). + /// + /// This may only be used with `storage_account` and is ignored when used with + /// `connection_string`. + #[configurable(metadata(docs::examples = "https://test.blob.core.usgovcloudapi.net/"))] + #[configurable(metadata(docs::examples = "https://test.blob.core.windows.net/"))] + pub endpoint: Option, + + /// The Azure Blob Storage Account container name. + #[configurable(metadata(docs::examples = "my-logs"))] + pub(super) container_name: String, + + #[configurable(derived)] + #[serde(default, deserialize_with = "bool_or_struct")] + pub acknowledgements: SourceAcknowledgementsConfig, + + #[configurable(derived)] + #[serde(default = "default_decoding")] + #[derivative(Default(value = "default_decoding()"))] + pub decoding: DeserializerConfig, +} + +impl_generate_config_from_default!(AzureBlobConfig); + +impl AzureBlobConfig { + /// Self validation + pub fn validate(&self) -> crate::Result<()> { + match self.strategy { + Strategy::StorageQueue => { + if self.queue.is_none() || self.queue.as_ref().unwrap().queue_name.is_empty() { + return Err("Azure event grid queue must be set.".into()); + } + if self.storage_account.clone().unwrap_or_default().is_empty() + && self + .connection_string + .clone() + .unwrap_or_default() + .inner() + .is_empty() + { + return Err("Azure Storage Account or Connection String must be set.".into()); + } + if self.container_name.is_empty() { + return Err("Azure Container must be set.".into()); + } + } + Strategy::Test => { + if self.exec_interval_secs == 0 { + return Err("exec_interval_secs must be greater than 0".into()); + } + } + } + + Ok(()) + } +} + +type BlobStream = Pin> + Send>>; + +pub struct BlobPack { + row_stream: BlobStream, + success_handler: Box Pin + Send>> + Send>, +} + +type BlobPackStream = Pin + Send>>; + +struct AzureBlobStreamer { + shutdown: ShutdownSignal, + out: SourceSender, + log_namespace: LogNamespace, + acknowledge: bool, + decoder: Decoder, + bytes_received: Registered, + events_received: Registered, +} + +impl AzureBlobStreamer { + pub fn new( + shutdown: ShutdownSignal, + out: SourceSender, + log_namespace: LogNamespace, + acknowledge: bool, + decoding: DeserializerConfig, + ) -> crate::Result { + Ok(Self { + shutdown, + out, + log_namespace: log_namespace.clone(), + acknowledge, + decoder: { + let framing = FramingConfig::NewlineDelimited(NewlineDelimitedDecoderConfig { + newline_delimited: NewlineDelimitedDecoderOptions { max_length: None }, + }); + DecodingConfig::new(framing, decoding, log_namespace).build()? + }, + bytes_received: register!(BytesReceived::from(Protocol::HTTP)), + events_received: register!(EventsReceived), + }) + } + + pub async fn run_streaming(mut self, mut blob_stream: BlobPackStream) -> Result<(), ()> { + debug!("Starting Azure streaming."); + + loop { + select! { + blob_pack = blob_stream.next() => { + match blob_pack{ + Some(blob_pack) => { + self.process_blob_pack(blob_pack).await?; + } + None => { + break; // end of stream + } + } + }, + _ = self.shutdown.clone() => { + break; + } + } + } + + Ok(()) + } + + async fn process_blob_pack(&mut self, blob_pack: BlobPack) -> Result<(), ()> { + let (batch, receiver) = BatchNotifier::maybe_new_with_receiver(self.acknowledge); + let mut row_stream = blob_pack.row_stream; + let mut output_stream = { + let bytes_received = self.bytes_received.clone(); + let events_received = self.events_received.clone(); + let log_namespace = self.log_namespace.clone(); + let decoder = self.decoder.clone(); + stream! { + // TODO: consider selecting with a shutdown + while let Some(row) = row_stream.next().await { + bytes_received.emit(ByteSize(row.len())); + let deser_result = decoder.deserializer_parse(Bytes::from(row)); + if deser_result.is_err(){ + continue; + } + // Error handling is done above, so we don't mind doing unwrap. + let (events, _) = deser_result.unwrap(); + for mut event in events.into_iter(){ + event = event.with_batch_notifier_option(&batch); + match event { + Event::Log(ref mut log_event) => { + log_namespace.insert_source_metadata( + AzureBlobConfig::NAME, + log_event, + Some(LegacyKey::Overwrite("ingest_timestamp")), + path!("ingest_timestamp"), + chrono::Utc::now().to_rfc3339(), + ); + events_received.emit(CountByteSize(1, event.estimated_json_encoded_size_of())); + yield event + } + _ => { + emit!(InvalidRowEventType{event: &event}) + } + } + } + } + // Explicitly dropping to showcase that the status of the batch is sent to the channel. + drop(batch); + }.boxed() + }; + + // Return if send was unsuccessful. + if let Err(send_error) = self.out.send_event_stream(&mut output_stream).await { + // TODO: consider dedicated error. + error!("Failed to send event stream: {}.", send_error); + let (count, _) = output_stream.size_hint(); + emit!(StreamClosedError { count }); + return Ok(()); + } + + // dropping like s3 sender + drop(output_stream); // TODO: better explanation + + // Run success handler if there are no errors in send or acknowledgement. + match receiver { + None => (blob_pack.success_handler)().await, + Some(receiver) => { + let result = receiver.await; + match result { + BatchStatus::Delivered => { + (blob_pack.success_handler)().await; + emit!(QueueMessageProcessingSucceeded {}); + } + BatchStatus::Errored => { + emit!(QueueMessageProcessingErrored {}); + } + BatchStatus::Rejected => { + // TODO: consider allowing rejected events wihtout retrying, like s3 + emit!(QueueMessageProcessingRejected {}); + } + } + } + } + + Ok(()) + } +} + +#[async_trait::async_trait] +#[typetag::serde(name = "azure_blob")] +impl SourceConfig for AzureBlobConfig { + async fn build(&self, cx: SourceContext) -> crate::Result { + self.validate()?; + let azure_blob_streamer = AzureBlobStreamer::new( + cx.shutdown.clone(), + cx.out.clone(), + cx.log_namespace(self.log_namespace), + cx.do_acknowledgements(self.acknowledgements), + self.decoding.clone(), + )?; + + let blob_pack_stream: BlobPackStream = match self.strategy { + Strategy::Test => { + // streaming incremented numbers periodically + let exec_interval_secs = self.exec_interval_secs; + let shutdown = cx.shutdown.clone(); + stream! { + let schedule = Duration::from_secs(exec_interval_secs); + let mut counter = 0; + let mut interval = IntervalStream::new(time::interval(schedule)).take_until(shutdown); + while interval.next().await.is_some() { + counter += 1; + let counter_copy = counter; + yield BlobPack { + row_stream: stream! { + for i in 0..=counter { + yield format!("{}:{}", counter, i).into_bytes(); + } + }.boxed(), + success_handler: Box::new(move || { + Box::pin(async move { + debug!("Successfully processed blob pack for counter {}.", counter_copy); + }) + }), + } + } + }.boxed() + } + Strategy::StorageQueue => make_azure_row_stream(self, cx.shutdown.clone())?, + }; + Ok(Box::pin( + azure_blob_streamer.run_streaming(blob_pack_stream), + )) + } + + fn outputs(&self, global_log_namespace: LogNamespace) -> Vec { + let log_namespace = global_log_namespace.merge(self.log_namespace); + let schema_definition = self + .decoding + .schema_definition(log_namespace) + .with_standard_vector_source_metadata(); + + vec![SourceOutput::new_maybe_logs( + self.decoding.output_type(), + schema_definition, + )] + } + + fn can_acknowledge(&self) -> bool { + true + } +} + +fn default_exec_interval_secs() -> u64 { + 1 +} diff --git a/src/sources/azure_blob/queue.rs b/src/sources/azure_blob/queue.rs new file mode 100644 index 0000000000000..8754c3443ef4d --- /dev/null +++ b/src/sources/azure_blob/queue.rs @@ -0,0 +1,363 @@ +use std::{ + io::{BufRead, BufReader, Cursor}, + panic, + sync::Arc, +}; + +use anyhow::anyhow; +use async_stream::stream; +use azure_core; +use azure_storage_blobs::prelude::ContainerClient; +use azure_storage_queues::{operations::Message, QueueClient}; +use base64::{prelude::BASE64_STANDARD, Engine}; +use futures::stream::StreamExt; +use serde::Deserialize; +use serde_with::serde_as; +use snafu::Snafu; +use tokio::{select, time}; + +use vector_lib::{ + configurable::configurable_component, + internal_event::{ByteSize, BytesReceived, InternalEventHandle, Protocol, Registered}, +}; + +use crate::{ + azure, + internal_events::{ + QueueMessageDeleteError, QueueMessageProcessingError, QueueMessageReceiveError, + QueueStorageInvalidEventIgnored, QueueStorageMismatchingContainerName, BlobDoesntExist, + }, + shutdown::ShutdownSignal, + sources::azure_blob::{AzureBlobConfig, BlobPack, BlobPackStream}, +}; + +/// Azure Queue configuration options. +#[serde_as] +#[configurable_component] +#[derive(Clone, Debug, Derivative)] +#[derivative(Default)] +#[serde(deny_unknown_fields)] +pub(super) struct Config { + /// The name of the storage queue to poll for events. + pub(super) queue_name: String, + + /// How long to wait while polling the event grid queue for new messages, in seconds. + /// + // NOTE: We restrict this to u32 for safe conversion to i32 later. + #[serde(default = "default_poll_secs")] + #[derivative(Default(value = "default_poll_secs()"))] + #[configurable(metadata(docs::type_unit = "seconds"))] + pub(super) poll_secs: u32, +} + +pub fn make_azure_row_stream( + cfg: &AzureBlobConfig, + shutdown: ShutdownSignal, +) -> crate::Result { + let queue_client = make_queue_client(cfg)?; + let container_client = make_container_client(cfg)?; + let bytes_received = register!(BytesReceived::from(Protocol::HTTP)); + let poll_interval = std::time::Duration::from_secs( + cfg.queue + .as_ref() + .ok_or(anyhow!("Missing Event Grid queue config."))? + .poll_secs as u64, + ); + + Ok(Box::pin(stream! { + // TODO: add a way to stop this loop, possibly with shutdown + loop { + let messages = match queue_client.get_messages().number_of_messages(num_messages()).await { + Ok(messages) => messages, + Err(e) => { + emit!(QueueMessageReceiveError{error: &e}); + continue; + } + }; + if !messages.messages.is_empty() { + for message in messages.messages { + let msg_id = message.message_id.clone(); + match proccess_event_grid_message( + message, + &container_client, + &queue_client, + bytes_received.clone() + ).await { + Ok(blob_pack) => { + match blob_pack { + None => trace!("Message {msg_id} is ignored, \ + no blob stream stream created from it. \ + Will retry on next message."), + Some(bp) => yield bp + } + }, + Err(e) => { + emit!(QueueMessageProcessingError{ + error: &e, + message_id: &msg_id + }); + } + } + } + } else { + // sleep or shutdown + select! { + _ = shutdown.clone() => { + info!("Shutdown signal received, terminating azure row stream."); + break; + }, + _ = time::sleep(poll_interval) => { } + } + } + } + })) +} + +pub fn make_queue_client(cfg: &AzureBlobConfig) -> crate::Result> { + let q = cfg.queue.clone().ok_or("Missing queue.")?; + azure::build_queue_client( + cfg.connection_string + .as_ref() + .map(|v| v.inner().to_string()), + cfg.storage_account.as_ref().map(|v| v.to_string()), + q.queue_name.clone(), + cfg.endpoint.clone(), + cfg.client_credentials.clone(), + ) +} + +pub fn make_container_client(cfg: &AzureBlobConfig) -> crate::Result> { + azure::build_container_client( + cfg.connection_string + .as_ref() + .map(|v| v.inner().to_string()), + cfg.storage_account.as_ref().map(|v| v.to_string()), + cfg.container_name.clone(), + cfg.endpoint.clone(), + cfg.client_credentials.clone(), + ) +} + +#[derive(Clone, Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +struct AzureStorageEvent { + pub subject: String, + pub event_type: String, +} + +#[derive(Debug, Snafu)] +pub enum ProcessingError { + #[snafu(display("Could not decode Queue message with id {}: {}", message_id, error))] + InvalidQueueMessage { + error: serde_json::Error, + message_id: String, + }, + + #[snafu(display("Failed to base64 decode message: {}", error))] + FailedDecodingMessageBase64 { error: base64::DecodeError }, + + #[snafu(display("Failed to utf8 decode message: {}", error))] + FailedDecodingUTF8 { error: std::string::FromUtf8Error }, + + #[snafu(display("Failed to get blob: {}", error))] + FailedToGetBlob { error: azure_core::Error }, + + #[snafu(display("Failed to parse {} as subject", subject))] + FailedToParseSubject { subject: String }, +} + +async fn proccess_event_grid_message( + message: Message, + container_client: &ContainerClient, + queue_client: &QueueClient, + bytes_received: Registered, +) -> Result, ProcessingError> { + let msg_id = message.message_id.clone(); + let decoded_bytes = BASE64_STANDARD + .decode(&message.message_text) + .map_err(|e| ProcessingError::FailedDecodingMessageBase64 { error: e })?; + let decoded_string = String::from_utf8(decoded_bytes) + .map_err(|e| ProcessingError::FailedDecodingUTF8 { error: e })?; + let body: AzureStorageEvent = serde_json::from_str(decoded_string.as_str()).map_err(|e| { + ProcessingError::InvalidQueueMessage { + error: e, + message_id: msg_id, + } + })?; + if body.event_type != "Microsoft.Storage.BlobCreated" { + emit!(QueueStorageInvalidEventIgnored { + container: container_client.container_name(), + subject: &body.subject, + event_type: &body.event_type, + }); + return Ok(None); + } + match parse_subject(body.subject.clone()) { + Some((container, blob)) => { + if container != container_client.container_name() { + emit!(QueueStorageMismatchingContainerName { + configured_container: container_client.container_name(), + container: container.as_str(), + }); + + return Ok(None); + } + trace!( + "Detected new blob creation in container '{}': '{}'", + &container, + &blob + ); + let blob_client = container_client.blob_client(blob); + let mut result: Vec = vec![]; + let mut stream = blob_client.get().into_stream(); + while let Some(value) = stream.next().await { + match value { + Ok(response) => { + let mut body = response.data; + while let Some(value) = body.next().await { + match value { + Ok(chunk) => result.extend(&chunk), + Err(e) => { + // This should now happen as long as `next()` is working + // correctly. Leaving just a safeguard, not to crash Vector. + trace!("Failed to read body chunk: {}", e); + break; + } + } + } + } + Err(e) => { + if let Some(http_error) = e.as_http_error() { + if http_error.status() == azure_core::StatusCode::NotFound { + emit!(BlobDoesntExist{ + nonexistent_blob_name: blob_client.blob_name(), + }); + remove_message_from_queue(queue_client, message).await; + return Ok(None); + } + } + return Err(ProcessingError::FailedToGetBlob { error: e }); + } + } + } + + let reader = Cursor::new(result); + let buffered = BufReader::new(reader); + let queue_client_copy = queue_client.clone(); + let bytes_received_copy = bytes_received.clone(); + + Ok(Some(BlobPack { + row_stream: Box::pin(stream! { + for line in buffered.lines() { + let line = line.map(|line| line.as_bytes().to_vec()); + let line = match line { + Ok(l) => l, + Err(e) => { + error!("Failed to map line: {}", e); + break; + } + }; + bytes_received_copy.emit(ByteSize(line.len())); + yield line; + } + }), + success_handler: Box::new(|| { + Box::pin(async move { + remove_message_from_queue(&queue_client_copy, message).await; + }) + }), + })) + } + None => { + return Err(ProcessingError::FailedToParseSubject { + subject: body.subject, + }); + } + } +} + +fn parse_subject(subject: String) -> Option<(String, String)> { + let parts: Vec<&str> = subject.split('/').collect(); + if parts.len() < 7 { + warn!("Ignoring event because of wrong subject format"); + return None; + } + let container = parts[4]; + let blob = parts[6..].join("/"); + Some((container.to_string(), blob)) +} + +const fn default_poll_secs() -> u32 { + 15 +} + +// Number of messages to consume from the queue at once. This is the maximum +// value allowed by the Azure API. +const fn num_messages() -> u8 { + 32 +} + +async fn remove_message_from_queue(queue_client: &QueueClient, message: Message) { + _ = queue_client.pop_receipt_client(message).delete().await.inspect_err(move |e| { + emit!(QueueMessageDeleteError { error: &e }) + }) +} + +#[test] +fn test_azure_storage_event() { + let event_value: AzureStorageEvent = serde_json::from_str( + r#"{ + "topic": "/subscriptions/fa5f2180-1451-4461-9b1f-aae7d4b33cf8/resourceGroups/events_poc/providers/Microsoft.Storage/storageAccounts/eventspocaccount", + "subject": "/blobServices/default/containers/content/blobs/foo", + "eventType": "Microsoft.Storage.BlobCreated", + "id": "be3f21f7-201e-000b-7605-a29195062628", + "data": { + "api": "PutBlob", + "clientRequestId": "1fa42c94-6dd3-4172-95c4-fd9cf56b5009", + "requestId": "be3f21f7-201e-000b-7605-a29195000000", + "eTag": "0x8DC701C5D3FFDF6", + "contentType": "application/octet-stream", + "contentLength": 0, + "blobType": "BlockBlob", + "url": "https://eventspocaccount.blob.core.windows.net/content/foo", + "sequencer": "0000000000000000000000000005C5360000000000276a63", + "storageDiagnostics": { + "batchId": "fec5b12c-2006-0034-0005-a25936000000" + } + }, + "dataVersion": "", + "metadataVersion": "1", + "eventTime": "2024-05-09T11:37:10.5637878Z" + }"#, + ).unwrap(); + + assert_eq!( + event_value.subject, + "/blobServices/default/containers/content/blobs/foo".to_string() + ); + assert_eq!( + event_value.event_type, + "Microsoft.Storage.BlobCreated".to_string() + ); +} + +#[test] +fn test_parse_subject_no_dir() { + let subject = "/blobServices/default/containers/content/blobs/foo".to_string(); + let result = parse_subject(subject); + assert_eq!(result, Some(("content".to_string(), "foo".to_string()))); +} + +#[test] +fn test_parse_subject_with_dirs() { + let subject = "/blobServices/default/containers/insights-logs-signinlogs/blobs/tenantId=0e35ee7a-425d-45a5-9013-218c1eae8fd4/y=2024/m=06/d=20/h=05/m=00/PT1H.json".to_string(); + let result = parse_subject(subject); + assert_eq!( + result, + Some(( + "insights-logs-signinlogs".to_string(), + "tenantId=0e35ee7a-425d-45a5-9013-218c1eae8fd4/y=2024/m=06/d=20/h=05/m=00/PT1H.json" + .to_string() + )) + ); +} diff --git a/src/sources/azure_blob/test.rs b/src/sources/azure_blob/test.rs new file mode 100644 index 0000000000000..83b0e820ca02d --- /dev/null +++ b/src/sources/azure_blob/test.rs @@ -0,0 +1,95 @@ +use super::*; +use crate::{ + config::LogNamespace, event::EventStatus, serde::default_decoding, shutdown::ShutdownSignal, + test_util::collect_n, SourceSender, +}; +use tokio::{select, sync::oneshot}; + +#[tokio::test] +async fn test_messages_delivered() { + let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered); + let streamer = super::AzureBlobStreamer::new( + ShutdownSignal::noop(), + tx, + LogNamespace::Vector, + true, + default_decoding(), + ); + let mut streamer = streamer.expect("Failed to create streamer"); + let (success_sender, success_receiver) = oneshot::channel(); + let blob_pack = BlobPack { + row_stream: Box::pin(stream! { + let lines = vec!["foo", "bar"]; + for line in lines { + yield line.as_bytes().to_vec(); + } + }), + success_handler: Box::new(move || { + Box::pin(async move { + success_sender.send(()).unwrap(); + }) + }), + }; + let (events_collector, events_receiver) = oneshot::channel(); + tokio::spawn(async move { + events_collector.send(collect_n(rx, 2).await).unwrap(); + }); + streamer + .process_blob_pack(blob_pack) + .await + .expect("Failed processing blob pack"); + + let events = select! { + value = events_receiver => value.expect("Failed to receive events"), + _ = time::sleep(Duration::from_secs(5)) => panic!("Timeout waiting for events"), + }; + assert_eq!(events[0].as_log().value().to_string(), "\"foo\""); + assert_eq!(events[1].as_log().value().to_string(), "\"bar\""); + select! { + _ = success_receiver => {} + _ = time::sleep(Duration::from_secs(5)) => panic!("Timeout waiting for success handler"), + } +} + +#[tokio::test] +async fn test_messages_rejected() { + let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Rejected); + let streamer = super::AzureBlobStreamer::new( + ShutdownSignal::noop(), + tx, + LogNamespace::Vector, + true, + default_decoding(), + ); + let mut streamer = streamer.expect("Failed to create streamer"); + let (success_sender, mut success_receiver) = oneshot::channel(); + let blob_pack = BlobPack { + row_stream: Box::pin(stream! { + let lines = vec!["foo", "bar"]; + for line in lines { + yield line.as_bytes().to_vec(); + } + }), + success_handler: Box::new(move || { + Box::pin(async move { + success_sender.send(()).unwrap(); + }) + }), + }; + let (events_collector, events_receiver) = oneshot::channel(); + tokio::spawn(async move { + events_collector.send(collect_n(rx, 2).await).unwrap(); + }); + streamer + .process_blob_pack(blob_pack) + .await + .expect("Failed processing blob pack"); + + let events = select! { + value = events_receiver => value.expect("Failed to receive events"), + _ = time::sleep(Duration::from_secs(5)) => panic!("Timeout waiting for events"), + }; + assert_eq!(events[0].as_log().value().to_string(), "\"foo\""); + assert_eq!(events[1].as_log().value().to_string(), "\"bar\""); + assert!(success_receiver.try_recv().is_err()); // assert success handler not called +} diff --git a/src/sources/mod.rs b/src/sources/mod.rs index 8a86d9d6343d0..822a76c953b09 100644 --- a/src/sources/mod.rs +++ b/src/sources/mod.rs @@ -13,6 +13,8 @@ pub mod aws_kinesis_firehose; pub mod aws_s3; #[cfg(feature = "sources-aws_sqs")] pub mod aws_sqs; +#[cfg(feature = "sources-azure_blob")] +pub mod azure_blob; #[cfg(feature = "sources-datadog_agent")] pub mod datadog_agent; #[cfg(feature = "sources-demo_logs")]