Skip to content

Commit

Permalink
Timeout moved out of run_sync into class init
Browse files Browse the repository at this point in the history
* timeout is set in the relay or relaymanager class
* get_timestamp fixed
* new dm communication example
  • Loading branch information
holgern committed Feb 27, 2023
1 parent 7666465 commit 44328f5
Show file tree
Hide file tree
Showing 8 changed files with 166 additions and 30 deletions.
13 changes: 7 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,13 @@ from pynostr.event import EventKind
import time
import uuid

relay_manager = RelayManager()
relay_manager = RelayManager(timeout=2)
relay_manager.add_relay("wss://nostr-pub.wellorder.net")
relay_manager.add_relay("wss://relay.damus.io")
filters = FiltersList([Filters(kinds=[EventKind.TEXT_NOTE], limit=100)])
subscription_id = uuid.uuid1().hex
relay_manager.add_subscription_on_all_relays(subscription_id, filters)
relay_manager.run_sync(timeout=2)
relay_manager.run_sync()
while relay_manager.message_pool.has_notices():
notice_msg = relay_manager.message_pool.get_notice()
print(notice_msg.content)
Expand Down Expand Up @@ -96,15 +96,16 @@ r = Relay(
"wss://relay.damus.io",
message_pool,
io_loop,
policy
policy,
timeout=2
)
filters = FiltersList([Filters(kinds=[EventKind.TEXT_NOTE], limit=100)])
subscription_id = uuid.uuid1().hex

r.add_subscription(subscription_id, filters)

try:
io_loop.run_sync(r.connect, timeout=2)
io_loop.run_sync(r.connect)
except gen.Return:
pass
io_loop.stop()
Expand All @@ -130,7 +131,7 @@ from pynostr.filters import FiltersList, Filters
from pynostr.message_type import ClientMessageType
from pynostr.key import PrivateKey

relay_manager = RelayManager()
relay_manager = RelayManager(timeout=6)
relay_manager.add_relay("wss://nostr-pub.wellorder.net")
relay_manager.add_relay("wss://relay.damus.io")
private_key = PrivateKey()
Expand All @@ -143,7 +144,7 @@ event = Event("Hello Nostr")
event.sign(private_key.hex())

relay_manager.publish_event(event)
relay_manager.run_sync(timeout=6)
relay_manager.run_sync()
time.sleep(5) # allow the messages to send
while relay_manager.message_pool.has_ok_notices():
ok_msg = relay_manager.message_pool.get_ok_notice()
Expand Down
102 changes: 102 additions & 0 deletions examples/dm_communication.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
#!/usr/bin/env python

import getpass
import uuid

import tornado.ioloop
from rich.console import Console
from tornado import gen

from pynostr.base_relay import RelayPolicy
from pynostr.encrypted_dm import EncryptedDirectMessage
from pynostr.event import Event, EventKind
from pynostr.filters import Filters, FiltersList
from pynostr.key import PrivateKey, PublicKey
from pynostr.message_pool import MessagePool
from pynostr.message_type import RelayMessageType
from pynostr.relay import Relay
from pynostr.utils import extract_nip05, get_timestamp


@gen.coroutine
def print_dm(message_json):
message_type = message_json[0]
if message_type == RelayMessageType.EVENT:
event = Event.from_dict(message_json[2])
if event.kind == EventKind.ENCRYPTED_DIRECT_MESSAGE:
if event.has_pubkey_ref(sender_pk.public_key.hex()):
rdm = EncryptedDirectMessage.from_event(event)
rdm.decrypt(sender_pk.hex(), public_key_hex=recipient.hex())
print(f"New dm received:{event.date_time()} {rdm.cleartext_content}")
elif message_type == RelayMessageType.OK:
print(message_json)
elif message_type == RelayMessageType.NOTICE:
print(message_json)


if __name__ == "__main__":

console = Console()

pk = getpass.getpass(prompt="Enter sender private key (new one when empty): ")
if len(pk) == 0:
sender_pk = PrivateKey()
else:
sender_pk = PrivateKey.from_nsec(pk)

print(f"New dm are sent from: {sender_pk.public_key.bech32()}")

