Skip to content

Commit

Permalink
Merge pull request #310 from CortexFoundation/fc
Browse files Browse the repository at this point in the history
FileCache
  • Loading branch information
ucwong authored Feb 13, 2023
2 parents deae3b7 + dd4cd5a commit 5090a73
Show file tree
Hide file tree
Showing 7 changed files with 194 additions and 47 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@ clean:
format:
find . -name '*.go' -type f -not -path "./vendor*" -not -path "*.git*" -not -path "*/generated/*" | xargs gofmt -w -s
test:
go test ./... -v -race -cpu=1,2,4 -coverprofile=coverage.txt -covermode=atomic -benchmem -bench .
go test ./... -v -race -cpu=1,2 -coverprofile=coverage.txt -covermode=atomic -benchmem -bench .
60 changes: 42 additions & 18 deletions backend/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ import (
"github.com/anacrolix/dht/v2"
"github.com/anacrolix/dht/v2/int160"
peer_store "github.com/anacrolix/dht/v2/peer-store"

"github.com/ucwong/filecache"
)

const (
Expand Down Expand Up @@ -155,9 +157,11 @@ type TorrentManager struct {
startOnce sync.Once
//seedingNotify chan string

badger kv.Bucket
kvdb kv.Bucket

//colaList mapset.Set[string]

fc *filecache.FileCache
}

// can only call by fs.go: 'SeedingLocal()'
Expand Down Expand Up @@ -350,12 +354,16 @@ func (tm *TorrentManager) Close() error {
// tm.fileCache.Reset()
//}

if tm.badger != nil {
tm.badger.Close()
if tm.kvdb != nil {
tm.kvdb.Close()
}
//if tm.hotCache != nil {
// tm.hotCache.Purge()
//}

if tm.fc != nil {
tm.fc.Stop()
}
log.Info("Fs Download Manager Closed")
return nil
}
Expand Down Expand Up @@ -484,8 +492,8 @@ func (tm *TorrentManager) addInfoHash(ih string, bytesRequested int64) *Torrent
v []byte
)

if tm.badger != nil {
if v = tm.badger.Get([]byte(SEED_PRE + ih)); v == nil {
if tm.kvdb != nil {
if v = tm.kvdb.Get([]byte(SEED_PRE + ih)); v == nil {
seedTorrentPath := filepath.Join(tm.DataDir, ih, TORRENT)
if _, err := os.Stat(seedTorrentPath); err == nil {
spec = tm.loadSpec(ih, seedTorrentPath)
Expand Down Expand Up @@ -528,8 +536,8 @@ func (tm *TorrentManager) addInfoHash(ih string, bytesRequested int64) *Torrent
t.AddTrackers(tm.globalTrackers)
}

if t.Info() == nil && tm.badger != nil {
if v := tm.badger.Get([]byte(SEED_PRE + ih)); v != nil {
if t.Info() == nil && tm.kvdb != nil {
if v := tm.kvdb.Get([]byte(SEED_PRE + ih)); v != nil {
t.SetInfoBytes(v)
}
}
Expand Down Expand Up @@ -595,7 +603,7 @@ func NewTorrentManager(config *params.Config, fsid uint64, cache, compress bool)
cfg.DisableIPv6 = config.DisableIPv6

cfg.IPBlocklist = iplist.New([]iplist.Range{
iplist.Range{First: net.ParseIP("10.0.0.1"), Last: net.ParseIP("10.0.0.255")}})
{First: net.ParseIP("10.0.0.1"), Last: net.ParseIP("10.0.0.255")}})

if blocklist, err := iplist.MMapPackedFile("packed-blocklist"); err == nil {
log.Info("Block list loaded")
Expand Down Expand Up @@ -698,10 +706,12 @@ func NewTorrentManager(config *params.Config, fsid uint64, cache, compress bool)
slot: int(fsid % bucket),
localSeedFiles: make(map[string]bool),
//seedingNotify: notify,
badger: kv.Badger(config.DataDir),
kvdb: kv.Badger(config.DataDir),
}

if cache {
torrentManager.fc = filecache.NewDefaultCache()
torrentManager.fc.MaxSize = 256 * filecache.Megabyte
/*conf := bigcache.Config{
Shards: 1024,
LifeWindow: 600 * time.Second,
Expand Down Expand Up @@ -760,6 +770,11 @@ func (tm *TorrentManager) Start() (err error) {
go tm.mainLoop()

//err = tm.init()
if tm.fc != nil {
if err := tm.fc.Start(); err != nil {
log.Error("File cache start", "err", err)
}
}
})

return
Expand Down Expand Up @@ -889,11 +904,11 @@ func (tm *TorrentManager) pendingLoop() {
case <-t.GotInfo():
if b, err := bencode.Marshal(t.Torrent.Info()); err == nil {
log.Debug("Record full torrent in history", "ih", t.infohash, "info", len(b))
if tm.badger != nil && tm.badger.Get([]byte(SEED_PRE+t.infohash)) == nil {
if tm.kvdb != nil && tm.kvdb.Get([]byte(SEED_PRE+t.infohash)) == nil {
elapsed := time.Duration(mclock.Now()) - time.Duration(t.start)
log.Info("Imported new seed", "ih", t.infohash, "request", common.StorageSize(t.Length()), "good", params.IsGood(t.infohash), "elapsed", common.PrettyDuration(elapsed))
log.Info("Imported new seed", "ih", t.infohash, "request", common.StorageSize(t.Length()), "ts", common.StorageSize(len(b)), "good", params.IsGood(t.infohash), "elapsed", common.PrettyDuration(elapsed))

tm.badger.Set([]byte(SEED_PRE+t.infohash), b)
tm.kvdb.Set([]byte(SEED_PRE+t.infohash), b)
}
} else {
log.Error("Meta info marshal failed", "ih", t.infohash, "err", err)
Expand Down Expand Up @@ -1202,14 +1217,14 @@ func (tm *TorrentManager) Available(ih string, rawSize uint64) (bool, uint64, mc
}
}

func (tm *TorrentManager) GetFile(infohash, subpath string) ([]byte, uint64, error) {
func (tm *TorrentManager) GetFile(ctx context.Context, infohash, subpath string) (data []byte, err error) {
getfileMeter.Mark(1)
if tm.metrics {
defer func(start time.Time) { tm.Updates += time.Since(start) }(time.Now())
}

if !common.IsHexAddress(infohash) {
return nil, 0, errors.New("invalid infohash format")
return nil, errors.New("invalid infohash format")
}

infohash = strings.TrimPrefix(strings.ToLower(infohash), common.Prefix)
Expand All @@ -1223,7 +1238,7 @@ func (tm *TorrentManager) GetFile(infohash, subpath string) ([]byte, uint64, err
if t := tm.getTorrent(infohash); t != nil {
if !t.Ready() {
//log.Error("Unavailable file, waiting", "ih", infohash, "subpath", subpath, "status", t.status, "p", t.BytesCompleted())
return nil, 0, ErrUnfinished
return nil, ErrUnfinished
}

// Data protection when torrent is active
Expand All @@ -1233,10 +1248,19 @@ func (tm *TorrentManager) GetFile(infohash, subpath string) ([]byte, uint64, err
}

diskReadMeter.Mark(1)
dir := filepath.Join(tm.DataDir, key)
if tm.fc != nil && tm.fc.Active() {
start := mclock.Now()
//if data, err = tm.fc.ReadFileContext(ctx, dir); err == nil {
if data, err = tm.fc.ReadFile(dir); err == nil {
elapsed := time.Duration(mclock.Now() - start)
log.Debug("Load data from file cache", "ih", infohash, "dir", dir, "elapsed", common.PrettyDuration(elapsed))
}
} else {
data, err = os.ReadFile(dir)
}

data, err := os.ReadFile(filepath.Join(tm.DataDir, key))

return data, 0, err
return
}

func (tm *TorrentManager) unzip(data []byte) ([]byte, error) {
Expand Down
4 changes: 2 additions & 2 deletions backend/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ func TestGetFile(t *testing.T) {
params.DefaultConfig.Mode = "LAZY"
ih := "aea5584d0cd3865e90c80eace3bfcb062473d966"
fmt.Println(params.DefaultConfig)
tm, _ := NewTorrentManager(&params.DefaultConfig, 1, false, false)
tm, _ := NewTorrentManager(&params.DefaultConfig, 1, true, false)
//tm.Simulate()
tm.Start()
tm.Search(context.Background(), ih, 0)
defer tm.Close()
time.Sleep(30 * time.Second)
//a, _, _, _ := tm.Available(ih, 100000000)
//fmt.Println("available", a)
file, _, _ := tm.GetFile(ih, "data")
file, _ := tm.GetFile(context.Background(), ih, "data")
if file == nil {
log.Fatal("failed to get file")
}
Expand Down
2 changes: 1 addition & 1 deletion backend/torrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (t *Torrent) Seed() bool {
//if active, ok := params.GoodFiles[t.InfoHash()]; !ok {
// log.Info("New active nas found", "ih", t.InfoHash(), "ok", ok, "active", active, "size", common.StorageSize(t.BytesCompleted()), "files", len(t.Files()), "pieces", t.Torrent.NumPieces(), "seg", len(t.Torrent.PieceStateRuns()), "peers", t.currentConns, "status", t.status, "elapsed", common.PrettyDuration(elapsed))
//} else {
log.Info("Imported new nas segment", "ih", t.InfoHash(), "size", common.StorageSize(t.BytesCompleted()), "files", len(t.Files()), "pieces", t.Torrent.NumPieces(), "seg", len(t.Torrent.PieceStateRuns()), "peers", t.currentConns, "status", t.status, "elapsed", common.PrettyDuration(elapsed))
log.Info("Imported new nas segment", "ih", t.InfoHash(), "size", common.StorageSize(t.BytesCompleted()), "files", len(t.Files()), "pieces", t.Torrent.NumPieces(), "seg", len(t.Torrent.PieceStateRuns()), "peers", t.currentConns, "status", t.status, "elapsed", common.PrettyDuration(elapsed), "speed", common.StorageSize(float64(t.BytesCompleted()*1000*1000*1000)/float64(elapsed)).String()+"/s")
//}
return true
}
Expand Down
19 changes: 4 additions & 15 deletions fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,31 +445,20 @@ func (fs *TorrentFS) bitsflow(ctx context.Context, ih string, size uint64) error
func (tfs *TorrentFS) Stop() error {
if tfs == nil || tfs.monitor == nil {
log.Info("Cortex fs engine is already stopped")
return nil
return errors.New("fs has been stopped")
}

tfs.monitor.Stop()

tfs.once.Do(func() {
close(tfs.closeAll)
})

tfs.wg.Wait()

// Wait until every goroutine terminates.
//tfs.monitor.lock.Lock()
tfs.monitor.Stop()
//tfs.monitor.lock.Unlock()

//if tfs.nasCache != nil {
// tfs.nasCache.Purge()
//}

if tfs.tunnel != nil {
tfs.tunnel.Drain()
}

//if tfs.queryCache != nil {
// tfs.queryCache.Purge()
//}
log.Info("Cortex fs engine stopped")
return nil
}
Expand Down Expand Up @@ -511,7 +500,7 @@ func (fs *TorrentFS) wakeup(ctx context.Context, ih string, rawSize uint64) (boo

func (fs *TorrentFS) GetFileWithSize(ctx context.Context, infohash string, rawSize uint64, subpath string) ([]byte, error) {
log.Debug("Get file with size", "ih", infohash, "size", rawSize, "path", subpath)
if ret, _, err := fs.storage().GetFile(infohash, subpath); err != nil {
if ret, err := fs.storage().GetFile(ctx, infohash, subpath); err != nil {
fs.wakeup(ctx, infohash, rawSize)
return nil, err
} else {
Expand Down
20 changes: 16 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,28 @@ require (
github.com/hashicorp/golang-lru v0.5.5-0.20221011183528-d4900dc688bf
github.com/otiai10/copy v1.9.0
github.com/prometheus/client_golang v1.14.0
github.com/ucwong/filecache v1.0.2-0.20230211204238-5bd1814b10ea
github.com/ucwong/go-ttlmap v1.0.2-0.20221020173635-331e7ddde2bb
github.com/ucwong/golang-kv v1.0.10-0.20230129204034-9be07a0ed06b
github.com/ucwong/golang-kv v1.0.12-0.20230212185402-0942222b90be
github.com/urfave/cli/v2 v2.23.6
go.etcd.io/bbolt v1.3.7-0.20230129125154-fc361799ce2e
go.etcd.io/bbolt v1.3.7-0.20230211214749-46437cea06b7
golang.org/x/time v0.3.0
golang.org/x/tools v0.4.0
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2
)

require github.com/elliotchance/orderedmap v1.4.0 // indirect
require (
github.com/DataDog/zstd v1.4.5 // indirect
github.com/cockroachdb/errors v1.8.1 // indirect
github.com/cockroachdb/logtags v0.0.0-20190617123548-eb05cc24525f // indirect
github.com/cockroachdb/pebble v0.0.0-20230209222158-0568b5fd3d14 // indirect
github.com/cockroachdb/redact v1.0.8 // indirect
github.com/cockroachdb/sentry-go v0.6.1-cockroachdb.2 // indirect
github.com/elliotchance/orderedmap v1.4.0 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/rogpeppe/go-internal v1.9.0 // indirect
)

require (
crawshaw.io/sqlite v0.3.3-0.20220618202545-d1964889ea3c // indirect
Expand Down Expand Up @@ -77,7 +89,7 @@ require (
github.com/huandu/xstrings v1.4.0 // indirect
github.com/huin/goupnp v1.0.4-0.20220613170603-23b555710578 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/klauspost/compress v1.15.13 // indirect
github.com/klauspost/compress v1.15.15 // indirect
github.com/lispad/go-generics-tools v1.1.0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/mschoch/smat v0.2.0 // indirect
Expand Down
Loading

0 comments on commit 5090a73

Please sign in to comment.