Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data protection #298

Merged
merged 7 commits into from
Jan 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 46 additions & 84 deletions backend/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,14 +300,35 @@ func (tm *TorrentManager) setTorrent(ih string, t *Torrent) {
tm.lock.Lock()
defer tm.lock.Unlock()

t.lock.Lock()
defer t.lock.Unlock()

tm.torrents[ih] = t
}

func (tm *TorrentManager) removeTorrent(ih string) {
func (tm *TorrentManager) removeTorrent(t *Torrent) {
tm.lock.Lock()
defer tm.lock.Unlock()

delete(tm.torrents, ih)
t.lock.Lock()
defer t.lock.Unlock()

defer t.Torrent.Drop()

//t.status = torrentSleeping
//tm.torrents[ih] = t

if t.status == torrentPending {
delete(tm.pendingTorrents, t.infohash)
} else if t.status == torrentRunning || t.status == torrentPaused {
delete(tm.activeTorrents, t.infohash)
} else if t.status == torrentSeeding {
delete(tm.seedingTorrents, t.infohash)
} else {
log.Warn("Unknown status", "ih", t.infohash, "status", t.status)
}

delete(tm.torrents, t.infohash)
}

func (tm *TorrentManager) Close() error {
Expand Down Expand Up @@ -536,8 +557,6 @@ func (tm *TorrentManager) GlobalTrackers() [][]string {
}

func (tm *TorrentManager) updateInfoHash(t *Torrent, bytesRequested int64) {
//t.lock.Lock()
//defer t.lock.Unlock()
if t.status != torrentSeeding && t.bytesRequested < bytesRequested {
if bytesRequested > t.Length() {
bytesRequested = t.Length()
Expand Down Expand Up @@ -906,8 +925,8 @@ func (tm *TorrentManager) pendingLoop() {
}

func (tm *TorrentManager) finish(ih string, t *Torrent) {
//t.lock.Lock()
//defer t.lock.Unlock()
t.lock.Lock()
defer t.lock.Unlock()
if _, err := os.Stat(filepath.Join(tm.DataDir, ih)); err == nil {
tm.seedingChan <- t
delete(tm.activeTorrents, ih)
Expand All @@ -932,10 +951,12 @@ func (tm *TorrentManager) activeLoop() {
case t := <-tm.activeChan:
t.status = torrentRunning
tm.activeTorrents[t.infohash] = t
n := tm.blockCaculate(t.Torrent.BytesCompleted() + t.Torrent.BytesMissing())
n := tm.blockCaculate(t.Torrent.Length())
if n < 10 {
n = 10
n += 10
}

// TODO n random salt
tm.wg.Add(1)
go func(i string, n int64) {
defer tm.wg.Done()
Expand Down Expand Up @@ -1031,18 +1052,17 @@ func (tm *TorrentManager) droppingLoop() {
select {
case ih := <-tm.droppingChan:
if t := tm.getTorrent(ih); t != nil { //&& t.Ready() {
t.Torrent.Drop()
if t.status == torrentPending {
/*if t.status == torrentPending {
delete(tm.pendingTorrents, ih)
}
if t.status == torrentRunning || t.status == torrentPaused {
delete(tm.activeTorrents, ih)
}
if t.status == torrentSeeding {
delete(tm.seedingTorrents, ih)
}
}*/

tm.removeTorrent(ih)
tm.removeTorrent(t)

log.Info("Seed has been dropped", "ih", ih, "cited", t.cited, "status", t.status)
} else {
Expand Down Expand Up @@ -1131,24 +1151,14 @@ func (tm *TorrentManager) Available(ih string, rawSize uint64) (bool, uint64, mc
return false, 0, 0, ErrInactiveTorrent
} else {
if !t.Ready() {
//if torrent.ch != nil {
// <-torrent.ch
// if torrent.Ready() {
// return torrent.BytesCompleted() <= int64(rawSize), nil
// }
//}
if t.Torrent.Info() == nil {
return false, uint64(t.BytesCompleted()), 0, ErrTorrentNotFound
return false, 0, 0, ErrTorrentNotFound
}
return false, uint64(t.BytesCompleted()), mclock.Now() - t.start, ErrUnfinished
}

// TODO
ok := t.BytesCompleted() <= int64(rawSize)
//if t.currentConns <= 1 && ok {
// t.currentConns = tm.maxEstablishedConns
// t.Torrent.SetMaxEstablishedConns(tm.maxEstablishedConns)
//}

return ok, uint64(t.BytesCompleted()), mclock.Now() - t.start, nil
}
Expand All @@ -1163,80 +1173,32 @@ func (tm *TorrentManager) GetFile(infohash, subpath string) ([]byte, uint64, err
if !common.IsHexAddress(infohash) {
return nil, 0, errors.New("invalid infohash format")
}
infohash = strings.TrimPrefix(strings.ToLower(infohash), common.Prefix)

//if t := tm.getTorrent(infohash); t == nil {
// return nil, 0, ErrInactiveTorrent
//} else {

infohash = strings.TrimPrefix(strings.ToLower(infohash), common.Prefix)
subpath = strings.TrimPrefix(subpath, "/")
subpath = strings.TrimSuffix(subpath, "/")

// if !t.Ready() {
// log.Error("Read unavailable file", "hash", infohash, "subpath", subpath)
// return nil, uint64(t.BytesCompleted()), ErrUnfinished
// }
var key = filepath.Join(infohash, subpath)

//tm.hotCache.Add(infohash, true)
//if t.currentConns < tm.maxEstablishedConns {
//t.setCurrentConns(tm.maxEstablishedConns)
//t.Torrent.SetMaxEstablishedConns(t.currentConns)
// log.Debug("Torrent active", "ih", infohash, "peers", t.currentConns)
//}
log.Debug("Get File", "dir", tm.DataDir, "key", key)

var key = filepath.Join(infohash, subpath)
/*if tm.fileCache != nil {
if cache, err := tm.fileCache.Get(key); err == nil {
memcacheHitMeter.Mark(1)
memcacheReadMeter.Mark(int64(len(cache)))
if c, err := tm.unzip(cache); err != nil {
return nil, 0, err
} else {
if tm.compress {
log.Info("File cache", "hash", infohash, "path", subpath, "size", tm.fileCache.Len(), "compress", len(cache), "origin", len(c), "compress", tm.compress)
}
//return c, uint64(t.BytesCompleted()), nil
return c, 0, nil
}
if t := tm.getTorrent(infohash); t != nil {
if !t.Ready() {
//log.Error("Unavailable file, waiting", "ih", infohash, "subpath", subpath, "status", t.status, "p", t.BytesCompleted())
return nil, 0, ErrUnfinished
}
}*/

//if t := tm.getTorrent(infohash); t != nil {
// t.lock.RLock()
// defer t.lock.RUnlock()
//}
diskReadMeter.Mark(1)
// Data protection when torrent is active
t.lock.RLock()
defer t.lock.RUnlock()

log.Debug("Get File", "dir", tm.DataDir, "key", key)
}

data, err := os.ReadFile(filepath.Join(tm.DataDir, key))
diskReadMeter.Mark(1)

//data final verification
/*for _, file := range t.Files() {
if file.Path() == subpath {
log.Debug("File location info", "ih", infohash, "path", file.Path(), "key", key)
if int64(len(data)) != file.Length() {
log.Error("Read file not completed", "hash", infohash, "len", len(data), "total", file.Path())
return nil, 0, errors.New("not a complete file")
} else {
log.Debug("Read data success", "hash", infohash, "size", len(data), "path", file.Path())
if c, err := tm.zip(data); err != nil {
log.Warn("Compress data failed", "hash", infohash, "err", err)
} else {
if tm.fileCache != nil {
tm.fileCache.Set(key, c)
memcacheMissMeter.Mark(1)
memcacheWriteMeter.Mark(int64(len(c)))
}
}
}
break
}
}*/
data, err := os.ReadFile(filepath.Join(tm.DataDir, key))

//return data, uint64(t.BytesCompleted()), err
return data, 0, err
//}
}

func (tm *TorrentManager) unzip(data []byte) ([]byte, error) {
Expand Down
5 changes: 1 addition & 4 deletions backend/torrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const (
torrentPaused
torrentRunning
torrentSeeding
torrentSleeping
)

type Torrent struct {
Expand Down Expand Up @@ -82,10 +83,6 @@ func (t *Torrent) Ready() bool {
return false
}

//t.lock.Lock()
//t.cited += 1
//t.lock.Unlock()

ret := t.IsSeeding()
if !ret {
log.Debug("Not ready", "ih", t.InfoHash(), "status", t.status, "seed", t.Torrent.Seeding(), "seeding", torrentSeeding)
Expand Down
8 changes: 5 additions & 3 deletions fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ func (tfs *TorrentFS) Start(server *p2p.Server) (err error) {
return
}

// download and pub
func (fs *TorrentFS) bitsflow(ih string, size uint64) {
fs.callback <- types.NewBitsFlow(ih, size)
}
Expand Down Expand Up @@ -512,6 +513,7 @@ func (fs *TorrentFS) sub(ih string, rawSize uint64) bool {
if rawSize > 0 {
log.Debug("Query added", "ih", ih, "size", rawSize)
//fs.nasCache.Add(ih, rawSize)
// TODO waiting if no neighbors found
fs.tunnel.Set(ih, ttlmap.NewItem(rawSize, ttlmap.WithTTL(60*time.Second)), nil)
} else {
return false
Expand All @@ -531,7 +533,7 @@ func (fs *TorrentFS) notify(infohash string) bool {
}

// Available is used to check the file status
func (fs *TorrentFS) localCheck(ctx context.Context, infohash string, rawSize uint64) (bool, error) {
func (fs *TorrentFS) available(ctx context.Context, infohash string, rawSize uint64) (bool, error) {
ret, _, _, err := fs.storage().Available(infohash, rawSize)
if err != nil {
if progress, e := fs.progress(infohash); e == nil {
Expand All @@ -546,7 +548,7 @@ func (fs *TorrentFS) GetFileWithSize(ctx context.Context, infohash string, rawSi
//fs.wg.Add(1)
//go func() {
/// defer fs.wg.Done()
//if ok, err := fs.localCheck(ctx, infohash, rawSize); err != nil || !ok {
//if ok, err := fs.available(ctx, infohash, rawSize); err != nil || !ok {
// return nil, err
//}
//}()
Expand All @@ -564,7 +566,7 @@ func (fs *TorrentFS) GetFileWithSize(ctx context.Context, infohash string, rawSi

if ret, _, err := fs.storage().GetFile(infohash, subpath); err != nil {
//log.Warn("Get file failed", "ih", infohash, "size", rawSize, "path", subpath, "err", err)
if ok, err := fs.localCheck(ctx, infohash, rawSize); err != nil || !ok {
if ok, err := fs.available(ctx, infohash, rawSize); err != nil || !ok {
log.Debug("Get file failed", "ih", infohash, "size", rawSize, "path", subpath, "err", err)
return nil, err
}
Expand Down
39 changes: 18 additions & 21 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/anacrolix/log v0.13.2-0.20221123232138-02e2764801c3
github.com/anacrolix/missinggo/v2 v2.7.1
github.com/anacrolix/tagflag v1.3.1-0.20210717093243-41ae76143afa
github.com/anacrolix/torrent v1.48.1-0.20230103142631-c20f73d53e9f
github.com/anacrolix/torrent v1.48.1-0.20230123215249-d47739db0c59
github.com/bradfitz/iter v0.0.0-20191230175014-e8f45d346db8
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc
github.com/deckarep/golang-set/v2 v2.1.0
Expand All @@ -34,8 +34,9 @@ require (

require (
crawshaw.io/sqlite v0.3.3-0.20220618202545-d1964889ea3c // indirect
github.com/RoaringBitmap/roaring v1.2.1 // indirect
github.com/RoaringBitmap/roaring v1.2.3 // indirect
github.com/ajwerner/btree v0.0.0-20211221152037-f427b3e689c0 // indirect
github.com/alecthomas/atomic v0.1.0-alpha2 // indirect
github.com/anacrolix/chansync v0.3.0 // indirect
github.com/anacrolix/dht/v2 v2.19.2
github.com/anacrolix/generics v0.0.0-20221221005542-ac1d5b02b8a3 // indirect
Expand All @@ -53,17 +54,28 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.4.0 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.3.2 // indirect
github.com/cespare/xxhash v1.1.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/deckarep/golang-set v1.8.0 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect
github.com/dgraph-io/badger/v3 v3.2103.5 // indirect
github.com/dgraph-io/ristretto v0.1.1 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/go-stack/stack v1.8.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/glog v1.0.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/btree v1.1.2 // indirect
github.com/google/flatbuffers v22.11.23+incompatible // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/huandu/xstrings v1.4.0 // indirect
github.com/huin/goupnp v1.0.4-0.20220613170603-23b555710578 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/klauspost/compress v1.15.13 // indirect
github.com/lispad/go-generics-tools v1.1.0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/mschoch/smat v0.2.0 // indirect
Expand Down Expand Up @@ -95,7 +107,11 @@ require (
github.com/tidwall/btree v1.6.0 // indirect
github.com/tklauser/go-sysconf v0.3.11 // indirect
github.com/tklauser/numcpus v0.6.0 // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/otel v1.11.2 // indirect
go.opentelemetry.io/otel/trace v1.11.2 // indirect
golang.org/x/crypto v0.4.0 // indirect
golang.org/x/exp v0.0.0-20221217163422-3c43f8badb15 // indirect
golang.org/x/mod v0.7.0 // indirect
Expand All @@ -106,23 +122,4 @@ require (
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect
)

require (
github.com/alecthomas/atomic v0.1.0-alpha2 // indirect
github.com/cespare/xxhash v1.1.0 // indirect
github.com/deckarep/golang-set v1.8.0 // indirect
github.com/dgraph-io/badger/v3 v3.2103.5 // indirect
github.com/dgraph-io/ristretto v0.1.1 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/glog v1.0.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/google/flatbuffers v22.11.23+incompatible // indirect
github.com/klauspost/compress v1.15.13 // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/otel v1.11.2 // indirect
go.opentelemetry.io/otel/trace v1.11.2 // indirect
)

replace github.com/benbjohnson/immutable v0.4.0 => github.com/benbjohnson/immutable v0.3.0
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAE
github.com/RoaringBitmap/roaring v0.4.7/go.mod h1:8khRDP4HmeXns4xIj9oGrKSz7XTQiJx2zgh7AcNke4w=
github.com/RoaringBitmap/roaring v0.4.17/go.mod h1:D3qVegWTmfCaX4Bl5CrBE9hfrSrrXIr8KVNvRsDi1NI=
github.com/RoaringBitmap/roaring v0.4.23/go.mod h1:D0gp8kJQgE1A4LQ5wFLggQEyvDi06Mq5mKs52e1TwOo=
github.com/RoaringBitmap/roaring v1.2.1 h1:58/LJlg/81wfEHd5L9qsHduznOIhyv4qb1yWcSvVq9A=
github.com/RoaringBitmap/roaring v1.2.1/go.mod h1:icnadbWcNyfEHlYdr+tDlOTih1Bf/h+rzPpv4sbomAA=
github.com/RoaringBitmap/roaring v1.2.3 h1:yqreLINqIrX22ErkKI0vY47/ivtJr6n+kMhVOVmhWBY=
github.com/RoaringBitmap/roaring v1.2.3/go.mod h1:plvDsJQpxOC5bw8LRteu/MLWHsHez/3y6cubLI4/1yE=
github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
github.com/ajwerner/btree v0.0.0-20211221152037-f427b3e689c0 h1:byYvvbfSo3+9efR4IeReh77gVs4PnNDR3AMOE9NJ7a0=
Expand Down Expand Up @@ -84,8 +84,8 @@ github.com/anacrolix/tagflag v1.0.0/go.mod h1:1m2U/K6ZT+JZG0+bdMK6qauP49QT4wE5pm
github.com/anacrolix/tagflag v1.1.0/go.mod h1:Scxs9CV10NQatSmbyjqmqmeQNwGzlNe0CMUMIxqHIG8=
github.com/anacrolix/tagflag v1.3.1-0.20210717093243-41ae76143afa h1:ObMmvS9ysQI9zKRPbLbPzR6I77l1Eiie2u9I3whKlJI=
github.com/anacrolix/tagflag v1.3.1-0.20210717093243-41ae76143afa/go.mod h1:PSF7SW1eKAti6ZaqkGMzKZ6K4y4G2j8rEAmwpu6fo9Q=
github.com/anacrolix/torrent v1.48.1-0.20230103142631-c20f73d53e9f h1:5fPzkRgj1BFYzQinFQqilCPM9A/EuPUXzSC3utjMVGc=
github.com/anacrolix/torrent v1.48.1-0.20230103142631-c20f73d53e9f/go.mod h1:PwdFzmApEr96LcqogJhuw41XOdd1oHGkp+qE9hhXyDc=
github.com/anacrolix/torrent v1.48.1-0.20230123215249-d47739db0c59 h1:J3AQJ8m9igbvzv2atm6SDMO+YFNGpQVR0xnE/aapThg=
github.com/anacrolix/torrent v1.48.1-0.20230123215249-d47739db0c59/go.mod h1:CSREuKRFlbHifecdogdFDt+geYf5MnDQyRPg+1O8W8E=
github.com/anacrolix/upnp v0.1.3-0.20220123035249-922794e51c96 h1:QAVZ3pN/J4/UziniAhJR2OZ9Ox5kOY2053tBbbqUPYA=
github.com/anacrolix/upnp v0.1.3-0.20220123035249-922794e51c96/go.mod h1:Wa6n8cYIdaG35x15aH3Zy6d03f7P728QfdcDeD/IEOs=
github.com/anacrolix/utp v0.1.0 h1:FOpQOmIwYsnENnz7tAGohA+r6iXpRjrq8ssKSre2Cp4=
Expand Down