Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
320 changes: 201 additions & 119 deletions tar/asm/disassemble.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,168 @@
package asm

import (
"errors"
"io"

"github.com/vbatts/tar-split/archive/tar"
"github.com/vbatts/tar-split/tar/storage"
)

// NewInputTarStream wraps the Reader stream of a tar archive and provides a
// Reader stream of the same.
// runInputTarStreamGoroutine is the goroutine entrypoint.
//
// In the middle it will pack the segments and file metadata to storage.Packer
// `p`.
// It centralizes the goroutine protocol so the core parsing logic can be
// written as ordinary Go code that just "returns an error".
//
// The the storage.FilePutter is where payload of files in the stream are
// stashed. If this stashing is not needed, you can provide a nil
// storage.FilePutter. Since the checksumming is still needed, then a default
// of NewDiscardFilePutter will be used internally
func NewInputTarStream(r io.Reader, p storage.Packer, fp storage.FilePutter) (io.Reader, error) {
// Protocol guarantees:
// - pW is always closed exactly once (CloseWithError(nil) == Close()).
// - if done != nil, exactly one value is sent (nil on success, non-nil on failure).
// - panics are converted into a non-nil error (and the panic is rethrown).
func runInputTarStreamGoroutine(outputRdr io.Reader, pW *io.PipeWriter, p storage.Packer, fp storage.FilePutter, done chan<- error) {
// Default to a non-nil error so a panic can't accidentally look like success.
err := errors.New("panic in runInputTarStream")
defer func() {
// CloseWithError(nil) is equivalent to Close().
pW.CloseWithError(err)

if done != nil {
done <- err
}

// Preserve panic semantics while still ensuring the protocol above runs.
if r := recover(); r != nil {
panic(r)
}
}()

err = runInputTarStream(outputRdr, p, fp)
}

// runInputTarStream drives tar-split parsing.
//
// It reads a tar stream from outputRdr and records tar-split metadata into the
// provided storage.Packer.
//
// Abort behavior: if the consumer closes the read end early, the tee reader will
// stop producing bytes (due to pipe write failure) and tar parsing will return
// an error. We propagate that error so the goroutine terminates promptly rather
// than draining the input stream for no benefit.
func runInputTarStream(outputRdr io.Reader, p storage.Packer, fp storage.FilePutter) error {
tr := tar.NewReader(outputRdr)
tr.RawAccounting = true

for {
hdr, err := tr.Next()
if err != nil {
if err != io.EOF {
return err
}
// Even when EOF is reached, there is often 1024 null bytes at the end
// of an archive. Collect them too.
if b := tr.RawBytes(); len(b) > 0 {
if _, err := p.AddEntry(storage.Entry{
Type: storage.SegmentType,
Payload: b,
}); err != nil {
return err
}
}
break // Not return: we still need to drain any additional padding.
}
if hdr == nil {
break // Not return: we still need to drain any additional padding.
}

if b := tr.RawBytes(); len(b) > 0 {
if _, err := p.AddEntry(storage.Entry{
Type: storage.SegmentType,
Payload: b,
}); err != nil {
return err
}
}

var csum []byte
if hdr.Size > 0 {
_, csum, err = fp.Put(hdr.Name, tr)
if err != nil {
return err
}
}

entry := storage.Entry{
Type: storage.FileType,
Size: hdr.Size,
Payload: csum,
}
// For proper marshalling of non-utf8 characters
entry.SetName(hdr.Name)

// File entries added, regardless of size
if _, err := p.AddEntry(entry); err != nil {
return err
}

if b := tr.RawBytes(); len(b) > 0 {
if _, err := p.AddEntry(storage.Entry{
Type: storage.SegmentType,
Payload: b,
}); err != nil {
return err
}
}
}

// It is allowable, and not uncommon that there is further padding on
// the end of an archive, apart from the expected 1024 null bytes. We
// do this in chunks rather than in one go to avoid cases where a
// maliciously crafted tar file tries to trick us into reading many GBs
// into memory.
const paddingChunkSize = 1024 * 1024
var paddingChunk [paddingChunkSize]byte
for {
n, err := outputRdr.Read(paddingChunk[:])
if n != 0 {
if _, aerr := p.AddEntry(storage.Entry{
Type: storage.SegmentType,
Payload: paddingChunk[:n],
}); aerr != nil {
return aerr
}
}
if err != nil {
if err == io.EOF {
break
}
return err
}
}

return nil
}

// newInputTarStreamCommon sets up the shared plumbing for NewInputTarStream and
// NewInputTarStreamWithDone.
//
// It constructs an io.Pipe and an io.TeeReader such that:
//
// - The caller reads tar bytes from the returned *io.PipeReader.
// - The background goroutine simultaneously reads the same stream from the
// TeeReader to perform tar-split parsing and metadata packing.
//
// Abort and synchronization semantics:
//
// - Closing the returned PipeReader causes the TeeReader to fail its write to
// the pipe, which in turn causes the background goroutine to exit promptly.
// - If withDone is true, a done channel is returned that receives exactly one
// error value (nil on success) once the background goroutine has fully
// terminated. This allows callers to safely wait until the input reader `r`
// is no longer in use.
func newInputTarStreamCommon(
r io.Reader,
p storage.Packer,
fp storage.FilePutter,
withDone bool,
) (pr *io.PipeReader, done <-chan error) {
// What to do here... folks will want their own access to the Reader that is
// their tar archive stream, but we'll need that same stream to use our
// forked 'archive/tar'.
Expand All @@ -34,123 +179,60 @@ func NewInputTarStream(r io.Reader, p storage.Packer, fp storage.FilePutter) (io
// only read what the outputRdr Read's. Since Tar archives have padding on
// the end, we want to be the one reading the padding, even if the user's
// `archive/tar` doesn't care.
pR, pW := io.Pipe()
outputRdr := io.TeeReader(r, pW)
pr, pw := io.Pipe()

// we need a putter that will generate the crc64 sums of file payloads
// We need a putter that will generate the crc64 sums of file payloads.
if fp == nil {
fp = storage.NewDiscardFilePutter()
}

go func() {
tr := tar.NewReader(outputRdr)
tr.RawAccounting = true
for {
hdr, err := tr.Next()
if err != nil {
if err != io.EOF {
pW.CloseWithError(err)
return
}
// even when an EOF is reached, there is often 1024 null bytes on
// the end of an archive. Collect them too.
if b := tr.RawBytes(); len(b) > 0 {
_, err := p.AddEntry(storage.Entry{
Type: storage.SegmentType,
Payload: b,
})
if err != nil {
pW.CloseWithError(err)
return
}
}
break // not return. We need the end of the reader.
}
if hdr == nil {
break // not return. We need the end of the reader.
}

if b := tr.RawBytes(); len(b) > 0 {
_, err := p.AddEntry(storage.Entry{
Type: storage.SegmentType,
Payload: b,
})
if err != nil {
pW.CloseWithError(err)
return
}
}
outputRdr := io.TeeReader(r, pw)

var csum []byte
if hdr.Size > 0 {
var err error
_, csum, err = fp.Put(hdr.Name, tr)
if err != nil {
pW.CloseWithError(err)
return
}
}

entry := storage.Entry{
Type: storage.FileType,
Size: hdr.Size,
Payload: csum,
}
// For proper marshalling of non-utf8 characters
entry.SetName(hdr.Name)

// File entries added, regardless of size
_, err = p.AddEntry(entry)
if err != nil {
pW.CloseWithError(err)
return
}
if withDone {
ch := make(chan error, 1)
done = ch
go runInputTarStreamGoroutine(outputRdr, pw, p, fp, ch)
return pr, done
}

if b := tr.RawBytes(); len(b) > 0 {
_, err = p.AddEntry(storage.Entry{
Type: storage.SegmentType,
Payload: b,
})
if err != nil {
pW.CloseWithError(err)
return
}
}
}
go runInputTarStreamGoroutine(outputRdr, pw, p, fp, nil)
return pr, nil
}

// It is allowable, and not uncommon that there is further padding on
// the end of an archive, apart from the expected 1024 null bytes. We
// do this in chunks rather than in one go to avoid cases where a
// maliciously crafted tar file tries to trick us into reading many GBs
// into memory.
const paddingChunkSize = 1024 * 1024
var paddingChunk [paddingChunkSize]byte
for {
var isEOF bool
n, err := outputRdr.Read(paddingChunk[:])
if err != nil {
if err != io.EOF {
pW.CloseWithError(err)
return
}
isEOF = true
}
if n != 0 {
_, err = p.AddEntry(storage.Entry{
Type: storage.SegmentType,
Payload: paddingChunk[:n],
})
if err != nil {
pW.CloseWithError(err)
return
}
}
if isEOF {
break
}
}
pW.Close()
}()
// NewInputTarStream wraps the Reader stream of a tar archive and provides a
// Reader stream of the same.
//
// In the middle it will pack the segments and file metadata to storage.Packer
// `p`.
//
// The storage.FilePutter is where payload of files in the stream are
// stashed. If this stashing is not needed, you can provide a nil
// storage.FilePutter. Since the checksumming is still needed, then a default
// of NewDiscardFilePutter will be used internally
//
// If callers need to be able to abort early and/or wait for goroutine termination,
// prefer NewInputTarStreamWithDone.
//
// Deprecated: Use NewInputTarStreamWithDone instead.
func NewInputTarStream(r io.Reader, p storage.Packer, fp storage.FilePutter) (io.Reader, error) {
pr, _ := newInputTarStreamCommon(r, p, fp, false)
return pr, nil
}

return pR, nil
// NewInputTarStreamWithDone wraps the Reader stream of a tar archive and provides a
// Reader stream of the same.
//
// In the middle it will pack the segments and file metadata to storage.Packer `p`.
//
// It also returns a done channel that will receive exactly one error value
// (nil on success) when the internal goroutine has fully completed parsing
// the tar stream (including the final paddingChunk draining loop) and has
// finished writing all entries to `p`.
//
// The returned reader is an io.ReadCloser so callers can stop early; closing it
// aborts the pipe so the internal goroutine can terminate promptly (rather than
// hanging on a blocked pipe write).
func NewInputTarStreamWithDone(r io.Reader, p storage.Packer, fp storage.FilePutter) (io.ReadCloser, <-chan error, error) {
pr, done := newInputTarStreamCommon(r, p, fp, true)
return pr, done, nil
}
Loading