@@ -26,32 +26,37 @@ def __init__(
2626 self .mode = mode
2727 self .max_requests = max_requests or _MAX_SFTP_REQUESTS
2828
29- if block_size is None :
30- # "The OpenSSH SFTP server will close the connection
31- # if it receives a message larger than 256 KB, and
32- # limits read requests to returning no more than
33- # 64 KB."
34- #
35- # We are going to use the maximum block_size possible
36- # with a 16KB margin (so instead of sending 256 KB data,
37- # we'll send 240 KB + headers for write requests)
38-
39- if self .readable ():
40- block_size = READ_BLOCK_SIZE
41- else :
42- block_size = WRITE_BLOCK_SIZE
43-
4429 # The blocksize is often used with constructs like
4530 # shutil.copyfileobj(src, dst, length=file.blocksize) and since we are
4631 # using pipelining, we are going to reflect the total size rather than
4732 # a size of chunk to our limits.
48- self .blocksize = block_size * self .max_requests
33+ self .blocksize = (
34+ None if block_size is None else block_size * self .max_requests
35+ )
4936
5037 self .kwargs = kwargs
5138
5239 self ._file = sync (self .loop , self ._open_file )
5340 self ._closed = False
5441
42+ def _determine_block_size (self , channel ):
43+ # Use the asyncssh block sizes to ensure the best performance.
44+ limits = getattr (channel , "limits" , None )
45+ if limits :
46+ if self .readable ():
47+ return limits .max_read_len
48+ return limits .max_write_len
49+
50+ # "The OpenSSH SFTP server will close the connection
51+ # if it receives a message larger than 256 KB, and
52+ # limits read requests to returning no more than
53+ # 64 KB."
54+ #
55+ # We are going to use the maximum block_size possible
56+ # with a 16KB margin (so instead of sending 256 KB data,
57+ # we'll send 240 KB + headers for write requests)
58+ return READ_BLOCK_SIZE if self .readable () else WRITE_BLOCK_SIZE
59+
5560 @wrap_exceptions
5661 async def _open_file (self ):
5762 # TODO: this needs to keep a reference to the
@@ -61,6 +66,10 @@ async def _open_file(self):
6166 # it's operations but the pool it thinking this
6267 # channel is freed.
6368 async with self .fs ._pool .get () as channel :
69+ if self .blocksize is None :
70+ self .blocksize = (
71+ self ._determine_block_size (channel ) * self .max_requests
72+ )
6473 return await channel .open (
6574 self .path ,
6675 self .mode ,
0 commit comments