Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Use inmemory Kubo API if the local node is used. #1615

Merged
merged 2 commits into from
Jan 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/bacalhau/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ func serve(cmd *cobra.Command, OS *ServeOptions) error {
ctx = logger.ContextWithNodeIDLogger(ctx, libp2pHost.ID().String())

// Establishing IPFS connection
ipfs, err := ipfs.NewClient(OS.IPFSConnect)
ipfs, err := ipfs.NewClientUsingRemoteHandler(OS.IPFSConnect)
if err != nil {
Fatal(cmd, fmt.Sprintf("Error creating IPFS client: %s", err), 1)
}
Expand Down
11 changes: 4 additions & 7 deletions pkg/devstack/devstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func NewDevStack(
// IPFS
// -------------------------------------
var ipfsNode *ipfs.Node
var ipfsClient *ipfs.Client
var ipfsClient ipfs.Client

var ipfsSwarmAddrs []string
if i > 0 {
Expand All @@ -168,10 +168,7 @@ func NewDevStack(
return nil, fmt.Errorf("failed to create ipfs node: %w", err)
}

ipfsClient, err = ipfsNode.Client()
if err != nil {
return nil, fmt.Errorf("failed to create ipfs client: %w", err)
}
ipfsClient = ipfsNode.Client()

var libp2pHost host.Host
var libp2pPort int
Expand Down Expand Up @@ -477,8 +474,8 @@ func (stack *DevStack) GetNode(ctx context.Context, nodeID string) (

return nil, fmt.Errorf("node not found: %s", nodeID)
}
func (stack *DevStack) IPFSClients() []*ipfs.Client {
clients := make([]*ipfs.Client, 0, len(stack.Nodes))
func (stack *DevStack) IPFSClients() []ipfs.Client {
clients := make([]ipfs.Client, 0, len(stack.Nodes))
for _, node := range stack.Nodes {
clients = append(clients, node.IPFSClient)
}
Expand Down
11 changes: 3 additions & 8 deletions pkg/devstack/devstack_ipfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ import (
)

type DevStackIPFS struct {
IPFSClients []*ipfs.Client
IPFSClients []ipfs.Client
CleanupManager *system.CleanupManager
}

// NewDevStackIPFS creates a devstack but with only IPFS servers connected to each other
func NewDevStackIPFS(ctx context.Context, cm *system.CleanupManager, count int) (*DevStackIPFS, error) {
var clients []*ipfs.Client
var clients []ipfs.Client
for i := 0; i < count; i++ {
log.Debug().Msgf(`Creating Node #%d`, i)

Expand All @@ -37,12 +37,7 @@ func NewDevStackIPFS(ctx context.Context, cm *system.CleanupManager, count int)
return nil, fmt.Errorf("failed to create ipfs node: %w", err)
}

ipfsClient, err := ipfsNode.Client()
if err != nil {
return nil, fmt.Errorf("failed to create ipfs client: %w", err)
}

clients = append(clients, ipfsClient)
clients = append(clients, ipfsNode.Client())
}

stack := &DevStackIPFS{
Expand Down
5 changes: 2 additions & 3 deletions pkg/devstack/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@ package devstack

import (
"github.com/filecoin-project/bacalhau/pkg/ipfs"

"github.com/filecoin-project/bacalhau/pkg/node"
)

func ToIPFSClients(nodes []*node.Node) []*ipfs.Client {
res := []*ipfs.Client{}
func ToIPFSClients(nodes []*node.Node) []ipfs.Client {
res := make([]ipfs.Client, 0, len(nodes))
for _, n := range nodes {
res = append(res, n.IPFSClient)
}
Expand Down
6 changes: 2 additions & 4 deletions pkg/downloader/download_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestDownloaderSuite(t *testing.T) {
type DownloaderSuite struct {
suite.Suite
cm *system.CleanupManager
client *ipfs.Client
client ipfs.Client
outputDir string
downloadSettings *model.DownloaderSettings
downloadProvider DownloaderProvider
Expand All @@ -49,9 +49,7 @@ func (ds *DownloaderSuite) SetupTest() {
node, err := ipfs.NewLocalNode(ctx, ds.cm, nil)
require.NoError(ds.T(), err)

client, err := node.Client()
require.NoError(ds.T(), err)
ds.client = client
ds.client = node.Client()

swarm, err := node.SwarmAddresses()
require.NoError(ds.T(), err)
Expand Down
5 changes: 1 addition & 4 deletions pkg/downloader/ipfs/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,7 @@ func (ipfsDownloader *Downloader) FetchResult(ctx context.Context, result model.
}

log.Ctx(ctx).Debug().Msg("Connecting client to new IPFS node...")
ipfsClient, err := n.Client()
if err != nil {
return err
}
ipfsClient := n.Client()

err = func() error {
log.Ctx(ctx).Debug().Msgf(
Expand Down
5 changes: 3 additions & 2 deletions pkg/executor/util/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
noop_executor "github.com/filecoin-project/bacalhau/pkg/executor/noop"
pythonwasm "github.com/filecoin-project/bacalhau/pkg/executor/python_wasm"
"github.com/filecoin-project/bacalhau/pkg/executor/wasm"
"github.com/filecoin-project/bacalhau/pkg/ipfs"
"github.com/filecoin-project/bacalhau/pkg/model"
"github.com/filecoin-project/bacalhau/pkg/storage"
"github.com/filecoin-project/bacalhau/pkg/storage/combo"
Expand All @@ -21,7 +22,7 @@ import (
)

type StandardStorageProviderOptions struct {
IPFSMultiaddress string
API ipfs.Client
FilecoinUnsealedPath string
DownloadPath string
}
Expand All @@ -36,7 +37,7 @@ func NewStandardStorageProvider(
cm *system.CleanupManager,
options StandardStorageProviderOptions,
) (storage.StorageProvider, error) {
ipfsAPICopyStorage, err := apicopy.NewStorage(cm, options.IPFSMultiaddress)
ipfsAPICopyStorage, err := apicopy.NewStorage(cm, options.API)
if err != nil {
return nil, err
}
Expand Down
43 changes: 26 additions & 17 deletions pkg/ipfs/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,34 +27,43 @@ type Client struct {
addr string
}

// NewClient creates an API client for the given ipfs node API multiaddress.
// NewClientUsingRemoteHandler creates an API client for the given ipfs node API multiaddress.
// NOTE: the API address is _not_ the same as the swarm address
func NewClient(apiAddr string) (*Client, error) {
func NewClientUsingRemoteHandler(apiAddr string) (Client, error) {
addr, err := ma.NewMultiaddr(apiAddr)
if err != nil {
return nil, fmt.Errorf("failed to parse api address '%s': %w", apiAddr, err)
return Client{}, fmt.Errorf("failed to parse api address '%s': %w", apiAddr, err)
}

api, err := httpapi.NewApi(addr)
if err != nil {
return nil, fmt.Errorf("failed to connect to '%s': %w", apiAddr, err)
return Client{}, fmt.Errorf("failed to connect to '%s': %w", apiAddr, err)
}

log.Debug().Msgf("Created IPFS client for node API address: %s", apiAddr)
return &Client{
log.Debug().Msgf("Created remote IPFS client for node API address: %s", apiAddr)
return Client{
API: api,
addr: apiAddr,
}, nil
}

const MagicInternalIPFSAddress = "memory://in-memory-node/"

func NewClient(api icore.CoreAPI) Client {
return Client{
API: api,
addr: MagicInternalIPFSAddress,
}
}

// WaitUntilAvailable blocks the current goroutine until the client is able
// to successfully make requests to the server. Useful for setting up local
// test networks. WaitUntilAvailable will respect context deadlines/cancels,
// and will propagate context cancellations back to the caller.
// NOTE: if you do not pass a context with a deadline/cancel in to this
//
// function, it may attempt to call the api server forever.
func (cl *Client) WaitUntilAvailable(ctx context.Context) error {
func (cl Client) WaitUntilAvailable(ctx context.Context) error {
for {
if err := ctx.Err(); err != nil {
return err
Expand All @@ -73,7 +82,7 @@ func (cl *Client) WaitUntilAvailable(ctx context.Context) error {
}

// ID returns the node's ipfs ID.
func (cl *Client) ID(ctx context.Context) (string, error) {
func (cl Client) ID(ctx context.Context) (string, error) {
key, err := cl.API.Key().Self(ctx)
if err != nil {
return "", err
Expand All @@ -83,12 +92,12 @@ func (cl *Client) ID(ctx context.Context) (string, error) {
}

// APIAddress returns Api address that was used to connect to the node.
func (cl *Client) APIAddress() string {
func (cl Client) APIAddress() string {
return cl.addr
}

// SwarmAddresses returns a list of swarm addresses the node has announced.
func (cl *Client) SwarmAddresses(ctx context.Context) ([]string, error) {
func (cl Client) SwarmAddresses(ctx context.Context) ([]string, error) {
ctx, span := system.GetTracer().Start(ctx, "pkg/ipfs.SwarmAddresses")
defer span.End()

Expand All @@ -111,7 +120,7 @@ func (cl *Client) SwarmAddresses(ctx context.Context) ([]string, error) {
}

// Get fetches a file or directory from the ipfs network.
func (cl *Client) Get(ctx context.Context, cid, outputPath string) error {
func (cl Client) Get(ctx context.Context, cid, outputPath string) error {
ctx, span := system.GetTracer().Start(ctx, "pkg/ipfs.Get")
defer span.End()

Expand All @@ -138,7 +147,7 @@ func (cl *Client) Get(ctx context.Context, cid, outputPath string) error {

// Put uploads and pins a file or directory to the ipfs network. Timeouts and
// cancellation should be handled by passing an appropriate context value.
func (cl *Client) Put(ctx context.Context, inputPath string) (string, error) {
func (cl Client) Put(ctx context.Context, inputPath string) (string, error) {
ctx, span := system.GetTracer().Start(ctx, "pkg/ipfs.Put")
defer span.End()

Expand Down Expand Up @@ -179,7 +188,7 @@ type StatResult struct {
}

// Stat returns information about an IPLD CID on the ipfs network.
func (cl *Client) Stat(ctx context.Context, cid string) (*StatResult, error) {
func (cl Client) Stat(ctx context.Context, cid string) (*StatResult, error) {
ctx, span := system.GetTracer().Start(ctx, "kg/ipfs.Stat")
defer span.End()

Expand All @@ -198,7 +207,7 @@ func (cl *Client) Stat(ctx context.Context, cid string) (*StatResult, error) {
}, nil
}

func (cl *Client) GetCidSize(ctx context.Context, cid string) (uint64, error) {
func (cl Client) GetCidSize(ctx context.Context, cid string) (uint64, error) {
ctx, span := system.GetTracer().Start(ctx, "pkg/ipfs.GetCidSize")
defer span.End()

Expand All @@ -211,7 +220,7 @@ func (cl *Client) GetCidSize(ctx context.Context, cid string) (uint64, error) {
}

// NodesWithCID returns the ipfs ids of nodes that have the given CID pinned.
func (cl *Client) NodesWithCID(ctx context.Context, cid string) ([]string, error) {
func (cl Client) NodesWithCID(ctx context.Context, cid string) ([]string, error) {
ctx, span := system.GetTracer().Start(ctx, "pkg/ipfs.NodesWithCID")
defer span.End()

Expand All @@ -229,7 +238,7 @@ func (cl *Client) NodesWithCID(ctx context.Context, cid string) ([]string, error
}

// HasCID returns true if the node has the given CID locally, whether pinned or not.
func (cl *Client) HasCID(ctx context.Context, cid string) (bool, error) {
func (cl Client) HasCID(ctx context.Context, cid string) (bool, error) {
ctx, span := system.GetTracer().Start(ctx, "pkg/ipfs.HasCID")
defer span.End()

Expand All @@ -252,7 +261,7 @@ func (cl *Client) HasCID(ctx context.Context, cid string) (bool, error) {
return false, nil
}

func (cl *Client) GetTreeNode(ctx context.Context, cid string) (IPLDTreeNode, error) {
func (cl Client) GetTreeNode(ctx context.Context, cid string) (IPLDTreeNode, error) {
ctx, span := system.GetTracer().Start(ctx, "pkg/ipfs.GetTreeNode")
defer span.End()

Expand Down
12 changes: 2 additions & 10 deletions pkg/ipfs/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,16 +286,8 @@ func (n *Node) LogDetails() {
}

// Client returns an API client for interacting with the node.
func (n *Node) Client() (*Client, error) {
addrs, err := n.APIAddresses()
if err != nil {
return nil, fmt.Errorf("failed to fetch api addresses: %w", err)
}
if len(addrs) == 0 {
return nil, fmt.Errorf("error creating client: node has no available api addresses")
}

return NewClient(addrs[0])
func (n *Node) Client() Client {
return NewClient(n.api)
}

// createNode spawns a new IPFS node using a temporary repo path.
Expand Down
8 changes: 2 additions & 6 deletions pkg/ipfs/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,7 @@ func (suite *NodeSuite) TestFunctionality() {
require.NoError(suite.T(), err)

// Upload a file to the second client:
cl2, err := n2.Client()
require.NoError(suite.T(), err)
require.NoError(suite.T(), cl2.WaitUntilAvailable(ctx))
cl2 := n2.Client()

cid, err := cl2.Put(ctx, filePath)
require.NoError(suite.T(), err)
Expand All @@ -78,9 +76,7 @@ func (suite *NodeSuite) TestFunctionality() {
require.True(suite.T(), isPinned)

// Download the file from the first client:
cl1, err := n1.Client()
require.NoError(suite.T(), err)
require.NoError(suite.T(), cl1.WaitUntilAvailable(ctx))
cl1 := n1.Client()

outputPath := filepath.Join(dirPath, "output.txt")
err = cl1.Get(ctx, cid, outputPath)
Expand Down
4 changes: 2 additions & 2 deletions pkg/ipfs/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/rs/zerolog/log"
)

func AddFileToNodes(ctx context.Context, filePath string, clients ...*Client) (string, error) {
func AddFileToNodes(ctx context.Context, filePath string, clients ...Client) (string, error) {
var res string
for i, client := range clients {
cid, err := client.Put(ctx, filePath)
Expand All @@ -27,7 +27,7 @@ func AddFileToNodes(ctx context.Context, filePath string, clients ...*Client) (s
return res, nil
}

func AddTextToNodes(ctx context.Context, fileContent []byte, clients ...*Client) (string, error) {
func AddTextToNodes(ctx context.Context, fileContent []byte, clients ...Client) (string, error) {
tempDir, err := os.MkdirTemp("", "bacalhau-test")
if err != nil {
return "", err
Expand Down
6 changes: 3 additions & 3 deletions pkg/node/factories.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (f *StandardStorageProvidersFactory) Get(
ctx,
nodeConfig.CleanupManager,
executor_util.StandardStorageProviderOptions{
IPFSMultiaddress: nodeConfig.IPFSClient.APIAddress(),
API: nodeConfig.IPFSClient,
FilecoinUnsealedPath: nodeConfig.FilecoinUnsealedPath,
},
)
Expand All @@ -100,7 +100,7 @@ func (f *StandardExecutorsFactory) Get(
executor_util.StandardExecutorOptions{
DockerID: fmt.Sprintf("bacalhau-%s", nodeConfig.Host.ID().String()),
Storage: executor_util.StandardStorageProviderOptions{
IPFSMultiaddress: nodeConfig.IPFSClient.APIAddress(),
API: nodeConfig.IPFSClient,
FilecoinUnsealedPath: nodeConfig.FilecoinUnsealedPath,
},
},
Expand Down Expand Up @@ -138,7 +138,7 @@ func (f *StandardPublishersFactory) Get(
return publisher_util.NewIPFSPublishers(
ctx,
nodeConfig.CleanupManager,
nodeConfig.IPFSClient.APIAddress(),
nodeConfig.IPFSClient,
nodeConfig.EstuaryAPIKey,
nodeConfig.LotusConfig,
)
Expand Down
4 changes: 2 additions & 2 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ const NodeInfoTopic = "bacalhau-node-info"

// Node configuration
type NodeConfig struct {
IPFSClient *ipfs.Client
IPFSClient ipfs.Client
CleanupManager *system.CleanupManager
LocalDB localdb.LocalDB
Host host.Host
Expand Down Expand Up @@ -68,7 +68,7 @@ type Node struct {
ComputeNode *Compute
RequesterNode *Requester
CleanupManager *system.CleanupManager
IPFSClient *ipfs.Client
IPFSClient ipfs.Client
Host host.Host
metricsPort int
}
Expand Down
Loading