diff --git a/Cargo.lock b/Cargo.lock index 3caa5210b..550b114f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -481,6 +481,7 @@ dependencies = [ "serde", "serde_json", "serde_qs", + "serde_with", "sgx-isa", "sgx-panic-backtrace", "sha2-const", @@ -625,6 +626,41 @@ dependencies = [ "typenum", ] +[[package]] +name = "darling" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4529658bdda7fd6769b8614be250cdcfc3aeb0ee72fe66f9e41e5e5eb73eac02" +dependencies = [ + "darling_core", + "darling_macro", +] + +[[package]] +name = "darling_core" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "649c91bc01e8b1eac09fb91e8dbc7d517684ca6be8ebc75bb9cafc894f9fdb6f" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2 1.0.43", + "quote 1.0.21", + "strsim", + "syn 1.0.99", +] + +[[package]] +name = "darling_macro" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddfc69c5bfcbd2fc09a0f38451d2daf0e372e367986a83906d1b0dbc88134fb5" +dependencies = [ + "darling_core", + "quote 1.0.21", + "syn 1.0.99", +] + [[package]] name = "data-encoding" version = "2.3.2" @@ -1076,6 +1112,12 @@ dependencies = [ "tokio-rustls", ] +[[package]] +name = "ident_case" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" + [[package]] name = "idna" version = "0.2.3" @@ -1193,6 +1235,7 @@ dependencies = [ "secrecy", "serde", "serde_json", + "thiserror", "tokio", "tracing", "tracing-core", @@ -2115,6 +2158,28 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_with" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "368f2d60d049ea019a84dcd6687b0d1e0030fe663ae105039bdf967ed5e6a9a7" +dependencies = [ + "serde", + "serde_with_macros", +] + +[[package]] +name = "serde_with_macros" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ccadfacf6cf10faad22bbadf55986bdd0856edfb5d9210aa1dcf1f516e84e93" +dependencies = [ + "darling", + "proc-macro2 1.0.43", + "quote 1.0.21", + "syn 1.0.99", +] + [[package]] name = "sgx-isa" version = "0.4.0" @@ -2286,6 +2351,12 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "213701ba3370744dcd1a12960caa4843b3d68b4d1c0a5d575e0d65b2ee9d16c0" +[[package]] +name = "strsim" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" + [[package]] name = "subtle" version = "2.4.1" diff --git a/common/Cargo.toml b/common/Cargo.toml index 64cf8d7e6..930661702 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -76,6 +76,7 @@ rustls-pemfile = "1" secrecy = "0.8" # Serialization / deserialization framework serde = { version = "1", features = ["derive"] } +serde_with = { version = "2", default-features = false, features = ["macros"] } serde_qs = "0" serde_json = "1" # Core SGX types diff --git a/common/src/api/node.rs b/common/src/api/command.rs similarity index 78% rename from common/src/api/node.rs rename to common/src/api/command.rs index 1792be86e..710ecd147 100644 --- a/common/src/api/node.rs +++ b/common/src/api/command.rs @@ -16,3 +16,9 @@ pub struct NodeInfo { pub struct ListChannels { pub channel_details: Vec, } + +#[derive(Serialize, Deserialize)] +pub struct GetInvoiceRequest { + pub amt_msat: Option, + pub expiry_secs: u32, +} diff --git a/common/src/api/def.rs b/common/src/api/def.rs index d762858d2..43bd3c755 100644 --- a/common/src/api/def.rs +++ b/common/src/api/def.rs @@ -20,8 +20,8 @@ use async_trait::async_trait; +use crate::api::command::{GetInvoiceRequest, ListChannels, NodeInfo}; use crate::api::error::{BackendApiError, NodeApiError, RunnerApiError}; -use crate::api::node::{ListChannels, NodeInfo}; use crate::api::ports::UserPorts; use crate::api::provision::{ Instance, Node, NodeInstanceSeed, NodeProvisionRequest, SealedSeed, @@ -30,6 +30,7 @@ use crate::api::provision::{ use crate::api::vfs::{NodeDirectory, NodeFile, NodeFileId}; use crate::api::UserPk; use crate::enclave::Measurement; +use crate::ln::invoice::LxInvoice; /// Defines the api that the backend exposes to the node. #[async_trait] @@ -142,4 +143,10 @@ pub trait OwnerNodeRunApi { /// /// [`EmptyData`]: super::qs::EmptyData async fn list_channels(&self) -> Result; + + /// POST /owner/get_invoice [`GetInvoiceRequest`] -> [`ListChannels`] + async fn get_invoice( + &self, + req: GetInvoiceRequest, + ) -> Result; } diff --git a/common/src/api/error.rs b/common/src/api/error.rs index b60b52276..9756385c8 100644 --- a/common/src/api/error.rs +++ b/common/src/api/error.rs @@ -222,15 +222,17 @@ pub enum NodeErrorKind { #[error("Other reqwest error")] Reqwest, + #[error("Could not proxy request to node")] + Proxy, + #[error("Wrong user pk")] WrongUserPk, #[error("Given node pk doesn't match node pk derived from seed")] WrongNodePk, #[error("Error occurred during provisioning")] Provision, - - #[error("Could not proxy request to node")] - Proxy, + #[error("Error while executing command")] + Command, } /// All variants of errors that the LSP can return. @@ -499,10 +501,11 @@ impl ErrorCodeConvertible for NodeErrorKind { Self::Timeout => 3, Self::Decode => 4, Self::Reqwest => 5, - Self::WrongUserPk => 6, - Self::WrongNodePk => 7, - Self::Provision => 8, - Self::Proxy => 9, + Self::Proxy => 6, + Self::WrongUserPk => 7, + Self::WrongNodePk => 8, + Self::Provision => 9, + Self::Command => 10, } } fn from_code(code: ErrorCode) -> Self { @@ -513,10 +516,11 @@ impl ErrorCodeConvertible for NodeErrorKind { 3 => Self::Timeout, 4 => Self::Decode, 5 => Self::Reqwest, - 6 => Self::WrongUserPk, - 7 => Self::WrongNodePk, - 8 => Self::Provision, - 9 => Self::Proxy, + 6 => Self::Proxy, + 7 => Self::WrongUserPk, + 8 => Self::WrongNodePk, + 9 => Self::Provision, + 10 => Self::Command, _ => Self::Unknown, } } @@ -611,11 +615,12 @@ impl HasStatusCode for NodeApiError { Connect => SERVER_503_SERVICE_UNAVAILABLE, Timeout => SERVER_504_GATEWAY_TIMEOUT, Decode => SERVER_502_BAD_GATEWAY, + Proxy => SERVER_502_BAD_GATEWAY, Reqwest => CLIENT_400_BAD_REQUEST, WrongUserPk => CLIENT_400_BAD_REQUEST, WrongNodePk => CLIENT_400_BAD_REQUEST, Provision => SERVER_500_INTERNAL_SERVER_ERROR, - Proxy => SERVER_502_BAD_GATEWAY, + Command => SERVER_500_INTERNAL_SERVER_ERROR, } } } diff --git a/common/src/api/mod.rs b/common/src/api/mod.rs index 30c2ec7fc..a22fd04e6 100644 --- a/common/src/api/mod.rs +++ b/common/src/api/mod.rs @@ -20,13 +20,12 @@ use crate::{const_ref_cast, ed25519, hexstr_or_bytes}; /// Authentication and User Signup. pub mod auth; +/// Data types used in APIs for top level commands. +pub mod command; /// Traits defining the various REST API interfaces. pub mod def; /// Enums for the API errors returned by the various services. pub mod error; -/// Minor data types defining what is returned by APIs exposed by the node. -/// Bigger / more fundamental LN types should go under [`crate::ln`]. -pub mod node; /// `Port`, `Ports`, `UserPorts`, `RunPorts`, etc. pub mod ports; /// Data types specific to provisioning. @@ -191,8 +190,8 @@ mod test { } #[test] - fn user_pk_bcs() { - roundtrip::bcs_roundtrip_proptest::(); + fn user_pk_json() { + roundtrip::json_roundtrip_proptest::(); } #[test] @@ -201,7 +200,7 @@ mod test { } #[test] - fn node_pk_bcs() { - roundtrip::bcs_roundtrip_proptest::(); + fn node_pk_json() { + roundtrip::json_roundtrip_proptest::(); } } diff --git a/common/src/api/rest.rs b/common/src/api/rest.rs index 07323bcc2..ed05d8ae0 100644 --- a/common/src/api/rest.rs +++ b/common/src/api/rest.rs @@ -27,15 +27,20 @@ pub const POST: Method = Method::POST; pub const DELETE: Method = Method::DELETE; /// A warp helper that converts `Result` into [`Response`]. -/// This function should be used in all warp routes because: +/// This function should be used after all *fallible* warp handlers because: /// /// 1) `RestClient::send_and_deserialize` relies on the HTTP status code to /// determine whether a response should be deserialized as the requested -/// object or as the error type. -/// 2) Using this function voids the need to call reply::json(&resp) in every -/// warp handler or to manually set the error code in every response. +/// object or as the error type. This function handles this automatically and +/// consistently across all Lexe APIs. +/// 2) It saves time; there is no need to call reply::json(&resp) in every warp +/// handler or to manually set the error code in every response. +/// 3) Removing the [`warp::Reply`] serialization step from the warp handlers +/// allows each handler to be independently unit and integration tested. /// -/// This function can be used at the end of a warp filter chain like so: +/// For infallible handlers, use [`into_succ_response`] instead. +/// +/// ## Usage /// /// ```ignore /// let status = warp::path("status") @@ -54,6 +59,24 @@ pub fn into_response>( } } +/// Like [`into_response`], but converts `T` into [`Response`]. This fn +/// should be used for the same reasons that [`into_response`] is used, but +/// applies only to *infallible* handlers. +/// +/// ## Usage +/// +/// ```ignore +/// let node_info = warp::path("node_info") +/// .and(warp::get()) +/// .and(inject::channel_manager(channel_manager.clone())) +/// .and(inject::peer_manager(peer_manager)) +/// .map(command::node_info) +/// .map(into_succ_response); +/// ``` +pub fn into_succ_response(data: T) -> Response { + build_json_response(StatusCode::OK, &data) +} + /// A warp helper for recovering one of our [`api::error`](crate::api::error) /// types if it was emitted from an intermediate filter's rejection and then /// converting into the standard json error response. diff --git a/common/src/client/mod.rs b/common/src/client/mod.rs index 018999a2f..31f18c6ee 100644 --- a/common/src/client/mod.rs +++ b/common/src/client/mod.rs @@ -7,14 +7,15 @@ pub mod tls; use anyhow::Context; use async_trait::async_trait; +use crate::api::command::{GetInvoiceRequest, ListChannels, NodeInfo}; use crate::api::def::{OwnerNodeProvisionApi, OwnerNodeRunApi}; use crate::api::error::NodeApiError; -use crate::api::node::{ListChannels, NodeInfo}; use crate::api::provision::NodeProvisionRequest; use crate::api::qs::EmptyData; use crate::api::rest::{RestClient, GET, POST}; use crate::api::UserPk; use crate::attest; +use crate::ln::invoice::LxInvoice; use crate::rng::Crng; use crate::root_seed::RootSeed; @@ -93,4 +94,14 @@ impl OwnerNodeRunApi for NodeClient { self.rest.request(GET, url, &data).await } + + async fn get_invoice( + &self, + req: GetInvoiceRequest, + ) -> Result { + let run_url = &self.run_url; + let url = format!("{run_url}/owner/get_invoice"); + + self.rest.request(POST, url, &req).await + } } diff --git a/common/src/constants.rs b/common/src/constants.rs index f601bc36b..6acc69fb9 100644 --- a/common/src/constants.rs +++ b/common/src/constants.rs @@ -10,6 +10,8 @@ use crate::ln::peer::ChannelPeer; use crate::rng::SysRng; use crate::root_seed::RootSeed; +pub const DEFAULT_CHANNEL_SIZE: usize = 256; + pub const DEFAULT_BACKEND_URL: &str = "http://127.0.0.1:3030"; pub const DEFAULT_GATEWAY_URL: &str = "http://127.0.0.1:4040"; pub const DEFAULT_RUNNER_URL: &str = "http://127.0.0.1:5050"; diff --git a/common/src/ln/invoice.rs b/common/src/ln/invoice.rs new file mode 100644 index 000000000..67e08f494 --- /dev/null +++ b/common/src/ln/invoice.rs @@ -0,0 +1,122 @@ +use std::fmt::{self, Display}; +use std::str::FromStr; + +use lightning_invoice::Invoice; +use serde_with::{DeserializeFromStr, SerializeDisplay}; + +/// Wraps [`lightning_invoice::Invoice`] to impl [`serde`] Serialize / +/// Deserialize using the LDK's [`FromStr`] / [`Display`] impls. +#[derive(Clone, Debug, Eq, PartialEq, SerializeDisplay, DeserializeFromStr)] +pub struct LxInvoice(pub Invoice); + +impl FromStr for LxInvoice { + type Err = lightning_invoice::ParseOrSemanticError; + fn from_str(s: &str) -> Result { + Invoice::from_str(s).map(Self) + } +} + +impl Display for LxInvoice { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0) + } +} + +// `any::()` requires proptest feature std which doesn't work in SGX +#[cfg(all(test, not(target_env = "sgx")))] +mod test { + use std::time::{Duration, SystemTime, UNIX_EPOCH}; + + use bitcoin::hashes::{sha256, Hash}; + use bitcoin::secp256k1::{self, Secp256k1}; + use lightning::ln::PaymentSecret; + use lightning_invoice::{Currency, InvoiceBuilder}; + use proptest::arbitrary::{any, Arbitrary}; + use proptest::strategy::{BoxedStrategy, Strategy}; + + use super::*; + use crate::cli::Network; + use crate::rng::SmallRng; + use crate::root_seed::RootSeed; + use crate::test_utils::roundtrip; + + impl Arbitrary for LxInvoice { + type Parameters = (); + type Strategy = BoxedStrategy; + fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy { + let currency = any::().prop_map(Currency::from); + let description = any::(); + + let payment_hash = any::<[u8; 32]>() + .prop_map(|buf| sha256::Hash::from_slice(&buf).unwrap()); + + let payment_secret = any::<[u8; 32]>().prop_map(PaymentSecret); + + let timestamp = any::().prop_map(|system_time| { + // TODO: We convert to and from unix seconds because LDK's + // fromstr/display impl fails the roundtrip test if the + // SystemTime passed to InvoiceBuilder::timestamp isn't rounded + // to the nearest second. We can drop the prop_map once + // + // is merged and released. + let unix_secs = system_time + .duration_since(UNIX_EPOCH) + .unwrap_or(Duration::from_secs(0)) + .as_secs(); + UNIX_EPOCH + Duration::from_secs(unix_secs) + }); + + let min_final_cltv_expiry = any::(); + + let secret_key = any::() + .prop_map(|mut rng| { + RootSeed::from_rng(&mut rng).derive_node_key_pair(&mut rng) + }) + .prop_map(secp256k1::SecretKey::from); + ( + currency, + description, + payment_hash, + payment_secret, + timestamp, + min_final_cltv_expiry, + secret_key, + ) + .prop_map( + |( + currency, + description, + payment_hash, + payment_secret, + timestamp, + min_final_cltv_expiry, + secret_key, + )| { + let invoice = InvoiceBuilder::new(currency) + .description(description) + .payment_hash(payment_hash) + .payment_secret(payment_secret) + .timestamp(timestamp) + .min_final_cltv_expiry(min_final_cltv_expiry) + .build_signed(|hash| { + Secp256k1::new() + .sign_ecdsa_recoverable(hash, &secret_key) + }) + .expect("Could not build invoice"); + Self(invoice) + }, + ) + .boxed() + } + } + + #[test] + fn invoice_serde_roundtrip() { + roundtrip::json_roundtrip_proptest::(); + } + + #[test] + fn invoice_fromstr_display_roundtrip() { + roundtrip::fromstr_display_roundtrip_proptest::(); + } +} diff --git a/common/src/ln/mod.rs b/common/src/ln/mod.rs index d6085d955..3c6bdb56a 100644 --- a/common/src/ln/mod.rs +++ b/common/src/ln/mod.rs @@ -11,6 +11,9 @@ //! `ToHex`); otherwise, implement `Display` and `FromStr`, perhaps with a //! serialization round trip test. +/// Channel outpoint, details, counterparty pub mod channel; +/// `LxInvoice` +pub mod invoice; /// `ChannelPeer`. pub mod peer; diff --git a/common/src/test_utils/regtest.rs b/common/src/test_utils/regtest.rs index 3e830d3b2..67ecf7611 100644 --- a/common/src/test_utils/regtest.rs +++ b/common/src/test_utils/regtest.rs @@ -4,6 +4,7 @@ use bitcoin::network::constants::Network; use bitcoin::util::address::{Address, Payload}; use bitcoind::bitcoincore_rpc::RpcApi; use bitcoind::{self, BitcoinD, Conf}; +use tracing::debug; use crate::cli::BitcoindRpcInfo; @@ -50,6 +51,7 @@ impl Regtest { /// Mines 6 blocks. Block rewards are sent to a dummy address. pub async fn mine_6_blocks(&self) { + debug!("Mining 6 blocks"); // `bitcoind.client.generate()` returns a deprecated error, so we use // generate_to_address instead. self.mine_n_blocks_to_address(6, &get_dummy_address()).await; @@ -58,6 +60,7 @@ impl Regtest { /// Mines 101 blocks to the given address. 101 blocks is needed because /// coinbase outputs aren't spendable until after 100 blocks. pub async fn fund_address(&self, address: &Address) { + debug!("Funding address {address} by mining 101 blocks"); self.mine_n_blocks_to_address(101, address).await; } diff --git a/common/src/test_utils/roundtrip.rs b/common/src/test_utils/roundtrip.rs index a22a86708..99bf50e9a 100644 --- a/common/src/test_utils/roundtrip.rs +++ b/common/src/test_utils/roundtrip.rs @@ -8,8 +8,7 @@ use serde::Serialize; use crate::ed25519; -/// Quickly create a roundtrip proptest for some `T` which can be serialized and -/// deserialized. +/// Quickly create a BCS roundtrip proptest. /// /// ```ignore /// bcs_roundtrip_proptest::(); @@ -19,13 +18,30 @@ pub fn bcs_roundtrip_proptest() where T: Arbitrary + PartialEq + Serialize + DeserializeOwned, { - proptest!(|(value: T)| { - let ser_value = bcs::to_bytes(&value).unwrap(); - let value2 = bcs::from_bytes::(&ser_value).unwrap(); - let ser_value2 = bcs::to_bytes(&value2).unwrap(); + proptest!(|(value1: T)| { + let bcs_value1 = bcs::to_bytes(&value1).unwrap(); + let value2 = bcs::from_bytes::(&bcs_value1).unwrap(); + let bcs_value2 = bcs::to_bytes(&value2).unwrap(); + prop_assert_eq!(&value1, &value2); + // Serialized form should be canonical too + prop_assert_eq!(&bcs_value1, &bcs_value2); + }); +} - prop_assert_eq!(&value, &value2); - prop_assert_eq!(&ser_value, &ser_value2); +/// Quickly create a JSON roundtrip proptest. +/// +/// ```ignore +/// json_roundtrip_proptest::(); +/// ``` +#[cfg_attr(target_env = "sgx", allow(dead_code))] +pub fn json_roundtrip_proptest() +where + T: Arbitrary + PartialEq + Serialize + DeserializeOwned, +{ + proptest!(|(value1: T)| { + let json_value1 = serde_json::to_string(&value1).unwrap(); + let value2 = serde_json::from_str::(&json_value1).unwrap(); + prop_assert_eq!(&value1, &value2); }); } diff --git a/lexe-ln/Cargo.toml b/lexe-ln/Cargo.toml index 384b69666..4da2a6ca9 100644 --- a/lexe-ln/Cargo.toml +++ b/lexe-ln/Cargo.toml @@ -57,6 +57,8 @@ secrecy = "0.8" # Serialization / deserialization framework serde = { version = "1", features = ["derive"] } serde_json = "1" +# Easy error definition +thiserror = "1" # Logging tracing = "0.1" tracing-core = "0.1" diff --git a/lexe-ln/src/alias.rs b/lexe-ln/src/alias.rs index b0fd9eacb..fd9d56189 100644 --- a/lexe-ln/src/alias.rs +++ b/lexe-ln/src/alias.rs @@ -16,9 +16,9 @@ use lightning_invoice::utils::DefaultRouter; use lightning_net_tokio::SocketDescriptor; use crate::bitcoind::LexeBitcoind; +use crate::invoice::PaymentInfo; use crate::keys_manager::LexeKeysManager; use crate::logger::LexeTracingLogger; -use crate::types::PaymentInfo; pub type SignerType = InMemorySigner; diff --git a/lexe-ln/src/background_processor.rs b/lexe-ln/src/background_processor.rs index de028f10a..9f30e5238 100644 --- a/lexe-ln/src/background_processor.rs +++ b/lexe-ln/src/background_processor.rs @@ -1,18 +1,19 @@ -use std::ops::Deref; use std::sync::{Arc, Mutex}; use std::time::Duration; use common::shutdown::ShutdownChannel; use common::task::LxTask; -use lightning::util::events::{EventHandler, EventsProvider}; +use lightning::util::events::EventsProvider; use tokio::time::{interval, interval_at, Instant}; use tracing::{debug, error, info, trace, warn}; use crate::alias::{ - LexeChainMonitorType, LexeChannelManagerType, LexeInvoicePayerType, - LexePeerManagerType, P2PGossipSyncType, ProbabilisticScorerType, + LexeChainMonitorType, LexeInvoicePayerType, P2PGossipSyncType, + ProbabilisticScorerType, +}; +use crate::traits::{ + LexeChannelManager, LexeEventHandler, LexePeerManager, LexePersister, }; -use crate::traits::LexePersister; const PROCESS_EVENTS_INTERVAL: Duration = Duration::from_millis(1000); const PEER_MANAGER_PING_INTERVAL: Duration = Duration::from_secs(15); @@ -28,24 +29,21 @@ pub struct LexeBackgroundProcessor {} impl LexeBackgroundProcessor { #[allow(clippy::too_many_arguments)] - pub fn start( - channel_manager: CHANNEL_MANAGER, - peer_manager: Arc>, - persister: PERSISTER, - chain_monitor: Arc>, - event_handler: Arc< - LexeInvoicePayerType, - >, + pub fn start( + channel_manager: CM, + peer_manager: PM, + persister: PS, + chain_monitor: Arc>, + event_handler: Arc>, gossip_sync: Arc, scorer: Arc>, mut shutdown: ShutdownChannel, ) -> LxTask<()> where - PERSISTER: Deref + Send + Sync + 'static, - PERSISTER::Target: LexePersister + Send + Sync, - CHANNEL_MANAGER: Deref>, - CHANNEL_MANAGER: Send + Sync + 'static, - EVENT_HANDLER: EventHandler + Send + Sync + 'static, + CM: LexeChannelManager, + PM: LexePeerManager, + PS: LexePersister, + EH: LexeEventHandler, { LxTask::spawn(async move { let mut process_timer = interval(PROCESS_EVENTS_INTERVAL); diff --git a/lexe-ln/src/channel.rs b/lexe-ln/src/channel.rs index 6f76796e0..9da0579c0 100644 --- a/lexe-ln/src/channel.rs +++ b/lexe-ln/src/channel.rs @@ -1,43 +1,34 @@ -use std::ops::Deref; -use std::sync::Arc; - use anyhow::{anyhow, Context}; use common::ln::peer::ChannelPeer; -use lightning::ln::msgs::ChannelMessageHandler; use lightning::util::config::UserConfig; use tokio::sync::mpsc; use tracing::{error, info}; -use crate::alias::{LexeChannelManagerType, LexePeerManagerType}; use crate::p2p::{self, ChannelPeerUpdate}; -use crate::traits::LexePersister; +use crate::traits::{LexeChannelManager, LexePeerManager, LexePersister}; /// Handles the full logic of opening a channel, including connecting to the /// peer, creating the channel, and persisting the newly created channel. -pub async fn open_channel( - channel_manager: &LexeChannelManagerType, - peer_manager: Arc>, - persister: PERSISTER, +pub async fn open_channel( + channel_manager: CM, + peer_manager: PM, + persister: PS, channel_peer: ChannelPeer, channel_value_sat: u64, channel_peer_tx: &mpsc::Sender, user_config: UserConfig, ) -> anyhow::Result<()> where - CHANNEL_MANAGER: Deref + Send + Sync + 'static, - CHANNEL_MANAGER::Target: ChannelMessageHandler + Send + Sync, - PERSISTER: Deref, - PERSISTER::Target: LexePersister, + CM: LexeChannelManager, + PM: LexePeerManager, + PS: LexePersister, { info!("Opening channel with {}", channel_peer); // Make sure that we're connected to the channel peer - p2p::connect_channel_peer_if_necessary( - peer_manager.clone(), - channel_peer.clone(), - ) - .await - .context("Failed to connect to peer")?; + p2p::connect_channel_peer_if_necessary(peer_manager, channel_peer.clone()) + .await + .context("Failed to connect to peer")?; // Create the channel let user_channel_id = 1; // Not important, just use a default value diff --git a/lexe-ln/src/channel_monitor.rs b/lexe-ln/src/channel_monitor.rs index 5c46784f3..98f250320 100644 --- a/lexe-ln/src/channel_monitor.rs +++ b/lexe-ln/src/channel_monitor.rs @@ -1,5 +1,3 @@ -use std::marker::Send; -use std::ops::Deref; use std::sync::Arc; use common::ln::channel::LxOutPoint; @@ -25,15 +23,11 @@ pub struct LxChannelMonitorUpdate { /// persister, therefore (b) the persister cannot hold the chain monitor, /// therefore there needs to be another means of letting the persister notify /// the channel manager of events. -pub fn spawn_channel_monitor_updated_task( - chain_monitor: Arc>, +pub fn spawn_channel_monitor_updated_task( + chain_monitor: Arc>, mut channel_monitor_updated_rx: mpsc::Receiver, mut shutdown: ShutdownChannel, -) -> LxTask<()> -where - PERSISTER: Deref + Send + Sync + 'static, - PERSISTER::Target: LexePersister + Send, -{ +) -> LxTask<()> { debug!("Starting channel_monitor_updated task"); LxTask::spawn(async move { loop { diff --git a/lexe-ln/src/command.rs b/lexe-ln/src/command.rs new file mode 100644 index 000000000..2a560189e --- /dev/null +++ b/lexe-ln/src/command.rs @@ -0,0 +1,86 @@ +use anyhow::{anyhow, Context}; +use bitcoin::hashes::Hash; +use common::api::command::{GetInvoiceRequest, NodeInfo}; +use common::cli::Network; +use common::ln::invoice::LxInvoice; +use lightning::ln::PaymentHash; +use lightning_invoice::Currency; + +use crate::alias::PaymentInfoStorageType; +use crate::invoice::{HTLCStatus, MillisatAmount, PaymentInfo}; +use crate::keys_manager::LexeKeysManager; +use crate::traits::{LexeChannelManager, LexePeerManager, LexePersister}; + +// TODO(max): Should these fns take e.g. &CM i.e. &Arc +// when possible? It can avoid the atomic operation in some cases, but in +// addition to requiring more indirection from node::command::server, it's a +// weird way to use Arcs. Taking &T doesn't seem possible though without an +// invasive (translated: painful) overhaul of the Lexe trait aliases. + +pub fn node_info(channel_manager: CM, peer_manager: PM) -> NodeInfo +where + CM: LexeChannelManager, + PM: LexePeerManager, + PS: LexePersister, +{ + let node_pk = channel_manager.get_our_node_id(); + + let channels = channel_manager.list_channels(); + let num_channels = channels.len(); + let num_usable_channels = channels.iter().filter(|c| c.is_usable).count(); + + let local_balance_msat = channels.iter().map(|c| c.balance_msat).sum(); + let num_peers = peer_manager.get_peer_node_ids().len(); + + NodeInfo { + node_pk, + num_channels, + num_usable_channels, + local_balance_msat, + num_peers, + } +} + +pub fn get_invoice( + channel_manager: CM, + keys_manager: LexeKeysManager, + inbound_payments: PaymentInfoStorageType, + network: Network, + req: GetInvoiceRequest, +) -> anyhow::Result +where + CM: LexeChannelManager, + PS: LexePersister, +{ + let currency = Currency::from(network); + + // Generate the invoice + let invoice = lightning_invoice::utils::create_invoice_from_channelmanager( + &channel_manager, + keys_manager, + currency, + req.amt_msat, + "lexe-node".to_string(), + req.expiry_secs, + ) + .map(LxInvoice) + .map_err(|e| anyhow!("{e}")) + .context("Failed to create invoice")?; + + // Save the invoice in our inbound payment storage + // TODO(max): Is this really needed? `create_invoice_from_channelmanager` + // docs notes that we don't have to store the payment preimage / secret + // information + let payment_hash = PaymentHash(invoice.0.payment_hash().into_inner()); + inbound_payments.lock().expect("Poisoned").insert( + payment_hash, + PaymentInfo { + preimage: None, + secret: Some(*invoice.0.payment_secret()), + status: HTLCStatus::Pending, + amt_msat: MillisatAmount(req.amt_msat), + }, + ); + + Ok(invoice) +} diff --git a/lexe-ln/src/invoice.rs b/lexe-ln/src/invoice.rs new file mode 100644 index 000000000..9223658ea --- /dev/null +++ b/lexe-ln/src/invoice.rs @@ -0,0 +1,65 @@ +use std::fmt::{self, Display}; + +use lightning::ln::channelmanager::PaymentSendFailure; +use lightning::ln::msgs::LightningError; +use lightning::ln::{PaymentPreimage, PaymentSecret}; +use lightning_invoice::payment::PaymentError; + +pub struct PaymentInfo { + pub preimage: Option, + pub secret: Option, + pub status: HTLCStatus, + pub amt_msat: MillisatAmount, +} + +#[allow(dead_code)] +pub enum HTLCStatus { + Pending, + Succeeded, + Failed, +} + +impl Display for HTLCStatus { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Pending => write!(f, "pending"), + Self::Succeeded => write!(f, "succeeded"), + Self::Failed => write!(f, "failed"), + } + } +} + +// TODO(max): This struct doesn't seem important - perhaps it can be removed? +pub struct MillisatAmount(pub Option); + +impl Display for MillisatAmount { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self.0 { + Some(amt) => write!(f, "{amt}"), + None => write!(f, "unknown"), + } + } +} + +/// A newtype for [`PaymentError`] that impls [`Display`] and [`Error`]. +/// +/// [`Error`]: std::error::Error +#[derive(Debug, thiserror::Error)] +pub enum LxPaymentError { + #[error("Invalid invoice: {0}")] + Invoice(&'static str), + #[error("Failed to find route: {}", .0.err)] + Routing(LightningError), + #[error("Payment send failure: {0:?}")] + Sending(PaymentSendFailure), +} + +impl From for LxPaymentError { + fn from(ldk_err: PaymentError) -> Self { + match ldk_err { + PaymentError::Invoice(inner) => Self::Invoice(inner), + PaymentError::Routing(inner) => Self::Routing(inner), + PaymentError::Sending(inner) => Self::Sending(inner), + } + } +} diff --git a/lexe-ln/src/lib.rs b/lexe-ln/src/lib.rs index d031a4a6f..685aaf97c 100644 --- a/lexe-ln/src/lib.rs +++ b/lexe-ln/src/lib.rs @@ -6,6 +6,8 @@ // Allow e.g. `CHANNEL_MANAGER` in generics to clearly distinguish between // concrete and generic types #![allow(non_camel_case_types)] +// Allow e.g. PS: Deref in generics +#![feature(associated_type_bounds)] /// Type aliases. pub mod alias; @@ -16,6 +18,10 @@ pub mod bitcoind; pub mod channel; /// Channel monitor pub mod channel_monitor; +/// Top level commands that can be initiated by the user. +pub mod command; +/// Types related to invoices. +pub mod invoice; /// Keys manager pub mod keys_manager; /// LDK + SGX compatible logger @@ -26,5 +32,3 @@ pub mod p2p; pub mod sync; /// Traits. pub mod traits; -/// Misc types that temporarily don't fit anywhere else -pub mod types; diff --git a/lexe-ln/src/p2p.rs b/lexe-ln/src/p2p.rs index ee2166ce9..31e8f7829 100644 --- a/lexe-ln/src/p2p.rs +++ b/lexe-ln/src/p2p.rs @@ -1,6 +1,5 @@ use std::collections::HashSet; use std::ops::Deref; -use std::sync::Arc; use std::time::Duration; use anyhow::{bail, Context}; @@ -10,14 +9,12 @@ use common::shutdown::ShutdownChannel; use common::task::LxTask; use futures::future; use futures::stream::{FuturesUnordered, StreamExt}; -use lightning::ln::msgs::ChannelMessageHandler; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::mpsc; use tokio::time; use tracing::{debug, error, info, warn}; -use crate::alias::{LexeChannelManagerType, LexePeerManagerType}; -use crate::traits::LexePersister; +use crate::traits::{LexeChannelManager, LexePeerManager, LexePersister}; const CONNECT_TIMEOUT: Duration = Duration::from_secs(5); const P2P_RECONNECT_INTERVAL: Duration = Duration::from_secs(60); @@ -36,13 +33,14 @@ pub enum ChannelPeerUpdate { Remove(ChannelPeer), } -pub async fn connect_channel_peer_if_necessary( - peer_manager: Arc>, +pub async fn connect_channel_peer_if_necessary( + peer_manager: PM, channel_peer: ChannelPeer, ) -> anyhow::Result<()> where - CHANNEL_MANAGER: Deref + Send + Sync + 'static, - CHANNEL_MANAGER::Target: ChannelMessageHandler + Send + Sync, + CM: LexeChannelManager, + PM: LexePeerManager, + PS: LexePersister, { debug!("Connecting to channel peer {channel_peer}"); @@ -61,13 +59,14 @@ where .context("Failed to connect to peer") } -pub async fn do_connect_peer( - peer_manager: Arc>, +pub async fn do_connect_peer( + peer_manager: PM, channel_peer: ChannelPeer, ) -> anyhow::Result<()> where - CHANNEL_MANAGER: Deref + Send + Sync + 'static, - CHANNEL_MANAGER::Target: ChannelMessageHandler + Send + Sync, + CM: LexeChannelManager, + PM: LexePeerManager, + PS: LexePersister, { let stream = time::timeout(CONNECT_TIMEOUT, TcpStream::connect(channel_peer.addr)) @@ -94,7 +93,7 @@ where // // TODO: Rewrite / replace lightning-net-tokio entirely let connection_closed_fut = lightning_net_tokio::setup_outbound( - peer_manager.clone(), + peer_manager.arc_inner(), channel_peer.node_pk.0, stream, ); @@ -130,18 +129,17 @@ where } /// Spawns a task that regularly reconnects to the channel peers stored in DB. -pub fn spawn_p2p_reconnector( - channel_manager: Arc>, - peer_manager: Arc>, +pub fn spawn_p2p_reconnector( + channel_manager: CM, + peer_manager: PM, initial_channel_peers: Vec, mut channel_peer_rx: mpsc::Receiver, mut shutdown: ShutdownChannel, ) -> LxTask<()> where - CHANNEL_MANAGER: Deref + Send + Sync + 'static, - CHANNEL_MANAGER::Target: ChannelMessageHandler + Send + Sync, - PERSISTER: Deref + Send + Sync + 'static, - PERSISTER::Target: LexePersister + Send + Sync, + CM: LexeChannelManager, + PM: LexePeerManager, + PS: LexePersister, { LxTask::spawn(async move { let mut interval = time::interval(P2P_RECONNECT_INTERVAL); @@ -203,14 +201,15 @@ where /// Given a [`TcpListener`], spawns a task to await on inbound connections, /// handing off the resultant `TcpStream`s for the `PeerManager` to manage. -pub fn spawn_p2p_listener( +pub fn spawn_p2p_listener( listener: TcpListener, - peer_manager: Arc>, + peer_manager: PM, mut shutdown: ShutdownChannel, ) -> LxTask<()> where - CHANNEL_MANAGER: Deref + 'static + Send + Sync, - CHANNEL_MANAGER::Target: ChannelMessageHandler + Send + Sync, + CM: LexeChannelManager, + PM: LexePeerManager, + PS: LexePersister, { LxTask::spawn(async move { let mut child_tasks = FuturesUnordered::new(); @@ -235,7 +234,7 @@ where }; // Spawn a task to await on the connection - let peer_manager_clone = peer_manager.clone(); + let peer_manager_arc_inner = peer_manager.arc_inner(); let child_task = LxTask::spawn(async move { // `setup_inbound()` returns a future that completes // when the connection is closed. The main thread calls @@ -243,7 +242,7 @@ where // a shutdown signal so there is no need to pass in a // `shutdown`s here. let connection_closed = lightning_net_tokio::setup_inbound( - peer_manager_clone, + peer_manager_arc_inner, tcp_stream, ); connection_closed.await; diff --git a/lexe-ln/src/sync.rs b/lexe-ln/src/sync.rs index ee5319d96..78404ba6c 100644 --- a/lexe-ln/src/sync.rs +++ b/lexe-ln/src/sync.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::marker::PhantomData; use std::ops::Deref; use std::sync::Arc; @@ -18,10 +19,9 @@ use tracing::{info, warn}; use crate::alias::{ BlockSourceType, BroadcasterType, ChannelMonitorListenerType, ChannelMonitorType, FeeEstimatorType, LexeChainMonitorType, - LexeChannelManagerType, }; use crate::logger::LexeTracingLogger; -use crate::traits::LexePersister; +use crate::traits::{LexeChannelManager, LexePersister}; /// How often the [`SpvClient`] client polls for an updated chain tip. const CHAIN_TIP_POLL_INTERVAL: Duration = Duration::from_secs(60); @@ -53,30 +53,26 @@ const CHAIN_TIP_POLL_INTERVAL: Duration = Duration::from_secs(60); /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor #[must_use] -pub struct SyncedChainListeners -where - PERSISTER: Deref + Send + Sync + 'static, - PERSISTER::Target: LexePersister + Send, -{ +pub struct SyncedChainListeners { network: Network, block_source: Arc, - channel_manager: Arc>, - chain_listeners: Vec>, + channel_manager: CM, + chain_listeners: Vec>, blockheader_cache: HashMap, chain_tip: ValidatedBlockHeader, } -impl SyncedChainListeners +impl SyncedChainListeners where - PERSISTER: Deref + Send + Sync + 'static, - PERSISTER::Target: LexePersister + Send, + CM: LexeChannelManager, + PS: LexePersister, { #[allow(clippy::too_many_arguments)] pub async fn init_and_sync( network: Network, - channel_manager: Arc>, + channel_manager: CM, channel_manager_blockhash: BlockHash, channel_monitors: Vec<(BlockHash, ChannelMonitorType)>, @@ -115,7 +111,7 @@ where async fn from_existing( network: Network, - channel_manager: Arc>, + channel_manager: CM, channel_manager_blockhash: BlockHash, channel_monitors: Vec<(BlockHash, ChannelMonitorType)>, @@ -163,7 +159,7 @@ where .map(|chain_listener| { (chain_listener.blockhash, &chain_listener.listener) }) - .collect::)>>(); + .collect::)>>(); // Block header cache which is required for the SPV client init later. let mut blockheader_cache = HashMap::new(); @@ -199,7 +195,7 @@ where /// header from our block source. async fn from_new( network: Network, - channel_manager: Arc>, + channel_manager: CM, block_source: Arc, ) -> anyhow::Result { let chain_tip = block_sync_init::validate_best_block_header( @@ -231,7 +227,7 @@ where /// continue monitoring the chain. pub fn feed_chain_monitor_and_spawn_spv( mut self, - chain_monitor: Arc>, + chain_monitor: Arc>, mut shutdown: ShutdownChannel, ) -> anyhow::Result> { for chain_listener in self.chain_listeners { @@ -285,13 +281,9 @@ where } /// Associates a [`LxListener`] with its latest synced [`BlockHash`]. -struct LxChainListener -where - PERSISTER: Deref + Send + Sync + 'static, - PERSISTER::Target: LexePersister + Send, -{ +struct LxChainListener { blockhash: BlockHash, - listener: LxListener, + listener: LxListener, } /// Concretely enumerates the different kinds of `impl Listen`. This enum is @@ -299,20 +291,19 @@ where /// [`lightning_block_sync::init::synchronize_listeners`] (as ldk-sample does) /// causes this sync implementation to not be [`Send`], which is required for /// moving the node into a task spawned during smoketests. -enum LxListener -where - PERSISTER: Deref + Send + Sync + 'static, - PERSISTER::Target: LexePersister + Send, -{ +enum LxListener { ChannelMonitor(ChannelMonitorChainListener), - ChannelManager(Arc>), + ChannelManager(CM), + // Prevents Rust error E0392 + #[allow(dead_code)] + Phantom(PhantomData), } /// This [`Listen`] impl simply delegates to the inner type. -impl Listen for LxListener +impl Listen for LxListener where - PERSISTER: Deref + Send + Sync + 'static, - PERSISTER::Target: LexePersister + Send, + CM: LexeChannelManager, + PS: LexePersister, { fn filtered_block_connected( &self, @@ -327,6 +318,7 @@ where Self::ChannelManager(cm) => { cm.deref().filtered_block_connected(header, txdata, height) } + Self::Phantom(_) => unimplemented!(), } } @@ -338,6 +330,7 @@ where Self::ChannelManager(cm) => { cm.deref().block_disconnected(header, height) } + Self::Phantom(_) => unimplemented!(), } } } diff --git a/lexe-ln/src/traits.rs b/lexe-ln/src/traits.rs index 56253a726..a4a9d5524 100644 --- a/lexe-ln/src/traits.rs +++ b/lexe-ln/src/traits.rs @@ -1,15 +1,30 @@ -use std::sync::Mutex; +use std::ops::Deref; +use std::sync::{Arc, Mutex}; use async_trait::async_trait; use common::ln::peer::ChannelPeer; use lightning::chain::chainmonitor::Persist; +use lightning::util::events::EventHandler; use lightning::util::ser::Writeable; -use crate::alias::{NetworkGraphType, ProbabilisticScorerType, SignerType}; +use crate::alias::{ + LexeChannelManagerType, LexePeerManagerType, NetworkGraphType, + ProbabilisticScorerType, SignerType, +}; -/// Defines all the methods needed in shared Lexe LN logic. +/// A trait for converting from a generic `Deref` to `Arc`. +/// +/// Requiring `ArcInner` (instead of `Deref`) is required if +/// something downstream of the function requires a conversion to [`Arc`]. +// TODO: It should be possible to remove this trait by patching LDK's +// `setup_outbound`, `connect_outbound` to not require Arc +pub trait ArcInner: Deref { + fn arc_inner(&self) -> Arc; +} + +/// Defines all the persister methods needed in shared Lexe LN logic. #[async_trait] -pub trait LexePersister: Persist { +pub trait LexeInnerPersister: Persist { async fn persist_manager( &self, channel_manager: &W, @@ -30,3 +45,55 @@ pub trait LexePersister: Persist { _channel_peer: ChannelPeer, ) -> anyhow::Result<()>; } + +/// A 'trait alias' defining all the requirements of a Lexe persister. +pub trait LexePersister: + Send + Sync + 'static + Deref +{ +} + +impl LexePersister for PS where + PS: Send + Sync + 'static + Deref +{ +} + +/// A 'trait alias' defining all the requirements a Lexe channel manager. +pub trait LexeChannelManager: + Clone + Send + Sync + 'static + Deref> +where + PS: LexePersister, +{ +} + +impl LexeChannelManager for CM +where + CM: Clone + + Send + + Sync + + 'static + + Deref>, + PS: LexePersister, +{ +} + +/// A 'trait alias' defining all the requirements of a Lexe peer manager. +pub trait LexePeerManager: + Clone + Send + Sync + 'static + ArcInner> +where + CM: LexeChannelManager, + PS: LexePersister, +{ +} + +impl LexePeerManager for PM +where + PM: Clone + Send + Sync + 'static + ArcInner>, + CM: LexeChannelManager, + PS: LexePersister, +{ +} + +/// A 'trait alias' defining all the requirements of a Lexe event handler. +pub trait LexeEventHandler: EventHandler + Send + Sync + 'static {} + +impl LexeEventHandler for EH where EH: EventHandler + Send + Sync + 'static {} diff --git a/lexe-ln/src/types.rs b/lexe-ln/src/types.rs deleted file mode 100644 index 28bc7b694..000000000 --- a/lexe-ln/src/types.rs +++ /dev/null @@ -1,28 +0,0 @@ -use std::fmt; - -use lightning::ln::{PaymentPreimage, PaymentSecret}; - -pub struct PaymentInfo { - pub preimage: Option, - pub secret: Option, - pub status: HTLCStatus, - pub amt_msat: MillisatAmount, -} - -#[allow(dead_code)] -pub enum HTLCStatus { - Pending, - Succeeded, - Failed, -} - -pub struct MillisatAmount(pub Option); - -impl fmt::Display for MillisatAmount { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self.0 { - Some(amt) => write!(f, "{}", amt), - None => write!(f, "unknown"), - } - } -} diff --git a/node/src/channel_manager.rs b/node/src/channel_manager.rs index 7327c48d6..196060c50 100644 --- a/node/src/channel_manager.rs +++ b/node/src/channel_manager.rs @@ -45,9 +45,9 @@ const TIME_TO_CONTEST_FRAUDULENT_CLOSES: u16 = 144; // 1 day /// funds in the case of a unilateral close initiated by us. /// /// NOTE: If this value is too low, channel negotiation with the LSP will fail. -const MAXIMUM_TIME_TO_RECLAIM_FUNDS: u16 = 6 * 24 * 3; // three days +const MAXIMUM_TIME_TO_RECLAIM_FUNDS: u16 = 6 * 24 * 4; // four days -pub(crate) const USER_CONFIG: UserConfig = UserConfig { +pub const USER_CONFIG: UserConfig = UserConfig { channel_handshake_config: CHANNEL_HANDSHAKE_CONFIG, channel_handshake_limits: CHANNEL_HANDSHAKE_LIMITS, channel_config: CHANNEL_CONFIG, @@ -76,8 +76,10 @@ const CHANNEL_HANDSHAKE_CONFIG: ChannelHandshakeConfig = max_inbound_htlc_value_in_flight_percent_of_channel: 100, // Attempt to use better privacy. negotiate_scid_privacy: true, - // Do not publically announce our channels - announced_channel: false, + // Publically announce our channels + // TODO: Is there a way to *not* publicly announce our channel, but + // still be able to complete a channel negatiation with the LSP? + announced_channel: true, // The additional 'security' provided by this setting is pointless. // Also, we want to be able to sweep all funds to an address specified // at the time of channel close, instead of committing upfront. @@ -125,7 +127,7 @@ const CHANNEL_CONFIG: ChannelConfig = ChannelConfig { /// An Arc is held internally, so it is fine to clone directly. #[derive(Clone)] -pub(crate) struct NodeChannelManager(Arc); +pub struct NodeChannelManager(Arc); impl Deref for NodeChannelManager { type Target = ChannelManagerType; @@ -135,7 +137,7 @@ impl Deref for NodeChannelManager { } impl NodeChannelManager { - pub(crate) fn arc_inner(&self) -> Arc { + pub fn arc_inner(&self) -> Arc { self.0.clone() } diff --git a/node/src/command/owner.rs b/node/src/command/owner.rs index d2ed77f4f..911901f13 100644 --- a/node/src/command/owner.rs +++ b/node/src/command/owner.rs @@ -1,37 +1,14 @@ use std::sync::Arc; +use common::api::command::ListChannels; use common::api::error::NodeApiError; -use common::api::node::{ListChannels, NodeInfo}; use common::ln::channel::LxChannelDetails; use lexe_ln::alias::NetworkGraphType; use crate::channel_manager::NodeChannelManager; -use crate::peer_manager::NodePeerManager; - -pub(crate) fn node_info( - channel_manager: NodeChannelManager, - peer_manager: NodePeerManager, -) -> Result { - let node_pk = channel_manager.get_our_node_id(); - - let channels = channel_manager.list_channels(); - let num_channels = channels.len(); - let num_usable_channels = channels.iter().filter(|c| c.is_usable).count(); - - let local_balance_msat = channels.iter().map(|c| c.balance_msat).sum(); - let num_peers = peer_manager.get_peer_node_ids().len(); - - let resp = NodeInfo { - node_pk, - num_channels, - num_usable_channels, - local_balance_msat, - num_peers, - }; - - Ok(resp) -} +// TODO(max): This should be moved to lexe_ln::command or duplicated (because it +// is so simple) or removed entirely pub(crate) fn list_channels( channel_manager: NodeChannelManager, _network_graph: Arc, // TODO REPL uses it, do we need it? diff --git a/node/src/command/server/inject.rs b/node/src/command/server/inject.rs index 40cefd85b..42e45651c 100644 --- a/node/src/command/server/inject.rs +++ b/node/src/command/server/inject.rs @@ -5,8 +5,10 @@ use std::convert::Infallible; use std::sync::Arc; use common::api::UserPk; +use common::cli::Network; use common::shutdown::ShutdownChannel; -use lexe_ln::alias::NetworkGraphType; +use lexe_ln::alias::{NetworkGraphType, PaymentInfoStorageType}; +use lexe_ln::keys_manager::LexeKeysManager; use warp::Filter; use crate::channel_manager::NodeChannelManager; @@ -47,3 +49,25 @@ pub(crate) fn network_graph( { warp::any().map(move || network_graph.clone()) } + +/// Injects a keys manager. +pub(crate) fn keys_manager( + keys_manager: LexeKeysManager, +) -> impl Filter + Clone { + warp::any().map(move || keys_manager.clone()) +} + +/// Injects the inbound payments storage. +pub(crate) fn inbound_payments( + inbound_payments: PaymentInfoStorageType, +) -> impl Filter + Clone +{ + warp::any().map(move || inbound_payments.clone()) +} + +/// Injects the [`Network`] the node is running on. +pub(crate) fn network( + network: Network, +) -> impl Filter + Clone { + warp::any().map(move || network) +} diff --git a/node/src/command/server/mod.rs b/node/src/command/server/mod.rs index 969b6a655..3dd87f109 100644 --- a/node/src/command/server/mod.rs +++ b/node/src/command/server/mod.rs @@ -12,10 +12,14 @@ use std::sync::Arc; -use common::api::rest::into_response; +use common::api::command::GetInvoiceRequest; +use common::api::error::{NodeApiError, NodeErrorKind}; +use common::api::rest::{into_response, into_succ_response}; use common::api::UserPk; +use common::cli::Network; use common::shutdown::ShutdownChannel; -use lexe_ln::alias::NetworkGraphType; +use lexe_ln::alias::{NetworkGraphType, PaymentInfoStorageType}; +use lexe_ln::keys_manager::LexeKeysManager; use tokio::sync::mpsc; use tracing::trace; use warp::{Filter, Rejection, Reply}; @@ -26,6 +30,18 @@ use crate::peer_manager::NodePeerManager; mod inject; +/// Converts the `anyhow::Result`s returned by [`lexe_ln::command`] into +/// `Result`s with error kind [`NodeErrorKind::Command`]. +#[allow(dead_code)] // TODO(max): Add get_invoice endpoint and use this fn +fn into_command_api_result( + anyhow_res: anyhow::Result, +) -> Result { + anyhow_res.map_err(|e| NodeApiError { + kind: NodeErrorKind::Command, + msg: format!("{e:#}"), + }) +} + // TODO Add owner authentication /// Implements [`OwnerNodeRunApi`] - endpoints only callable by the node owner. /// @@ -34,6 +50,9 @@ pub(crate) fn owner_routes( channel_manager: NodeChannelManager, peer_manager: NodePeerManager, network_graph: Arc, + keys_manager: LexeKeysManager, + inbound_payments: PaymentInfoStorageType, + network: Network, activity_tx: mpsc::Sender<()>, ) -> impl Filter + Clone { let root = @@ -51,16 +70,26 @@ pub(crate) fn owner_routes( .and(warp::get()) .and(inject::channel_manager(channel_manager.clone())) .and(inject::peer_manager(peer_manager)) - .map(owner::node_info) - .map(into_response); + .map(lexe_ln::command::node_info) + .map(into_succ_response); let list_channels = warp::path("channels") .and(warp::get()) - .and(inject::channel_manager(channel_manager)) + .and(inject::channel_manager(channel_manager.clone())) .and(inject::network_graph(network_graph)) .map(owner::list_channels) .map(into_response); + let get_invoice = warp::path("get_invoice") + .and(warp::post()) + .and(inject::channel_manager(channel_manager)) + .and(inject::keys_manager(keys_manager)) + .and(inject::inbound_payments(inbound_payments)) + .and(inject::network(network)) + .and(warp::body::json::()) + .map(lexe_ln::command::get_invoice) + .map(into_command_api_result) + .map(into_response); - let owner = owner_base.and(node_info.or(list_channels)); + let owner = owner_base.and(node_info.or(list_channels).or(get_invoice)); root.or(owner) } diff --git a/node/src/command/test.rs b/node/src/command/test.rs index cfea9aa96..51307a474 100644 --- a/node/src/command/test.rs +++ b/node/src/command/test.rs @@ -1,5 +1,4 @@ use std::net::SocketAddr; -use std::ops::Deref; use std::str::FromStr; use std::sync::Arc; @@ -11,7 +10,7 @@ use common::rng::SysRng; use common::shutdown::ShutdownChannel; use common::test_utils::regtest::Regtest; use lexe_ln::alias::NetworkGraphType; -use lexe_ln::{channel, logger, p2p}; +use lexe_ln::{channel, command, logger, p2p}; use tokio::sync::mpsc; use crate::channel_manager::{NodeChannelManager, USER_CONFIG}; @@ -121,7 +120,7 @@ async fn node_info() { let args = default_args(); let h = CommandTestHarness::init(args).await; - owner::node_info(h.channel_manager(), h.peer_manager()).unwrap(); + command::node_info(h.channel_manager(), h.peer_manager()); } /// Tests the list_channels handler. @@ -153,32 +152,25 @@ async fn connect_peer() { // Prior to connecting let pre_node_info1 = - owner::node_info(node1.channel_manager(), node1.peer_manager()) - .unwrap(); + command::node_info(node1.channel_manager(), node1.peer_manager()); assert_eq!(pre_node_info1.num_peers, 0); let pre_node_info2 = - owner::node_info(node2.channel_manager(), node2.peer_manager()) - .unwrap(); + command::node_info(node2.channel_manager(), node2.peer_manager()); assert_eq!(pre_node_info2.num_peers, 0); assert!(peer_manager1.get_peer_node_ids().is_empty()); assert!(peer_manager2.get_peer_node_ids().is_empty()); // Connect - p2p::connect_channel_peer_if_necessary( - peer_manager1.arc_inner(), - channel_peer, - ) - .await - .expect("Failed to connect"); + p2p::connect_channel_peer_if_necessary(peer_manager1.clone(), channel_peer) + .await + .expect("Failed to connect"); // After connecting let post_node_info1 = - owner::node_info(node1.channel_manager(), node1.peer_manager()) - .unwrap(); + command::node_info(node1.channel_manager(), node1.peer_manager()); assert_eq!(post_node_info1.num_peers, 1); let post_node_info2 = - owner::node_info(node2.channel_manager(), node2.peer_manager()) - .unwrap(); + command::node_info(node2.channel_manager(), node2.peer_manager()); assert_eq!(post_node_info2.num_peers, 1); assert_eq!(peer_manager1.get_peer_node_ids().len(), 1); assert_eq!(peer_manager2.get_peer_node_ids().len(), 1); @@ -208,20 +200,19 @@ async fn open_channel() { }; let channel_value_sat = 1_000_000; let (channel_peer_tx, _rx) = - mpsc::channel(crate::run::DEFAULT_CHANNEL_SIZE); + mpsc::channel(common::constants::DEFAULT_CHANNEL_SIZE); // Prior to opening let pre_node_info = - owner::node_info(node1.channel_manager(), node1.peer_manager()) - .unwrap(); + command::node_info(node1.channel_manager(), node1.peer_manager()); assert_eq!(pre_node_info.num_channels, 0); // Open the channel println!("Opening channel"); channel::open_channel( - node1.channel_manager().deref(), - node1.peer_manager().arc_inner(), + node1.channel_manager(), + node1.peer_manager(), node1.persister(), channel_peer, channel_value_sat, @@ -233,8 +224,7 @@ async fn open_channel() { // After opening let post_node_info = - owner::node_info(node1.channel_manager(), node1.peer_manager()) - .unwrap(); + command::node_info(node1.channel_manager(), node1.peer_manager()); assert_eq!(post_node_info.num_channels, 1); // Wait for a graceful shutdown to complete before exiting this test (and diff --git a/node/src/event_handler.rs b/node/src/event_handler.rs index 9d44d5bd7..38356ac1b 100644 --- a/node/src/event_handler.rs +++ b/node/src/event_handler.rs @@ -15,14 +15,13 @@ use common::hex; use common::task::LxTask; use lexe_ln::alias::{NetworkGraphType, PaymentInfoStorageType}; use lexe_ln::bitcoind::LexeBitcoind; +use lexe_ln::invoice::{HTLCStatus, MillisatAmount, PaymentInfo}; use lexe_ln::keys_manager::LexeKeysManager; -use lexe_ln::types::{HTLCStatus, MillisatAmount, PaymentInfo}; use lightning::chain::chaininterface::{ BroadcasterInterface, ConfirmationTarget, FeeEstimator, }; use lightning::routing::gossip::NodeId; use lightning::util::events::{Event, EventHandler, PaymentPurpose}; -use tokio::runtime::Handle; use tracing::{debug, error, info}; use crate::channel_manager::NodeChannelManager; @@ -78,21 +77,28 @@ impl EventHandler for NodeEventHandler { /// /// [`EventsProvider`]: lightning::util::events::EventsProvider fn handle_event(&self, event: &Event) { - // FIXME: This trait requires that event handling *finishes* before - // returning from this function. There isn't currently a clean way to do - // this, so we block the *entire* program (yes, sucks) until event - // handling is complete, as an inefficient but safe default. Once LDK - // #1674 (async event handling) is fixed, we can remove the `block_on`. - Handle::current().block_on(async move { + // TODO FIXME XXX: This trait requires that event handling *finishes* + // before returning from this function, but LDK #1674 (async event + // handling) isn't implemented yet. For now, we carry out the event + // handling in a spawned task, but this *must* be fixed eventually. + let channel_manager = self.channel_manager.clone(); + let bitcoind = self.bitcoind.clone(); + let network_graph = self.network_graph.clone(); + let keys_manager = self.keys_manager.clone(); + let inbound_payments = self.inbound_payments.clone(); + let outbound_payments = self.outbound_payments.clone(); + let network = self.network; + let event = event.clone(); + let _ = LxTask::spawn(async move { handle_event( - &self.channel_manager, - &self.bitcoind, - &self.network_graph, - &self.keys_manager, - &self.inbound_payments, - &self.outbound_payments, - self.network, - event, + &channel_manager, + &bitcoind, + &network_graph, + &keys_manager, + &inbound_payments, + &outbound_payments, + network, + &event, ) .await }); diff --git a/node/src/inactivity_timer.rs b/node/src/inactivity_timer.rs index c5fdad27e..2dd24974f 100644 --- a/node/src/inactivity_timer.rs +++ b/node/src/inactivity_timer.rs @@ -104,11 +104,11 @@ impl InactivityTimer { mod tests { use std::future::Future; + use common::constants::DEFAULT_CHANNEL_SIZE; use tokio::sync::mpsc; use tokio::time::{self, Duration}; use super::*; - use crate::run::DEFAULT_CHANNEL_SIZE; /// A simple struct that holds all the materials required to test the /// InactivityTimer. diff --git a/node/src/lib.rs b/node/src/lib.rs index c33959fd6..99bfeab2c 100644 --- a/node/src/lib.rs +++ b/node/src/lib.rs @@ -3,12 +3,12 @@ // Enforce disallowed methods clippy lint #![deny(clippy::disallowed_methods)] +pub mod channel_manager; pub mod cli; pub mod run; mod alias; mod api; -mod channel_manager; mod command; mod event_handler; mod inactivity_timer; diff --git a/node/src/peer_manager.rs b/node/src/peer_manager.rs index 5bf74c1ee..a875cd9d1 100644 --- a/node/src/peer_manager.rs +++ b/node/src/peer_manager.rs @@ -6,6 +6,7 @@ use common::rng::Crng; use lexe_ln::alias::{OnionMessengerType, P2PGossipSyncType}; use lexe_ln::keys_manager::LexeKeysManager; use lexe_ln::logger::LexeTracingLogger; +use lexe_ln::traits::ArcInner; use lightning::chain::keysinterface::{KeysInterface, Recipient}; use lightning::ln::peer_handler::{IgnoringMessageHandler, MessageHandler}; use secrecy::zeroize::Zeroizing; @@ -15,7 +16,7 @@ use crate::channel_manager::NodeChannelManager; /// An Arc is held internally, so it is fine to clone directly. #[derive(Clone)] -pub(crate) struct NodePeerManager(Arc); +pub struct NodePeerManager(Arc); impl Deref for NodePeerManager { type Target = PeerManagerType; @@ -67,8 +68,10 @@ impl NodePeerManager { Self(Arc::new(peer_manager)) } +} - pub(crate) fn arc_inner(&self) -> Arc { +impl ArcInner for NodePeerManager { + fn arc_inner(&self) -> Arc { self.0.clone() } } diff --git a/node/src/persister.rs b/node/src/persister.rs index e58c62114..1b7332fe4 100644 --- a/node/src/persister.rs +++ b/node/src/persister.rs @@ -21,7 +21,7 @@ use lexe_ln::alias::{ use lexe_ln::channel_monitor::LxChannelMonitorUpdate; use lexe_ln::keys_manager::LexeKeysManager; use lexe_ln::logger::LexeTracingLogger; -use lexe_ln::traits::LexePersister; +use lexe_ln::traits::LexeInnerPersister; use lightning::chain::chainmonitor::{MonitorUpdateId, Persist}; use lightning::chain::channelmonitor::ChannelMonitorUpdate; use lightning::chain::transaction::OutPoint; @@ -52,7 +52,7 @@ const IMPORTANT_RETRIES: usize = 3; /// An Arc is held internally, so it is fine to clone and use directly. #[derive(Clone)] // TODO Try removing this -pub(crate) struct NodePersister { +pub struct NodePersister { inner: InnerPersister, } @@ -86,7 +86,7 @@ impl Deref for NodePersister { /// The thing that actually impls the Persist trait. LDK requires that /// NodePersister Derefs to it. #[derive(Clone)] -pub(crate) struct InnerPersister { +pub struct InnerPersister { api: ApiClientType, node_pk: PublicKey, measurement: Measurement, @@ -273,7 +273,7 @@ impl InnerPersister { } #[async_trait] -impl LexePersister for InnerPersister { +impl LexeInnerPersister for InnerPersister { async fn persist_manager( &self, channel_manager: &W, diff --git a/node/src/repl.rs b/node/src/repl.rs index 14c8eb29b..764e9ec7e 100644 --- a/node/src/repl.rs +++ b/node/src/repl.rs @@ -10,19 +10,22 @@ use anyhow::Context; use bitcoin::hashes::sha256::Hash as Sha256; use bitcoin::hashes::Hash; use bitcoin::secp256k1::PublicKey; +use common::api::command::GetInvoiceRequest; use common::cli::Network; use common::hex; use common::ln::peer::ChannelPeer; use lexe_ln::alias::{NetworkGraphType, PaymentInfoStorageType}; -use lexe_ln::channel; +use lexe_ln::invoice::{ + HTLCStatus, LxPaymentError, MillisatAmount, PaymentInfo, +}; use lexe_ln::keys_manager::LexeKeysManager; use lexe_ln::p2p::{self, ChannelPeerUpdate}; -use lexe_ln::types::{HTLCStatus, MillisatAmount, PaymentInfo}; +use lexe_ln::{channel, command}; use lightning::chain::keysinterface::{KeysInterface, Recipient}; use lightning::ln::{PaymentHash, PaymentPreimage}; use lightning::routing::gossip::NodeId; use lightning_invoice::payment::PaymentError; -use lightning_invoice::{utils, Currency, Invoice}; +use lightning_invoice::Invoice; use tokio::sync::mpsc; use tracing::{error, info}; @@ -73,31 +76,17 @@ pub(crate) async fn poll_for_user_input( ) .await; if let Err(e) = res { - // Print the entire error chain on one line - info!("{:#}", e); + error!("{e:#}"); } } "sendpayment" => { - let invoice_str = words.next(); - if invoice_str.is_none() { - info!("ERROR: sendpayment requires an invoice: `sendpayment `"); - continue; - } - - let invoice = match Invoice::from_str(invoice_str.unwrap()) - { - Ok(inv) => inv, - Err(e) => { - info!("ERROR: invalid invoice: {:?}", e); - continue; - } - }; - - send_payment( + if let Err(e) = send_payment( + words, &invoice_payer, - &invoice, outbound_payments.clone(), - ); + ) { + error!("{e:#}"); + } } "keysend" => { let dest_pk = match words.next() { @@ -136,40 +125,15 @@ pub(crate) async fn poll_for_user_input( ); } "getinvoice" => { - let amt_str = words.next(); - if amt_str.is_none() { - info!("ERROR: getinvoice requires an amount in millisatoshis"); - continue; - } - - let amt_msat: Result = amt_str.unwrap().parse(); - if amt_msat.is_err() { - info!("ERROR: getinvoice provided payment amount was not a number"); - continue; - } - let expiry_secs_str = words.next(); - if expiry_secs_str.is_none() { - info!( - "ERROR: getinvoice requires an expiry in seconds" - ); - continue; - } - - let expiry_secs: Result = - expiry_secs_str.unwrap().parse(); - if expiry_secs.is_err() { - info!("ERROR: getinvoice provided expiry was not a number"); - continue; - } - - get_invoice( - amt_msat.unwrap(), + if let Err(e) = get_invoice( + words, inbound_payments.clone(), channel_manager.clone(), keys_manager.clone(), network, - expiry_secs.unwrap(), - ); + ) { + error!("{e:#}"); + } } "connectpeer" => { if let Err(e) = connect_peer(words, &peer_manager).await { @@ -397,14 +361,7 @@ fn list_payments( info!("\t\tamount_millisatoshis: {},", payment_info.amt_msat); info!("\t\tpayment_hash: {},", hex::encode(&payment_hash.0)); info!("\t\thtlc_direction: inbound,"); - info!( - "\t\thtlc_status: {},", - match payment_info.status { - HTLCStatus::Pending => "pending", - HTLCStatus::Succeeded => "succeeded", - HTLCStatus::Failed => "failed", - } - ); + info!("\t\thtlc_status: {},", payment_info.status); info!("\t}},"); } @@ -416,14 +373,7 @@ fn list_payments( info!("\t\tamount_millisatoshis: {},", payment_info.amt_msat); info!("\t\tpayment_hash: {},", hex::encode(&payment_hash.0)); info!("\t\thtlc_direction: outbound,"); - info!( - "\t\thtlc_status: {},", - match payment_info.status { - HTLCStatus::Pending => "pending", - HTLCStatus::Succeeded => "succeeded", - HTLCStatus::Failed => "failed", - } - ); + info!("\t\thtlc_status: {},", payment_info.status); info!("\t}},"); } @@ -441,7 +391,7 @@ async fn connect_peer<'a, I: Iterator>( .context("Could not parse ChannelPeer")?; p2p::connect_channel_peer_if_necessary( - peer_manager.arc_inner(), + peer_manager.clone(), channel_peer.clone(), ) .await @@ -452,51 +402,48 @@ async fn connect_peer<'a, I: Iterator>( Ok(()) } -fn send_payment( +fn send_payment<'a, I: Iterator>( + mut words: I, invoice_payer: &InvoicePayerType, - invoice: &Invoice, - payment_storage: PaymentInfoStorageType, -) { - let status = match invoice_payer.pay_invoice(invoice) { - Ok(_payment_id) => { - let payee_pk = invoice.recover_payee_pub_key(); - let amt_msat = invoice.amount_milli_satoshis().unwrap(); - info!( - "EVENT: initiated sending {} msats to {}", - amt_msat, payee_pk - ); - print!("> "); - HTLCStatus::Pending - } - Err(PaymentError::Invoice(e)) => { - info!("ERROR: invalid invoice: {}", e); - print!("> "); - return; - } - Err(PaymentError::Routing(e)) => { - info!("ERROR: failed to find route: {}", e.err); - print!("> "); - return; - } - Err(PaymentError::Sending(e)) => { - info!("ERROR: failed to send payment: {:?}", e); - print!("> "); - HTLCStatus::Failed - } - }; - let payment_hash = PaymentHash(invoice.payment_hash().into_inner()); - let payment_secret = Some(*invoice.payment_secret()); + outbound_payments: PaymentInfoStorageType, +) -> anyhow::Result<()> { + let invoice = words + .next() + .map(Invoice::from_str) + .context("sendpayment requires an invoice: `sendpayment `")? + .context("Invalid invoice")?; - let mut payments = payment_storage.lock().unwrap(); - payments.insert( + let payment_result = invoice_payer + .pay_invoice(&invoice) + .map_err(LxPaymentError::from); + + // Store the payment in our outbound payments storage as pending or failed + // depending on the payment result + let payment_hash = PaymentHash(invoice.payment_hash().into_inner()); + let preimage = None; + let secret = Some(*invoice.payment_secret()); + let amt_msat = MillisatAmount(invoice.amount_milli_satoshis()); + let status = if payment_result.is_ok() { + HTLCStatus::Pending + } else { + HTLCStatus::Failed + }; + outbound_payments.lock().expect("Poisoned").insert( payment_hash, PaymentInfo { - preimage: None, - secret: payment_secret, + preimage, + secret, + amt_msat, status, - amt_msat: MillisatAmount(invoice.amount_milli_satoshis()), }, ); + + let _payment_id = payment_result.context("Couldn't initiate payment")?; + let amt_msat = MillisatAmount(invoice.amount_milli_satoshis()); + let payee_pk = invoice.recover_payee_pub_key(); + info!("Success: Initiated payment of {amt_msat} msats to {payee_pk}"); + + Ok(()) } fn keysend( @@ -551,44 +498,42 @@ fn keysend( ); } -fn get_invoice( - amt_msat: u64, - payment_storage: PaymentInfoStorageType, +fn get_invoice<'a, I: Iterator>( + mut words: I, + inbound_payments: PaymentInfoStorageType, channel_manager: NodeChannelManager, keys_manager: LexeKeysManager, network: Network, - expiry_secs: u32, -) { - let mut payments = payment_storage.lock().unwrap(); - let currency = Currency::from(network); - let invoice = match utils::create_invoice_from_channelmanager( - &channel_manager, - keys_manager, - currency, - Some(amt_msat), - "lexe-node".to_string(), +) -> anyhow::Result<()> { + let amt_msat_str = words + .next() + .context("getinvoice requires an amount in millisatoshis")?; + let amt_msat = u64::from_str(amt_msat_str) + .context("getinvoice: provided amount was not a number")?; + + let expiry_secs_str = words + .next() + .context("getinvoice requires an expiry in seconds")?; + let expiry_secs = u32::from_str(expiry_secs_str) + .context("getinvoice: provided expiry was not a number")?; + + let req = GetInvoiceRequest { + amt_msat: Some(amt_msat), expiry_secs, - ) { - Ok(inv) => { - info!("SUCCESS: generated invoice: {}", inv); - inv - } - Err(e) => { - info!("ERROR: failed to create invoice: {:?}", e); - return; - } }; - let payment_hash = PaymentHash(invoice.payment_hash().into_inner()); - payments.insert( - payment_hash, - PaymentInfo { - preimage: None, - secret: Some(*invoice.payment_secret()), - status: HTLCStatus::Pending, - amt_msat: MillisatAmount(Some(amt_msat)), - }, - ); + let invoice = command::get_invoice( + channel_manager, + keys_manager, + inbound_payments, + network, + req, + ) + .context("Could not generate invoice")?; + + info!("Success: Generated invoice {invoice}"); + + Ok(()) } /// Parses the channel peer and channel value and opens a channel. @@ -612,8 +557,8 @@ async fn open_channel<'a, I: Iterator>( .context("channel_value_sat must be a number")?; channel::open_channel( - channel_manager.deref(), - peer_manager.arc_inner(), + channel_manager.clone(), + peer_manager.clone(), persister, channel_peer, channel_value_sat, diff --git a/node/src/run.rs b/node/src/run.rs index 75b024edd..4ff521787 100644 --- a/node/src/run.rs +++ b/node/src/run.rs @@ -9,6 +9,7 @@ use common::api::provision::{Node, SealedSeedId}; use common::api::UserPk; use common::cli::RunArgs; use common::client::tls::node_run_tls_config; +use common::constants::DEFAULT_CHANNEL_SIZE; use common::enclave::{ self, MachineId, Measurement, MinCpusvn, MIN_SGX_CPUSVN, }; @@ -19,7 +20,8 @@ use common::task::LxTask; use futures::stream::{FuturesUnordered, StreamExt}; use lexe_ln::alias::{ BlockSourceType, BroadcasterType, ChannelMonitorType, FeeEstimatorType, - NetworkGraphType, P2PGossipSyncType, PaymentInfoStorageType, WalletType, + NetworkGraphType, OnionMessengerType, P2PGossipSyncType, + PaymentInfoStorageType, ProbabilisticScorerType, WalletType, }; use lexe_ln::background_processor::LexeBackgroundProcessor; use lexe_ln::bitcoind::LexeBitcoind; @@ -48,7 +50,6 @@ use crate::persister::NodePersister; use crate::{api, command}; // TODO(max): Move this to common::constants -pub(crate) const DEFAULT_CHANNEL_SIZE: usize = 256; /// The amount of time tasks have to finish after a graceful shutdown was /// initiated before the program exits. const SHUTDOWN_TIME_LIMIT: Duration = Duration::from_secs(15); @@ -62,19 +63,21 @@ pub struct UserNode { tasks: Vec<(&'static str, LxTask<()>)>, // --- Actors --- // - pub(crate) channel_manager: NodeChannelManager, - pub(crate) peer_manager: NodePeerManager, + logger: LexeTracingLogger, + pub persister: NodePersister, + pub wallet: Arc, + block_source: Arc, + fee_estimator: Arc, + broadcaster: Arc, pub(crate) keys_manager: LexeKeysManager, - pub(crate) persister: NodePersister, chain_monitor: Arc, pub(crate) network_graph: Arc, gossip_sync: Arc, + scorer: Arc>, + pub channel_manager: NodeChannelManager, + onion_messenger: Arc, + pub peer_manager: NodePeerManager, invoice_payer: Arc, - pub(crate) wallet: Arc, - block_source: Arc, - broadcaster: Arc, - fee_estimator: Arc, - logger: LexeTracingLogger, inactivity_timer: InactivityTimer, // --- Sync --- // @@ -249,7 +252,7 @@ impl UserNode { &keys_manager, channel_manager.clone(), gossip_sync.clone(), - onion_messenger, + onion_messenger.clone(), logger.clone(), ); @@ -268,7 +271,7 @@ impl UserNode { info!("Listening for LN P2P connections on port {peer_port}"); let p2p_listener_task = p2p::spawn_p2p_listener( listener, - peer_manager.arc_inner(), + peer_manager.clone(), shutdown.clone(), ); tasks.push(("p2p listener", p2p_listener_task)); @@ -290,14 +293,30 @@ impl UserNode { // Spawn the task to regularly reconnect to channel peers let p2p_reconnector_task = p2p::spawn_p2p_reconnector( - channel_manager.arc_inner(), - peer_manager.arc_inner(), + channel_manager.clone(), + peer_manager.clone(), initial_channel_peers, channel_peer_rx, shutdown.clone(), ); tasks.push(("p2p reconnectooor", p2p_reconnector_task)); + // Initialize the event handler + // TODO: persist payment info + let inbound_payments: PaymentInfoStorageType = + Arc::new(Mutex::new(HashMap::new())); + let outbound_payments: PaymentInfoStorageType = + Arc::new(Mutex::new(HashMap::new())); + let event_handler = NodeEventHandler::new( + args.network, + channel_manager.clone(), + keys_manager.clone(), + bitcoind.clone(), + network_graph.clone(), + inbound_payments.clone(), + outbound_payments.clone(), + ); + // Build owner service TLS config for authenticating owner let node_dns = args.node_dns_name.clone(); let owner_tls = node_run_tls_config(rng, &root_seed, vec![node_dns]) @@ -308,6 +327,9 @@ impl UserNode { channel_manager.clone(), peer_manager.clone(), network_graph.clone(), + keys_manager.clone(), + inbound_payments.clone(), + args.network, activity_tx, ); let mut owner_shutdown = shutdown.clone(); @@ -353,22 +375,6 @@ impl UserNode { .await .context("Could not notify runner of ready status")?; - // Initialize the event handler - // TODO: persist payment info - let inbound_payments: PaymentInfoStorageType = - Arc::new(Mutex::new(HashMap::new())); - let outbound_payments: PaymentInfoStorageType = - Arc::new(Mutex::new(HashMap::new())); - let event_handler = NodeEventHandler::new( - args.network, - channel_manager.clone(), - keys_manager.clone(), - bitcoind.clone(), - network_graph.clone(), - inbound_payments.clone(), - outbound_payments.clone(), - ); - // Initialize InvoicePayer let router = DefaultRouter::new( network_graph.clone(), @@ -387,11 +393,12 @@ impl UserNode { // Init background processor let bg_processor_task = LexeBackgroundProcessor::start::< NodeChannelManager, + NodePeerManager, NodePersister, NodeEventHandler, >( channel_manager.clone(), - peer_manager.arc_inner(), + peer_manager.clone(), persister.clone(), chain_monitor.clone(), invoice_payer.clone(), @@ -417,19 +424,21 @@ impl UserNode { tasks, // Actors - channel_manager, - peer_manager, - keys_manager, + logger, persister, - chain_monitor, - network_graph, - gossip_sync, - invoice_payer, wallet, block_source, fee_estimator, broadcaster, - logger, + keys_manager, + chain_monitor, + network_graph, + gossip_sync, + scorer, + channel_manager, + onion_messenger, + peer_manager, + invoice_payer, inactivity_timer, // Sync @@ -452,7 +461,7 @@ impl UserNode { // Sync channel manager and channel monitors to chain tip let synced_chain_listeners = SyncedChainListeners::init_and_sync( self.args.network, - self.channel_manager.arc_inner(), + self.channel_manager.clone(), self.channel_manager_blockhash, self.channel_monitors, self.block_source.clone(), @@ -487,16 +496,16 @@ impl UserNode { if self.args.repl { debug!("Starting REPL"); crate::repl::poll_for_user_input( - self.invoice_payer.clone(), + self.invoice_payer, self.peer_manager.clone(), - self.channel_manager.clone(), - self.keys_manager.clone(), - self.network_graph.clone(), + self.channel_manager, + self.keys_manager, + self.network_graph, self.inbound_payments, self.outbound_payments, - self.persister.clone(), + self.persister, self.args.network, - self.channel_peer_tx.clone(), + self.channel_peer_tx, ) .await; debug!("REPL complete.");