Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatically record heap profiles in testplans #147

Merged
merged 4 commits into from
Feb 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ jobs:
- run:
name: "trigger graphsync testplan on taas"
command: ~/testground-cli run composition -f $HOME/testground/plans/graphsync/stress-k8s.toml --metadata-commit=$CIRCLE_SHA1 --metadata-repo=ipfs/go-graphsync --metadata-branch=$CIRCLE_BRANCH
- run:
name: "trigger graphsync memory stress on taas"
command: ~/testground-cli run composition -f $HOME/testground/plans/graphsync/memory-stress-k8s.toml --metadata-commit=$CIRCLE_SHA1 --metadata-repo=ipfs/go-graphsync --metadata-branch=$CIRCLE_BRANCH

workflows:
version: 2
Expand Down
19 changes: 11 additions & 8 deletions testplans/graphsync/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,34 @@ module github.com/ipfs/go-graphsync/testplans/graphsync
go 1.14

require (
github.com/davidlazar/go-crypto v0.0.0-20190912175916-7055855a373f // indirect
github.com/dgraph-io/badger/v2 v2.2007.2
github.com/dustin/go-humanize v1.0.0
github.com/hannahhoward/all-selector v0.1.0
github.com/hannahhoward/all-selector v0.2.0
github.com/ipfs/go-blockservice v0.1.3
github.com/ipfs/go-cid v0.0.6
github.com/ipfs/go-cid v0.0.7
github.com/ipfs/go-datastore v0.4.4
github.com/ipfs/go-graphsync v0.1.2
github.com/ipfs/go-ds-badger2 v0.1.1-0.20200708190120-187fc06f714e
github.com/ipfs/go-graphsync v0.6.0
github.com/ipfs/go-ipfs-blockstore v0.1.4
github.com/ipfs/go-ipfs-chunker v0.0.5
github.com/ipfs/go-ipfs-exchange-offline v0.0.1
github.com/ipfs/go-ipfs-files v0.0.8
github.com/ipfs/go-ipld-format v0.2.0
github.com/ipfs/go-merkledag v0.3.1
github.com/ipfs/go-unixfs v0.2.4
github.com/ipld/go-ipld-prime v0.4.0
github.com/ipld/go-ipld-prime v0.5.1-0.20201021195245-109253e8a018
github.com/kr/text v0.2.0 // indirect
github.com/libp2p/go-libp2p v0.10.0
github.com/libp2p/go-libp2p-core v0.6.0
github.com/libp2p/go-libp2p v0.12.0
github.com/libp2p/go-libp2p-core v0.7.0
github.com/libp2p/go-libp2p-noise v0.1.1
github.com/libp2p/go-libp2p-secio v0.2.2
github.com/libp2p/go-libp2p-tls v0.1.3
github.com/libp2p/go-sockaddr v0.1.0 // indirect
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/testground/sdk-go v0.2.7-0.20201112151952-8ee00c80c3ec
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208
golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae // indirect
google.golang.org/protobuf v1.25.0 // indirect
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
gopkg.in/yaml.v2 v2.2.8 // indirect
)
)
306 changes: 152 additions & 154 deletions testplans/graphsync/go.sum

Large diffs are not rendered by default.

