Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sendpayment Part 2: Channel open, Get invoice #58

Merged
merged 15 commits into from
Oct 8, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions lexe-ln/src/background_processor.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
use std::ops::Deref;
use std::sync::{Arc, Mutex};
use std::time::Duration;

use common::shutdown::ShutdownChannel;
use common::task::LxTask;
use lightning::util::events::{EventHandler, EventsProvider};
use lightning::util::events::EventsProvider;
use tokio::time::{interval, interval_at, Instant};
use tracing::{debug, error, info, trace, warn};

use crate::alias::{
LexeChainMonitorType, LexeChannelManagerType, LexeInvoicePayerType,
LexePeerManagerType, P2PGossipSyncType, ProbabilisticScorerType,
LexeChainMonitorType, LexeInvoicePayerType, P2PGossipSyncType,
ProbabilisticScorerType,
};
use crate::traits::{
LexeChannelManager, LexeEventHandler, LexePeerManager, LexePersister,
};
use crate::traits::LexePersister;

const PROCESS_EVENTS_INTERVAL: Duration = Duration::from_millis(1000);
const PEER_MANAGER_PING_INTERVAL: Duration = Duration::from_secs(15);
Expand All @@ -28,9 +29,9 @@ pub struct LexeBackgroundProcessor {}

impl LexeBackgroundProcessor {
#[allow(clippy::too_many_arguments)]
pub fn start<CHANNEL_MANAGER, PERSISTER, EVENT_HANDLER>(
pub fn start<CHANNEL_MANAGER, PEER_MANAGER, PERSISTER, EVENT_HANDLER>(
channel_manager: CHANNEL_MANAGER,
peer_manager: Arc<LexePeerManagerType<CHANNEL_MANAGER>>,
peer_manager: PEER_MANAGER,
persister: PERSISTER,
chain_monitor: Arc<LexeChainMonitorType<PERSISTER>>,
event_handler: Arc<
Expand All @@ -41,11 +42,10 @@ impl LexeBackgroundProcessor {
mut shutdown: ShutdownChannel,
) -> LxTask<()>
where
PERSISTER: Deref + Send + Sync + 'static,
PERSISTER::Target: LexePersister + Send + Sync,
CHANNEL_MANAGER: Deref<Target = LexeChannelManagerType<PERSISTER>>,
CHANNEL_MANAGER: Send + Sync + 'static,
EVENT_HANDLER: EventHandler + Send + Sync + 'static,
CHANNEL_MANAGER: LexeChannelManager<PERSISTER>,
PEER_MANAGER: LexePeerManager<CHANNEL_MANAGER, PERSISTER>,
PERSISTER: LexePersister,
EVENT_HANDLER: LexeEventHandler,
{
LxTask::spawn(async move {
let mut process_timer = interval(PROCESS_EVENTS_INTERVAL);
Expand Down
29 changes: 10 additions & 19 deletions lexe-ln/src/channel.rs
Original file line number Diff line number Diff line change
@@ -1,43 +1,34 @@
use std::ops::Deref;
use std::sync::Arc;

use anyhow::{anyhow, Context};
use common::ln::peer::ChannelPeer;
use lightning::ln::msgs::ChannelMessageHandler;
use lightning::util::config::UserConfig;
use tokio::sync::mpsc;
use tracing::{error, info};

use crate::alias::{LexeChannelManagerType, LexePeerManagerType};
use crate::p2p::{self, ChannelPeerUpdate};
use crate::traits::LexePersister;
use crate::traits::{LexeChannelManager, LexePeerManager, LexePersister};

/// Handles the full logic of opening a channel, including connecting to the
/// peer, creating the channel, and persisting the newly created channel.
pub async fn open_channel<CHANNEL_MANAGER, PERSISTER>(
channel_manager: &LexeChannelManagerType<PERSISTER>,
peer_manager: Arc<LexePeerManagerType<CHANNEL_MANAGER>>,
pub async fn open_channel<CHANNEL_MANAGER, PEER_MANAGER, PERSISTER>(
channel_manager: CHANNEL_MANAGER,
peer_manager: PEER_MANAGER,
persister: PERSISTER,
channel_peer: ChannelPeer,
channel_value_sat: u64,
channel_peer_tx: &mpsc::Sender<ChannelPeerUpdate>,
user_config: UserConfig,
) -> anyhow::Result<()>
where
CHANNEL_MANAGER: Deref + Send + Sync + 'static,
CHANNEL_MANAGER::Target: ChannelMessageHandler + Send + Sync,
PERSISTER: Deref,
PERSISTER::Target: LexePersister,
CHANNEL_MANAGER: LexeChannelManager<PERSISTER>,
PEER_MANAGER: LexePeerManager<CHANNEL_MANAGER, PERSISTER>,
PERSISTER: LexePersister,
{
info!("Opening channel with {}", channel_peer);

// Make sure that we're connected to the channel peer
p2p::connect_channel_peer_if_necessary(
peer_manager.clone(),
channel_peer.clone(),
)
.await
.context("Failed to connect to peer")?;
p2p::connect_channel_peer_if_necessary(peer_manager, channel_peer.clone())
.await
.context("Failed to connect to peer")?;

// Create the channel
let user_channel_id = 1; // Not important, just use a default value
Expand Down
4 changes: 2 additions & 2 deletions lexe-ln/src/channel_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tokio::sync::mpsc;
use tracing::{debug, error, info};

use crate::alias::LexeChainMonitorType;
use crate::traits::LexePersister;
use crate::traits::LexeInnerPersister;

pub struct LxChannelMonitorUpdate {
pub funding_txo: LxOutPoint,
Expand All @@ -32,7 +32,7 @@ pub fn spawn_channel_monitor_updated_task<PERSISTER>(
) -> LxTask<()>
where
PERSISTER: Deref + Send + Sync + 'static,
PERSISTER::Target: LexePersister + Send,
PERSISTER::Target: LexeInnerPersister + Send,
{
debug!("Starting channel_monitor_updated task");
LxTask::spawn(async move {
Expand Down
33 changes: 33 additions & 0 deletions lexe-ln/src/command.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use common::api::error::NodeApiError;
use common::api::node::NodeInfo;

use crate::traits::{LexeChannelManager, LexePeerManager, LexePersister};

pub fn node_info<CHANNEL_MANAGER, PEER_MANAGER, PERSISTER>(
channel_manager: CHANNEL_MANAGER,
peer_manager: PEER_MANAGER,
) -> Result<NodeInfo, NodeApiError>
where
CHANNEL_MANAGER: LexeChannelManager<PERSISTER>,
PEER_MANAGER: LexePeerManager<CHANNEL_MANAGER, PERSISTER>,
PERSISTER: LexePersister,
{
let node_pk = channel_manager.get_our_node_id();

let channels = channel_manager.list_channels();
let num_channels = channels.len();
let num_usable_channels = channels.iter().filter(|c| c.is_usable).count();

let local_balance_msat = channels.iter().map(|c| c.balance_msat).sum();
let num_peers = peer_manager.get_peer_node_ids().len();

let resp = NodeInfo {
node_pk,
num_channels,
num_usable_channels,
local_balance_msat,
num_peers,
};

Ok(resp)
}
4 changes: 4 additions & 0 deletions lexe-ln/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
// Allow e.g. `CHANNEL_MANAGER` in generics to clearly distinguish between
// concrete and generic types
#![allow(non_camel_case_types)]
// Allow e.g. PERSISTER: Deref<Target: LexeInnerPersister> in generics
#![feature(associated_type_bounds)]

/// Type aliases.
pub mod alias;
Expand All @@ -16,6 +18,8 @@ pub mod bitcoind;
pub mod channel;
/// Channel monitor
pub mod channel_monitor;
/// Top level commands that can be initiated by the user.
pub mod command;
/// Keys manager
pub mod keys_manager;
/// LDK + SGX compatible logger
Expand Down
55 changes: 29 additions & 26 deletions lexe-ln/src/p2p.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::collections::HashSet;
use std::ops::Deref;
use std::sync::Arc;
use std::time::Duration;

use anyhow::{bail, Context};
Expand All @@ -10,14 +9,12 @@ use common::shutdown::ShutdownChannel;
use common::task::LxTask;
use futures::future;
use futures::stream::{FuturesUnordered, StreamExt};
use lightning::ln::msgs::ChannelMessageHandler;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::mpsc;
use tokio::time;
use tracing::{debug, error, info, warn};

use crate::alias::{LexeChannelManagerType, LexePeerManagerType};
use crate::traits::LexePersister;
use crate::traits::{LexeChannelManager, LexePeerManager, LexePersister};

const CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
const P2P_RECONNECT_INTERVAL: Duration = Duration::from_secs(60);
Expand All @@ -36,13 +33,18 @@ pub enum ChannelPeerUpdate {
Remove(ChannelPeer),
}

pub async fn connect_channel_peer_if_necessary<CHANNEL_MANAGER>(
peer_manager: Arc<LexePeerManagerType<CHANNEL_MANAGER>>,
pub async fn connect_channel_peer_if_necessary<
CHANNEL_MANAGER,
PEER_MANAGER,
PERSISTER,
>(
peer_manager: PEER_MANAGER,
channel_peer: ChannelPeer,
) -> anyhow::Result<()>
where
CHANNEL_MANAGER: Deref + Send + Sync + 'static,
CHANNEL_MANAGER::Target: ChannelMessageHandler + Send + Sync,
CHANNEL_MANAGER: LexeChannelManager<PERSISTER>,
PEER_MANAGER: LexePeerManager<CHANNEL_MANAGER, PERSISTER>,
PERSISTER: LexePersister,
{
debug!("Connecting to channel peer {channel_peer}");

Expand All @@ -61,13 +63,14 @@ where
.context("Failed to connect to peer")
}

pub async fn do_connect_peer<CHANNEL_MANAGER>(
peer_manager: Arc<LexePeerManagerType<CHANNEL_MANAGER>>,
pub async fn do_connect_peer<CHANNEL_MANAGER, PEER_MANAGER, PERSISTER>(
peer_manager: PEER_MANAGER,
channel_peer: ChannelPeer,
) -> anyhow::Result<()>
where
CHANNEL_MANAGER: Deref + Send + Sync + 'static,
CHANNEL_MANAGER::Target: ChannelMessageHandler + Send + Sync,
CHANNEL_MANAGER: LexeChannelManager<PERSISTER>,
PEER_MANAGER: LexePeerManager<CHANNEL_MANAGER, PERSISTER>,
PERSISTER: LexePersister,
{
let stream =
time::timeout(CONNECT_TIMEOUT, TcpStream::connect(channel_peer.addr))
Expand All @@ -94,7 +97,7 @@ where
//
// TODO: Rewrite / replace lightning-net-tokio entirely
let connection_closed_fut = lightning_net_tokio::setup_outbound(
peer_manager.clone(),
peer_manager.arc_inner(),
channel_peer.node_pk.0,
stream,
);
Expand Down Expand Up @@ -130,18 +133,17 @@ where
}

/// Spawns a task that regularly reconnects to the channel peers stored in DB.
pub fn spawn_p2p_reconnector<CHANNEL_MANAGER, PERSISTER>(
channel_manager: Arc<LexeChannelManagerType<PERSISTER>>,
peer_manager: Arc<LexePeerManagerType<CHANNEL_MANAGER>>,
pub fn spawn_p2p_reconnector<CHANNEL_MANAGER, PEER_MANAGER, PERSISTER>(
channel_manager: CHANNEL_MANAGER,
peer_manager: PEER_MANAGER,
initial_channel_peers: Vec<ChannelPeer>,
mut channel_peer_rx: mpsc::Receiver<ChannelPeerUpdate>,
mut shutdown: ShutdownChannel,
) -> LxTask<()>
where
CHANNEL_MANAGER: Deref + Send + Sync + 'static,
CHANNEL_MANAGER::Target: ChannelMessageHandler + Send + Sync,
PERSISTER: Deref + Send + Sync + 'static,
PERSISTER::Target: LexePersister + Send + Sync,
CHANNEL_MANAGER: LexeChannelManager<PERSISTER>,
PEER_MANAGER: LexePeerManager<CHANNEL_MANAGER, PERSISTER>,
PERSISTER: LexePersister,
{
LxTask::spawn(async move {
let mut interval = time::interval(P2P_RECONNECT_INTERVAL);
Expand Down Expand Up @@ -203,14 +205,15 @@ where

/// Given a [`TcpListener`], spawns a task to await on inbound connections,
/// handing off the resultant `TcpStream`s for the `PeerManager` to manage.
pub fn spawn_p2p_listener<CHANNEL_MANAGER>(
pub fn spawn_p2p_listener<CHANNEL_MANAGER, PEER_MANAGER, PERSISTER>(
listener: TcpListener,
peer_manager: Arc<LexePeerManagerType<CHANNEL_MANAGER>>,
peer_manager: PEER_MANAGER,
mut shutdown: ShutdownChannel,
) -> LxTask<()>
where
CHANNEL_MANAGER: Deref + 'static + Send + Sync,
CHANNEL_MANAGER::Target: ChannelMessageHandler + Send + Sync,
CHANNEL_MANAGER: LexeChannelManager<PERSISTER>,
PEER_MANAGER: LexePeerManager<CHANNEL_MANAGER, PERSISTER>,
PERSISTER: LexePersister,
{
LxTask::spawn(async move {
let mut child_tasks = FuturesUnordered::new();
Expand All @@ -235,15 +238,15 @@ where
};

// Spawn a task to await on the connection
let peer_manager_clone = peer_manager.clone();
let peer_manager_arc_inner = peer_manager.arc_inner();
let child_task = LxTask::spawn(async move {
// `setup_inbound()` returns a future that completes
// when the connection is closed. The main thread calls
// peer_manager.disconnect_all_peers() once it receives
// a shutdown signal so there is no need to pass in a
// `shutdown`s here.
let connection_closed = lightning_net_tokio::setup_inbound(
peer_manager_clone,
peer_manager_arc_inner,
tcp_stream,
);
connection_closed.await;
Expand Down
30 changes: 5 additions & 25 deletions lexe-ln/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,7 @@ const CHAIN_TIP_POLL_INTERVAL: Duration = Duration::from_secs(60);
/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
/// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
#[must_use]
pub struct SyncedChainListeners<PERSISTER>
where
PERSISTER: Deref + Send + Sync + 'static,
PERSISTER::Target: LexePersister + Send,
{
pub struct SyncedChainListeners<PERSISTER: LexePersister> {
network: Network,
block_source: Arc<BlockSourceType>,

Expand All @@ -67,11 +63,7 @@ where
chain_tip: ValidatedBlockHeader,
}

impl<PERSISTER> SyncedChainListeners<PERSISTER>
where
PERSISTER: Deref + Send + Sync + 'static,
PERSISTER::Target: LexePersister + Send,
{
impl<PERSISTER: LexePersister> SyncedChainListeners<PERSISTER> {
#[allow(clippy::too_many_arguments)]
pub async fn init_and_sync(
network: Network,
Expand Down Expand Up @@ -285,11 +277,7 @@ where
}

/// Associates a [`LxListener`] with its latest synced [`BlockHash`].
struct LxChainListener<PERSISTER>
where
PERSISTER: Deref + Send + Sync + 'static,
PERSISTER::Target: LexePersister + Send,
{
struct LxChainListener<PERSISTER: LexePersister> {
blockhash: BlockHash,
listener: LxListener<PERSISTER>,
}
Expand All @@ -299,21 +287,13 @@ where
/// [`lightning_block_sync::init::synchronize_listeners`] (as ldk-sample does)
/// causes this sync implementation to not be [`Send`], which is required for
/// moving the node into a task spawned during smoketests.
enum LxListener<PERSISTER>
where
PERSISTER: Deref + Send + Sync + 'static,
PERSISTER::Target: LexePersister + Send,
{
enum LxListener<PERSISTER: LexePersister> {
ChannelMonitor(ChannelMonitorChainListener),
ChannelManager(Arc<LexeChannelManagerType<PERSISTER>>),
}

/// This [`Listen`] impl simply delegates to the inner type.
impl<PERSISTER> Listen for LxListener<PERSISTER>
where
PERSISTER: Deref + Send + Sync + 'static,
PERSISTER::Target: LexePersister + Send,
{
impl<PERSISTER: LexePersister> Listen for LxListener<PERSISTER> {
fn filtered_block_connected(
&self,
header: &BlockHeader,
Expand Down
Loading