diff --git a/backend/handler.go b/backend/handler.go index ea984371..5ec1fffa 100644 --- a/backend/handler.go +++ b/backend/handler.go @@ -300,14 +300,35 @@ func (tm *TorrentManager) setTorrent(ih string, t *Torrent) { tm.lock.Lock() defer tm.lock.Unlock() + t.lock.Lock() + defer t.lock.Unlock() + tm.torrents[ih] = t } -func (tm *TorrentManager) removeTorrent(ih string) { +func (tm *TorrentManager) removeTorrent(t *Torrent) { tm.lock.Lock() defer tm.lock.Unlock() - delete(tm.torrents, ih) + t.lock.Lock() + defer t.lock.Unlock() + + defer t.Torrent.Drop() + + //t.status = torrentSleeping + //tm.torrents[ih] = t + + if t.status == torrentPending { + delete(tm.pendingTorrents, t.infohash) + } else if t.status == torrentRunning || t.status == torrentPaused { + delete(tm.activeTorrents, t.infohash) + } else if t.status == torrentSeeding { + delete(tm.seedingTorrents, t.infohash) + } else { + log.Warn("Unknown status", "ih", t.infohash, "status", t.status) + } + + delete(tm.torrents, t.infohash) } func (tm *TorrentManager) Close() error { @@ -536,8 +557,6 @@ func (tm *TorrentManager) GlobalTrackers() [][]string { } func (tm *TorrentManager) updateInfoHash(t *Torrent, bytesRequested int64) { - //t.lock.Lock() - //defer t.lock.Unlock() if t.status != torrentSeeding && t.bytesRequested < bytesRequested { if bytesRequested > t.Length() { bytesRequested = t.Length() @@ -906,8 +925,8 @@ func (tm *TorrentManager) pendingLoop() { } func (tm *TorrentManager) finish(ih string, t *Torrent) { - //t.lock.Lock() - //defer t.lock.Unlock() + t.lock.Lock() + defer t.lock.Unlock() if _, err := os.Stat(filepath.Join(tm.DataDir, ih)); err == nil { tm.seedingChan <- t delete(tm.activeTorrents, ih) @@ -932,10 +951,12 @@ func (tm *TorrentManager) activeLoop() { case t := <-tm.activeChan: t.status = torrentRunning tm.activeTorrents[t.infohash] = t - n := tm.blockCaculate(t.Torrent.BytesCompleted() + t.Torrent.BytesMissing()) + n := tm.blockCaculate(t.Torrent.Length()) if n < 10 { - n = 10 + n += 10 } + + // TODO n random salt tm.wg.Add(1) go func(i string, n int64) { defer tm.wg.Done() @@ -1031,8 +1052,7 @@ func (tm *TorrentManager) droppingLoop() { select { case ih := <-tm.droppingChan: if t := tm.getTorrent(ih); t != nil { //&& t.Ready() { - t.Torrent.Drop() - if t.status == torrentPending { + /*if t.status == torrentPending { delete(tm.pendingTorrents, ih) } if t.status == torrentRunning || t.status == torrentPaused { @@ -1040,9 +1060,9 @@ func (tm *TorrentManager) droppingLoop() { } if t.status == torrentSeeding { delete(tm.seedingTorrents, ih) - } + }*/ - tm.removeTorrent(ih) + tm.removeTorrent(t) log.Info("Seed has been dropped", "ih", ih, "cited", t.cited, "status", t.status) } else { @@ -1131,24 +1151,14 @@ func (tm *TorrentManager) Available(ih string, rawSize uint64) (bool, uint64, mc return false, 0, 0, ErrInactiveTorrent } else { if !t.Ready() { - //if torrent.ch != nil { - // <-torrent.ch - // if torrent.Ready() { - // return torrent.BytesCompleted() <= int64(rawSize), nil - // } - //} if t.Torrent.Info() == nil { - return false, uint64(t.BytesCompleted()), 0, ErrTorrentNotFound + return false, 0, 0, ErrTorrentNotFound } return false, uint64(t.BytesCompleted()), mclock.Now() - t.start, ErrUnfinished } // TODO ok := t.BytesCompleted() <= int64(rawSize) - //if t.currentConns <= 1 && ok { - // t.currentConns = tm.maxEstablishedConns - // t.Torrent.SetMaxEstablishedConns(tm.maxEstablishedConns) - //} return ok, uint64(t.BytesCompleted()), mclock.Now() - t.start, nil } @@ -1163,80 +1173,32 @@ func (tm *TorrentManager) GetFile(infohash, subpath string) ([]byte, uint64, err if !common.IsHexAddress(infohash) { return nil, 0, errors.New("invalid infohash format") } - infohash = strings.TrimPrefix(strings.ToLower(infohash), common.Prefix) - - //if t := tm.getTorrent(infohash); t == nil { - // return nil, 0, ErrInactiveTorrent - //} else { + infohash = strings.TrimPrefix(strings.ToLower(infohash), common.Prefix) subpath = strings.TrimPrefix(subpath, "/") subpath = strings.TrimSuffix(subpath, "/") - // if !t.Ready() { - // log.Error("Read unavailable file", "hash", infohash, "subpath", subpath) - // return nil, uint64(t.BytesCompleted()), ErrUnfinished - // } + var key = filepath.Join(infohash, subpath) - //tm.hotCache.Add(infohash, true) - //if t.currentConns < tm.maxEstablishedConns { - //t.setCurrentConns(tm.maxEstablishedConns) - //t.Torrent.SetMaxEstablishedConns(t.currentConns) - // log.Debug("Torrent active", "ih", infohash, "peers", t.currentConns) - //} + log.Debug("Get File", "dir", tm.DataDir, "key", key) - var key = filepath.Join(infohash, subpath) - /*if tm.fileCache != nil { - if cache, err := tm.fileCache.Get(key); err == nil { - memcacheHitMeter.Mark(1) - memcacheReadMeter.Mark(int64(len(cache))) - if c, err := tm.unzip(cache); err != nil { - return nil, 0, err - } else { - if tm.compress { - log.Info("File cache", "hash", infohash, "path", subpath, "size", tm.fileCache.Len(), "compress", len(cache), "origin", len(c), "compress", tm.compress) - } - //return c, uint64(t.BytesCompleted()), nil - return c, 0, nil - } + if t := tm.getTorrent(infohash); t != nil { + if !t.Ready() { + //log.Error("Unavailable file, waiting", "ih", infohash, "subpath", subpath, "status", t.status, "p", t.BytesCompleted()) + return nil, 0, ErrUnfinished } - }*/ - //if t := tm.getTorrent(infohash); t != nil { - // t.lock.RLock() - // defer t.lock.RUnlock() - //} - diskReadMeter.Mark(1) + // Data protection when torrent is active + t.lock.RLock() + defer t.lock.RUnlock() - log.Debug("Get File", "dir", tm.DataDir, "key", key) + } - data, err := os.ReadFile(filepath.Join(tm.DataDir, key)) + diskReadMeter.Mark(1) - //data final verification - /*for _, file := range t.Files() { - if file.Path() == subpath { - log.Debug("File location info", "ih", infohash, "path", file.Path(), "key", key) - if int64(len(data)) != file.Length() { - log.Error("Read file not completed", "hash", infohash, "len", len(data), "total", file.Path()) - return nil, 0, errors.New("not a complete file") - } else { - log.Debug("Read data success", "hash", infohash, "size", len(data), "path", file.Path()) - if c, err := tm.zip(data); err != nil { - log.Warn("Compress data failed", "hash", infohash, "err", err) - } else { - if tm.fileCache != nil { - tm.fileCache.Set(key, c) - memcacheMissMeter.Mark(1) - memcacheWriteMeter.Mark(int64(len(c))) - } - } - } - break - } - }*/ + data, err := os.ReadFile(filepath.Join(tm.DataDir, key)) - //return data, uint64(t.BytesCompleted()), err return data, 0, err - //} } func (tm *TorrentManager) unzip(data []byte) ([]byte, error) { diff --git a/backend/torrent.go b/backend/torrent.go index ae5c7321..29dba481 100644 --- a/backend/torrent.go +++ b/backend/torrent.go @@ -37,6 +37,7 @@ const ( torrentPaused torrentRunning torrentSeeding + torrentSleeping ) type Torrent struct { @@ -82,10 +83,6 @@ func (t *Torrent) Ready() bool { return false } - //t.lock.Lock() - //t.cited += 1 - //t.lock.Unlock() - ret := t.IsSeeding() if !ret { log.Debug("Not ready", "ih", t.InfoHash(), "status", t.status, "seed", t.Torrent.Seeding(), "seeding", torrentSeeding) diff --git a/fs.go b/fs.go index 6cfbad42..eac8e1bf 100644 --- a/fs.go +++ b/fs.go @@ -462,6 +462,7 @@ func (tfs *TorrentFS) Start(server *p2p.Server) (err error) { return } +// download and pub func (fs *TorrentFS) bitsflow(ih string, size uint64) { fs.callback <- types.NewBitsFlow(ih, size) } @@ -512,6 +513,7 @@ func (fs *TorrentFS) sub(ih string, rawSize uint64) bool { if rawSize > 0 { log.Debug("Query added", "ih", ih, "size", rawSize) //fs.nasCache.Add(ih, rawSize) + // TODO waiting if no neighbors found fs.tunnel.Set(ih, ttlmap.NewItem(rawSize, ttlmap.WithTTL(60*time.Second)), nil) } else { return false @@ -531,7 +533,7 @@ func (fs *TorrentFS) notify(infohash string) bool { } // Available is used to check the file status -func (fs *TorrentFS) localCheck(ctx context.Context, infohash string, rawSize uint64) (bool, error) { +func (fs *TorrentFS) available(ctx context.Context, infohash string, rawSize uint64) (bool, error) { ret, _, _, err := fs.storage().Available(infohash, rawSize) if err != nil { if progress, e := fs.progress(infohash); e == nil { @@ -546,7 +548,7 @@ func (fs *TorrentFS) GetFileWithSize(ctx context.Context, infohash string, rawSi //fs.wg.Add(1) //go func() { /// defer fs.wg.Done() - //if ok, err := fs.localCheck(ctx, infohash, rawSize); err != nil || !ok { + //if ok, err := fs.available(ctx, infohash, rawSize); err != nil || !ok { // return nil, err //} //}() @@ -564,7 +566,7 @@ func (fs *TorrentFS) GetFileWithSize(ctx context.Context, infohash string, rawSi if ret, _, err := fs.storage().GetFile(infohash, subpath); err != nil { //log.Warn("Get file failed", "ih", infohash, "size", rawSize, "path", subpath, "err", err) - if ok, err := fs.localCheck(ctx, infohash, rawSize); err != nil || !ok { + if ok, err := fs.available(ctx, infohash, rawSize); err != nil || !ok { log.Debug("Get file failed", "ih", infohash, "size", rawSize, "path", subpath, "err", err) return nil, err } diff --git a/go.mod b/go.mod index 7d5fab9d..302774cc 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/anacrolix/log v0.13.2-0.20221123232138-02e2764801c3 github.com/anacrolix/missinggo/v2 v2.7.1 github.com/anacrolix/tagflag v1.3.1-0.20210717093243-41ae76143afa - github.com/anacrolix/torrent v1.48.1-0.20230103142631-c20f73d53e9f + github.com/anacrolix/torrent v1.48.1-0.20230123215249-d47739db0c59 github.com/bradfitz/iter v0.0.0-20191230175014-e8f45d346db8 github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc github.com/deckarep/golang-set/v2 v2.1.0 @@ -34,8 +34,9 @@ require ( require ( crawshaw.io/sqlite v0.3.3-0.20220618202545-d1964889ea3c // indirect - github.com/RoaringBitmap/roaring v1.2.1 // indirect + github.com/RoaringBitmap/roaring v1.2.3 // indirect github.com/ajwerner/btree v0.0.0-20211221152037-f427b3e689c0 // indirect + github.com/alecthomas/atomic v0.1.0-alpha2 // indirect github.com/anacrolix/chansync v0.3.0 // indirect github.com/anacrolix/dht/v2 v2.19.2 github.com/anacrolix/generics v0.0.0-20221221005542-ac1d5b02b8a3 // indirect @@ -53,17 +54,28 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/bits-and-blooms/bitset v1.4.0 // indirect github.com/btcsuite/btcd/btcec/v2 v2.3.2 // indirect + github.com/cespare/xxhash v1.1.0 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect + github.com/deckarep/golang-set v1.8.0 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect + github.com/dgraph-io/badger/v3 v3.2103.5 // indirect + github.com/dgraph-io/ristretto v0.1.1 // indirect + github.com/go-logr/logr v1.2.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/go-ole/go-ole v1.2.6 // indirect github.com/go-stack/stack v1.8.1 // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang/glog v1.0.0 // indirect + github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/google/btree v1.1.2 // indirect + github.com/google/flatbuffers v22.11.23+incompatible // indirect github.com/gorilla/websocket v1.5.0 // indirect github.com/huandu/xstrings v1.4.0 // indirect github.com/huin/goupnp v1.0.4-0.20220613170603-23b555710578 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect + github.com/klauspost/compress v1.15.13 // indirect github.com/lispad/go-generics-tools v1.1.0 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/mschoch/smat v0.2.0 // indirect @@ -95,7 +107,11 @@ require ( github.com/tidwall/btree v1.6.0 // indirect github.com/tklauser/go-sysconf v0.3.11 // indirect github.com/tklauser/numcpus v0.6.0 // indirect + github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect github.com/yusufpapurcu/wmi v1.2.2 // indirect + go.opencensus.io v0.24.0 // indirect + go.opentelemetry.io/otel v1.11.2 // indirect + go.opentelemetry.io/otel/trace v1.11.2 // indirect golang.org/x/crypto v0.4.0 // indirect golang.org/x/exp v0.0.0-20221217163422-3c43f8badb15 // indirect golang.org/x/mod v0.7.0 // indirect @@ -106,23 +122,4 @@ require ( gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect ) -require ( - github.com/alecthomas/atomic v0.1.0-alpha2 // indirect - github.com/cespare/xxhash v1.1.0 // indirect - github.com/deckarep/golang-set v1.8.0 // indirect - github.com/dgraph-io/badger/v3 v3.2103.5 // indirect - github.com/dgraph-io/ristretto v0.1.1 // indirect - github.com/go-logr/logr v1.2.3 // indirect - github.com/go-logr/stdr v1.2.2 // indirect - github.com/gogo/protobuf v1.3.2 // indirect - github.com/golang/glog v1.0.0 // indirect - github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect - github.com/google/flatbuffers v22.11.23+incompatible // indirect - github.com/klauspost/compress v1.15.13 // indirect - github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect - go.opencensus.io v0.24.0 // indirect - go.opentelemetry.io/otel v1.11.2 // indirect - go.opentelemetry.io/otel/trace v1.11.2 // indirect -) - replace github.com/benbjohnson/immutable v0.4.0 => github.com/benbjohnson/immutable v0.3.0 diff --git a/go.sum b/go.sum index f6a79bd9..2c316712 100644 --- a/go.sum +++ b/go.sum @@ -17,8 +17,8 @@ github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAE github.com/RoaringBitmap/roaring v0.4.7/go.mod h1:8khRDP4HmeXns4xIj9oGrKSz7XTQiJx2zgh7AcNke4w= github.com/RoaringBitmap/roaring v0.4.17/go.mod h1:D3qVegWTmfCaX4Bl5CrBE9hfrSrrXIr8KVNvRsDi1NI= github.com/RoaringBitmap/roaring v0.4.23/go.mod h1:D0gp8kJQgE1A4LQ5wFLggQEyvDi06Mq5mKs52e1TwOo= -github.com/RoaringBitmap/roaring v1.2.1 h1:58/LJlg/81wfEHd5L9qsHduznOIhyv4qb1yWcSvVq9A= -github.com/RoaringBitmap/roaring v1.2.1/go.mod h1:icnadbWcNyfEHlYdr+tDlOTih1Bf/h+rzPpv4sbomAA= +github.com/RoaringBitmap/roaring v1.2.3 h1:yqreLINqIrX22ErkKI0vY47/ivtJr6n+kMhVOVmhWBY= +github.com/RoaringBitmap/roaring v1.2.3/go.mod h1:plvDsJQpxOC5bw8LRteu/MLWHsHez/3y6cubLI4/1yE= github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/ajwerner/btree v0.0.0-20211221152037-f427b3e689c0 h1:byYvvbfSo3+9efR4IeReh77gVs4PnNDR3AMOE9NJ7a0= @@ -84,8 +84,8 @@ github.com/anacrolix/tagflag v1.0.0/go.mod h1:1m2U/K6ZT+JZG0+bdMK6qauP49QT4wE5pm github.com/anacrolix/tagflag v1.1.0/go.mod h1:Scxs9CV10NQatSmbyjqmqmeQNwGzlNe0CMUMIxqHIG8= github.com/anacrolix/tagflag v1.3.1-0.20210717093243-41ae76143afa h1:ObMmvS9ysQI9zKRPbLbPzR6I77l1Eiie2u9I3whKlJI= github.com/anacrolix/tagflag v1.3.1-0.20210717093243-41ae76143afa/go.mod h1:PSF7SW1eKAti6ZaqkGMzKZ6K4y4G2j8rEAmwpu6fo9Q= -github.com/anacrolix/torrent v1.48.1-0.20230103142631-c20f73d53e9f h1:5fPzkRgj1BFYzQinFQqilCPM9A/EuPUXzSC3utjMVGc= -github.com/anacrolix/torrent v1.48.1-0.20230103142631-c20f73d53e9f/go.mod h1:PwdFzmApEr96LcqogJhuw41XOdd1oHGkp+qE9hhXyDc= +github.com/anacrolix/torrent v1.48.1-0.20230123215249-d47739db0c59 h1:J3AQJ8m9igbvzv2atm6SDMO+YFNGpQVR0xnE/aapThg= +github.com/anacrolix/torrent v1.48.1-0.20230123215249-d47739db0c59/go.mod h1:CSREuKRFlbHifecdogdFDt+geYf5MnDQyRPg+1O8W8E= github.com/anacrolix/upnp v0.1.3-0.20220123035249-922794e51c96 h1:QAVZ3pN/J4/UziniAhJR2OZ9Ox5kOY2053tBbbqUPYA= github.com/anacrolix/upnp v0.1.3-0.20220123035249-922794e51c96/go.mod h1:Wa6n8cYIdaG35x15aH3Zy6d03f7P728QfdcDeD/IEOs= github.com/anacrolix/utp v0.1.0 h1:FOpQOmIwYsnENnz7tAGohA+r6iXpRjrq8ssKSre2Cp4=