From 1790dea4673717de9f24b73982f5b620161814a8 Mon Sep 17 00:00:00 2001 From: Will Date: Thu, 3 Mar 2022 11:11:28 +0000 Subject: [PATCH] add AsLargeBytes support to unixfs files (#24) * add AsLargeBytes support to unixfs files This commit was moved from ipfs/go-unixfsnode@94986a76305a01f1f21eb86fb1ed41e5478367f2 --- unixfs/node/data/builder/file_test.go | 3 +- unixfs/node/file/deferred.go | 173 ++++++++++++++++++++++++++ unixfs/node/file/file.go | 43 +++++-- unixfs/node/file/shard.go | 168 +++++++++++++++++++------ unixfs/node/file/wrapped.go | 2 +- 5 files changed, 342 insertions(+), 47 deletions(-) create mode 100644 unixfs/node/file/deferred.go diff --git a/unixfs/node/data/builder/file_test.go b/unixfs/node/data/builder/file_test.go index 58619794d..de3803e4f 100644 --- a/unixfs/node/data/builder/file_test.go +++ b/unixfs/node/data/builder/file_test.go @@ -3,7 +3,6 @@ package builder import ( "bytes" "context" - "io" "testing" "github.com/ipfs/go-cid" @@ -70,7 +69,7 @@ func TestUnixFSFileRoundtrip(t *testing.T) { t.Fatal(err) } // read back out the file. - out, err := io.ReadAll(ufn) + out, err := ufn.AsBytes() if err != nil { t.Fatal(err) } diff --git a/unixfs/node/file/deferred.go b/unixfs/node/file/deferred.go new file mode 100644 index 000000000..44ce8ca27 --- /dev/null +++ b/unixfs/node/file/deferred.go @@ -0,0 +1,173 @@ +package file + +import ( + "context" + "io" + + dagpb "github.com/ipld/go-codec-dagpb" + "github.com/ipld/go-ipld-prime" +) + +func newDeferredFileNode(ctx context.Context, lsys *ipld.LinkSystem, root ipld.Link) LargeBytesNode { + dfn := deferredFileNode{ + LargeBytesNode: nil, + root: root, + lsys: lsys, + ctx: ctx, + } + dfn.LargeBytesNode = &deferred{&dfn} + return &dfn +} + +type deferredFileNode struct { + LargeBytesNode + + root ipld.Link + lsys *ipld.LinkSystem + ctx context.Context +} + +func (d *deferredFileNode) resolve() error { + if d.lsys == nil { + return nil + } + target, err := d.lsys.Load(ipld.LinkContext{Ctx: d.ctx}, d.root, protoFor(d.root)) + if err != nil { + return err + } + + asFSNode, err := NewUnixFSFile(d.ctx, target, d.lsys) + if err != nil { + return err + } + d.LargeBytesNode = asFSNode + d.root = nil + d.lsys = nil + d.ctx = nil + return nil +} + +type deferred struct { + *deferredFileNode +} + +type deferredReader struct { + io.ReadSeeker + *deferredFileNode +} + +func (d *deferred) AsLargeBytes() (io.ReadSeeker, error) { + return &deferredReader{nil, d.deferredFileNode}, nil +} + +func (d *deferredReader) Read(p []byte) (int, error) { + if d.ReadSeeker == nil { + if err := d.deferredFileNode.resolve(); err != nil { + return 0, err + } + rs, err := d.deferredFileNode.AsLargeBytes() + if err != nil { + return 0, err + } + d.ReadSeeker = rs + } + return d.ReadSeeker.Read(p) +} + +func (d *deferredReader) Seek(offset int64, whence int) (int64, error) { + if d.ReadSeeker == nil { + if err := d.deferredFileNode.resolve(); err != nil { + return 0, err + } + rs, err := d.deferredFileNode.AsLargeBytes() + if err != nil { + return 0, err + } + d.ReadSeeker = rs + } + return d.ReadSeeker.Seek(offset, whence) +} + +func (d *deferred) Kind() ipld.Kind { + return ipld.Kind_Bytes +} + +func (d *deferred) AsBytes() ([]byte, error) { + if err := d.deferredFileNode.resolve(); err != nil { + return []byte{}, err + } + + return d.deferredFileNode.AsBytes() +} + +func (d *deferred) AsBool() (bool, error) { + return false, ipld.ErrWrongKind{TypeName: "bool", MethodName: "AsBool", AppropriateKind: ipld.KindSet_JustBytes} +} + +func (d *deferred) AsInt() (int64, error) { + return 0, ipld.ErrWrongKind{TypeName: "int", MethodName: "AsInt", AppropriateKind: ipld.KindSet_JustBytes} +} + +func (d *deferred) AsFloat() (float64, error) { + return 0, ipld.ErrWrongKind{TypeName: "float", MethodName: "AsFloat", AppropriateKind: ipld.KindSet_JustBytes} +} + +func (d *deferred) AsString() (string, error) { + return "", ipld.ErrWrongKind{TypeName: "string", MethodName: "AsString", AppropriateKind: ipld.KindSet_JustBytes} +} + +func (d *deferred) AsLink() (ipld.Link, error) { + return nil, ipld.ErrWrongKind{TypeName: "link", MethodName: "AsLink", AppropriateKind: ipld.KindSet_JustBytes} +} + +func (d *deferred) AsNode() (ipld.Node, error) { + return nil, nil +} + +func (d *deferred) Size() int { + return 0 +} + +func (d *deferred) IsAbsent() bool { + return false +} + +func (d *deferred) IsNull() bool { + if err := d.deferredFileNode.resolve(); err != nil { + return true + } + return d.deferredFileNode.IsNull() +} + +func (d *deferred) Length() int64 { + return 0 +} + +func (d *deferred) ListIterator() ipld.ListIterator { + return nil +} + +func (d *deferred) MapIterator() ipld.MapIterator { + return nil +} + +func (d *deferred) LookupByIndex(idx int64) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{} +} + +func (d *deferred) LookupByString(key string) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{} +} + +func (d *deferred) LookupByNode(key ipld.Node) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{} +} + +func (d *deferred) LookupBySegment(seg ipld.PathSegment) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{} +} + +// shardded files / nodes look like dagpb nodes. +func (d *deferred) Prototype() ipld.NodePrototype { + return dagpb.Type.PBNode +} diff --git a/unixfs/node/file/file.go b/unixfs/node/file/file.go index bda6f9ef8..17a004a5a 100644 --- a/unixfs/node/file/file.go +++ b/unixfs/node/file/file.go @@ -11,10 +11,10 @@ import ( // root of a unixfs File. // It provides a `bytes` view over the file, along with access to io.Reader streaming access // to file data. -func NewUnixFSFile(ctx context.Context, substrate ipld.Node, lsys *ipld.LinkSystem) (StreamableByteNode, error) { +func NewUnixFSFile(ctx context.Context, substrate ipld.Node, lsys *ipld.LinkSystem) (LargeBytesNode, error) { if substrate.Kind() == ipld.Kind_Bytes { // A raw / single-node file. - return &singleNodeFile{substrate, 0}, nil + return &singleNodeFile{substrate}, nil } // see if it's got children. links, err := substrate.LookupByString("Links") @@ -30,22 +30,29 @@ func NewUnixFSFile(ctx context.Context, substrate ipld.Node, lsys *ipld.LinkSyst ctx: ctx, lsys: lsys, substrate: substrate, - done: false, - rdr: nil}, nil + }, nil } -// A StreamableByteNode is an ipld.Node that can be streamed over. It is guaranteed to have a Bytes type. -type StreamableByteNode interface { +// A LargeBytesNode is an ipld.Node that can be streamed over. It is guaranteed to have a Bytes type. +type LargeBytesNode interface { ipld.Node - io.Reader + AsLargeBytes() (io.ReadSeeker, error) } type singleNodeFile struct { ipld.Node +} + +func (f *singleNodeFile) AsLargeBytes() (io.ReadSeeker, error) { + return &singleNodeReader{f, 0}, nil +} + +type singleNodeReader struct { + ipld.Node offset int } -func (f *singleNodeFile) Read(p []byte) (int, error) { +func (f *singleNodeReader) Read(p []byte) (int, error) { buf, err := f.Node.AsBytes() if err != nil { return 0, err @@ -57,3 +64,23 @@ func (f *singleNodeFile) Read(p []byte) (int, error) { f.offset += n return n, nil } + +func (f *singleNodeReader) Seek(offset int64, whence int) (int64, error) { + buf, err := f.Node.AsBytes() + if err != nil { + return 0, err + } + + switch whence { + case io.SeekStart: + f.offset = int(offset) + case io.SeekCurrent: + f.offset += int(offset) + case io.SeekEnd: + f.offset = len(buf) + int(offset) + } + if f.offset < 0 { + return 0, io.EOF + } + return int64(f.offset), nil +} diff --git a/unixfs/node/file/shard.go b/unixfs/node/file/shard.go index 303054f5f..e3dc79b14 100644 --- a/unixfs/node/file/shard.go +++ b/unixfs/node/file/shard.go @@ -4,6 +4,7 @@ import ( "context" "io" + "github.com/ipfs/go-unixfsnode/data" dagpb "github.com/ipld/go-codec-dagpb" "github.com/ipld/go-ipld-prime" cidlink "github.com/ipld/go-ipld-prime/linking/cid" @@ -15,56 +16,147 @@ type shardNodeFile struct { ctx context.Context lsys *ipld.LinkSystem substrate ipld.Node - done bool - rdr io.Reader } var _ ipld.Node = (*shardNodeFile)(nil) -func (s *shardNodeFile) Read(p []byte) (int, error) { - if s.done { - return 0, io.EOF +type shardNodeReader struct { + *shardNodeFile + rdr io.Reader + offset int64 +} + +func (s *shardNodeReader) makeReader() (io.Reader, error) { + links, err := s.shardNodeFile.substrate.LookupByString("Links") + if err != nil { + return nil, err } - // collect the sub-nodes on first use - if s.rdr == nil { - links, err := s.substrate.LookupByString("Links") + readers := make([]io.Reader, 0) + lnki := links.ListIterator() + at := int64(0) + for !lnki.Done() { + _, lnk, err := lnki.Next() if err != nil { - return 0, err + return nil, err } - readers := make([]io.Reader, 0) - lnki := links.ListIterator() - for !lnki.Done() { - _, lnk, err := lnki.Next() - if err != nil { - return 0, err - } - lnkhash, err := lnk.LookupByString("Hash") - if err != nil { - return 0, err - } - lnklnk, err := lnkhash.AsLink() - if err != nil { - return 0, err - } - target, err := s.lsys.Load(ipld.LinkContext{Ctx: s.ctx}, lnklnk, protoFor(lnklnk)) + sz, err := lnk.LookupByString("Tsize") + if err != nil { + return nil, err + } + childSize, err := sz.AsInt() + if err != nil { + return nil, err + } + if s.offset > at+childSize { + at += childSize + continue + } + lnkhash, err := lnk.LookupByString("Hash") + if err != nil { + return nil, err + } + lnklnk, err := lnkhash.AsLink() + if err != nil { + return nil, err + } + target := newDeferredFileNode(s.ctx, s.lsys, lnklnk) + tr, err := target.AsLargeBytes() + if err != nil { + return nil, err + } + // fastforward the first one if needed. + if at < s.offset { + _, err := tr.Seek(s.offset-at, io.SeekStart) if err != nil { - return 0, err + return nil, err } + } + at += childSize + readers = append(readers, tr) + } + if len(readers) == 0 { + return nil, io.EOF + } + return io.MultiReader(readers...), nil +} - asFSNode, err := NewUnixFSFile(s.ctx, target, s.lsys) - if err != nil { - return 0, err - } - readers = append(readers, asFSNode) +func (s *shardNodeReader) Read(p []byte) (int, error) { + // build reader + if s.rdr == nil { + rdr, err := s.makeReader() + if err != nil { + return 0, err } - s.rdr = io.MultiReader(readers...) + s.rdr = rdr } n, err := s.rdr.Read(p) - if err == io.EOF { + return n, err +} + +func (s *shardNodeReader) Seek(offset int64, whence int) (int64, error) { + if s.rdr != nil { s.rdr = nil - s.done = true } - return n, err + switch whence { + case io.SeekStart: + s.offset = offset + case io.SeekCurrent: + s.offset += offset + case io.SeekEnd: + s.offset = s.length() + offset + } + return s.offset, nil +} + +func (s *shardNodeFile) length() int64 { + // see if we have size specified in the unixfs data. errors fall back to length from links + nodeData, err := s.substrate.LookupByString("Data") + if err != nil { + return s.lengthFromLinks() + } + nodeDataBytes, err := nodeData.AsBytes() + if err != nil { + return s.lengthFromLinks() + } + ud, err := data.DecodeUnixFSData(nodeDataBytes) + if err != nil { + return s.lengthFromLinks() + } + if ud.FileSize.Exists() { + if fs, err := ud.FileSize.Must().AsInt(); err == nil { + return int64(fs) + } + } + return s.lengthFromLinks() +} + +func (s *shardNodeFile) lengthFromLinks() int64 { + links, err := s.substrate.LookupByString("Links") + if err != nil { + return 0 + } + size := int64(0) + li := links.ListIterator() + for !li.Done() { + _, l, err := li.Next() + if err != nil { + return 0 + } + sn, err := l.LookupByString("Tsize") + if err != nil { + return 0 + } + ll, err := sn.AsInt() + if err != nil { + return 0 + } + size += ll + } + return size +} + +func (s *shardNodeFile) AsLargeBytes() (io.ReadSeeker, error) { + return &shardNodeReader{s, nil, 0}, nil } func protoFor(link ipld.Link) ipld.NodePrototype { @@ -81,7 +173,11 @@ func (s *shardNodeFile) Kind() ipld.Kind { } func (s *shardNodeFile) AsBytes() ([]byte, error) { - return io.ReadAll(s) + rdr, err := s.AsLargeBytes() + if err != nil { + return nil, err + } + return io.ReadAll(rdr) } func (s *shardNodeFile) AsBool() (bool, error) { diff --git a/unixfs/node/file/wrapped.go b/unixfs/node/file/wrapped.go index 56b2c6ccc..b2c2210c9 100644 --- a/unixfs/node/file/wrapped.go +++ b/unixfs/node/file/wrapped.go @@ -6,7 +6,7 @@ import ( "github.com/ipld/go-ipld-prime/node/basicnode" ) -func newWrappedNode(substrate ipld.Node) (StreamableByteNode, error) { +func newWrappedNode(substrate ipld.Node) (LargeBytesNode, error) { dataField, err := substrate.LookupByString("Data") if err != nil { return nil, err