Skip to content

Commit

Permalink
pool: deprecate methods that take more than one filter.
Browse files Browse the repository at this point in the history
  • Loading branch information
fiatjaf committed Feb 12, 2025
1 parent 332a164 commit c37ed1a
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 26 deletions.
55 changes: 37 additions & 18 deletions nip60/wallet.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,24 +82,37 @@ func loadWalletFromPool(
kinds = append(kinds, 7376)
}

eoseChan := make(chan struct{})
events := pool.SubManyNotifyEOSE(
eoseChanE := make(chan struct{})
events := pool.SubscribeManyNotifyEOSE(
ctx,
relays,
nostr.Filter{Kinds: kinds, Authors: []string{pk}},
eoseChanE,
)

eoseChanD := make(chan struct{})
deletions := pool.SubscribeManyNotifyEOSE(
ctx,
relays,
nostr.Filters{
{Kinds: kinds, Authors: []string{pk}},
{Kinds: []int{5}, Tags: nostr.TagMap{"k": []string{"7375"}}, Authors: []string{pk}},
},
eoseChan,
nostr.Filter{Kinds: []int{5}, Tags: nostr.TagMap{"k": []string{"7375"}}, Authors: []string{pk}},
eoseChanD,
)

return loadWallet(ctx, kr, events, eoseChan)
eoseChan := make(chan struct{})
go func() {
<-eoseChanD
<-eoseChanE
close(eoseChan)
}()

return loadWallet(ctx, kr, events, deletions, eoseChan)
}

func loadWallet(
ctx context.Context,
kr nostr.Keyer,
events chan nostr.RelayEvent,
deletions chan nostr.RelayEvent,
eoseChan chan struct{},
) *Wallet {
w := &Wallet{
Expand All @@ -125,20 +138,26 @@ func loadWallet(
close(w.Stable)
}()

go func() {
for ie := range deletions {
w.Lock()
if !eosed {
for _, tag := range ie.Event.Tags.All([]string{"e", ""}) {
w.pendingDeletions = append(w.pendingDeletions, tag[1])
}
} else {
for _, tag := range ie.Event.Tags.All([]string{"e", ""}) {
w.removeDeletedToken(tag[1])
}
}
w.Unlock()
}
}()

go func() {
for ie := range events {
w.Lock()
switch ie.Event.Kind {
case 5:
if !eosed {
for _, tag := range ie.Event.Tags.All([]string{"e", ""}) {
w.pendingDeletions = append(w.pendingDeletions, tag[1])
}
} else {
for _, tag := range ie.Event.Tags.All([]string{"e", ""}) {
w.removeDeletedToken(tag[1])
}
}
case 17375:
if err := w.parse(ctx, kr, ie.Event); err != nil {
if w.Processed != nil {
Expand Down
2 changes: 1 addition & 1 deletion nip60/wallet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func TestWallet(t *testing.T) {
}()

// load wallet from events
loaded := loadWallet(ctx, kr, evtChan, eoseChan)
loaded := loadWallet(ctx, kr, evtChan, make(chan nostr.RelayEvent), eoseChan)
loaded.Processed = func(evt *nostr.Event, err error) {
fmt.Println("processed", evt.Kind, err)
}
Expand Down
35 changes: 28 additions & 7 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,29 @@ func (pool *SimplePool) PublishMany(ctx context.Context, urls []string, evt Even
return ch
}

// SubMany opens a subscription with the given filters to multiple relays
// the subscriptions only end when the context is canceled
// SubscribeMany opens a subscription with the given filter to multiple relays
// the subscriptions ends when the context is canceled or when all relays return a CLOSED.
func (pool *SimplePool) SubscribeMany(
ctx context.Context,
urls []string,
filter Filter,
opts ...SubscriptionOption,
) chan RelayEvent {
return pool.subMany(ctx, urls, Filters{filter}, nil, opts...)
}

// FetchMany opens a subscription, much like SubscribeMany, but it ends as soon as all Relays
// return an EOSE message.
func (pool *SimplePool) FetchMany(
ctx context.Context,
urls []string,
filter Filter,
opts ...SubscriptionOption,
) chan RelayEvent {
return pool.SubManyEose(ctx, urls, Filters{filter}, opts...)
}

// Deprecated: use SubscribeMany instead.
func (pool *SimplePool) SubMany(
ctx context.Context,
urls []string,
Expand All @@ -235,16 +256,16 @@ func (pool *SimplePool) SubMany(
return pool.subMany(ctx, urls, filters, nil, opts...)
}

// SubManyNotifyEOSE is like SubMany, but takes a channel that is closed when
// SubscribeManyNotifyEOSE is like SubscribeMany, but takes a channel that is closed when
// all subscriptions have received an EOSE
func (pool *SimplePool) SubManyNotifyEOSE(
func (pool *SimplePool) SubscribeManyNotifyEOSE(
ctx context.Context,
urls []string,
filters Filters,
filter Filter,
eoseChan chan struct{},
opts ...SubscriptionOption,
) chan RelayEvent {
return pool.subMany(ctx, urls, filters, eoseChan, opts...)
return pool.subMany(ctx, urls, Filters{filter}, eoseChan, opts...)
}

func (pool *SimplePool) subMany(
Expand Down Expand Up @@ -426,7 +447,7 @@ func (pool *SimplePool) subMany(
return events
}

// SubManyEose is like SubMany, but it stops subscriptions and closes the channel when gets a EOSE
// Deprecated: use FetchMany instead.
func (pool *SimplePool) SubManyEose(
ctx context.Context,
urls []string,
Expand Down

0 comments on commit c37ed1a

Please sign in to comment.