Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 185 additions & 0 deletions sdk/storage/azure_storage_blob/src/client_impl/block_blob_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

use std::num::NonZero;

use async_trait::async_trait;
use azure_core::http::Body;
use futures::lock::Mutex;
use uuid::Uuid;

use crate::{
generated::BlockBlobClient,
models::{
BlockBlobClientCommitBlockListOptions, BlockBlobClientManagedUploadOptions,
BlockBlobClientStageBlockOptions, BlockLookupList,
},
partitioned_transfer::{self, PartitionedUploadBehavior},
};

type AzureResult<T> = azure_core::Result<T>;

// unwrap evaluated at compile time
const DEFAULT_PARALLEL: NonZero<usize> = NonZero::new(4).unwrap();
const DEFAULT_PARTITION_SIZE: NonZero<usize> = NonZero::new(4 * 1024 * 1024).unwrap();

// implement this on handwritten client for now
impl crate::BlockBlobClient {
pub async fn managed_upload(
&self,
body: Body,
options: Option<BlockBlobClientManagedUploadOptions<'_>>,
) -> AzureResult<()> {
self.client.managed_upload(body, options).await
}
}

impl BlockBlobClient {
pub async fn managed_upload(
&self,
body: Body,
options: Option<BlockBlobClientManagedUploadOptions<'_>>,
) -> AzureResult<()> {
let options = options.unwrap_or_default();
let parallel = options.parallel.unwrap_or(DEFAULT_PARALLEL);
let partition_size = options.partition_size.unwrap_or(DEFAULT_PARTITION_SIZE);
// construct exhaustively to ensure we catch new options when added
let stage_block_options = BlockBlobClientStageBlockOptions {
encryption_algorithm: options.encryption_algorithm,
encryption_key: options.encryption_key.clone(),
encryption_key_sha256: options.encryption_key_sha256.clone(),
encryption_scope: options.encryption_scope.clone(),
lease_id: options.lease_id.clone(),
method_options: options.method_options.clone(),
structured_body_type: None,
structured_content_length: None,
timeout: options.per_request_timeout,
transactional_content_crc64: None,
transactional_content_md5: None,
};
let commit_block_list_options = BlockBlobClientCommitBlockListOptions {
blob_cache_control: options.blob_cache_control,
blob_content_disposition: options.blob_content_disposition,
blob_content_encoding: options.blob_content_encoding,
blob_content_language: options.blob_content_language,
blob_content_md5: options.blob_content_md5,
blob_content_type: options.blob_content_type,
blob_tags_string: options.blob_tags_string,
encryption_algorithm: options.encryption_algorithm,
encryption_key: options.encryption_key,
encryption_key_sha256: options.encryption_key_sha256,
encryption_scope: options.encryption_scope,
if_match: options.if_match,
if_modified_since: options.if_modified_since,
if_none_match: options.if_none_match,
if_tags: options.if_tags,
if_unmodified_since: options.if_unmodified_since,
immutability_policy_expiry: options.immutability_policy_expiry,
immutability_policy_mode: options.immutability_policy_mode,
lease_id: options.lease_id,
legal_hold: options.legal_hold,
metadata: options.metadata,
method_options: options.method_options,
tier: options.tier,
timeout: options.per_request_timeout,
transactional_content_crc64: None,
transactional_content_md5: None,
};
partitioned_transfer::upload(
body,
parallel,
partition_size,
&BlockBlobClientUploadBehavior::new(
self,
stage_block_options,
commit_block_list_options,
),
)
.await
}
}

struct BlockInfo {
offset: u64,
block_id: Uuid,
}

struct BlockBlobClientUploadBehavior<'c, 'opt> {
client: &'c BlockBlobClient,
stage_block_options: BlockBlobClientStageBlockOptions<'opt>,
commit_block_list_options: BlockBlobClientCommitBlockListOptions<'opt>,
blocks: Mutex<Vec<BlockInfo>>,
}

impl<'c, 'opt> BlockBlobClientUploadBehavior<'c, 'opt> {
fn new(
client: &'c BlockBlobClient,
stage_block_options: BlockBlobClientStageBlockOptions<'opt>,
commit_block_list_options: BlockBlobClientCommitBlockListOptions<'opt>,
) -> Self {
Self {
client,
stage_block_options,
commit_block_list_options,
blocks: Mutex::new(vec![]),
}
}
}

