diff --git a/sdk/storage/azure_storage_blob/src/client_impl/block_blob_client.rs b/sdk/storage/azure_storage_blob/src/client_impl/block_blob_client.rs new file mode 100644 index 0000000000..25209ab813 --- /dev/null +++ b/sdk/storage/azure_storage_blob/src/client_impl/block_blob_client.rs @@ -0,0 +1,185 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +use std::num::NonZero; + +use async_trait::async_trait; +use azure_core::http::Body; +use futures::lock::Mutex; +use uuid::Uuid; + +use crate::{ + generated::BlockBlobClient, + models::{ + BlockBlobClientCommitBlockListOptions, BlockBlobClientManagedUploadOptions, + BlockBlobClientStageBlockOptions, BlockLookupList, + }, + partitioned_transfer::{self, PartitionedUploadBehavior}, +}; + +type AzureResult = azure_core::Result; + +// unwrap evaluated at compile time +const DEFAULT_PARALLEL: NonZero = NonZero::new(4).unwrap(); +const DEFAULT_PARTITION_SIZE: NonZero = NonZero::new(4 * 1024 * 1024).unwrap(); + +// implement this on handwritten client for now +impl crate::BlockBlobClient { + pub async fn managed_upload( + &self, + body: Body, + options: Option>, + ) -> AzureResult<()> { + self.client.managed_upload(body, options).await + } +} + +impl BlockBlobClient { + pub async fn managed_upload( + &self, + body: Body, + options: Option>, + ) -> AzureResult<()> { + let options = options.unwrap_or_default(); + let parallel = options.parallel.unwrap_or(DEFAULT_PARALLEL); + let partition_size = options.partition_size.unwrap_or(DEFAULT_PARTITION_SIZE); + // construct exhaustively to ensure we catch new options when added + let stage_block_options = BlockBlobClientStageBlockOptions { + encryption_algorithm: options.encryption_algorithm, + encryption_key: options.encryption_key.clone(), + encryption_key_sha256: options.encryption_key_sha256.clone(), + encryption_scope: options.encryption_scope.clone(), + lease_id: options.lease_id.clone(), + method_options: options.method_options.clone(), + structured_body_type: None, + structured_content_length: None, + timeout: options.per_request_timeout, + transactional_content_crc64: None, + transactional_content_md5: None, + }; + let commit_block_list_options = BlockBlobClientCommitBlockListOptions { + blob_cache_control: options.blob_cache_control, + blob_content_disposition: options.blob_content_disposition, + blob_content_encoding: options.blob_content_encoding, + blob_content_language: options.blob_content_language, + blob_content_md5: options.blob_content_md5, + blob_content_type: options.blob_content_type, + blob_tags_string: options.blob_tags_string, + encryption_algorithm: options.encryption_algorithm, + encryption_key: options.encryption_key, + encryption_key_sha256: options.encryption_key_sha256, + encryption_scope: options.encryption_scope, + if_match: options.if_match, + if_modified_since: options.if_modified_since, + if_none_match: options.if_none_match, + if_tags: options.if_tags, + if_unmodified_since: options.if_unmodified_since, + immutability_policy_expiry: options.immutability_policy_expiry, + immutability_policy_mode: options.immutability_policy_mode, + lease_id: options.lease_id, + legal_hold: options.legal_hold, + metadata: options.metadata, + method_options: options.method_options, + tier: options.tier, + timeout: options.per_request_timeout, + transactional_content_crc64: None, + transactional_content_md5: None, + }; + partitioned_transfer::upload( + body, + parallel, + partition_size, + &BlockBlobClientUploadBehavior::new( + self, + stage_block_options, + commit_block_list_options, + ), + ) + .await + } +} + +struct BlockInfo { + offset: u64, + block_id: Uuid, +} + +struct BlockBlobClientUploadBehavior<'c, 'opt> { + client: &'c BlockBlobClient, + stage_block_options: BlockBlobClientStageBlockOptions<'opt>, + commit_block_list_options: BlockBlobClientCommitBlockListOptions<'opt>, + blocks: Mutex>, +} + +impl<'c, 'opt> BlockBlobClientUploadBehavior<'c, 'opt> { + fn new( + client: &'c BlockBlobClient, + stage_block_options: BlockBlobClientStageBlockOptions<'opt>, + commit_block_list_options: BlockBlobClientCommitBlockListOptions<'opt>, + ) -> Self { + Self { + client, + stage_block_options, + commit_block_list_options, + blocks: Mutex::new(vec![]), + } + } +} + +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +impl PartitionedUploadBehavior for BlockBlobClientUploadBehavior<'_, '_> { + async fn transfer_oneshot(&self, content: Body) -> AzureResult<()> { + let content_len = content.len().try_into().unwrap(); + self.client + .upload(content.into(), content_len, None) + .await?; + Ok(()) + } + + async fn transfer_partition(&self, offset: usize, content: Body) -> AzureResult<()> { + let block_id = Uuid::new_v4(); + let content_len = content.len().try_into().unwrap(); + { + self.blocks.lock().await.push(BlockInfo { + offset: offset as u64, + block_id, + }); + } + self.client + .stage_block( + block_id.as_bytes(), + content_len, + content.into(), + Some(self.stage_block_options.clone()), + ) + .await?; + Ok(()) + } + + async fn initialize(&self, _content_len: usize) -> AzureResult<()> { + Ok(()) + } + + async fn finalize(&self) -> AzureResult<()> { + let mut blocks = self.blocks.lock().await; + blocks.sort_by(|left, right| left.offset.cmp(&right.offset)); + let blocklist = BlockLookupList { + latest: Some( + blocks + .iter() + .map(|bi| bi.block_id.as_bytes().to_vec()) + .collect(), + ), + ..Default::default() + }; + self.client + .commit_block_list( + blocklist.try_into()?, + Some(self.commit_block_list_options.clone()), + ) + .await?; + + Ok(()) + } +} diff --git a/sdk/storage/azure_storage_blob/src/client_impl/mod.rs b/sdk/storage/azure_storage_blob/src/client_impl/mod.rs new file mode 100644 index 0000000000..ccb11553e9 --- /dev/null +++ b/sdk/storage/azure_storage_blob/src/client_impl/mod.rs @@ -0,0 +1 @@ +pub mod block_blob_client; diff --git a/sdk/storage/azure_storage_blob/src/clients/block_blob_client.rs b/sdk/storage/azure_storage_blob/src/clients/block_blob_client.rs index bd1973e8ba..7a4c3e6c39 100644 --- a/sdk/storage/azure_storage_blob/src/clients/block_blob_client.rs +++ b/sdk/storage/azure_storage_blob/src/clients/block_blob_client.rs @@ -27,7 +27,7 @@ use std::sync::Arc; /// A client to interact with a specific Azure storage Block blob, although that blob may not yet exist. pub struct BlockBlobClient { - pub(super) client: GeneratedBlockBlobClient, + pub(crate) client: GeneratedBlockBlobClient, } impl GeneratedBlockBlobClient { diff --git a/sdk/storage/azure_storage_blob/src/lib.rs b/sdk/storage/azure_storage_blob/src/lib.rs index abf4db548b..cc53b34e9c 100644 --- a/sdk/storage/azure_storage_blob/src/lib.rs +++ b/sdk/storage/azure_storage_blob/src/lib.rs @@ -7,6 +7,7 @@ #![allow(dead_code)] #![cfg_attr(docsrs, feature(doc_cfg))] +mod client_impl; pub mod clients; #[allow(unused_imports)] mod generated; @@ -14,6 +15,7 @@ mod parsers; mod partitioned_transfer; mod pipeline; mod streams; +pub use client_impl::*; pub use clients::*; pub use parsers::*; pub mod models; diff --git a/sdk/storage/azure_storage_blob/src/models/mod.rs b/sdk/storage/azure_storage_blob/src/models/mod.rs index f1ce53e7cb..7b6114182c 100644 --- a/sdk/storage/azure_storage_blob/src/models/mod.rs +++ b/sdk/storage/azure_storage_blob/src/models/mod.rs @@ -4,6 +4,10 @@ pub(crate) mod content_range; mod extensions; +use azure_core::{fmt::SafeDebug, http::ClientMethodOptions}; +use std::{collections::HashMap, num::NonZero}; +use time::OffsetDateTime; + pub use crate::generated::models::{ AccessPolicy, AccessTier, AccountKind, AppendBlobClientAppendBlockFromUrlOptions, AppendBlobClientAppendBlockFromUrlResult, AppendBlobClientAppendBlockFromUrlResultHeaders, @@ -81,3 +85,105 @@ pub use crate::generated::models::{ SignedIdentifiers, SignedIdentifiersHeaders, SkuName, StaticWebsite, StorageErrorCode, StorageServiceStats, StorageServiceStatsHeaders, UserDelegationKey, UserDelegationKeyHeaders, }; + +/// Options to be passed to `BlockBlobClient::managed_upload()` +#[derive(Clone, Default, SafeDebug)] +pub struct BlockBlobClientManagedUploadOptions<'a> { + /// Optional. Sets the blob's cache control. If specified, this property is stored with the blob and returned with a read + /// request. + pub blob_cache_control: Option, + + /// Optional. Sets the blob's content disposition. If specified, this property is stored with the blob and returned with a + /// read request. + pub blob_content_disposition: Option, + + /// Optional. Sets the blob's content encoding. If specified, this property is stored with the blob and returned with a read + /// request. + pub blob_content_encoding: Option, + + /// Optional. Set the blob's content language. If specified, this property is stored with the blob and returned with a read + /// request. + pub blob_content_language: Option, + + /// Optional. An MD5 hash of the blob content. Note that this hash is not validated, as the hashes for the individual blocks + /// were validated when each was uploaded. + pub blob_content_md5: Option>, + + /// Optional. Sets the blob's content type. If specified, this property is stored with the blob and returned with a read request. + pub blob_content_type: Option, + + /// Optional. Used to set blob tags in various blob operations. + pub blob_tags_string: Option, + + /// Optional. Version 2019-07-07 and later. Specifies the algorithm to use for encryption. If not specified, the default is + /// AES256. + pub encryption_algorithm: Option, + + /// Optional. Version 2019-07-07 and later. Specifies the encryption key to use to encrypt the data provided in the request. + /// If not specified, the request will be encrypted with the root account key. + pub encryption_key: Option, + + /// Optional. Version 2019-07-07 and later. Specifies the SHA256 hash of the encryption key used to encrypt the data provided + /// in the request. This header is only used for encryption with a customer-provided key. If the request is authenticated + /// with a client token, this header should be specified using the SHA256 hash of the encryption key. + pub encryption_key_sha256: Option, + + /// Optional. Version 2019-07-07 and later. Specifies the encryption scope to use to encrypt the data provided in the request. + /// If not specified, the request will be encrypted with the root account key. + pub encryption_scope: Option, + + /// Optional. Specifies the date time when the blobs immutability policy is set to expire. + pub immutability_policy_expiry: Option, + + /// Optional. Specifies the immutability policy mode to set on the blob. + pub immutability_policy_mode: Option, + + /// Optional. Applied only to the final commit of the new block blob. + /// A condition that must be met in order for the request to be processed. + pub if_match: Option, + + /// Optional. Applied only to the final commit of the new block blob. + /// A date-time value. A request is made under the condition that the resource has been modified since the specified date-time. + pub if_modified_since: Option, + + /// Optional. Applied only to the final commit of the new block blob. + /// A condition that must be met in order for the request to be processed. + pub if_none_match: Option, + + /// Optional. Applied only to the final commit of the new block blob. + /// Specify a SQL where clause on blob tags to operate only on blobs with a matching value. + pub if_tags: Option, + + /// Optional. Applied only to the final commit of the new block blob. + /// A date-time value. A request is made under the condition that the resource has not been modified since the specified date-time. + pub if_unmodified_since: Option, + + /// Optional. Applied to all requests. + /// If specified, the operation only succeeds if the resource's lease is active and matches this ID. + pub lease_id: Option, + + /// Optional. Specified if a legal hold should be set on the blob. + pub legal_hold: Option, + + /// Optional. The metadata headers. + pub metadata: Option>, + + /// Optional. Allows customization of the method call. + pub method_options: ClientMethodOptions<'a>, + + /// Optional. Number of concurrent network transfers to maintain for this operation. + /// A default value will be chosen if none is provided. + pub parallel: Option>, + + /// Optional. Size to partition data into. + /// A default value will be chosen if none is provided. + pub partition_size: Option>, + + /// Optional. The server-side timeout to apply on each individual request. This is not a timeout for the whole operation. + /// The timeout parameter is expressed in seconds. For more information, see + /// [Setting Timeouts for Blob Service Operations.](https://docs.microsoft.com/rest/api/storageservices/fileservices/setting-timeouts-for-blob-service-operations) + pub per_request_timeout: Option, + + /// Optional. The tier to be set on the blob. + pub tier: Option, +} diff --git a/sdk/storage/azure_storage_blob/src/partitioned_transfer/mod.rs b/sdk/storage/azure_storage_blob/src/partitioned_transfer/mod.rs index 858fc0324a..064a570de6 100644 --- a/sdk/storage/azure_storage_blob/src/partitioned_transfer/mod.rs +++ b/sdk/storage/azure_storage_blob/src/partitioned_transfer/mod.rs @@ -1,6 +1,11 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + mod download; mod upload; +pub(crate) use upload::*; + use std::{cmp::max, future::Future, num::NonZero, pin::Pin}; use futures::{ diff --git a/sdk/storage/azure_storage_blob/src/partitioned_transfer/upload.rs b/sdk/storage/azure_storage_blob/src/partitioned_transfer/upload.rs index 4ea324b420..313678948b 100644 --- a/sdk/storage/azure_storage_blob/src/partitioned_transfer/upload.rs +++ b/sdk/storage/azure_storage_blob/src/partitioned_transfer/upload.rs @@ -4,6 +4,7 @@ use azure_core::http::Body; use bytes::Bytes; +use async_trait::async_trait; #[cfg(not(target_arch = "wasm32"))] use azure_core::stream::SeekableStream; #[cfg(not(target_arch = "wasm32"))] @@ -14,7 +15,8 @@ use crate::streams::partitioned_stream::PartitionedStream; use super::*; -#[async_trait::async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] pub(crate) trait PartitionedUploadBehavior { async fn transfer_oneshot(&self, content: Body) -> AzureResult<()>; async fn transfer_partition(&self, offset: usize, content: Body) -> AzureResult<()>; @@ -135,7 +137,8 @@ mod tests { } } - #[async_trait::async_trait] + #[cfg_attr(not(target_arch = "wasm32"), async_trait)] + #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl PartitionedUploadBehavior for MockPartitionedUploadBehavior { async fn transfer_oneshot(&self, mut content: Body) -> AzureResult<()> { let body_type = match content { diff --git a/sdk/storage/azure_storage_blob/tests/block_blob_client.rs b/sdk/storage/azure_storage_blob/tests/block_blob_client.rs index 836db1c1dc..cd07c9031c 100644 --- a/sdk/storage/azure_storage_blob/tests/block_blob_client.rs +++ b/sdk/storage/azure_storage_blob/tests/block_blob_client.rs @@ -6,14 +6,24 @@ use azure_core::{ Bytes, }; use azure_core_test::{recorded, TestContext}; -use azure_storage_blob::models::{ - BlobClientDownloadResultHeaders, BlockBlobClientUploadBlobFromUrlOptions, BlockListType, - BlockLookupList, +use azure_storage_blob::{ + models::{ + BlobClientDownloadResultHeaders, BlockBlobClientManagedUploadOptions, + BlockBlobClientUploadBlobFromUrlOptions, BlockListType, BlockLookupList, + }, + BlobContainerClientOptions, }; use azure_storage_blob_test::{ - create_test_blob, get_blob_name, get_container_client, StorageAccount, + create_test_blob, get_blob_name, get_container_client, StorageAccount, TestPolicy, +}; +use std::{ + error::Error, + num::NonZero, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, }; -use std::error::Error; #[recorded::test] async fn test_block_list(ctx: TestContext) -> Result<(), Box> { @@ -202,3 +212,81 @@ async fn test_upload_blob_from_url(ctx: TestContext) -> Result<(), Box Result<(), Box> { + let stage_count = Arc::new(AtomicUsize::new(0)); + // Pipeline policy to count stage block requests going through the pipeline through stage_count + let policy_count_ref = stage_count.clone(); + let count_policy = Arc::new(TestPolicy::new( + Some(Arc::new(move |request| { + if let Some(url_query) = request.url().query() { + if url_query.contains("comp=block") && !url_query.contains("blocklist") { + policy_count_ref.fetch_add(1, Ordering::Relaxed); + } + } + Ok(()) + })), + None, + )); + let mut client_options = BlobContainerClientOptions::default(); + client_options + .client_options + .per_call_policies + .push(count_policy.clone()); + + let recording = ctx.recording(); + let container_client = get_container_client( + recording, + true, + StorageAccount::Standard, + Some(client_options), + ) + .await?; + let blob_client = container_client.blob_client(&get_blob_name(recording)); + let block_blob_client = blob_client.block_blob_client(); + + let data: [u8; 1024] = recording.random(); + + for (parallel, partition_size, expected_stage_block_calls) in [ + (1, 2048, 0), // put blob expected + (2, 1024, 0), // put blob expected + (2, 512, 2), + (1, 256, 4), + (8, 31, 34), + ] { + stage_count.store(0, Ordering::Relaxed); + let options = BlockBlobClientManagedUploadOptions { + parallel: Some(NonZero::new(parallel).unwrap()), + partition_size: Some(NonZero::new(partition_size).unwrap()), + ..Default::default() + }; + { + let _scope = count_policy.check_request_scope(); + block_blob_client + .managed_upload(data.to_vec().into(), Some(options)) + .await?; + } + assert_eq!( + blob_client + .download(None) + .await? + .into_body() + .collect() + .await?[..], + data, + "Failed parallel={},partition_size={}", + parallel, + partition_size + ); + assert_eq!( + stage_count.load(Ordering::Relaxed), + expected_stage_block_calls, + "Failed parallel={},partition_size={}", + parallel, + partition_size + ); + } + + Ok(()) +} diff --git a/sdk/storage/azure_storage_blob_test/src/lib.rs b/sdk/storage/azure_storage_blob_test/src/lib.rs index 62d324bf9f..42e1fe9859 100644 --- a/sdk/storage/azure_storage_blob_test/src/lib.rs +++ b/sdk/storage/azure_storage_blob_test/src/lib.rs @@ -1,10 +1,21 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. -use std::slice; +use std::{ + slice, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, +}; +use async_trait::async_trait; use azure_core::{ - http::{Body, ClientOptions, NoFormat, RequestContent, Response}, + http::{ + policies::{Policy, PolicyResult}, + AsyncRawResponse, Body, ClientOptions, Context, NoFormat, Request, RequestContent, + Response, + }, Bytes, Result, }; use azure_core_test::Recording; @@ -212,3 +223,80 @@ impl BodyTestExt for Body { } } } + +pub struct AssertionScope { + counter: Arc, +} + +impl Drop for AssertionScope { + fn drop(&mut self) { + self.counter.fetch_sub(1, Ordering::Relaxed); + } +} + +type Check = Arc Result<()> + Send + Sync>; +pub struct TestPolicy { + request_scope_counter: Arc, + response_scope_counter: Arc, + on_request: Check, + on_response: Check, +} + +impl TestPolicy { + pub fn new( + on_request: Option>, + on_response: Option>, + ) -> Self { + TestPolicy { + request_scope_counter: Arc::new(AtomicUsize::new(0)), + response_scope_counter: Arc::new(AtomicUsize::new(0)), + on_request: on_request.unwrap_or(Arc::new(|_| Ok(()))), + on_response: on_response.unwrap_or(Arc::new(|_| Ok(()))), + } + } + + /// DO NOT assign this to `_`. It will be dropped immediately instead of the intended scope. + pub fn check_request_scope(&self) -> AssertionScope { + self.request_scope_counter.fetch_add(1, Ordering::Relaxed); + AssertionScope { + counter: self.request_scope_counter.clone(), + } + } + + /// DO NOT assign this to `_`. It will be dropped immediately instead of the intended scope. + pub fn check_response_scope(&self) -> AssertionScope { + self.response_scope_counter.fetch_add(1, Ordering::Relaxed); + AssertionScope { + counter: self.response_scope_counter.clone(), + } + } +} + +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +impl Policy for TestPolicy { + async fn send( + &self, + ctx: &Context, + request: &mut Request, + next: &[Arc], + ) -> PolicyResult { + if self.request_scope_counter.load(Ordering::Relaxed) > 0 { + (self.on_request)(request)?; + } + let response = next[0].send(ctx, request, &next[1..]).await?; + if self.response_scope_counter.load(Ordering::Relaxed) > 0 { + (self.on_response)(&response)?; + } + Ok(response) + } +} + +impl std::fmt::Debug for TestPolicy { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("AssertionPolicy") + .field("check_request_counter", &self.request_scope_counter) + .field("check_response_counter", &self.response_scope_counter) + .finish() + } +}