Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: pre-signed chunks #4719

Merged
merged 12 commits into from
Jul 9, 2024
10 changes: 7 additions & 3 deletions openapi/Swarm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,11 @@ paths:
- Chunk
parameters:
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmTagParameter"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageBatchId"
- in: header
schema:
$ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageBatchId"
required: false
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageStamp"
requestBody:
description: Chunk binary data that has to have at least 8 bytes.
content:
Expand Down Expand Up @@ -689,8 +693,8 @@ paths:
- in: header
schema:
$ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageBatchId"
name: swarm-postage-batch-id
required: true
required: false
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageStamp"
requestBody:
required: true
description: The SOC binary data is composed of the span (8 bytes) and the at most 4KB payload.
Expand Down
14 changes: 14 additions & 0 deletions openapi/SwarmCommon.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1116,6 +1116,20 @@ components:
schema:
$ref: "#/components/schemas/SwarmAddress"

SwarmPostageStamp:
in: header
name: swarm-postage-stamp
description: |
Postage stamp for the corresponding chunk in the request. \
It is required if Swarm-Postage-Batch-Id header is missing \
It consists of: \
- batch ID - 0:32 bytes \
- postage index (bucket and bucket index) - 32:40 bytes \
- timestamp - 40:48 bytes \
- signature - 48:113 bytes
schema:
$ref: "#/components/schemas/HexString"

