Not All Bytes Were Read From the S3objectinputstream, Aborting Http Connection
Streaming large objects from S3 with ranged Go requests
In my concluding post, I talked almost how to accept a Java InputStream
for a tar.gz file, and become an iterator of (ArchiveEntry, InputStream)
. If nosotros desire to use that lawmaking, we need to get an InputStream for our tar.gz file – which in our example, is stored in S3. Some of our archives are very big (the biggest is half a terabyte), and getting a reliable InputStream for an S3 object turns out to exist not-trivial.
In this post, I'm going to walk through some lawmaking we've written to reliably stream large objects from S3. This is like to something I wrote in Feb about reading large objects in Python, but you don't need to read that mail service before this one.
To get an InputStream for an object, nosotros can use the GetObject API in the S3 SDK:
import java.io.InputStream import com.amazonaws.services.s3.AmazonS3 val s3Client : AmazonS3 val is : InputStream = s3Client . getObject ( "bukkit" , "myarchive.tar.gz" ) . getObjectContent
As you read bytes from this stream, it holds open the aforementioned HTTP connection to S3. The bigger the object, the longer you have to maintain that connection, and the greater the adventure that it times out or drops unexpectedly. If the stream drops, yous tin get a slightly cryptic mistake from the S3 SDK:
com.amazonaws.SdkClientException: Data read has a unlike length than the expected: dataLength=162480427095; expectedLength=528801304583; includeSkipped=true; in.getClass()=class com.amazonaws.services.s3.AmazonS3Client$2; markedSupported=false; marked=0; resetSinceLastMarked=false; markCount=0; resetCount=0
In this example, the SDK was expecting to read 528 GB from the stream. Considering the connection dropped midway through, information technology got an EOF and only read 162.5 GB – hence this error. It took the states a while to work out why information technology was happening!
I haven't tried it myself, but I think the TransferManager in the AWS SDK helps with this – you lot can download big objects to a file, and it manages the threads and connections to go along the download going. If you have the disk space to download your objects, that might exist worth a wait. Unfortunately TransferManager doesn't support downloading to streams (yet), and we don't have much local disk space, so we had to find a way to do information technology manually.
When you want to upload a big file to S3, you can do a multipart upload. You break the file into smaller pieces, upload each piece individually, then they get stitched dorsum together into a single object. What if you run that procedure in opposite?
Break the object into smaller pieces, download each piece individually, so sew together them back together into a single stream. We'd but be holding open a connexion for as long as it takes to download a chunk, then it'southward much less likely to timeout or drib. Let'due south walk through the code:
How big is a single object?
To know how many chunks we'll need (and when we're finished reading the object), nosotros need to know how big the object is. One way to do this is to utilise the getObjectMetadata
method (aka the HeadObject API):
def getSize ( bucketName : String , key : String ) : Long = s3Client . getObjectMetadata ( bucketName , primal ) . getContentLength
How do we go a single clamper of an object?
Let's suppose we want to read the starting time 1000 bytes of an object – we tin employ a ranged Become asking to go but that function of the file:
import com.amazonaws.services.s3.model.GetObjectRequest val getRequest = new GetObjectRequest ( bucketName , key ) . withRange ( 0 , 999 ) val is : InputStream = s3Client . getObject ( getRequest ) . getObjectContent
Note that the Range header is an inclusive purlieus – in this instance, it reads everything up to and including the 999th byte.
If the object merely has 500 bytes, that's all we'll get – information technology quietly truncates the range to the bytes available.
How practise nosotros stitch the pieces back together?
Now nosotros know how big the object is, and how to read an individual piece. Reading whatsoever one slice gives us an InputStream
, and so we have a serial of InputStreams – only ideally we'd present a single InputStream dorsum to the caller. How can we practice that?
Java has a SequenceInputStream
type that's just what nosotros need – nosotros requite information technology an Enumeration of InputStream instances, and it reads bytes from each one in turn. It we create the streams as they're needed by the Enumeration, this will join them together for us.
We can create the Enumeration like so:
import coffee.util import com.amazonaws.services.s3.model.S3ObjectInputStream val pieceSize : Long val enumeration = new util . Enumeration [ S3ObjectInputStream ] { var currentPosition = 0L val totalSize : Long override def hasMoreElements : Boolean = currentPosition < totalSize override def nextElement () : InputStream = { // The Range request is inclusive of the `beginning` and `end` parameters, // then to read `pieceSize` bytes we need to go to `pieceSize - i`. val getRequest = new GetObjectRequest ( bucketName , key ) . withRange ( currentPosition , currentPosition + pieceSize - 1 ) currentPosition += pieceSize s3Client . getObject ( getRequest ). getObjectContent } }
On each step of the enumeration, we read the next chunk from S3, and rails how far we've read with currentPosition
. On the last stride, we might ask for more bytes than are available (if the remaining bytes are less than the buffer size), merely that seems to work okay. The S3 SDK returns all the remaining bytes, only no more.
And and then nosotros put that enumeration into a SequenceInputStream:
import java.io.SequenceInputStream val combinedStream : InputStream = new SequenceInputStream ( enumeration )
When the Enumeration reaches the end of i of the individual streams, information technology closes that stream and calls nextElement()
to create the next one.
This can be more expensive than doing a single GetObject call – Amazon charge for each use of the GetObject API, and while the individual cost is small, the cost of multiple calls could add up if you're making lots of them. Yous tin can play with the chunk size to become a mixture of reliability and price. Smaller chunks are more than reliable (because each connection is open for a shorter time), but toll more in the aggregate.
In our testing, we constitute that this solitary still wasn't wholly reliable. The SequenceInputStream won't close the stream for a unmarried piece until it's read all the bytes for that slice, and started reading the next ane – and it holds the underlying connectedness open up. If it takes a long time to process a unmarried piece, that connection can still drop. We could plough down the buffer size to make it more reliable, but that gets expensive.
What we're trying instead is reading the entire contents of a single slice into memory every bit before long as nosotros do the GetObject telephone call, so closing the connection. Nosotros're trading memory for increased reliability. Here'due south what that buffering looks similar:
import java.io.ByteArrayInputStream val underlying : util.Enumeration [ InputStream ] val bufferedEnumeration = new util . Enumeration [ ByteArrayInputStream ] { override def hasMoreElements : Boolean = underlying . hasMoreElements override def nextElement () : ByteArrayInputStream = { val nextStream = underlying . nextElement () val byteArray = IOUtils . toByteArray ( nextStream ) nextStream . close () new ByteArrayInputStream ( byteArray ) } }
Nosotros can drop this enumeration into some other SequenceInputStream, and get a single InputStream again – but this fourth dimension the S3ObjectInputStream is read and closed well-nigh immediately.
Putting it all together
If we have all those pieces, we can combine them into a single Scala class like so:
import java.io. { ByteArrayInputStream , InputStream , SequenceInputStream } import java.util import com.amazonaws.services.s3.model. { GetObjectRequest , S3ObjectInputStream } import com.amazonaws.services.s3.AmazonS3 import org.apache.commons.io.IOUtils import scala.util.Try /** Read objects from S3, buffering upwards to `bufferSize` bytes of an object * in-retentivity at a time, minimising the time needed to concord open * an HTTP connection to S3. * * This is useful for reading very large objects to an InputStream, * especially where a single GetObject call would time out before * reading the entire object. * */ class S3StreamReader ( s3Client : AmazonS3 , bufferSize : Long ) { def get ( bucketName : Cord , key : String ) : Try [ InputStream ] = Endeavor { val totalSize = getSize ( bucketName , key ) val s3Enumeration = getEnumeration ( bucketName , key , totalSize ) val bufferedEnumeration = getBufferedEnumeration ( s3Enumeration ) new SequenceInputStream ( bufferedEnumeration ) } private def getSize ( bucketName : String , central : String ) : Long = s3Client . getObjectMetadata ( bucketName , key ) . getContentLength private def getEnumeration ( bucketName : String , key : Cord , totalSize : Long ) : util.Enumeration [ S3ObjectInputStream ] = new util . Enumeration [ S3ObjectInputStream ] { var currentPosition = 0L override def hasMoreElements : Boolean = currentPosition < totalSize override def nextElement () : InputStream = { // The Range request is inclusive of the `showtime` and `stop` parameters, // and then to read `bufferSize` bytes we demand to get to `bufferSize - 1`. val getRequest = new GetObjectRequest ( bucketName , key ) . withRange ( currentPosition , currentPosition + bufferSize - 1 ) currentPosition += bufferSize s3Client . getObject ( getRequest ). getObjectContent } } private def getBufferedEnumeration [ IS <: InputStream ]( underlying : util.Enumeration [ IS ]) : util.Enumeration [ ByteArrayInputStream ] = new util . Enumeration [ ByteArrayInputStream ] { override def hasMoreElements : Boolean = underlying . hasMoreElements override def nextElement () : ByteArrayInputStream = { val nextStream = underlying . nextElement () val byteArray = IOUtils . toByteArray ( nextStream ) nextStream . close () new ByteArrayInputStream ( byteArray ) } } }
That'southward a standalone snippet you can drib into your projection if information technology'southward useful (remembering to include the platform MIT licence).
We've been running this code in our pipeline for several weeks, and in that time it'south read tens of thousands of objects, ranging from a few kilobytes to half a terabyte. Our apps have a 128MB buffer, with upwardly to ten threads at once and 2GB of retentiveness. Information technology's fixed a persistent source of flakiness when ranging from S3, and the price of the actress GetObject calls has been negligible.
This could behave unexpectedly if an object changes under your feet – the data from one piece would be inconsistent with another piece. Our objects should never modify in one case they're written, so that's not a problem for united states.
All of the heavy lifting is done by Java classes, so if your project uses Java rather than Scala, you should exist able to port this for your needs. And the full general idea – using a ranged GET request to fetch an object a piece at a time, then stitching them together – is language agnostic, and so yous tin use this technique even if you're non using a JVM language.
Source: https://alexwlchan.net/2019/09/streaming-large-s3-objects/
0 Response to "Not All Bytes Were Read From the S3objectinputstream, Aborting Http Connection"
Post a Comment