#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl PartitionedUploadBehavior for BlockBlobClientUploadBehavior<'_, '_> {
async fn transfer_oneshot(&self, content: Body) -> AzureResult<()> {
let content_len = content.len().try_into().unwrap();
self.client
.upload(content.into(), content_len, None)
.await?;
Ok(())
}

async fn transfer_partition(&self, offset: usize, content: Body) -> AzureResult<()> {
let block_id = Uuid::new_v4();
let content_len = content.len().try_into().unwrap();
{
self.blocks.lock().await.push(BlockInfo {
offset: offset as u64,
block_id,
});
}
self.client
.stage_block(
block_id.as_bytes(),
content_len,
content.into(),
Some(self.stage_block_options.clone()),
)
.await?;
Ok(())
}

async fn initialize(&self, _content_len: usize) -> AzureResult<()> {
Ok(())
}

async fn finalize(&self) -> AzureResult<()> {
let mut blocks = self.blocks.lock().await;
blocks.sort_by(|left, right| left.offset.cmp(&right.offset));
let blocklist = BlockLookupList {
latest: Some(
blocks
.iter()
.map(|bi| bi.block_id.as_bytes().to_vec())
.collect(),
),
..Default::default()
};
self.client
.commit_block_list(
blocklist.try_into()?,
Some(self.commit_block_list_options.clone()),
)
.await?;

Ok(())
}
}
1 change: 1 addition & 0 deletions sdk/storage/azure_storage_blob/src/client_impl/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod block_blob_client;
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use std::sync::Arc;

/// A client to interact with a specific Azure storage Block blob, although that blob may not yet exist.
pub struct BlockBlobClient {
pub(super) client: GeneratedBlockBlobClient,
pub(crate) client: GeneratedBlockBlobClient,
}