recipient_str = input("recipient (npub or nip05): ")
recipient = ""
if "@" in recipient_str:
nip05 = extract_nip05(recipient_str)
recipient = PublicKey.from_hex(nip05[0])
elif "npub" in recipient_str:
recipient = PublicKey.from_hex(recipient_str)
if recipient != "":
print(f"recipient is set to {recipient.bech32()}")
else:
raise Exception("reciever not valed")
relay_url = input("relay: ")

dm = EncryptedDirectMessage()
dm.encrypt(
sender_pk.hex(),
cleartext_content="Hello, please reply to this dm for testing :).",
recipient_pubkey=recipient.hex(),
)

filters = FiltersList(
[
Filters(
authors=[recipient.hex()],
kinds=[EventKind.ENCRYPTED_DIRECT_MESSAGE],
since=get_timestamp(),
limit=10,
)
]
)

subscription_id = uuid.uuid1().hex
io_loop = tornado.ioloop.IOLoop.current()
message_pool = MessagePool(first_response_only=False)
policy = RelayPolicy()
r = Relay(
relay_url,
message_pool,
io_loop,
policy,
timeout=5,
close_on_eose=False,
message_callback=print_dm,
)
dm_event = dm.to_event()
dm_event.sign(sender_pk.hex())
r.publish(dm_event.to_message())
r.add_subscription(subscription_id, filters)

try:
io_loop.run_sync(r.connect)
except gen.Return:
pass
io_loop.stop()
4 changes: 2 additions & 2 deletions examples/paid_relay_users.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
relay_info["fees"]['admission'][0]['amount'] / 1000
)
print(f"Found {len(paid_relays)} relays with fee!")
relay_manager = RelayManager(error_threshold=10)
relay_manager = RelayManager(error_threshold=10, timeout=5)
for url in paid_relays.keys():
relay_manager.add_relay(url)

Expand All @@ -61,7 +61,7 @@
)
subscription_id = uuid.uuid1().hex
relay_manager.add_subscription_on_all_relays(subscription_id, filters)
relay_manager.run_sync(timeout=5)
relay_manager.run_sync()

event_messages = relay_manager.message_pool.get_all_events()
events.add_event(event_messages)
Expand Down
16 changes: 13 additions & 3 deletions examples/relay_subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,19 @@

console = Console()

nip05 = extract_nip05("[email protected]")
input_str = input("author (npub or nip05): ")
recipient = ""
if "@" in input_str:
nip05 = extract_nip05(input_str)
author = PublicKey.from_hex(nip05[0])
elif "npub" in input_str:
author = PublicKey.from_hex(input_str)
if author != "":
print(f"author is set to {author.bech32()}")
else:
raise Exception("reciever not valed")
relay_url = input("relay: ")

author = PublicKey.from_hex(nip05[0])
filters = FiltersList(
[Filters(authors=[author.hex()], kinds=[EventKind.TEXT_NOTE], limit=100)]
)
Expand All @@ -30,7 +40,7 @@
io_loop = tornado.ioloop.IOLoop.current()
message_pool = MessagePool(first_response_only=False)
policy = RelayPolicy()
r = Relay("wss://relay.damus.io", message_pool, io_loop, policy)
r = Relay(relay_url, message_pool, io_loop, policy, timeout=3)

r.add_subscription(subscription_id, filters)