181 changes: 153 additions & 28 deletions testplans/graphsync/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,18 @@ import (
"os"
"path/filepath"
goruntime "runtime"
"runtime/pprof"
"strings"
"time"

dgbadger "github.com/dgraph-io/badger/v2"
"github.com/dustin/go-humanize"
allselector "github.com/hannahhoward/all-selector"
"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
dss "github.com/ipfs/go-datastore/sync"
badgerds "github.com/ipfs/go-ds-badger"
badgerds "github.com/ipfs/go-ds-badger2"
blockstore "github.com/ipfs/go-ipfs-blockstore"
chunk "github.com/ipfs/go-ipfs-chunker"
offline "github.com/ipfs/go-ipfs-exchange-offline"
Expand Down Expand Up @@ -65,10 +67,10 @@ func (p networkParams) String() string {

func runStress(runenv *runtime.RunEnv, initCtx *run.InitContext) error {
var (
size = runenv.SizeParam("size")
concurrency = runenv.IntParam("concurrency")

networkParams = parseNetworkConfig(runenv)
size = runenv.SizeParam("size")
concurrency = runenv.IntParam("concurrency")
networkParams = parseNetworkConfig(runenv)
memorySnapshots = parseMemorySnapshotsParam(runenv)
)
runenv.RecordMessage("started test instance")
runenv.RecordMessage("network params: %v", networkParams)
Expand All @@ -81,15 +83,21 @@ func runStress(runenv *runtime.RunEnv, initCtx *run.InitContext) error {
host, peers, _ := makeHost(ctx, runenv, initCtx)
defer host.Close()

datastore, err := createDatastore(runenv.BooleanParam("disk_store"))
if err != nil {
runenv.RecordMessage("datastore error: %s", err.Error())
return err
}
var (
// make datastore, blockstore, dag service, graphsync
bs = blockstore.NewBlockstore(dss.MutexWrap(createDatastore(runenv.BooleanParam("disk_store"))))
bs = blockstore.NewBlockstore(dss.MutexWrap(datastore))
dagsrv = merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs)))
gsync = gsi.New(ctx,
gsnet.NewFromLibp2pHost(host),
storeutil.LoaderForBlockstore(bs),
storeutil.StorerForBlockstore(bs),
)
recorder = &runRecorder{memorySnapshots: memorySnapshots, runenv: runenv}
)

defer initCtx.SyncClient.MustSignalAndWait(ctx, "done", runenv.TestInstanceCount)
Expand All @@ -106,19 +114,27 @@ func runStress(runenv *runtime.RunEnv, initCtx *run.InitContext) error {
gsync.RegisterIncomingRequestHook(func(p peer.ID, request gs.RequestData, hookActions gs.IncomingRequestHookActions) {
hookActions.ValidateRequest()
})

return runProvider(ctx, runenv, initCtx, dagsrv, size, networkParams, concurrency)

gsync.RegisterBlockSentListener(func(p peer.ID, request gs.RequestData, block gs.BlockData) {
recorder.recordBlock()
})
err := runProvider(ctx, runenv, initCtx, dagsrv, size, networkParams, concurrency, memorySnapshots, recorder)
if err != nil {
runenv.RecordMessage("Error running provider: %s", err.Error())
}
return err
case "requestors":
runenv.RecordMessage("we are the requestor")
defer runenv.RecordMessage("done requestor")
gsync.RegisterIncomingBlockHook(func(p peer.ID, request gs.ResponseData, block gs.BlockData, ha gs.IncomingBlockHookActions) {
recorder.recordBlock()
})

p := *peers[0]
if err := host.Connect(ctx, p); err != nil {
return err
}
runenv.RecordMessage("done dialling provider")
return runRequestor(ctx, runenv, initCtx, gsync, p, dagsrv, networkParams, concurrency, size)
return runRequestor(ctx, runenv, initCtx, gsync, p, dagsrv, networkParams, concurrency, size, memorySnapshots, recorder)

default:
panic(fmt.Sprintf("unsupported group ID: %s\n", runenv.TestGroupID))
Expand Down Expand Up @@ -158,7 +174,33 @@ func parseNetworkConfig(runenv *runtime.RunEnv) []networkParams {
return ret
}

func runRequestor(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitContext, gsync gs.GraphExchange, p peer.AddrInfo, dagsrv format.DAGService, networkParams []networkParams, concurrency int, size uint64) error {
type snapshotMode uint

const (
snapshotNone snapshotMode = iota
snapshotSimple
snapshotDetailed
)

const (
detailedSnapshotFrequency = 10
)

func parseMemorySnapshotsParam(runenv *runtime.RunEnv) snapshotMode {
memorySnapshotsString := runenv.StringParam("memory_snapshots")
switch memorySnapshotsString {
case "none":
return snapshotNone
case "simple":
return snapshotSimple
case "detailed":
return snapshotDetailed
default:
panic("invalid memory_snapshot parameter")
}
}

func runRequestor(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitContext, gsync gs.GraphExchange, p peer.AddrInfo, dagsrv format.DAGService, networkParams []networkParams, concurrency int, size uint64, memorySnapshots snapshotMode, recorder *runRecorder) error {
var (
cids []cid.Cid
// create a selector for the whole UnixFS dag
Expand All @@ -167,11 +209,14 @@ func runRequestor(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.Init

for round, np := range networkParams {
var (
topicCid = sync.NewTopic(fmt.Sprintf("cid-%d", round), []cid.Cid{})
stateNext = sync.State(fmt.Sprintf("next-%d", round))
stateNet = sync.State(fmt.Sprintf("network-configured-%d", round))
topicCid = sync.NewTopic(fmt.Sprintf("cid-%d", round), []cid.Cid{})
stateNext = sync.State(fmt.Sprintf("next-%d", round))
stateNet = sync.State(fmt.Sprintf("network-configured-%d", round))
stateFinish = sync.State(fmt.Sprintf("finish-%d", round))
)

recorder.beginRun(np, size, concurrency)

// wait for all instances to be ready for the next state.
initCtx.SyncClient.MustSignalAndWait(ctx, stateNext, runenv.TestInstanceCount)

Expand Down Expand Up @@ -206,7 +251,9 @@ func runRequestor(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.Init
runenv.RecordMessage("\t>>> requesting CID %s", c)

start := time.Now()
_, errCh := gsync.Request(grpctx, p.ID, clink, sel)
respCh, errCh := gsync.Request(grpctx, p.ID, clink, sel)
for range respCh {
}
for err := range errCh {
return err
}
Expand All @@ -233,23 +280,32 @@ func runRequestor(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.Init
if err := errgrp.Wait(); err != nil {
return err
}

// wait for all instances to finish running
initCtx.SyncClient.MustSignalAndWait(ctx, stateFinish, runenv.TestInstanceCount)

if memorySnapshots == snapshotSimple || memorySnapshots == snapshotDetailed {
recordSnapshots(runenv, size, np, concurrency, "total")
}
}

return nil
}

func runProvider(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitContext, dagsrv format.DAGService, size uint64, networkParams []networkParams, concurrency int) error {
func runProvider(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitContext, dagsrv format.DAGService, size uint64, networkParams []networkParams, concurrency int, memorySnapshots snapshotMode, recorder *runRecorder) error {
var (
cids []cid.Cid
bufferedDS = format.NewBufferedDAG(ctx, dagsrv)
)

for round, np := range networkParams {
var (
topicCid = sync.NewTopic(fmt.Sprintf("cid-%d", round), []cid.Cid{})
stateNext = sync.State(fmt.Sprintf("next-%d", round))
stateNet = sync.State(fmt.Sprintf("network-configured-%d", round))
topicCid = sync.NewTopic(fmt.Sprintf("cid-%d", round), []cid.Cid{})
stateNext = sync.State(fmt.Sprintf("next-%d", round))
stateFinish = sync.State(fmt.Sprintf("finish-%d", round))
stateNet = sync.State(fmt.Sprintf("network-configured-%d", round))
)
recorder.beginRun(np, size, concurrency)

// wait for all instances to be ready for the next state.
initCtx.SyncClient.MustSignalAndWait(ctx, stateNext, runenv.TestInstanceCount)
Expand Down Expand Up @@ -296,7 +352,9 @@ func runProvider(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitC
}

// run GC to get accurate-ish stats.
goruntime.GC()
if memorySnapshots == snapshotSimple || memorySnapshots == snapshotDetailed {
recordSnapshots(runenv, size, np, concurrency, "pre")
}
goruntime.GC()

runenv.RecordMessage("\tCIDs are: %v", cids)
Expand All @@ -314,6 +372,14 @@ func runProvider(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitC
CallbackTarget: 1,
})
runenv.RecordMessage("\tnetwork configured for round %d", round)

// wait for all instances to finish running
initCtx.SyncClient.MustSignalAndWait(ctx, stateFinish, runenv.TestInstanceCount)

if memorySnapshots == snapshotSimple || memorySnapshots == snapshotDetailed {
recordSnapshots(runenv, size, np, concurrency, "total")
}

}

return nil
Expand Down Expand Up @@ -388,29 +454,88 @@ func makeHost(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitCont
return host, peers, bwcounter
}

func createDatastore(diskStore bool) ds.Datastore {
func createDatastore(diskStore bool) (ds.Datastore, error) {
if !diskStore {
return ds.NewMapDatastore()
return ds.NewMapDatastore(), nil
}

// create temporary directory for badger datastore
path := filepath.Join(os.TempDir(), "datastore")
if _, err := os.Stat(path); os.IsNotExist(err) {
if err := os.MkdirAll(path, 0755); err != nil {
panic(err)
return nil, err
}
} else if err != nil {
panic(err)
return nil, err
}

// create disk based badger datastore
defopts := badgerds.DefaultOptions
defopts.SyncWrites = false
defopts.Truncate = true

defopts.Options = dgbadger.DefaultOptions("").WithTruncate(true).
WithValueThreshold(1 << 10)
datastore, err := badgerds.NewDatastore(path, &defopts)
if err != nil {
panic(err)
return nil, err
}

return datastore, nil
}

func recordSnapshots(runenv *runtime.RunEnv, size uint64, np networkParams, concurrency int, postfix string) error {
runenv.RecordMessage("Recording heap profile...")
err := writeHeap(runenv, size, np, concurrency, fmt.Sprintf("%s-pre-gc", postfix))
if err != nil {
return err
}
goruntime.GC()
goruntime.GC()
err = writeHeap(runenv, size, np, concurrency, fmt.Sprintf("%s-post-gc", postfix))
if err != nil {
return err
}
return nil
}

func writeHeap(runenv *runtime.RunEnv, size uint64, np networkParams, concurrency int, postfix string) error {
snapshotName := fmt.Sprintf("heap_lat-%s_bw-%s_concurrency-%d_size-%s_%s", np.latency, humanize.IBytes(np.bandwidth), concurrency, humanize.Bytes(size), postfix)
snapshotName = strings.Replace(snapshotName, " ", "", -1)
snapshotFile, err := runenv.CreateRawAsset(snapshotName)
if err != nil {
return err
}
err = pprof.WriteHeapProfile(snapshotFile)
if err != nil {
return err
}
err = snapshotFile.Close()
if err != nil {
return err
}
return nil
}

type runRecorder struct {
memorySnapshots snapshotMode
index int
np networkParams
size uint64
concurrency int
runenv *runtime.RunEnv
}

func (rr *runRecorder) recordBlock() {
if rr.memorySnapshots == snapshotDetailed {
if rr.index%detailedSnapshotFrequency == 0 {
recordSnapshots(rr.runenv, rr.size, rr.np, rr.concurrency, fmt.Sprintf("incremental-%d", rr.index))
}
}
rr.index++
}

return datastore
func (rr *runRecorder) beginRun(np networkParams, size uint64, concurrency int) {
rr.concurrency = concurrency
rr.np = np
rr.size = size
rr.index = 0
}
1 change: 1 addition & 0 deletions testplans/graphsync/manifest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ chunk_size = { type = "int", desc = "unixfs chunk size (power of 2)", default =
links_per_level = { type = "int", desc = "unixfs links per level", default = "1024" }
raw_leaves = { type = "bool", desc = "should unixfs leaves be left unwrapped", default = "true"}
disk_store = { type = "bool", desc = "should data be stored on disk (true) or memory (false)", default = "false"}
memory_snapshots = { type = "string", desc = "what kind of memory snapshots to take (none, simple, detailed)", default = "none" }
Loading