Skip to content

Commit

Permalink
Remove unnecessary incoming channel
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Michael Avila <[email protected]>
  • Loading branch information
michaelavila committed Jan 18, 2019
1 parent 00be68d commit f59148e
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 31 deletions.
49 changes: 18 additions & 31 deletions provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ type Provider struct {
ctx context.Context
lock sync.Mutex

// CIDs to provide
incoming chan cid.Cid
// CIDs we are working on providing now
outgoing chan cid.Cid
// strategy for deciding which CIDs, given a CID, should be provided
Expand All @@ -47,25 +45,36 @@ func NewProvider(ctx context.Context, strategy Strategy, tracker *Tracker, queue
contentRouting: contentRouting,
lock: sync.Mutex{},
outgoing: make(chan cid.Cid),
incoming: make(chan cid.Cid),
}
}

// Start workers to handle provide requests.
func (p *Provider) Run() {
go p.handleIncoming()
go p.handlePopulateOutgoing()
go p.handleOutgoing()
}

// Provider the given cid using specified strategy.
func (p *Provider) Provide(root cid.Cid) {
func (p *Provider) Provide(root cid.Cid) error {
cids := p.strategy(p.ctx, root)
go func() {
for cid := range cids {
p.incoming <- cid

for cid := range cids {
isTracking, err := p.tracker.IsTracking(cid)
if err != nil {
return err
}
}()

if !isTracking {
p.lock.Lock()
if err := p.queue.Enqueue(cid); err != nil {
p.lock.Unlock()
return err
}
p.lock.Unlock()
}
}

return nil
}

func (p *Provider) Unprovide(cid cid.Cid) error {
Expand All @@ -83,28 +92,6 @@ func (p *Provider) announce(cid cid.Cid) error {
return nil
}

// Move CIDs from the incoming channel to the providing queue
func (p *Provider) handleIncoming() {
for {
select {
case key := <-p.incoming:
isTracking, err := p.tracker.IsTracking(key)
if err != nil {
log.Warning("Unable to check provider tracking on incoming: %s", err)
continue
}

if !isTracking {
p.lock.Lock()
p.queue.Enqueue(key)
p.lock.Unlock()
}
case <-p.ctx.Done():
return
}
}
}

// Move CIDs from the providing queue to the outgoing channel
func (p *Provider) handlePopulateOutgoing() {
for {
Expand Down
1 change: 1 addition & 0 deletions provider/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ func NewProvideAllStrategy(dag ipld.DAGService) Strategy {
cids <- cid
return true
})
close(cids)
}()
return cids
}
Expand Down

0 comments on commit f59148e

Please sign in to comment.