Expand Down
2 changes: 2 additions & 0 deletions pynostr/base_relay.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def __init__(
policy: RelayPolicy = RelayPolicy(),
ssl_options: dict = None,
proxy_config: RelayProxyConnectionConfig = None,
timeout: float = 2.0,
close_on_eose: bool = True,
message_callback=None,
) -> None:
Expand All @@ -47,6 +48,7 @@ def __init__(
self.policy = policy
self.ssl_options = ssl_options
self.proxy_config = proxy_config
self.timeout = timeout
self.close_on_eose = close_on_eose
self.lock: Lock = Lock()
self.ws = None
Expand Down
18 changes: 13 additions & 5 deletions pynostr/relay.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,19 @@ def __init__(
message_pool: MessagePool,
io_loop: IOLoop,
policy: RelayPolicy = RelayPolicy(),
timeout: float = 2.0,
close_on_eose: bool = True,
message_callback=None,
) -> None:
super().__init__(
url, message_pool, policy, None, None, close_on_eose, message_callback
url,
message_pool,
policy,
None,
None,
timeout,
close_on_eose,
message_callback,
)
self.io_loop = io_loop
self.running = True
Expand All @@ -31,12 +39,12 @@ def is_connected(self) -> bool:
return self.ws is not None and self.ws.protocol is not None

@gen.coroutine
def connect(self, timeout=2):
def connect(self):
error = False
try:
if timeout > 0:
if self.timeout > 0:
self.ws = yield gen.with_timeout(
self.io_loop.time() + timeout,
self.io_loop.time() + self.timeout,
websocket_connect(
self.url,
ping_interval=60,
Expand Down Expand Up @@ -77,7 +85,7 @@ def connect(self, timeout=2):
if error:
self.error_counter += 1
if self.error_counter <= self.error_threshold:
self.io_loop.call_later(1, self.connect, timeout)
self.io_loop.call_later(1, self.connect)
else:
return
# print(self.request)
Expand Down
37 changes: 25 additions & 12 deletions pynostr/relay_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import time
from dataclasses import dataclass
from typing import Optional

from tornado import gen
from tornado.ioloop import IOLoop
Expand All @@ -19,7 +20,14 @@

@dataclass
class RelayManager:
error_threshold: int = 3
"""RelayManager.
:param error_threshold: When set, error_threshold on each relay is overwritten
:param timeout: When set, timeout on each relay is overwritten
"""

error_threshold: Optional[int] = None
timeout: Optional[float] = None

def __post_init__(self):
self.relays: dict[str, Relay] = {}
Expand All @@ -30,6 +38,7 @@ def add_relay(
self,
url: str,
policy: RelayPolicy = RelayPolicy(),
timeout=2,
close_on_eose: bool = True,
message_callback=None,
):
Expand All @@ -39,12 +48,14 @@ def add_relay(
self.message_pool,
self.io_loop,
policy,
timeout=timeout,
close_on_eose=close_on_eose,
message_callback=message_callback,
)
if self.error_threshold:
if self.error_threshold is not None:
relay.error_threshold = self.error_threshold

if self.timeout is not None:
relay.timeout = self.timeout
self.relays[url] = relay

def remove_relay(self, url: str):
Expand All @@ -68,30 +79,32 @@ def add_subscription_on_relay(self, url: str, id: str, filters: FiltersList):
relay.add_subscription(id, filters)

@gen.coroutine
def prepare_relays(self, timeout: int = 2):
def prepare_relays(self):
futures_timeout = []
futures = []
relays = []
for relay in self.relays.values():
if relay.policy.should_read:
# yield relay.connect()
timeout = relay.timeout
relays.append(relay)
if timeout > 0:
future = gen.with_timeout(
self.io_loop.time() + timeout, relay.connect(timeout=0)
self.io_loop.time() + timeout, relay.connect()
)
futures.append(future)
futures_timeout.append(future)
else:
futures.append(relay.connect(timeout=0))
futures.append(relay.connect())

if timeout > 0:
for i, future in enumerate(futures):
if len(futures_timeout) > 0:
for i, future in enumerate(futures_timeout):
try:
yield future
except gen.TimeoutError:
log.warning(
f"Connection to WebSocket client {relays[i].url} timed out"
)
else:
elif len(futures) > 0:
yield gen.multi(futures)
raise gen.Return(relays)

Expand All @@ -100,8 +113,8 @@ def add_subscription_on_all_relays(self, id: str, filters: FiltersList):
if relay.policy.should_read:
relay.add_subscription(id, filters)

def run_sync(self, timeout: int = 2):
self.io_loop.run_sync(lambda: self.prepare_relays(timeout))
def run_sync(self):
self.io_loop.run_sync(lambda: self.prepare_relays())

def close_subscription_on_relay(self, url: str, id: str):
if url in self.relays:
Expand Down
4 changes: 2 additions & 2 deletions pynostr/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ def nprofile_encode(pubkey: str, relays: [str]):


def get_timestamp(days=0, seconds=0, minutes=0, hours=0, weeks=0):
utc_now = datetime.datetime.utcnow()
date = utc_now - datetime.timedelta(
now = datetime.datetime.now()
date = now - datetime.timedelta(
days=days, seconds=seconds, minutes=minutes, hours=hours, weeks=weeks
)
return int(date.timestamp())
Expand Down

0 comments on commit 44328f5

Please sign in to comment.