Skip to content

Commit

Permalink
search loops fix
Browse files Browse the repository at this point in the history
  • Loading branch information
ucwong committed Apr 10, 2023
1 parent 33d2497 commit 7168cc7
Show file tree
Hide file tree
Showing 7 changed files with 182 additions and 197 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@ clean:
format:
find . -name '*.go' -type f -not -path "./vendor*" -not -path "*.git*" -not -path "*/generated/*" | xargs gofmt -w -s
test: format
go test ./... -v -race -cpu=1,2 -coverprofile=coverage.txt -covermode=atomic
go test ./... -v -race -cpu=1,2,4 -coverprofile=coverage.txt -covermode=atomic
111 changes: 39 additions & 72 deletions backend/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,8 @@ type TorrentManager struct {
recovery atomic.Uint32
seeds atomic.Int32
pends atomic.Int32

actives atomic.Int32
stops atomic.Int32
actives atomic.Int32
stops atomic.Int32
}

// can only call by fs.go: 'SeedingLocal()'
Expand Down Expand Up @@ -354,7 +353,18 @@ func (tm *TorrentManager) dropTorrent(t *Torrent) {
tm.stops.Add(1)
}()

if t.Status() == torrentPending {
switch t.Status() {
case torrentPending:
case torrentRunning:
tm.actives.Add(-1)
case torrentPaused:
case torrentSeeding:
tm.seeds.Add(-1)
default:
log.Warn("Unknown status", "ih", t.InfoHash(), "status", t.Status())
}

/*if t.Status() == torrentPending {
//tm.pending_lock.Lock()
//delete(tm.pendingTorrents, t.InfoHash())
//tm.pending_lock.Unlock()
Expand All @@ -373,7 +383,7 @@ func (tm *TorrentManager) dropTorrent(t *Torrent) {
tm.seeds.Add(-1)
} else {
log.Warn("Unknown status", "ih", t.InfoHash(), "status", t.Status())
}
}*/

//delete(tm.torrents, t.InfoHash())
//tm.torrents.Delete(t.InfoHash())
Expand All @@ -383,45 +393,30 @@ func (tm *TorrentManager) Close() error {
tm.lock.Lock()
defer tm.lock.Unlock()

/*log.Info("Current running torrents", "size", len(tm.torrents))
for _, t := range tm.torrents {
t.Stop()
}*/
tm.closeOnce.Do(func() {
log.Info("Current running torrents", "size", tm.torrents.Len())
tm.torrents.Range(func(_ string, t *Torrent) bool {
t.Close()
return true
})

log.Info("Current running torrents", "size", tm.torrents.Len())
tm.torrents.Range(func(_ string, t *Torrent) bool {
t.Close()
return true
})
tm.client.Close()
tm.client.WaitAll()

tm.client.Close()
tm.client.WaitAll()
tm.closeOnce.Do(func() {
close(tm.closeAll)
tm.wg.Wait()
})
//if tm.fileCache != nil {
// tm.fileCache.Reset()
//}

if tm.kvdb != nil {
log.Info("Nas engine close", "engine", tm.kvdb.Name())
tm.kvdb.Close()
}
//if tm.hotCache != nil {
// tm.hotCache.Purge()
//}

if tm.fc != nil {
tm.fc.Stop()
}
if tm.kvdb != nil {
log.Info("Nas engine close", "engine", tm.kvdb.Name())
tm.kvdb.Close()
}

//tm.torrents.Clear()
//tm.seedingTorrents.Clear()
//tm.activeTorrents.Clear()
//tm.pendingTorrents.Clear()
if tm.fc != nil {
tm.fc.Stop()
}
log.Info("Fs Download Manager Closed")
})

log.Info("Fs Download Manager Closed")
return nil
}

Expand Down Expand Up @@ -684,6 +679,7 @@ func (tm *TorrentManager) injectSpec(ih string, spec *torrent.TorrentSpec) (*tor
func (tm *TorrentManager) updateGlobalTrackers() {
tm.lock.Lock()
defer tm.lock.Unlock()

if global := wormhole.BestTrackers(); len(global) > 0 {
tm.globalTrackers = [][]string{global}
log.Info("Global trackers update", "size", len(global), "cap", wormhole.CAP)
Expand Down Expand Up @@ -846,7 +842,7 @@ func NewTorrentManager(config *params.Config, fsid uint64, cache, compress bool)
activeChan: make(chan *Torrent, torrentChanSize),
pendingChan: make(chan *Torrent, torrentChanSize),
//pendingRemoveChan: make(chan string, torrentChanSize),
droppingChan: make(chan string),
droppingChan: make(chan string, 1),
mode: config.Mode,
//boost: config.Boost,
id: fsid,
Expand Down Expand Up @@ -1033,7 +1029,6 @@ func (tm *TorrentManager) commit(ctx context.Context, hex string, request uint64
case <-ctx.Done():
return ctx.Err()
case <-tm.closeAll:
return nil
}

return nil
Expand Down Expand Up @@ -1069,6 +1064,7 @@ func (tm *TorrentManager) mainLoop() {
t.Torrent = tt
t.start = mclock.Now()
t.Unlock()

tm.forPending(t)
tm.recovery.Add(1)
tm.stops.Add(-1)
Expand Down Expand Up @@ -1129,7 +1125,7 @@ func (tm *TorrentManager) pendingLoop() {
}

if err := t.Start(); err != nil {
log.Error("Nas start failed", "ih", t.InfoHash())
log.Error("Nas start failed", "ih", t.InfoHash(), "err", err)
// TODO
}

Expand Down Expand Up @@ -1191,37 +1187,6 @@ func (tm *TorrentManager) finish(t *Torrent) {
}
}

var _cache uint64

func (tm *TorrentManager) total() (ret uint64) {
/*tm.lock.RLock()
defer tm.lock.RUnlock()
for _, t := range tm.torrents {
if t.Torrent.Info() != nil {
ret += uint64(t.Torrent.BytesCompleted())
}
}*/

tm.torrents.Range(func(_ string, t *Torrent) bool {
if t.Torrent.Info() != nil {
t.RLock()
ret += uint64(t.Torrent.BytesCompleted())
t.RUnlock()
}

return true
})

if _cache > ret {
ret = _cache
} else {
_cache = ret
}

return
}

/*func (tm *TorrentManager) dur() uint64 {
return tm.seconds.Load()
}
Expand All @@ -1237,12 +1202,14 @@ func (tm *TorrentManager) activeLoop() {
//timer_2 = time.NewTicker(time.Second * params.QueryTimeInterval * 3600 * 18)
//clean = []*Torrent{}
)

defer func() {
tm.wg.Done()
timer.Stop()
timer_1.Stop()
//timer_2.Stop()
}()

for {
select {
case t := <-tm.activeChan:
Expand All @@ -1253,7 +1220,7 @@ func (tm *TorrentManager) activeLoop() {
//tm.activeTorrents.Set(t.InfoHash(), t)

if t.QuotaFull() { //t.Length() <= t.BytesRequested() {
t.Leech()
//t.Leech()
}

n := tm.blockCaculate(t.Torrent.Length())
Expand Down
27 changes: 20 additions & 7 deletions backend/torrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
//"bytes"
//"context"
"context"
"os"
"path/filepath"
"sync"
Expand Down Expand Up @@ -92,7 +93,7 @@ func NewTorrent(t *torrent.Torrent, requested int64, ih string, path string, slo
infohash: ih,
filepath: path,
start: mclock.Now(),
taskCh: make(chan task, 1),
taskCh: make(chan task, 8),
closeAll: make(chan any),
slot: slot,
spec: spec,
Expand Down Expand Up @@ -160,9 +161,6 @@ func (t *Torrent) CitedDec() {
}

func (t *Torrent) BytesRequested() int64 {
//t.RLock()
//defer t.RUnlock()

return t.bytesRequested
}

Expand Down Expand Up @@ -230,9 +228,10 @@ func (t *Torrent) Seed() bool {
//}
if t.Torrent.Seeding() {
t.Lock()
defer t.Unlock()

t.status = torrentSeeding
t.stopListen()
t.Unlock()

elapsed := time.Duration(mclock.Now()) - time.Duration(t.start)
//if active, ok := params.GoodFiles[t.InfoHash()]; !ok {
Expand All @@ -255,6 +254,8 @@ func (t *Torrent) IsSeeding() bool {
}

func (t *Torrent) Pause() {
t.Lock()
defer t.Unlock()
//if t.currentConns > t.minEstablishedConns {
//t.setCurrentConns(t.minEstablishedConns)
//t.Torrent.SetMaxEstablishedConns(t.minEstablishedConns)
Expand All @@ -267,6 +268,9 @@ func (t *Torrent) Pause() {
}

func (t *Torrent) Paused() bool {
t.RLock()
defer t.RUnlock()

return t.status == torrentPaused
}

Expand Down Expand Up @@ -335,8 +339,17 @@ func (t *Torrent) download(p int) error {
e = s + p

if t.taskCh != nil {
t.taskCh <- task{s, e}
log.Info(ScaleBar(s, e, t.Torrent.NumPieces()), "ih", t.InfoHash(), "slot", t.slot, "s", s, "e", e, "p", p, "total", t.Torrent.NumPieces())
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
select {
case t.taskCh <- task{s, e}:
log.Info(ScaleBar(s, e, t.Torrent.NumPieces()), "ih", t.InfoHash(), "slot", t.slot, "s", s, "e", e, "p", p, "total", t.Torrent.NumPieces())
case <-ctx.Done():
return ctx.Err()
case <-t.closeAll:
}
} else {
return errors.New("task channel is nil")
}
return nil
}
Expand Down
Loading

0 comments on commit 7168cc7

Please sign in to comment.