SwarmDeferredUpload:
in: header
name: swarm-deferred-upload
Expand Down
34 changes: 33 additions & 1 deletion pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ const (
SwarmFeedIndexNextHeader = "Swarm-Feed-Index-Next"
SwarmCollectionHeader = "Swarm-Collection"
SwarmPostageBatchIdHeader = "Swarm-Postage-Batch-Id"
SwarmPostageStampHeader = "Swarm-Postage-Stamp"
SwarmDeferredUploadHeader = "Swarm-Deferred-Upload"
SwarmRedundancyLevelHeader = "Swarm-Redundancy-Level"
SwarmRedundancyStrategyHeader = "Swarm-Redundancy-Strategy"
Expand Down Expand Up @@ -115,6 +116,8 @@ var (
errBatchUnusable = errors.New("batch not usable")
errUnsupportedDevNodeOperation = errors.New("operation not supported in dev mode")
errOperationSupportedOnlyInFullMode = errors.New("operation is supported only in full mode")

batchIdOrStampSig = fmt.Sprintf("Either '%s' or '%s' header must be set in the request", SwarmPostageStampHeader, SwarmPostageBatchIdHeader)
)

// Storer interface provides the functionality required from the local storage
Expand Down Expand Up @@ -506,7 +509,7 @@ func (s *Service) corsHandler(h http.Handler) http.Handler {
allowedHeaders := []string{
"User-Agent", "Accept", "X-Requested-With", "Access-Control-Request-Headers", "Access-Control-Request-Method", "Accept-Ranges", "Content-Encoding",
AuthorizationHeader, AcceptEncodingHeader, ContentTypeHeader, ContentDispositionHeader, RangeHeader, OriginHeader,
SwarmTagHeader, SwarmPinHeader, SwarmEncryptHeader, SwarmIndexDocumentHeader, SwarmErrorDocumentHeader, SwarmCollectionHeader, SwarmPostageBatchIdHeader, SwarmDeferredUploadHeader, SwarmRedundancyLevelHeader, SwarmRedundancyStrategyHeader, SwarmRedundancyFallbackModeHeader, SwarmChunkRetrievalTimeoutHeader, SwarmLookAheadBufferSizeHeader, SwarmFeedIndexHeader, SwarmFeedIndexNextHeader, GasPriceHeader, GasLimitHeader, ImmutableHeader,
SwarmTagHeader, SwarmPinHeader, SwarmEncryptHeader, SwarmIndexDocumentHeader, SwarmErrorDocumentHeader, SwarmCollectionHeader, SwarmPostageBatchIdHeader, SwarmPostageStampHeader, SwarmDeferredUploadHeader, SwarmRedundancyLevelHeader, SwarmRedundancyStrategyHeader, SwarmRedundancyFallbackModeHeader, SwarmChunkRetrievalTimeoutHeader, SwarmLookAheadBufferSizeHeader, SwarmFeedIndexHeader, SwarmFeedIndexNextHeader, GasPriceHeader, GasLimitHeader, ImmutableHeader,
}
allowedHeadersStr := strings.Join(allowedHeaders, ", ")

Expand Down Expand Up @@ -725,6 +728,35 @@ func (s *Service) newStamperPutter(ctx context.Context, opts putterOptions) (sto
}, nil
}

func (s *Service) newStampedPutter(ctx context.Context, opts putterOptions, stamp *postage.Stamp) (storer.PutterSession, error) {
if !opts.Deferred && s.beeMode == DevMode {
return nil, errUnsupportedDevNodeOperation
}

storedBatch, err := s.batchStore.Get(stamp.BatchID())
if err != nil {
return nil, errInvalidPostageBatch
}

var session storer.PutterSession
if opts.Deferred || opts.Pin {
session, err = s.storer.Upload(ctx, opts.Pin, opts.TagID)
if err != nil {
return nil, fmt.Errorf("failed creating session: %w", err)
}
} else {
session = s.storer.DirectUpload()
}

stamper := postage.NewPresignedStamper(stamp, storedBatch.Owner)

return &putterSessionWrapper{
PutterSession: session,
stamper: stamper,
save: func() error { return nil },
}, nil
}

type pipelineFunc func(context.Context, io.Reader) (swarm.Address, error)

func requestPipelineFn(s storage.Putter, encrypt bool, rLevel redundancy.Level) pipelineFunc {
Expand Down
47 changes: 38 additions & 9 deletions pkg/api/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/ethersphere/bee/v2/pkg/cac"
"github.com/ethersphere/bee/v2/pkg/soc"
"github.com/ethersphere/bee/v2/pkg/storer"

"github.com/ethersphere/bee/v2/pkg/jsonhttp"
"github.com/ethersphere/bee/v2/pkg/postage"
Expand All @@ -30,7 +31,8 @@ func (s *Service) chunkUploadHandler(w http.ResponseWriter, r *http.Request) {
logger := s.logger.WithName("post_chunk").Build()

headers := struct {
BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"`
BatchID []byte `map:"Swarm-Postage-Batch-Id"`
StampSig []byte `map:"Swarm-Postage-Stamp"`
SwarmTag uint64 `map:"Swarm-Tag"`
}{}
if response := s.mapStructure(r.Header, &headers); response != nil {
Expand All @@ -57,20 +59,45 @@ func (s *Service) chunkUploadHandler(w http.ResponseWriter, r *http.Request) {
}
}

if len(headers.BatchID) == 0 && len(headers.StampSig) == 0 {
logger.Error(nil, batchIdOrStampSig)
jsonhttp.BadRequest(w, batchIdOrStampSig)
return
}

// Currently the localstore supports session based uploads. We don't want to
// create new session for single chunk uploads. So if the chunk upload is not
// part of a session already, then we directly push the chunk. This way we dont
// need to go through the UploadStore.
deferred := tag != 0

putter, err := s.newStamperPutter(r.Context(), putterOptions{
BatchID: headers.BatchID,
TagID: tag,
Deferred: deferred,
})
var putter storer.PutterSession
if len(headers.StampSig) != 0 {
stamp := postage.Stamp{}
if err := stamp.UnmarshalBinary(headers.StampSig); err != nil {
errorMsg := "Stamp deserialization failure"
logger.Debug(errorMsg, "error", err)
logger.Error(nil, errorMsg)
jsonhttp.BadRequest(w, errorMsg)
return
}

putter, err = s.newStampedPutter(r.Context(), putterOptions{
BatchID: stamp.BatchID(),
TagID: tag,
Deferred: deferred,
}, &stamp)
} else {
putter, err = s.newStamperPutter(r.Context(), putterOptions{
BatchID: headers.BatchID,
TagID: tag,
Deferred: deferred,
})
}
if err != nil {
logger.Debug("get putter failed", "error", err)
logger.Error(nil, "get putter failed")
errorMsg := "get putter failed"
logger.Debug(errorMsg, "error", err)
logger.Error(nil, errorMsg)
switch {
case errors.Is(err, errBatchUnusable) || errors.Is(err, postage.ErrNotUsable):
jsonhttp.UnprocessableEntity(w, "batch not usable yet or does not exist")
Expand All @@ -81,7 +108,7 @@ func (s *Service) chunkUploadHandler(w http.ResponseWriter, r *http.Request) {
case errors.Is(err, errUnsupportedDevNodeOperation):
jsonhttp.BadRequest(w, errUnsupportedDevNodeOperation)
default:
jsonhttp.BadRequest(w, nil)
jsonhttp.BadRequest(w, errorMsg)
}
return
}
Expand Down Expand Up @@ -146,6 +173,8 @@ func (s *Service) chunkUploadHandler(w http.ResponseWriter, r *http.Request) {
switch {
case errors.Is(err, postage.ErrBucketFull):
jsonhttp.PaymentRequired(ow, "batch is overissued")
case errors.Is(err, postage.ErrInvalidBatchSignature):
jsonhttp.BadRequest(ow, "stamp signature is invalid")
default:
jsonhttp.InternalServerError(ow, "chunk write error")
}
Expand Down
39 changes: 39 additions & 0 deletions pkg/api/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@ package api_test
import (
"bytes"
"context"
"encoding/hex"
"errors"
"fmt"
"net/http"
"testing"
"time"

"github.com/ethersphere/bee/v2/pkg/crypto"
"github.com/ethersphere/bee/v2/pkg/log"
"github.com/ethersphere/bee/v2/pkg/postage"
mockbatchstore "github.com/ethersphere/bee/v2/pkg/postage/batchstore/mock"
mockpost "github.com/ethersphere/bee/v2/pkg/postage/mock"
"github.com/ethersphere/bee/v2/pkg/spinlock"
Expand All @@ -22,6 +25,7 @@ import (
"github.com/ethersphere/bee/v2/pkg/api"
"github.com/ethersphere/bee/v2/pkg/jsonhttp"
"github.com/ethersphere/bee/v2/pkg/jsonhttp/jsonhttptest"
testingpostage "github.com/ethersphere/bee/v2/pkg/postage/testing"
testingc "github.com/ethersphere/bee/v2/pkg/storage/testing"
"github.com/ethersphere/bee/v2/pkg/swarm"
)
Expand Down Expand Up @@ -269,3 +273,38 @@ func TestChunkDirectUpload(t *testing.T) {
}),
)
}

// // TestPreSignedUpload tests that chunk can be uploaded with pre-signed postage stamp
func TestPreSignedUpload(t *testing.T) {
t.Parallel()

var (
chunksEndpoint = "/chunks"
chunk = testingc.GenerateTestRandomChunk()
storerMock = mockstorer.New()
batchStore = mockbatchstore.New()
client, _, _, _ = newTestServer(t, testServerOptions{
Storer: storerMock,
BatchStore: batchStore,
})
)

// generate random postage batch and stamp
key, _ := crypto.GenerateSecp256k1Key()
signer := crypto.NewDefaultSigner(key)
owner, _ := signer.EthereumAddress()
stamp := testingpostage.MustNewValidStamp(signer, chunk.Address())
_ = batchStore.Save(&postage.Batch{
ID: stamp.BatchID(),
Owner: owner.Bytes(),
})
stampBytes, _ := stamp.MarshalBinary()

// read off inserted chunk
go func() { <-storerMock.PusherFeed() }()

jsonhttptest.Request(t, client, http.MethodPost, chunksEndpoint, http.StatusCreated,
jsonhttptest.WithRequestHeader(api.SwarmPostageStampHeader, hex.EncodeToString(stampBytes)),
jsonhttptest.WithRequestBody(bytes.NewReader(chunk.Data())),
)
}
46 changes: 38 additions & 8 deletions pkg/api/soc.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/ethersphere/bee/v2/pkg/postage"
"github.com/ethersphere/bee/v2/pkg/soc"
storage "github.com/ethersphere/bee/v2/pkg/storage"
storer "github.com/ethersphere/bee/v2/pkg/storer"
"github.com/ethersphere/bee/v2/pkg/swarm"
"github.com/gorilla/mux"
)
Expand Down Expand Up @@ -43,17 +44,25 @@ func (s *Service) socUploadHandler(w http.ResponseWriter, r *http.Request) {
}

headers := struct {
BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"`
Pin bool `map:"Swarm-Pin"`
BatchID []byte `map:"Swarm-Postage-Batch-Id"`
StampSig []byte `map:"Swarm-Postage-Stamp"`
Pin bool `map:"Swarm-Pin"`
}{}
if response := s.mapStructure(r.Header, &headers); response != nil {
response("invalid header params", logger, w)
return
}

if len(headers.BatchID) == 0 && len(headers.StampSig) == 0 {
logger.Error(nil, batchIdOrStampSig)
jsonhttp.BadRequest(w, batchIdOrStampSig)
return
}

// if pinning header is set we do a deferred upload, else we do a direct upload
var (
tag uint64
err error
)
if headers.Pin {
session, err := s.storer.NewSession()
Expand All @@ -71,12 +80,33 @@ func (s *Service) socUploadHandler(w http.ResponseWriter, r *http.Request) {
tag = session.TagID
}

putter, err := s.newStamperPutter(r.Context(), putterOptions{
BatchID: headers.BatchID,
TagID: tag,
Pin: headers.Pin,
Deferred: headers.Pin,
})
deferred := tag != 0

var putter storer.PutterSession
if len(headers.StampSig) != 0 {
stamp := postage.Stamp{}
if err := stamp.UnmarshalBinary(headers.StampSig); err != nil {
errorMsg := "Stamp deserialization failure"
logger.Debug(errorMsg, "error", err)
logger.Error(nil, errorMsg)
jsonhttp.BadRequest(w, errorMsg)
return
}

putter, err = s.newStampedPutter(r.Context(), putterOptions{
BatchID: stamp.BatchID(),
TagID: tag,
Pin: headers.Pin,
Deferred: deferred,
}, &stamp)
} else {
putter, err = s.newStamperPutter(r.Context(), putterOptions{
BatchID: headers.BatchID,
TagID: tag,
Pin: headers.Pin,
Deferred: deferred,
})
}
if err != nil {
logger.Debug("get putter failed", "error", err)
logger.Error(nil, "get putter failed")
Expand Down
39 changes: 39 additions & 0 deletions pkg/api/soc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@ import (
"time"

"github.com/ethersphere/bee/v2/pkg/api"
"github.com/ethersphere/bee/v2/pkg/crypto"
"github.com/ethersphere/bee/v2/pkg/jsonhttp"
"github.com/ethersphere/bee/v2/pkg/jsonhttp/jsonhttptest"
"github.com/ethersphere/bee/v2/pkg/postage"
mockbatchstore "github.com/ethersphere/bee/v2/pkg/postage/batchstore/mock"
testingpostage "github.com/ethersphere/bee/v2/pkg/postage/testing"
testingsoc "github.com/ethersphere/bee/v2/pkg/soc/testing"
"github.com/ethersphere/bee/v2/pkg/spinlock"
mockstorer "github.com/ethersphere/bee/v2/pkg/storer/mock"
Expand Down Expand Up @@ -140,6 +144,41 @@ func TestSOC(t *testing.T) {
t.Fatal(err)
}
})

// TestPreSignedUpload tests that chunk can be uploaded with pre-signed postage stamp
t.Run("pre-signed upload", func(t *testing.T) {
t.Parallel()

var (
s = testingsoc.GenerateMockSOC(t, testData)
storerMock = mockstorer.New()
batchStore = mockbatchstore.New()
client, _, _, _ = newTestServer(t, testServerOptions{
Storer: storerMock,
BatchStore: batchStore,
})
)

// generate random postage batch and stamp
key, _ := crypto.GenerateSecp256k1Key()
signer := crypto.NewDefaultSigner(key)
owner, _ := signer.EthereumAddress()
stamp := testingpostage.MustNewValidStamp(signer, s.Address())
_ = batchStore.Save(&postage.Batch{
ID: stamp.BatchID(),
Owner: owner.Bytes(),
})
stampBytes, _ := stamp.MarshalBinary()

// read off inserted chunk
go func() { <-storerMock.PusherFeed() }()

jsonhttptest.Request(t, client, http.MethodPost, socResource(hex.EncodeToString(s.Owner), hex.EncodeToString(s.ID), hex.EncodeToString(s.Signature)), http.StatusCreated,
jsonhttptest.WithRequestHeader(api.SwarmPostageStampHeader, hex.EncodeToString(stampBytes)),
jsonhttptest.WithRequestBody(bytes.NewReader(s.WrappedChunk.Data())),
)
})

t.Run("err - batch empty", func(t *testing.T) {
s := testingsoc.GenerateMockSOC(t, testData)
hexbatch := hex.EncodeToString(batchEmpty)
Expand Down
Loading
Loading