impl GeneratedBlockBlobClient {
Expand Down
2 changes: 2 additions & 0 deletions sdk/storage/azure_storage_blob/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
#![allow(dead_code)]
#![cfg_attr(docsrs, feature(doc_cfg))]

mod client_impl;
pub mod clients;
#[allow(unused_imports)]
mod generated;
mod parsers;
mod partitioned_transfer;
mod pipeline;
mod streams;
pub use client_impl::*;
pub use clients::*;
pub use parsers::*;
pub mod models;
106 changes: 106 additions & 0 deletions sdk/storage/azure_storage_blob/src/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
pub(crate) mod content_range;
mod extensions;

use azure_core::{fmt::SafeDebug, http::ClientMethodOptions};
use std::{collections::HashMap, num::NonZero};
use time::OffsetDateTime;

pub use crate::generated::models::{
AccessPolicy, AccessTier, AccountKind, AppendBlobClientAppendBlockFromUrlOptions,
AppendBlobClientAppendBlockFromUrlResult, AppendBlobClientAppendBlockFromUrlResultHeaders,
Expand Down Expand Up @@ -81,3 +85,105 @@ pub use crate::generated::models::{
SignedIdentifiers, SignedIdentifiersHeaders, SkuName, StaticWebsite, StorageErrorCode,
StorageServiceStats, StorageServiceStatsHeaders, UserDelegationKey, UserDelegationKeyHeaders,
};

/// Options to be passed to `BlockBlobClient::managed_upload()`
#[derive(Clone, Default, SafeDebug)]
pub struct BlockBlobClientManagedUploadOptions<'a> {
/// Optional. Sets the blob's cache control. If specified, this property is stored with the blob and returned with a read
/// request.
pub blob_cache_control: Option<String>,

/// Optional. Sets the blob's content disposition. If specified, this property is stored with the blob and returned with a
/// read request.
pub blob_content_disposition: Option<String>,

/// Optional. Sets the blob's content encoding. If specified, this property is stored with the blob and returned with a read
/// request.
pub blob_content_encoding: Option<String>,

/// Optional. Set the blob's content language. If specified, this property is stored with the blob and returned with a read
/// request.
pub blob_content_language: Option<String>,

/// Optional. An MD5 hash of the blob content. Note that this hash is not validated, as the hashes for the individual blocks
/// were validated when each was uploaded.
pub blob_content_md5: Option<Vec<u8>>,

/// Optional. Sets the blob's content type. If specified, this property is stored with the blob and returned with a read request.
pub blob_content_type: Option<String>,

/// Optional. Used to set blob tags in various blob operations.
pub blob_tags_string: Option<String>,

/// Optional. Version 2019-07-07 and later. Specifies the algorithm to use for encryption. If not specified, the default is
/// AES256.
pub encryption_algorithm: Option<EncryptionAlgorithmType>,

/// Optional. Version 2019-07-07 and later. Specifies the encryption key to use to encrypt the data provided in the request.
/// If not specified, the request will be encrypted with the root account key.
pub encryption_key: Option<String>,

/// Optional. Version 2019-07-07 and later. Specifies the SHA256 hash of the encryption key used to encrypt the data provided
/// in the request. This header is only used for encryption with a customer-provided key. If the request is authenticated
/// with a client token, this header should be specified using the SHA256 hash of the encryption key.
pub encryption_key_sha256: Option<String>,

/// Optional. Version 2019-07-07 and later. Specifies the encryption scope to use to encrypt the data provided in the request.
/// If not specified, the request will be encrypted with the root account key.
pub encryption_scope: Option<String>,

/// Optional. Specifies the date time when the blobs immutability policy is set to expire.
pub immutability_policy_expiry: Option<OffsetDateTime>,

/// Optional. Specifies the immutability policy mode to set on the blob.
pub immutability_policy_mode: Option<ImmutabilityPolicyMode>,

/// Optional. Applied only to the final commit of the new block blob.
/// A condition that must be met in order for the request to be processed.
pub if_match: Option<String>,

/// Optional. Applied only to the final commit of the new block blob.
/// A date-time value. A request is made under the condition that the resource has been modified since the specified date-time.
pub if_modified_since: Option<OffsetDateTime>,

/// Optional. Applied only to the final commit of the new block blob.
/// A condition that must be met in order for the request to be processed.
pub if_none_match: Option<String>,

/// Optional. Applied only to the final commit of the new block blob.
/// Specify a SQL where clause on blob tags to operate only on blobs with a matching value.
pub if_tags: Option<String>,

/// Optional. Applied only to the final commit of the new block blob.
/// A date-time value. A request is made under the condition that the resource has not been modified since the specified date-time.
pub if_unmodified_since: Option<OffsetDateTime>,

/// Optional. Applied to all requests.
/// If specified, the operation only succeeds if the resource's lease is active and matches this ID.
pub lease_id: Option<String>,

/// Optional. Specified if a legal hold should be set on the blob.
pub legal_hold: Option<bool>,

/// Optional. The metadata headers.
pub metadata: Option<HashMap<String, String>>,

/// Optional. Allows customization of the method call.
pub method_options: ClientMethodOptions<'a>,

/// Optional. Number of concurrent network transfers to maintain for this operation.
/// A default value will be chosen if none is provided.
pub parallel: Option<NonZero<usize>>,

/// Optional. Size to partition data into.
/// A default value will be chosen if none is provided.
pub partition_size: Option<NonZero<usize>>,

/// Optional. The server-side timeout to apply on each individual request. This is not a timeout for the whole operation.
/// The timeout parameter is expressed in seconds. For more information, see
/// [Setting Timeouts for Blob Service Operations.](https://docs.microsoft.com/rest/api/storageservices/fileservices/setting-timeouts-for-blob-service-operations)
pub per_request_timeout: Option<i32>,

/// Optional. The tier to be set on the blob.
pub tier: Option<AccessTier>,
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

mod download;
mod upload;

pub(crate) use upload::*;

use std::{cmp::max, future::Future, num::NonZero, pin::Pin};

use futures::{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use azure_core::http::Body;
use bytes::Bytes;

use async_trait::async_trait;
#[cfg(not(target_arch = "wasm32"))]
use azure_core::stream::SeekableStream;
#[cfg(not(target_arch = "wasm32"))]
Expand All @@ -14,7 +15,8 @@ use crate::streams::partitioned_stream::PartitionedStream;

use super::*;

#[async_trait::async_trait]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
pub(crate) trait PartitionedUploadBehavior {
async fn transfer_oneshot(&self, content: Body) -> AzureResult<()>;
async fn transfer_partition(&self, offset: usize, content: Body) -> AzureResult<()>;
Expand Down Expand Up @@ -135,7 +137,8 @@ mod tests {
}
}

#[async_trait::async_trait]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl PartitionedUploadBehavior for MockPartitionedUploadBehavior {
async fn transfer_oneshot(&self, mut content: Body) -> AzureResult<()> {
let body_type = match content {
Expand Down
Loading
Loading