diff --git a/Cargo.lock b/Cargo.lock index c216364..96f8a6c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -210,6 +210,20 @@ dependencies = [ "syn 2.0.96", ] +[[package]] +name = "actix-ws" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3a1fb4f9f2794b0aadaf2ba5f14a6f034c7e86957b458c506a8cb75953f2d99" +dependencies = [ + "actix-codec", + "actix-http", + "actix-web", + "bytestring", + "futures-core", + "tokio", +] + [[package]] name = "addr2line" version = "0.24.2" @@ -721,6 +735,12 @@ dependencies = [ "syn 2.0.96", ] +[[package]] +name = "atomic" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c59bdb34bc650a32731b31bd8f0829cc15d24a708ee31559e0bb34f2bc320cba" + [[package]] name = "atty" version = "0.2.14" @@ -732,6 +752,18 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "auto_enums" +version = "0.8.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c170965892137a3a9aeb000b4524aa3cc022a310e709d848b6e1cdce4ab4781" +dependencies = [ + "derive_utils 0.15.0", + "proc-macro2", + "quote", + "syn 2.0.96", +] + [[package]] name = "autocfg" version = "1.4.0" @@ -1514,6 +1546,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e" +[[package]] +name = "convert_case" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb402b8d4c85569410425650ce3eddc7d698ed96d39a73f941b08fb63082f1e7" +dependencies = [ + "unicode-segmentation", +] + [[package]] name = "cookie" version = "0.16.2" @@ -1861,13 +1902,35 @@ version = "0.99.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f33878137e4dafd7fa914ad4e259e18a4e8e532b9617a2d0150262bf53abfce" dependencies = [ - "convert_case", + "convert_case 0.4.0", "proc-macro2", "quote", "rustc_version", "syn 2.0.96", ] +[[package]] +name = "derive_utils" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "532b4c15dccee12c7044f1fcad956e98410860b22231e44a3b827464797ca7bf" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + +[[package]] +name = "derive_utils" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccfae181bab5ab6c5478b2ccb69e4c68a02f8c3ec72f6616bfec9dbc599d2ee0" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.96", +] + [[package]] name = "dialoguer" version = "0.10.4" @@ -2369,6 +2432,15 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" +[[package]] +name = "fsevent-sys" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2" +dependencies = [ + "libc", +] + [[package]] name = "futures" version = "0.1.31" @@ -2406,6 +2478,17 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" +[[package]] +name = "futures-enum" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3422d14de7903a52e9dbc10ae05a7e14445ec61890100e098754e120b2bd7b1e" +dependencies = [ + "derive_utils 0.11.2", + "quote", + "syn 1.0.109", +] + [[package]] name = "futures-executor" version = "0.3.31" @@ -2592,6 +2675,16 @@ dependencies = [ "spinning_top", ] +[[package]] +name = "graphql-parser" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a818c0d883d7c0801df27be910917750932be279c7bc82dc541b8769425f409" +dependencies = [ + "combine 4.6.7", + "thiserror 1.0.69", +] + [[package]] name = "h2" version = "0.3.26" @@ -2668,7 +2761,7 @@ dependencies = [ "fnv", "hcl-primitives", "vecmap-rs", - "winnow 0.7.1", + "winnow 0.7.2", ] [[package]] @@ -3074,7 +3167,7 @@ dependencies = [ "iana-time-zone-haiku", "js-sys", "wasm-bindgen", - "windows-core", + "windows-core 0.52.0", ] [[package]] @@ -3337,6 +3430,26 @@ version = "2.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b248f5224d1d606005e02c97f5aa4e88eeb230488bcc03bc9ca4d7991399f2b5" +[[package]] +name = "inotify" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f37dccff2791ab604f9babef0ba14fbe0be30bd368dc541e2b08d07c8aa908f3" +dependencies = [ + "bitflags 2.8.0", + "inotify-sys", + "libc", +] + +[[package]] +name = "inotify-sys" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb" +dependencies = [ + "libc", +] + [[package]] name = "inout" version = "0.1.3" @@ -3368,6 +3481,25 @@ dependencies = [ "cfg-if 1.0.0", ] +[[package]] +name = "ipc-channel" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fb8251fb7bcd9ccd3725ed8deae9fe7db8e586495c9eb5b0c52e6233e5e75ea" +dependencies = [ + "bincode", + "crossbeam-channel", + "fnv", + "lazy_static", + "libc", + "mio", + "rand 0.8.5", + "serde", + "tempfile", + "uuid", + "windows", +] + [[package]] name = "ipnet" version = "2.11.0" @@ -3621,6 +3753,76 @@ dependencies = [ "unicase", ] +[[package]] +name = "juniper" +version = "0.16.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "943306315b1a7a03d27af9dfb0c288d9f4da8830c17df4bceb7d50a47da0982c" +dependencies = [ + "async-trait", + "auto_enums", + "fnv", + "futures 0.3.31", + "graphql-parser", + "indexmap 2.7.1", + "juniper_codegen", + "serde", + "smartstring", + "static_assertions", + "void", +] + +[[package]] +name = "juniper_actix" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "457190a70f2a8d7c6d9ae8a5cfc134909c498c335dbb751a147eec7a513faced" +dependencies = [ + "actix-http", + "actix-web", + "actix-ws", + "anyhow", + "futures 0.3.31", + "juniper", + "juniper_graphql_ws", + "serde", + "serde_json", +] + +[[package]] +name = "juniper_codegen" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "760dbe46660494d469023d661e8d268f413b2cb68c999975dcc237407096a693" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.96", + "url 2.5.4", +] + +[[package]] +name = "juniper_graphql_ws" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "709eb11c716072f5c9fcbfa705dd684bd3c070943102f9fc56ccb812a36ba017" +dependencies = [ + "juniper", + "juniper_subscriptions", + "serde", + "tokio", +] + +[[package]] +name = "juniper_subscriptions" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6208a839bd4ca2131924a238311d088d6604ea267c0917903392bad7b70a92c" +dependencies = [ + "futures 0.3.31", + "juniper", +] + [[package]] name = "keccak" version = "0.1.5" @@ -3650,6 +3852,26 @@ dependencies = [ "winapi-build", ] +[[package]] +name = "kqueue" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7447f1ca1b7b563588a205fe93dea8df60fd981423a768bc1c0ded35ed147d0c" +dependencies = [ + "kqueue-sys", + "libc", +] + +[[package]] +name = "kqueue-sys" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed9625ffda8729b85e45cf04090035ac368927b8cebc34898e7c120f52e4838b" +dependencies = [ + "bitflags 1.3.2", + "libc", +] + [[package]] name = "language-tags" version = "0.3.2" @@ -4229,6 +4451,31 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61807f77802ff30975e01f4f071c8ba10c022052f98b3294119f3e615d13e5be" +[[package]] +name = "notify" +version = "8.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2fee8403b3d66ac7b26aee6e40a897d85dc5ce26f44da36b8b73e987cc52e943" +dependencies = [ + "bitflags 2.8.0", + "filetime", + "fsevent-sys", + "inotify", + "kqueue", + "libc", + "log 0.4.25", + "mio", + "notify-types", + "walkdir", + "windows-sys 0.59.0", +] + +[[package]] +name = "notify-types" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e0826a989adedc2a244799e823aece04662b66609d96af8dff7ac6df9a8925d" + [[package]] name = "num" version = "0.2.1" @@ -6033,6 +6280,17 @@ version = "1.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" +[[package]] +name = "smartstring" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fb72c633efbaa2dd666986505016c32c3044395ceaf881518399d2f4127ee29" +dependencies = [ + "autocfg", + "static_assertions", + "version_check", +] + [[package]] name = "smpl_jwt" version = "0.7.1" @@ -8985,30 +9243,38 @@ checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" [[package]] name = "surfpool-cli" -version = "0.1.7" +version = "0.1.8" dependencies = [ "actix-cors", "actix-web", "ansi_term", "anyhow", "atty", + "bincode", "chrono", "clap 4.5.27", "clap_complete 4.5.44", "clap_generate", + "convert_case 0.7.1", "crossbeam", "crossterm", "ctrlc", "dialoguer 0.11.0", "dirs 6.0.0", "hiro-system-kit", + "ipc-channel", + "juniper", + "juniper_actix", + "juniper_graphql_ws", "mime_guess", "mustache", + "notify", "ratatui", "rust-embed", "serde", "serde_json", "surfpool-core", + "surfpool-gql", "tokio", "toml 0.8.19", "txtx-addon-network-svm", @@ -9018,7 +9284,7 @@ dependencies = [ [[package]] name = "surfpool-core" -version = "0.1.7" +version = "0.1.8" dependencies = [ "agave-geyser-plugin-interface", "base64 0.22.1", @@ -9029,6 +9295,7 @@ dependencies = [ "crossbeam-channel", "hiro-system-kit", "indexmap 2.7.1", + "ipc-channel", "itertools 0.14.0", "json5", "jsonrpc-core", @@ -9079,9 +9346,41 @@ dependencies = [ "test-case", "tokio", "tokio-util 0.7.13", + "txtx-addon-network-svm", + "uuid", "zstd", ] +[[package]] +name = "surfpool-gql" +version = "0.1.8" +dependencies = [ + "async-stream", + "convert_case 0.7.1", + "futures 0.3.31", + "futures-enum", + "juniper", + "juniper_codegen", + "surfpool-core", + "tokio", + "uuid", +] + +[[package]] +name = "surfpool-subgraph" +version = "0.1.8" +dependencies = [ + "agave-geyser-plugin-interface", + "bincode", + "ipc-channel", + "serde", + "serde_json", + "solana-program", + "surfpool-core", + "txtx-addon-network-svm", + "uuid", +] + [[package]] name = "symlink" version = "0.1.0" @@ -9832,9 +10131,9 @@ dependencies = [ [[package]] name = "txtx-addon-kit" -version = "0.2.2" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "349567e68d1724bec1eec4ccf766232be8cdfd257f5453063e766b5b978eb90e" +checksum = "75191c35eec95ea5087cbf74ebfdbff8ebf1496083afa0392db3061f580d6fd1" dependencies = [ "crossbeam-channel", "dirs 5.0.1", @@ -9865,9 +10164,9 @@ dependencies = [ [[package]] name = "txtx-addon-network-svm" -version = "0.1.3" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ce3a54125dcded77c3e27cc3891cffbdd12aa1f1340dfc289d6c9d31e1d5353" +checksum = "e0d9ba755577d4b8906d8bac43c82e072ab0aeeb19767aa6fdad8609dec5a62f" dependencies = [ "anchor-lang-idl", "async-recursion", @@ -9888,9 +10187,9 @@ dependencies = [ [[package]] name = "txtx-core" -version = "0.2.2" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db49fab87668ddb4b4af2e0661566c23bbba269d3b36f4cc067dfc2863bfe1a0" +checksum = "4e853497b765475d05d5bccefdf3eeda9fe87685632a415ebb842fd4764b7dd9" dependencies = [ "base64 0.22.1", "better-debug", @@ -10095,6 +10394,7 @@ version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f00cc9702ca12d3c81455259621e676d0f7251cec66a21e98fe2e9a37db93b2a" dependencies = [ + "atomic", "getrandom 0.2.15", "serde", ] @@ -10345,6 +10645,16 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows" +version = "0.58.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd04d41d93c4992d421894c18c8b43496aa748dd4c081bac0dc93eb0489272b6" +dependencies = [ + "windows-core 0.58.0", + "windows-targets 0.52.6", +] + [[package]] name = "windows-core" version = "0.52.0" @@ -10354,6 +10664,41 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-core" +version = "0.58.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ba6d44ec8c2591c134257ce647b7ea6b20335bf6379a27dac5f1641fcf59f99" +dependencies = [ + "windows-implement", + "windows-interface", + "windows-result", + "windows-strings", + "windows-targets 0.52.6", +] + +[[package]] +name = "windows-implement" +version = "0.58.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2bbd5b46c938e506ecbce286b6628a02171d56153ba733b6c741fc627ec9579b" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.96", +] + +[[package]] +name = "windows-interface" +version = "0.58.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "053c4c462dc91d3b1504c6fe5a726dd15e216ba718e84a0e46a88fbe5ded3515" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.96", +] + [[package]] name = "windows-registry" version = "0.2.0" @@ -10543,9 +10888,9 @@ dependencies = [ [[package]] name = "winnow" -version = "0.7.1" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86e376c75f4f43f44db463cf729e0d3acbf954d13e22c51e26e4c264b4ab545f" +checksum = "59690dea168f2198d1a3b0cac23b8063efcd11012f10ae4698f284808c8ef603" dependencies = [ "memchr", ] diff --git a/Cargo.toml b/Cargo.toml index 32bc079..33d6761 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,7 @@ [workspace.package] -version = "0.1.7" +version = "0.1.8" edition = "2021" +description = "Surfpool is the best place to train before surfing Solana." license = "Apache-2.0" readme = "README.md" repository = "https://github.com/txtx/surfpool" @@ -10,10 +11,55 @@ categories = ["cryptography"] [workspace] members = [ "crates/cli", - "crates/core" + "crates/core", + "crates/gql", + "crates/subgraph" ] default-members = ["crates/cli"] resolver = "2" [workspace.dependencies] surfpool-core = { path = "crates/core", default-features = false } +surfpool-gql = { path = "crates/gql", default-features = false } +agave-geyser-plugin-interface = "2.1.10" +solana-sdk = "=2.1.10" +solana-program = "2.1.10" +solana-program-test = "2.1.10" +solana-rpc-client = "2.1.10" +solana-account = "2.1.10" +solana-account-decoder = "2.1.10" +solana-accounts-db = "2.1.10" +solana-client = "2.1.10" +solana-entry = "2.1.10" +solana-faucet = "2.1.10" +solana-feature-set = "2.1.10" +solana-gossip = "2.1.10" +solana-inline-spl = "2.1.10" +solana-ledger = "2.1.10" +solana-metrics = "2.1.10" +solana-perf = "2.1.10" +solana-rpc-client-api = "2.1.10" +solana-rpc = "2.1.10" +solana-runtime = "2.1.10" +solana-runtime-transaction = "2.1.10" +solana-send-transaction-service = "2.1.10" +solana-stake-program = "2.1.10" +solana-storage-bigtable = "2.1.10" +solana-transaction-status = "2.1.10" +solana-vote-program = "2.1.10" +solana-version = "2.1.10" +solana-poh = "2.1.10" +solana-svm = "2.1.10" +solana-program-runtime = "2.1.10" +solana-geyser-plugin-manager = "2.1.10" +solana-streamer = "2.1.10" +ipc-channel = "0.19.0" +serde = "1.0.217" +serde_derive = "1.0.217" # must match the serde version, see https://github.com/serde-rs/serde/issues/2584#issuecomment-1685252251 +serde_json = "1.0.135" +# txtx-core = { path = "../txtx/crates/txtx-core" } +# txtx-addon-network-svm = { package = "txtx-addon-network-svm", path = "../txtx/addons/svm" } +txtx-core = { version = "0.2.3" } +txtx-addon-network-svm = { version = "0.1.4" } +bincode = "1.3.3" +uuid = "1.7.0" diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml index 06287c2..8a08fe4 100644 --- a/crates/cli/Cargo.toml +++ b/crates/cli/Cargo.toml @@ -15,11 +15,10 @@ path = "src/main.rs" [dependencies] surfpool-core = { workspace = true } +surfpool-gql = { workspace = true } # surfpool-core = { version = "0.1" } -# txtx-core = { path = "../../../txtx/crates/txtx-core" } -# txtx-addon-network-svm = { package = "txtx-addon-network-svm", path = "../../../txtx/addons/svm" } -txtx-core = { version = "0.2.2" } -txtx-addon-network-svm = { version = "0.1.3" } +txtx-core = { workspace = true } +txtx-addon-network-svm = { workspace = true } hiro-system-kit = "0.3.1" atty = "0.2.13" ansi_term = "0.12.1" @@ -44,6 +43,13 @@ actix-web = "4" actix-cors = "0.7.0" rust-embed="8.2.0" mime_guess = "2.0.4" +notify = { version = "8.0.0" } +juniper_actix = {version = "0.6.0", features = ["subscriptions"] } +juniper_graphql_ws = { version = "0.4.0", features = ["graphql-transport-ws"] } +juniper = { version = "0.16.1", features = ["schema-language"] } +ipc-channel = { workspace = true } +bincode = { workspace = true } +convert_case = "0.7.1" [features] default = ["cli"] diff --git a/crates/cli/src/cli/mod.rs b/crates/cli/src/cli/mod.rs index b29c5d5..64b85be 100644 --- a/crates/cli/src/cli/mod.rs +++ b/crates/cli/src/cli/mod.rs @@ -106,6 +106,9 @@ pub struct StartSimnet { /// Disable explorer (default: false) #[clap(long = "no-explorer")] pub no_explorer: bool, + /// Watch programs (default: false) + #[clap(long = "watch", action=ArgAction::SetTrue)] + pub watch: bool, /// List of geyser plugins to load #[arg(long = "geyser-plugin-config", short = 'g')] pub plugin_config_path: Vec, diff --git a/crates/cli/src/cli/simnet/mod.rs b/crates/cli/src/cli/simnet/mod.rs index e25ff0a..cd20cbe 100644 --- a/crates/cli/src/cli/simnet/mod.rs +++ b/crates/cli/src/cli/simnet/mod.rs @@ -1,9 +1,9 @@ use std::{ - path::PathBuf, + path::{Path, PathBuf}, str::FromStr, sync::{ atomic::{AtomicBool, Ordering}, - Arc, + mpsc, Arc, }, thread::sleep, time::Duration, @@ -18,15 +18,21 @@ use crate::{ use super::{Context, StartSimnet, DEFAULT_EXPLORER_PORT}; use crossbeam::channel::Select; +use notify::{ + event::{CreateKind, DataChange, ModifyKind}, + Config, Event, EventKind, RecursiveMode, Result as NotifyResult, Watcher, +}; use surfpool_core::{ - simnet::SimnetEvent, solana_sdk::{ pubkey::Pubkey, signature::Keypair, signer::{EncodableKey, Signer}, }, start_simnet, - types::{RpcConfig, RunloopTriggerMode, SimnetConfig, SurfpoolConfig}, + types::{ + RpcConfig, RunloopTriggerMode, SimnetConfig, SimnetEvent, SubgraphConfig, SubgraphEvent, + SurfpoolConfig, + }, }; use txtx_core::kit::{ channel::Receiver, futures::future::join_all, helpers::fs::FileLocation, @@ -79,6 +85,7 @@ pub async fn handle_start_simnet_command(cmd: &StartSimnet, ctx: &Context) -> Re airdrop_addresses, airdrop_token_amount: cmd.airdrop_token_amount, }, + subgraph: SubgraphConfig {}, plugin_config_path: cmd .plugin_config_path .iter() @@ -91,12 +98,34 @@ pub async fn handle_start_simnet_command(cmd: &StartSimnet, ctx: &Context) -> Re // We start the simnet as soon as possible, as it needs to be ready for deployments let (simnet_commands_tx, simnet_commands_rx) = crossbeam::channel::unbounded(); let (simnet_events_tx, simnet_events_rx) = crossbeam::channel::unbounded(); - let ctx_cloned = ctx.clone(); + let (subgraph_commands_tx, subgraph_commands_rx) = crossbeam::channel::unbounded(); + let (subgraph_events_tx, subgraph_events_rx) = crossbeam::channel::unbounded(); + + let network_binding = format!("{}:{}", cmd.network_host, DEFAULT_EXPLORER_PORT); + let explorer_handle = start_server( + network_binding, + config.clone(), + subgraph_events_tx.clone(), + subgraph_commands_rx, + &ctx.clone(), + ) + .await + .map_err(|e| format!("{}", e.to_string()))?; + + let ctx_copy = ctx.clone(); + let simnet_commands_tx_copy = simnet_commands_tx.clone(); + let config_copy = config.clone(); let _handle = hiro_system_kit::thread_named("simnet") .spawn(move || { - let future = start_simnet(config, simnet_events_tx, simnet_commands_rx); + let future = start_simnet( + config_copy, + subgraph_commands_tx, + simnet_events_tx, + simnet_commands_tx_copy, + simnet_commands_rx, + ); if let Err(e) = hiro_system_kit::nestable_block_on(future) { - error!(ctx_cloned.expect_logger(), "{e}"); + error!(ctx_copy.expect_logger(), "{e}"); sleep(Duration::from_millis(500)); std::process::exit(1); } @@ -113,17 +142,6 @@ pub async fn handle_start_simnet_command(cmd: &StartSimnet, ctx: &Context) -> Re } } - let explorer_handle = if !cmd.no_explorer { - let ctx_cloned = ctx.clone(); - let network_binding = format!("{}:{}", cmd.network_host, DEFAULT_EXPLORER_PORT); - let future = start_server(&network_binding, &ctx_cloned) - .await - .map_err(|e| format!("{}", e.to_string()))?; - Some(future) - } else { - None - }; - let mut deploy_progress_rx = vec![]; if !cmd.no_deploy { // Are we in a project directory? @@ -135,6 +153,9 @@ pub async fn handle_start_simnet_command(cmd: &StartSimnet, ctx: &Context) -> Re Ok(deployment) => deployment, }; + let (progress_tx, progress_rx) = crossbeam::channel::unbounded(); + deploy_progress_rx.push(progress_rx); + if let Some((_framework, programs)) = deployment { // Is infrastructure-as-code (IaC) already setup? let base_location = @@ -147,14 +168,13 @@ pub async fn handle_start_simnet_command(cmd: &StartSimnet, ctx: &Context) -> Re } let mut futures = vec![]; - for runbook_id in cmd.runbooks.iter() { - let (progress_tx, progress_rx) = crossbeam::channel::unbounded(); + let runbooks_ids_to_execute = cmd.runbooks.clone(); + for runbook_id in runbooks_ids_to_execute.iter() { futures.push(execute_runbook( runbook_id.clone(), - progress_tx, + progress_tx.clone(), txtx_manifest_location.clone(), )); - deploy_progress_rx.push(progress_rx); } let _handle = hiro_system_kit::thread_named("simnet") @@ -163,12 +183,83 @@ pub async fn handle_start_simnet_command(cmd: &StartSimnet, ctx: &Context) -> Re Ok::<(), String>(()) }) .map_err(|e| format!("{}", e))?; + + if cmd.watch { + let _handle = hiro_system_kit::thread_named("watch filesystem") + .spawn(move || { + let mut target_path = base_location.clone(); + let _ = target_path.append_path("target"); + let _ = target_path.append_path("deploy"); + let (tx, rx) = mpsc::channel::>(); + let mut watcher = + notify::recommended_watcher(tx).map_err(|e| e.to_string())?; + watcher + .watch( + Path::new(&target_path.to_string()), + RecursiveMode::NonRecursive, + ) + .map_err(|e| e.to_string())?; + let _ = watcher.configure( + Config::default() + .with_poll_interval(Duration::from_secs(1)) + .with_compare_contents(true), + ); + for res in rx { + // Disregard any event that would not create or modify a .so file + let mut found_candidates = false; + match res { + Ok(Event { + kind: EventKind::Modify(ModifyKind::Data(DataChange::Content)), + paths, + attrs: _, + }) + | Ok(Event { + kind: EventKind::Create(CreateKind::File), + paths, + attrs: _, + }) => { + for path in paths.iter() { + if path.to_string_lossy().ends_with(".so") { + found_candidates = true; + } + } + } + _ => continue, + } + + if !found_candidates { + continue; + } + + let mut futures = vec![]; + for runbook_id in runbooks_ids_to_execute.iter() { + futures.push(execute_runbook( + runbook_id.clone(), + progress_tx.clone(), + txtx_manifest_location.clone(), + )); + } + let _ = hiro_system_kit::nestable_block_on(join_all(futures)); + } + Ok::<(), String>(()) + }) + .map_err(|e| format!("{}", e)) + .unwrap(); + } } + + // clean-up state }; // Start frontend - kept on main thread if cmd.no_tui { - log_events(simnet_events_rx, cmd.debug, deploy_progress_rx, ctx)?; + log_events( + simnet_events_rx, + subgraph_events_rx, + cmd.debug, + deploy_progress_rx, + ctx, + )?; } else { tui::simnet::start_app( simnet_events_rx, @@ -181,14 +272,13 @@ pub async fn handle_start_simnet_command(cmd: &StartSimnet, ctx: &Context) -> Re ) .map_err(|e| format!("{}", e))?; } - if let Some(explorer_handle) = explorer_handle { - let _ = explorer_handle.stop(true); - } + let _ = explorer_handle.stop(true); Ok(()) } fn log_events( simnet_events_rx: Receiver, + subgraph_events_rx: Receiver, include_debug_logs: bool, deploy_progress_rx: Vec>, ctx: &Context, @@ -209,6 +299,7 @@ fn log_events( let mut handles = vec![]; selector.recv(&simnet_events_rx); + selector.recv(&subgraph_events_rx); if !deployment_completed { for rx in deploy_progress_rx.iter() { @@ -227,6 +318,12 @@ fn log_events( "Account {} retrieved from Mainnet", account ); } + SimnetEvent::PluginLoaded(plugin_name) => { + info!( + ctx.expect_logger(), + "Plugin {} successfully loaded", plugin_name + ); + } SimnetEvent::EpochInfoUpdate(epoch_info) => { info!( ctx.expect_logger(), @@ -287,10 +384,35 @@ fn log_events( break; } }, - i => match oper.recv(&deploy_progress_rx[i - 1]) { + 1 => match oper.recv(&subgraph_events_rx) { + Ok(event) => match event { + SubgraphEvent::ErrorLog(_dt, log) => { + error!(ctx.expect_logger(), "{}", log); + } + SubgraphEvent::InfoLog(_dt, log) => { + info!(ctx.expect_logger(), "{}", log); + } + SubgraphEvent::DebugLog(_dt, log) => { + if include_debug_logs { + info!(ctx.expect_logger(), "{}", log); + } + } + SubgraphEvent::WarnLog(_dt, log) => { + warn!(ctx.expect_logger(), "{}", log); + } + SubgraphEvent::EndpointReady => {} + SubgraphEvent::Shutdown => { + break; + } + }, + Err(_e) => { + break; + } + }, + i => match oper.recv(&deploy_progress_rx[i - 2]) { Ok(event) => match event { BlockEvent::UpdateProgressBarStatus(update) => { - info!( + debug!( ctx.expect_logger(), "{}", format!( @@ -299,6 +421,9 @@ fn log_events( ) ); } + BlockEvent::RunbookCompleted => { + info!(ctx.expect_logger(), "{}", format!("Deployment executed",)); + } _ => {} }, Err(_e) => { diff --git a/crates/cli/src/http/mod.rs b/crates/cli/src/http/mod.rs index 288f6e2..2a687d7 100644 --- a/crates/cli/src/http/mod.rs +++ b/crates/cli/src/http/mod.rs @@ -2,10 +2,24 @@ use crate::cli::Context; use actix_cors::Cors; use actix_web::dev::ServerHandle; use actix_web::http::header::{self}; -use actix_web::web; -use actix_web::Responder; +use actix_web::web::{self, Data}; +use actix_web::HttpRequest; use actix_web::{middleware, App, HttpResponse, HttpServer}; +use actix_web::{Error, Responder}; +use crossbeam::channel::{Receiver, Select, Sender}; +use juniper_actix::{graphiql_handler, graphql_handler, playground_handler, subscriptions}; +use juniper_graphql_ws::ConnectionConfig; +use std::collections::HashMap; use std::error::Error as StdError; +use std::sync::RwLock; +use std::time::Duration; +use surfpool_core::types::{ + Entry, SchemaDatasourceingEvent, SubgraphCommand, SubgraphEvent, SurfpoolConfig, +}; +use surfpool_gql::query::{SchemaDatasource, SchemaDatasourceEntry}; +use surfpool_gql::subscription::EntryData; +use surfpool_gql::{new_dynamic_schema, Context as GqlContext, GqlDynamicSchema as GqlSchema}; +use txtx_core::kit::uuid::Uuid; #[cfg(feature = "explorer")] use rust_embed::RustEmbed; @@ -16,11 +30,110 @@ use rust_embed::RustEmbed; pub struct Asset; pub async fn start_server( - network_binding: &str, + network_binding: String, + _config: SurfpoolConfig, + subgraph_events_tx: Sender, + subgraph_commands_rx: Receiver, _ctx: &Context, ) -> Result> { + let context = GqlContext::new(); + let mut schema_datasource = SchemaDatasource::new(); + let schema = RwLock::new(Some(new_dynamic_schema(schema_datasource.clone()))); + let schema_wrapped = Data::new(schema); + let context_wrapped = Data::new(RwLock::new(context)); + + let gql_context_copy = context_wrapped.clone(); + let gql_schema_copy = schema_wrapped.clone(); + + let _handle = hiro_system_kit::thread_named("subgraph") + .spawn(move || { + let mut observers = vec![]; + loop { + let mut selector = Select::new(); + let mut handles = vec![]; + selector.recv(&subgraph_commands_rx); + for rx in observers.iter() { + handles.push(selector.recv(rx)); + } + let oper = selector.select(); + match oper.index() { + 0 => match oper.recv(&subgraph_commands_rx) { + Err(_e) => { + // todo + } + Ok(cmd) => match cmd { + SubgraphCommand::CreateSubgraph(uuid, config, sender) => { + let mut gql_schema = gql_schema_copy.write().unwrap(); + let subgraph_uuid = uuid; + let subgraph_name = config.subgraph_name.clone(); + let mut schema = + SchemaDatasourceEntry::new(&subgraph_uuid, &subgraph_name); + for fields in config.fields.iter() { + schema.fields.push(fields.display_name.clone()); + } + schema_datasource.add_entry(schema); + gql_schema.replace(new_dynamic_schema(schema_datasource.clone())); + use convert_case::{Case, Casing}; + + let gql_context = gql_context_copy.write().unwrap(); + let mut entries_store = gql_context.entries_store.write().unwrap(); + let mut lookup = gql_context.uuid_lookup.write().unwrap(); + lookup.insert( + subgraph_uuid.clone(), + subgraph_name.to_case(Case::Camel), + ); + entries_store.insert( + subgraph_name.to_case(Case::Camel), + (subgraph_uuid.clone(), vec![]), + ); + let _ = sender.send("http://127.0.0.1:8900/graphql".into()); + } + SubgraphCommand::ObserveSubgraph(subgraph_observer_rx) => { + observers.push(subgraph_observer_rx); + } + SubgraphCommand::Shutdown => { + let _ = subgraph_events_tx.send(SubgraphEvent::Shutdown); + } + }, + }, + i => match oper.recv(&observers[i - 1]) { + Ok(cmd) => match cmd { + SchemaDatasourceingEvent::ApplyEntry( + uuid, + value, /* , request, slot*/ + ) => { + let gql_context = gql_context_copy.write().unwrap(); + let uuid_lookup = gql_context.uuid_lookup.read().unwrap(); + let subgraph_name = uuid_lookup.get(&uuid).unwrap(); + let mut entries_store = gql_context.entries_store.write().unwrap(); + let (_uuid, entries) = + entries_store.get_mut(subgraph_name).unwrap(); + let mut values = HashMap::new(); + values.insert("message".to_string(), value); + let entry_uuid = Uuid::new_v4(); + entries.push(EntryData { + name: subgraph_name.clone(), + entry: Entry { + uuid: entry_uuid, + values, + }, + }); + } + SchemaDatasourceingEvent::Rountrip(_uuid) => {} + }, + Err(_e) => {} + }, + } + } + #[allow(unreachable_code)] + Ok::<(), String>(()) + }) + .map_err(|e| format!("{}", e))?; + let server = HttpServer::new(move || { App::new() + .app_data(schema_wrapped.clone()) + .app_data(context_wrapped.clone()) .wrap( Cors::default() .allow_any_origin() @@ -32,6 +145,14 @@ pub async fn start_server( ) .wrap(middleware::Compress::default()) .wrap(middleware::Logger::default()) + .service( + web::scope("/gql/v1") + .route("/graphql?", web::get().to(get_graphql)) + .route("/graphql", web::post().to(post_graphql)) + .route("/subscriptions", web::get().to(subscriptions)), + ) + .service(web::resource("/playground").route(web::get().to(playground))) + .service(web::resource("/graphiql").route(web::get().to(graphiql))) .service(dist) }) .workers(5) @@ -74,3 +195,60 @@ async fn dist(path: web::Path) -> impl Responder { }; handle_embedded_file(path_str) } + +async fn post_graphql( + req: HttpRequest, + payload: web::Payload, + schema: Data>>, + context: Data>, +) -> Result { + let context = context + .read() + .map_err(|_| actix_web::error::ErrorInternalServerError("Failed to read context"))?; + let schema = schema + .read() + .map_err(|_| actix_web::error::ErrorInternalServerError("Failed to read context"))?; + graphql_handler(&(schema.as_ref().unwrap()), &context, req, payload).await +} + +async fn get_graphql( + req: HttpRequest, + payload: web::Payload, + schema: Data>>, + context: Data>, +) -> Result { + let context = context + .read() + .map_err(|_| actix_web::error::ErrorInternalServerError("Failed to read context"))?; + let schema = schema + .read() + .map_err(|_| actix_web::error::ErrorInternalServerError("Failed to read context"))?; + graphql_handler(&(schema.as_ref().unwrap()), &context, req, payload).await +} + +async fn subscriptions( + req: HttpRequest, + stream: web::Payload, + schema: Data, + context: Data>, +) -> Result { + let context = context + .read() + .map_err(|_| actix_web::error::ErrorInternalServerError("Failed to read context"))?; + let ctx = GqlContext { + uuid_lookup: context.uuid_lookup.clone(), + entries_store: context.entries_store.clone(), + entries_broadcaster: context.entries_broadcaster.clone(), + }; + let config = ConnectionConfig::new(ctx); + let config = config.with_keep_alive_interval(Duration::from_secs(15)); + subscriptions::ws_handler(req, stream, schema.into_inner(), config).await +} + +async fn playground() -> Result { + playground_handler("/gql/v1/graphql", Some("/gql/v1/subscriptions")).await +} + +async fn graphiql() -> Result { + graphiql_handler("/gql/v1/graphql", Some("/gql/v1/subscriptions")).await +} diff --git a/crates/cli/src/runbook/mod.rs b/crates/cli/src/runbook/mod.rs index 5ed4b22..c275e39 100644 --- a/crates/cli/src/runbook/mod.rs +++ b/crates/cli/src/runbook/mod.rs @@ -1,3 +1,4 @@ +use dialoguer::{console::Style, theme::ColorfulTheme, Confirm}; use txtx_addon_network_svm::SvmNetworkAddon; use txtx_core::{ kit::{ @@ -7,8 +8,10 @@ use txtx_core::{ Addon, }, manifest::{file::read_runbooks_from_manifest, WorkspaceManifest}, + runbook::{ConsolidatedChanges, SynthesizedChange}, start_unsupervised_runbook_runloop, std::StdAddon, + types::RunbookSnapshotContext, }; pub fn get_addon_by_namespace(namespace: &str) -> Option> { @@ -34,7 +37,8 @@ pub async fn execute_runbook( let top_level_inputs_map = manifest.get_runbook_inputs(&Some("localnet".into()), &vec![], None)?; - let Some((mut runbook, runbook_sources, _state, _smt)) = runbooks.swap_remove(&runbook_id) + let Some((mut runbook, runbook_sources, _, runbook_state_location)) = + runbooks.swap_remove(&runbook_id) else { return Err(format!("Deployment {} not found", runbook_id)); }; @@ -54,6 +58,109 @@ pub async fn execute_runbook( runbook.enable_full_execution_mode(); + let previous_state_opt = if let Some(state_file_location) = runbook_state_location.clone() { + match state_file_location.load_execution_snapshot( + true, + &runbook.runbook_id.name, + &runbook.top_level_inputs_map.current_top_level_input_name(), + ) { + Ok(snapshot) => Some(snapshot), + Err(e) => { + println!("{} {}", red!("x"), e); + None + } + } + } else { + None + }; + + if let Some(old) = previous_state_opt { + let ctx = RunbookSnapshotContext::new(); + + let execution_context_backups = runbook.backup_execution_contexts(); + let new = runbook.simulate_and_snapshot_flows(&old).await?; + + for flow_context in runbook.flow_contexts.iter() { + if old.flows.get(&flow_context.name).is_none() { + println!( + "{} Previous snapshot not found for flow {}", + yellow!("!"), + flow_context.name + ); + }; + } + + let consolidated_changes = ctx.diff(old, new); + + let Some(consolidated_changes) = display_snapshot_diffing(consolidated_changes) else { + return Ok(()); + }; + + runbook.prepare_flows_for_new_plans( + &consolidated_changes.new_plans_to_add, + execution_context_backups, + ); + + let (actions_to_re_execute, actions_to_execute) = + runbook.prepared_flows_for_updated_plans(&consolidated_changes.plans_to_update); + + let has_actions = actions_to_re_execute + .iter() + .filter(|(_, actions)| !actions.is_empty()) + .count(); + if has_actions > 0 { + println!("The following actions will be re-executed:"); + for (context, actions) in actions_to_re_execute.iter() { + let documentation_missing = black!(""); + println!("\n{}", yellow!(format!("{}", context))); + for (action_name, documentation) in actions.into_iter() { + println!( + "- {}: {}", + action_name, + documentation.as_ref().unwrap_or(&documentation_missing) + ); + } + } + println!("\n"); + } + + let has_actions = actions_to_execute + .iter() + .filter(|(_, actions)| !actions.is_empty()) + .count(); + if has_actions > 0 { + println!( + "The following actions have been added and will be executed for the first time:" + ); + for (context, actions) in actions_to_execute.iter() { + let documentation_missing = black!(""); + println!("\n{}", green!(format!("{}", context))); + for (action_name, documentation) in actions.into_iter() { + println!( + "- {}: {}", + action_name, + documentation.as_ref().unwrap_or(&documentation_missing) + ); + } + } + println!("\n"); + } + + let theme = ColorfulTheme { + values_style: Style::new().green(), + ..ColorfulTheme::default() + }; + + let confirm = Confirm::with_theme(&theme) + .with_prompt("Do you want to continue?") + .interact() + .unwrap(); + + if !confirm { + return Ok(()); + } + } + let res = start_unsupervised_runbook_runloop(&mut runbook, &progress_tx).await; if let Err(diags) = res { println!("{} Execution aborted", red!("x")); @@ -71,5 +178,96 @@ pub async fn execute_runbook( std::process::exit(1); } + match runbook.write_runbook_state(runbook_state_location) { + Ok(Some(location)) => { + println!("\n{} Saved execution state to {}", green!("✓"), location); + } + Ok(None) => {} + Err(e) => { + println!("{} Failed to write runbook state: {}", red!("x"), e); + } + }; + Ok(()) } + +pub fn display_snapshot_diffing( + consolidated_changes: ConsolidatedChanges, +) -> Option { + let synthesized_changes = consolidated_changes.get_synthesized_changes(); + + if synthesized_changes.is_empty() && consolidated_changes.new_plans_to_add.is_empty() { + println!( + "{} Latest snapshot in sync with latest runbook updates\n", + green!("✓") + ); + return None; + } + + if !consolidated_changes.new_plans_to_add.is_empty() { + println!("\n{}", yellow!("New chain to synchronize:")); + println!("{}\n", consolidated_changes.new_plans_to_add.join(", ")); + } + + let has_critical_changes = synthesized_changes + .iter() + .filter(|(c, _)| match c { + SynthesizedChange::Edition(_, _) => true, + SynthesizedChange::FormerFailure(_, _) => false, + SynthesizedChange::Addition(_) => false, + }) + .count(); + if has_critical_changes > 0 { + println!("\n{}\n", yellow!("Changes detected:")); + for (i, (change, _impacted)) in synthesized_changes.iter().enumerate() { + match change { + SynthesizedChange::Edition(change, _) => { + let formatted_change = change + .iter() + .map(|c| { + if c.starts_with("-") { + red!(c) + } else { + green!(c) + } + }) + .collect::>() + .join(""); + println!("{}. The following edits:\n-------------------------\n{}\n-------------------------", i + 1, formatted_change); + println!("will introduce breaking changes.\n\n"); + } + SynthesizedChange::FormerFailure(_construct_to_run, command_name) => { + println!("{}. The action error:\n-------------------------\n{}\n-------------------------", i + 1, command_name); + println!("will be re-executed.\n\n"); + } + SynthesizedChange::Addition(_new_construct_did) => {} + } + } + } + + let unexecuted = synthesized_changes + .iter() + .filter(|(c, _)| match c { + SynthesizedChange::Edition(_, _) => false, + SynthesizedChange::FormerFailure(_, _) => true, + SynthesizedChange::Addition(_) => false, + }) + .count(); + if unexecuted > 0 { + println!("\n{}", yellow!("Runbook Recovery Plan")); + println!("The previous runbook execution was interrupted before completion, causing the following actions to be aborted:"); + + for (_i, (change, _impacted)) in synthesized_changes.iter().enumerate() { + match change { + SynthesizedChange::Edition(_, _) => {} + SynthesizedChange::FormerFailure(_construct_to_run, command_name) => { + println!("- {}", command_name); + } + SynthesizedChange::Addition(_new_construct_did) => {} + } + } + println!("These actions will be re-executed in the next run.\n"); + } + + Some(consolidated_changes) +} diff --git a/crates/cli/src/tui/simnet.rs b/crates/cli/src/tui/simnet.rs index 23c83dd..ca86241 100644 --- a/crates/cli/src/tui/simnet.rs +++ b/crates/cli/src/tui/simnet.rs @@ -12,14 +12,13 @@ use ratatui::{ }; use std::{collections::VecDeque, error::Error, io, time::Duration}; use surfpool_core::{ - simnet::{ClockCommand, SimnetCommand, SimnetEvent}, solana_rpc_client::rpc_client::RpcClient, solana_sdk::{ clock::Clock, commitment_config::CommitmentConfig, epoch_info::EpochInfo, message::Message, pubkey::Pubkey, signature::Keypair, signer::Signer, system_instruction, transaction::Transaction, }, - types::RunloopTriggerMode, + types::{ClockCommand, RunloopTriggerMode, SimnetCommand, SimnetEvent}, }; use txtx_core::kit::types::frontend::BlockEvent; use txtx_core::kit::{channel::Receiver, types::frontend::ProgressBarStatusColor}; @@ -234,6 +233,13 @@ fn run_app(terminal: &mut Terminal, mut app: App) -> io::Result<( format!("Account {} retrieved from Mainnet", account), )); } + SimnetEvent::PluginLoaded(plugin_name) => { + app.events.push_front(( + EventType::Success, + Local::now(), + format!("Plugin {} successfully loaded", plugin_name), + )); + } SimnetEvent::EpochInfoUpdate(epoch_info) => { app.epoch_info = epoch_info; app.events.push_front(( diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index ba22ed1..5e8d01b 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -15,12 +15,12 @@ path = "src/lib.rs" [dependencies] base64 = "0.22.1" -bincode = "1.3.3" +bincode = { workspace = true } crossbeam-channel = "0.5.14" log = "0.4.22" -serde = "1.0.217" -serde_derive = "1.0.217" # must match the serde version, see https://github.com/serde-rs/serde/issues/2584#issuecomment-1685252251 -serde_json = "1.0.135" +serde = { workspace = true } +serde_derive = { workspace = true } # must match the serde version, see https://github.com/serde-rs/serde/issues/2584#issuecomment-1685252251 +serde_json = { workspace = true } itertools = "0.14.0" symlink = "0.1.0" tokio = "1.43.0" @@ -38,42 +38,45 @@ regex = "1.11.1" litesvm = { version = "0.5.0", features = ["nodejs-internal"] } # litesvm = { path = "../../../litesvm/crates/litesvm" } crossbeam = "0.8.4" -solana-sdk = "=2.1.10" -solana-program-test = "2.1.10" -solana-rpc-client = "2.1.10" -solana-account = "2.1.10" -solana-account-decoder = "2.1.10" -solana-accounts-db = "2.1.10" -solana-client = "2.1.10" -solana-entry = "2.1.10" -solana-faucet = "2.1.10" -solana-feature-set = "2.1.10" -solana-gossip = "2.1.10" -solana-inline-spl = "2.1.10" -solana-ledger = "2.1.10" -solana-metrics = "2.1.10" -solana-perf = "2.1.10" -solana-rpc-client-api = "2.1.10" -solana-rpc = "2.1.10" -solana-runtime = "2.1.10" -solana-runtime-transaction = "2.1.10" -solana-send-transaction-service = "2.1.10" -solana-stake-program = "2.1.10" -solana-storage-bigtable = "2.1.10" -solana-transaction-status = "2.1.10" -solana-vote-program = "2.1.10" -solana-version = "2.1.10" -solana-poh = "2.1.10" -solana-svm = "2.1.10" -solana-program-runtime = "2.1.10" +agave-geyser-plugin-interface = { workspace = true } +solana-sdk = { workspace = true } +solana-program-test = { workspace = true } +solana-rpc-client = { workspace = true } +solana-account = { workspace = true } +solana-account-decoder = { workspace = true } +solana-accounts-db = { workspace = true } +solana-client = { workspace = true } +solana-entry = { workspace = true } +solana-faucet = { workspace = true } +solana-feature-set = { workspace = true } +solana-gossip = { workspace = true } +solana-inline-spl = { workspace = true } +solana-ledger = { workspace = true } +solana-metrics = { workspace = true } +solana-perf = { workspace = true } +solana-rpc-client-api = { workspace = true } +solana-rpc = { workspace = true } +solana-runtime = { workspace = true } +solana-runtime-transaction = { workspace = true } +solana-send-transaction-service = { workspace = true } +solana-stake-program = { workspace = true } +solana-storage-bigtable = { workspace = true } +solana-transaction-status = { workspace = true } +solana-vote-program = { workspace = true } +solana-version = { workspace = true } +solana-poh = { workspace = true } +solana-svm = { workspace = true } +solana-program-runtime = { workspace = true } +solana-geyser-plugin-manager = { workspace = true } +solana-streamer = { workspace = true } spl-token-2022 = "6.0.0" spl-token = "7.0.0" -solana-streamer = "2.1.10" zstd = "0.13.2" -agave-geyser-plugin-interface = "2.1.10" -solana-geyser-plugin-manager = "2.1.10" libloading = "0.7.4" json5 = "0.4.1" +txtx-addon-network-svm = { workspace = true } +uuid = "1.7.0" +ipc-channel = { workspace = true } [dev-dependencies] -test-case = "^3.3.1" \ No newline at end of file +test-case = "^3.3.1" diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 160ad9c..ab4ff5a 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -12,22 +12,32 @@ extern crate serde_json; pub mod rpc; pub mod simnet; -mod test_helpers; pub mod types; use crossbeam_channel::{Receiver, Sender}; pub use jsonrpc_core; pub use jsonrpc_http_server; pub use litesvm; -use simnet::{SimnetCommand, SimnetEvent}; pub use solana_rpc_client; pub use solana_sdk; -use types::SurfpoolConfig; +use types::{SimnetCommand, SimnetEvent, SubgraphCommand, SurfpoolConfig}; pub async fn start_simnet( config: SurfpoolConfig, + subgraph_commands_tx: Sender, simnet_events_tx: Sender, + simnet_commands_tx: Sender, simnet_commands_rx: Receiver, ) -> Result<(), Box> { - simnet::start(config, simnet_events_tx, simnet_commands_rx).await + simnet::start( + config, + subgraph_commands_tx, + simnet_events_tx, + simnet_commands_tx, + simnet_commands_rx, + ) + .await } + +#[cfg(test)] +mod test_helpers; diff --git a/crates/core/src/rpc/admin.rs b/crates/core/src/rpc/admin.rs new file mode 100644 index 0000000..ca664b0 --- /dev/null +++ b/crates/core/src/rpc/admin.rs @@ -0,0 +1,247 @@ +use jsonrpc_core::BoxFuture; +use jsonrpc_core::Result; +use jsonrpc_derive::rpc; +use solana_client::rpc_config::RpcAccountIndex; +use solana_sdk::pubkey::Pubkey; +use std::collections::HashMap; +use std::net::SocketAddr; +use std::time::Duration; +use std::time::SystemTime; +use txtx_addon_network_svm::codec::subgraph::PluginConfig; +use uuid::Uuid; + +use super::RunloopContext; +use crate::types::PluginManagerCommand; + +#[rpc] +pub trait AdminRpc { + type Metadata; + + #[rpc(meta, name = "exit")] + fn exit(&self, meta: Self::Metadata) -> Result<()>; + + #[rpc(meta, name = "reloadPlugin")] + fn reload_plugin( + &self, + meta: Self::Metadata, + name: String, + config_file: String, + ) -> BoxFuture>; + + #[rpc(meta, name = "unloadPlugin")] + fn unload_plugin(&self, meta: Self::Metadata, name: String) -> BoxFuture>; + + #[rpc(meta, name = "loadPlugin")] + fn load_plugin(&self, meta: Self::Metadata, config_file: String) -> BoxFuture>; + + #[rpc(meta, name = "listPlugins")] + fn list_plugins(&self, meta: Self::Metadata) -> BoxFuture>>; + + #[rpc(meta, name = "rpcAddress")] + fn rpc_addr(&self, meta: Self::Metadata) -> Result>; + + #[rpc(name = "setLogFilter")] + fn set_log_filter(&self, filter: String) -> Result<()>; + + #[rpc(meta, name = "startTime")] + fn start_time(&self, meta: Self::Metadata) -> Result; + + // #[rpc(meta, name = "startProgress")] + // fn start_progress(&self, meta: Self::Metadata) -> Result; + + #[rpc(meta, name = "addAuthorizedVoter")] + fn add_authorized_voter(&self, meta: Self::Metadata, keypair_file: String) -> Result<()>; + + #[rpc(meta, name = "addAuthorizedVoterFromBytes")] + fn add_authorized_voter_from_bytes(&self, meta: Self::Metadata, keypair: Vec) + -> Result<()>; + + #[rpc(meta, name = "removeAllAuthorizedVoters")] + fn remove_all_authorized_voters(&self, meta: Self::Metadata) -> Result<()>; + + #[rpc(meta, name = "setIdentity")] + fn set_identity( + &self, + meta: Self::Metadata, + keypair_file: String, + require_tower: bool, + ) -> Result<()>; + + #[rpc(meta, name = "setIdentityFromBytes")] + fn set_identity_from_bytes( + &self, + meta: Self::Metadata, + identity_keypair: Vec, + require_tower: bool, + ) -> Result<()>; + + #[rpc(meta, name = "setStakedNodesOverrides")] + fn set_staked_nodes_overrides(&self, meta: Self::Metadata, path: String) -> Result<()>; + + // #[rpc(meta, name = "contactInfo")] + // fn contact_info(&self, meta: Self::Metadata) -> Result; + + #[rpc(meta, name = "repairShredFromPeer")] + fn repair_shred_from_peer( + &self, + meta: Self::Metadata, + pubkey: Option, + slot: u64, + shred_index: u64, + ) -> Result<()>; + + // #[rpc(meta, name = "repairWhitelist")] + // fn repair_whitelist(&self, meta: Self::Metadata) -> Result; + + #[rpc(meta, name = "setRepairWhitelist")] + fn set_repair_whitelist(&self, meta: Self::Metadata, whitelist: Vec) -> Result<()>; + + #[rpc(meta, name = "getSecondaryIndexKeySize")] + fn get_secondary_index_key_size( + &self, + meta: Self::Metadata, + pubkey_str: String, + ) -> Result>; + + #[rpc(meta, name = "setPublicTpuAddress")] + fn set_public_tpu_address( + &self, + meta: Self::Metadata, + public_tpu_addr: SocketAddr, + ) -> Result<()>; + + #[rpc(meta, name = "setPublicTpuForwardsAddress")] + fn set_public_tpu_forwards_address( + &self, + meta: Self::Metadata, + public_tpu_forwards_addr: SocketAddr, + ) -> Result<()>; +} + +pub struct SurfpoolAdminRpc; +impl AdminRpc for SurfpoolAdminRpc { + type Metadata = Option; + + fn exit(&self, _meta: Self::Metadata) -> Result<()> { + unimplemented!() + } + + fn reload_plugin( + &self, + _meta: Self::Metadata, + _name: String, + _config_file: String, + ) -> BoxFuture> { + unimplemented!() + } + + fn unload_plugin(&self, _meta: Self::Metadata, _name: String) -> BoxFuture> { + unimplemented!() + } + + fn load_plugin(&self, meta: Self::Metadata, config_file: String) -> BoxFuture> { + let config = serde_json::from_str::(&config_file).unwrap(); + let ctx = meta.unwrap(); + let uuid = Uuid::new_v4(); + let (tx, rx) = crossbeam_channel::bounded(1); + let _ = ctx + .plugin_manager_commands_tx + .send(PluginManagerCommand::LoadConfig(uuid, config, tx)); + let Ok(endpoint_url) = rx.recv_timeout(Duration::from_secs(10)) else { + unimplemented!() + }; + Box::pin(async move { Ok(endpoint_url) }) + } + + fn list_plugins(&self, _meta: Self::Metadata) -> BoxFuture>> { + unimplemented!() + } + + fn rpc_addr(&self, _meta: Self::Metadata) -> Result> { + unimplemented!() + } + + fn set_log_filter(&self, _filter: String) -> Result<()> { + unimplemented!() + } + + fn start_time(&self, _meta: Self::Metadata) -> Result { + unimplemented!() + } + + fn add_authorized_voter(&self, _meta: Self::Metadata, _keypair_file: String) -> Result<()> { + unimplemented!() + } + + fn add_authorized_voter_from_bytes( + &self, + _meta: Self::Metadata, + _keypair: Vec, + ) -> Result<()> { + unimplemented!() + } + + fn remove_all_authorized_voters(&self, _meta: Self::Metadata) -> Result<()> { + unimplemented!() + } + + fn set_identity( + &self, + _meta: Self::Metadata, + _keypair_file: String, + _require_tower: bool, + ) -> Result<()> { + unimplemented!() + } + + fn set_identity_from_bytes( + &self, + _meta: Self::Metadata, + _identity_keypair: Vec, + _require_tower: bool, + ) -> Result<()> { + unimplemented!() + } + + fn set_staked_nodes_overrides(&self, _meta: Self::Metadata, _path: String) -> Result<()> { + unimplemented!() + } + + fn repair_shred_from_peer( + &self, + _meta: Self::Metadata, + _pubkey: Option, + _slot: u64, + _shred_index: u64, + ) -> Result<()> { + unimplemented!() + } + + fn set_repair_whitelist(&self, _meta: Self::Metadata, _whitelist: Vec) -> Result<()> { + unimplemented!() + } + + fn get_secondary_index_key_size( + &self, + _meta: Self::Metadata, + _pubkey_str: String, + ) -> Result> { + unimplemented!() + } + + fn set_public_tpu_address( + &self, + _meta: Self::Metadata, + _public_tpu_addr: SocketAddr, + ) -> Result<()> { + unimplemented!() + } + + fn set_public_tpu_forwards_address( + &self, + _meta: Self::Metadata, + _public_tpu_forwards_addr: SocketAddr, + ) -> Result<()> { + unimplemented!() + } +} diff --git a/crates/core/src/rpc/full.rs b/crates/core/src/rpc/full.rs index d3d3e77..bbd83dd 100644 --- a/crates/core/src/rpc/full.rs +++ b/crates/core/src/rpc/full.rs @@ -1,6 +1,7 @@ -use crate::simnet::{EntryStatus, TransactionWithStatusMeta}; - use super::utils::{decode_and_deserialize, transform_tx_metadata_to_ui_accounts}; +use crate::simnet::{EntryStatus, TransactionWithStatusMeta}; +use crate::types::TransactionStatusEvent; +use itertools::Itertools; use jsonrpc_core::futures::future::{self, join_all}; use jsonrpc_core::BoxFuture; use jsonrpc_core::{Error, Result}; @@ -374,20 +375,49 @@ impl Full for SurfpoolFullRpc { let signature = signatures[0]; let (status_update_tx, status_uptate_rx) = crossbeam_channel::bounded(1); let _ = ctx - .mempool_tx - .send((ctx.id.clone(), unsanitized_tx, status_update_tx)); + .simnet_commands_tx + .send(SimnetCommand::TransactionReceived( + ctx.id.clone(), + unsanitized_tx, + status_update_tx, + config, + )); loop { match (status_uptate_rx.recv(), config.preflight_commitment) { + (Ok(TransactionStatusEvent::SimulationFailure(e)), _) => { + return Err(Error { + data: None, + message: format!( + "Transaction simulation failed: {}: {} log messages:\n{}", + e.0.to_string(), + e.1.logs.len(), + e.1.logs.iter().map(|l| l.to_string()).join("\n") + ), + code: jsonrpc_core::ErrorCode::ServerError(-32002), + }) + } + (Ok(TransactionStatusEvent::ExecutionFailure(e)), _) => { + return Err(Error { + data: None, + message: format!( + "Transaction execution failed: {}: {} log messages:\n{}", + e.0.to_string(), + e.1.logs.len(), + e.1.logs.iter().map(|l| l.to_string()).join("\n") + ), + code: jsonrpc_core::ErrorCode::ServerError(-32002), + }) + } ( - Ok(TransactionConfirmationStatus::Processed), + Ok(TransactionStatusEvent::Success(TransactionConfirmationStatus::Processed)), Some(CommitmentLevel::Processed), ) => break, ( - Ok(TransactionConfirmationStatus::Confirmed), + Ok(TransactionStatusEvent::Success(TransactionConfirmationStatus::Confirmed)), None | Some(CommitmentLevel::Confirmed), ) => break, ( - Ok(TransactionConfirmationStatus::Finalized), + Ok(TransactionStatusEvent::Success(TransactionConfirmationStatus::Finalized)), Some(CommitmentLevel::Finalized), ) => break, (Err(_), _) => break, @@ -736,9 +766,8 @@ mod tests { transaction::{Legacy, TransactionVersion}, }; use solana_transaction_status::{ - option_serializer::OptionSerializer, EncodedTransaction, EncodedTransactionWithStatusMeta, - UiCompiledInstruction, UiMessage, UiRawMessage, UiReturnDataEncoding, UiTransaction, - UiTransactionReturnData, UiTransactionStatusMeta, + EncodedTransaction, EncodedTransactionWithStatusMeta, UiCompiledInstruction, UiMessage, + UiRawMessage, UiTransaction, }; use test_case::test_case; @@ -908,12 +937,14 @@ mod tests { .unwrap(); match mempool_rx.recv() { - Ok((_hash, _tx, status_tx)) => { + Ok(SimnetCommand::TransactionReceived(_, _, status_tx, _)) => { status_tx - .send(TransactionConfirmationStatus::Confirmed) + .send(TransactionStatusEvent::Success( + TransactionConfirmationStatus::Confirmed, + )) .unwrap(); } - Err(_) => panic!("failed to receive transaction from mempool"), + _ => panic!("failed to receive transaction from mempool"), } assert_eq!( diff --git a/crates/core/src/rpc/mod.rs b/crates/core/src/rpc/mod.rs index c675bdd..97cc422 100644 --- a/crates/core/src/rpc/mod.rs +++ b/crates/core/src/rpc/mod.rs @@ -5,11 +5,11 @@ use jsonrpc_core::{ futures::future::Either, middleware, FutureResponse, Metadata, Middleware, Request, Response, }; use solana_client::rpc_custom_error::RpcCustomError; -use solana_sdk::{blake3::Hash, clock::Slot, transaction::VersionedTransaction}; -use solana_transaction_status::TransactionConfirmationStatus; +use solana_sdk::{blake3::Hash, clock::Slot}; pub mod accounts_data; pub mod accounts_scan; +pub mod admin; pub mod bank_data; pub mod full; pub mod minimal; @@ -28,11 +28,8 @@ pub struct SurfpoolRpc; pub struct RunloopContext { pub id: Hash, pub state: Arc>, - pub mempool_tx: Sender<( - Hash, - VersionedTransaction, - Sender, - )>, + pub simnet_commands_tx: Sender, + pub plugin_manager_commands_tx: Sender, } trait State { @@ -76,18 +73,18 @@ impl State for Option { impl Metadata for RunloopContext {} -use crate::{simnet::GlobalState, types::RpcConfig}; +use crate::{ + simnet::GlobalState, + types::{PluginManagerCommand, RpcConfig, SimnetCommand}, +}; use jsonrpc_core::futures::FutureExt; use std::future::Future; #[derive(Clone)] pub struct SurfpoolMiddleware { pub context: Arc>, - pub mempool_tx: Sender<( - Hash, - VersionedTransaction, - Sender, - )>, + pub simnet_commands_tx: Sender, + pub plugin_manager_commands_tx: Sender, pub config: RpcConfig, } @@ -108,7 +105,8 @@ impl Middleware> for SurfpoolMiddleware { let meta = Some(RunloopContext { id: Hash::new_unique(), state: self.context.clone(), - mempool_tx: self.mempool_tx.clone(), + simnet_commands_tx: self.simnet_commands_tx.clone(), + plugin_manager_commands_tx: self.plugin_manager_commands_tx.clone(), }); // println!("Processing request {:?}", request); Either::Left(Box::pin(next(request, meta).map(move |res| { diff --git a/crates/core/src/simnet/mod.rs b/crates/core/src/simnet/mod.rs index 5e933a5..fdf962a 100644 --- a/crates/core/src/simnet/mod.rs +++ b/crates/core/src/simnet/mod.rs @@ -2,9 +2,13 @@ use agave_geyser_plugin_interface::geyser_plugin_interface::{ GeyserPlugin, ReplicaTransactionInfoV2, ReplicaTransactionInfoVersions, }; use base64::prelude::{Engine, BASE64_STANDARD}; -use chrono::{DateTime, Local, Utc}; +use chrono::{Local, Utc}; use crossbeam::select; use crossbeam_channel::{unbounded, Receiver, Sender}; +use ipc_channel::{ + ipc::{IpcOneShotServer, IpcReceiver}, + router::RouterProxy, +}; use jsonrpc_core::MetaIoHandler; use jsonrpc_http_server::{DomainsValidation, ServerBuilder}; use litesvm::{types::TransactionMetadata, LiteSVM}; @@ -16,12 +20,8 @@ use solana_sdk::{ clock::Clock, epoch_info::EpochInfo, message::v0::LoadedAddresses, - pubkey::Pubkey, signature::Signature, - transaction::{ - SanitizedTransaction, Transaction, TransactionError, TransactionVersion, - VersionedTransaction, - }, + transaction::{SanitizedTransaction, Transaction, TransactionError, TransactionVersion}, }; use solana_transaction_status::{ option_serializer::OptionSerializer, EncodedConfirmedTransactionWithStatusMeta, @@ -30,6 +30,7 @@ use solana_transaction_status::{ UiInstruction, UiMessage, UiRawMessage, UiReturnDataEncoding, UiTransaction, UiTransactionReturnData, UiTransactionStatusMeta, }; +use std::path::PathBuf; use std::{ collections::{HashMap, HashSet, VecDeque}, net::SocketAddr, @@ -40,10 +41,14 @@ use std::{ use crate::{ rpc::{ - self, accounts_data::AccountsData, accounts_scan::AccountsScan, bank_data::BankData, - full::Full, minimal::Minimal, SurfpoolMiddleware, + self, accounts_data::AccountsData, accounts_scan::AccountsScan, admin::AdminRpc, + bank_data::BankData, full::Full, minimal::Minimal, SurfpoolMiddleware, + }, + types::{ + ClockCommand, ClockEvent, PluginManagerCommand, RunloopTriggerMode, + SchemaDatasourceingEvent, SimnetCommand, SimnetEvent, SubgraphCommand, + SubgraphPluginConfig, SurfpoolConfig, TransactionStatusEvent, }, - types::{RunloopTriggerMode, SurfpoolConfig}, }; const BLOCKHASH_SLOT_TTL: u64 = 75; @@ -181,53 +186,14 @@ impl EntryStatus { } } -#[derive(Debug)] -pub enum SimnetEvent { - Ready, - Aborted(String), - Shutdown, - ClockUpdate(Clock), - EpochInfoUpdate(EpochInfo), - BlockHashExpired, - InfoLog(DateTime, String), - ErrorLog(DateTime, String), - WarnLog(DateTime, String), - DebugLog(DateTime, String), - TransactionReceived(DateTime, VersionedTransaction), - TransactionProcessed( - DateTime, - TransactionMetadata, - Option, - ), - AccountUpdate(DateTime, Pubkey), -} - -pub enum SimnetCommand { - SlotForward, - SlotBackward, - UpdateClock(ClockCommand), - UpdateRunloopMode(RunloopTriggerMode), -} - -pub enum ClockCommand { - Pause, - Resume, - Toggle, - UpdateSlotInterval(u64), -} - -pub enum ClockEvent { - Tick, - ExpireBlockHash, -} - -use std::{fs::File, io::Read, path::PathBuf}; type PluginConstructor = unsafe fn() -> *mut dyn GeyserPlugin; use libloading::{Library, Symbol}; pub async fn start( config: SurfpoolConfig, + subgraph_commands_tx: Sender, simnet_events_tx: Sender, + simnet_commands_tx: Sender, simnet_commands_rx: Receiver, ) -> Result<(), Box> { let mut svm = LiteSVM::new(); @@ -246,6 +212,15 @@ pub async fn start( // Todo: should check config first let rpc_client = Arc::new(RpcClient::new(config.simnet.remote_rpc_url.clone())); let epoch_info = rpc_client.get_epoch_info().await?; + // let epoch_info = EpochInfo { + // epoch: 0, + // slot_index: 0, + // slots_in_epoch: 0, + // absolute_slot: 0, + // block_height: 0, + // transaction_count: None, + // }; + // Question: can the value `slots_in_epoch` fluctuate over time? let slots_in_epoch = epoch_info.slots_in_epoch; @@ -258,11 +233,13 @@ pub async fn start( rpc_client: rpc_client.clone(), }; + let (plugin_manager_commands_tx, plugin_manager_commands_rx) = unbounded(); + let context = Arc::new(RwLock::new(context)); - let (mempool_tx, mempool_rx) = unbounded(); let middleware = SurfpoolMiddleware { context: context.clone(), - mempool_tx, + simnet_commands_tx: simnet_commands_tx.clone(), + plugin_manager_commands_tx, config: config.rpc.clone(), }; @@ -275,6 +252,7 @@ pub async fn start( io.extend_with(rpc::accounts_data::SurfpoolAccountsDataRpc.to_delegate()); io.extend_with(rpc::accounts_scan::SurfpoolAccountsScanRpc.to_delegate()); io.extend_with(rpc::bank_data::SurfpoolBankDataRpc.to_delegate()); + io.extend_with(rpc::admin::SurfpoolAdminRpc.to_delegate()); let _handle = hiro_system_kit::thread_named("rpc handler").spawn(move || { let server = ServerBuilder::new(io) @@ -289,103 +267,147 @@ pub async fn start( }); let simnet_config = config.simnet.clone(); + let simnet_events_tx_copy = simnet_events_tx.clone(); let (plugins_data_tx, plugins_data_rx) = unbounded::<(Transaction, TransactionMetadata)>(); + + let ipc_router = RouterProxy::new(); + if !config.plugin_config_path.is_empty() { let _handle = hiro_system_kit::thread_named("geyser plugins handler").spawn(move || { let mut plugin_manager = GeyserPluginManager::new(); - for geyser_plugin_config_file in config.plugin_config_path.iter() { - let mut file = match File::open(geyser_plugin_config_file) { - Ok(file) => file, - Err(err) => { - return Err(GeyserPluginManagerError::CannotOpenConfigFile(format!( - "Failed to open the plugin config file {geyser_plugin_config_file:?}, error: {err:?}" - ))); - } - }; - - let mut contents = String::new(); - if let Err(err) = file.read_to_string(&mut contents) { - return Err(GeyserPluginManagerError::CannotReadConfigFile(format!( - "Failed to read the plugin config file {geyser_plugin_config_file:?}, error: {err:?}" - ))); - } - - let result: serde_json::Value = match json5::from_str(&contents) { - Ok(value) => value, - Err(err) => { - return Err(GeyserPluginManagerError::InvalidConfigFileFormat(format!( - "The config file {geyser_plugin_config_file:?} is not in a valid Json5 format, error: {err:?}" - ))); - } - }; + // for (i, geyser_plugin_config_file) in config.plugin_config_path.iter().enumerate() { + // let mut file = match File::open(geyser_plugin_config_file) { + // Ok(file) => file, + // Err(err) => { + // return Err(GeyserPluginManagerError::CannotOpenConfigFile(format!( + // "Failed to open the plugin config file {geyser_plugin_config_file:?}, error: {err:?}" + // ))); + // } + // }; + // let mut contents = String::new(); + // if let Err(err) = file.read_to_string(&mut contents) { + // return Err(GeyserPluginManagerError::CannotReadConfigFile(format!( + // "Failed to read the plugin config file {geyser_plugin_config_file:?}, error: {err:?}" + // ))); + // } + // let result: serde_json::Value = match json5::from_str(&contents) { + // Ok(value) => value, + // Err(err) => { + // return Err(GeyserPluginManagerError::InvalidConfigFileFormat(format!( + // "The config file {geyser_plugin_config_file:?} is not in a valid Json5 format, error: {err:?}" + // ))); + // } + // }; + + // let _config_file = geyser_plugin_config_file + // .as_os_str() + // .to_str() + // .ok_or(GeyserPluginManagerError::InvalidPluginPath)?; + + + let geyser_plugin_config_file = PathBuf::from("../../surfpool_subgraph_plugin.json"); + + let contents = "{\"name\": \"surfpool-subgraph\", \"libpath\": \"target/release/libsurfpool_subgraph.dylib\"}"; + let result: serde_json::Value = json5::from_str(&contents).unwrap(); let libpath = result["libpath"] .as_str() - .ok_or(GeyserPluginManagerError::LibPathNotSet)?; + .unwrap(); let mut libpath = PathBuf::from(libpath); if libpath.is_relative() { let config_dir = geyser_plugin_config_file.parent().ok_or_else(|| { GeyserPluginManagerError::CannotOpenConfigFile(format!( "Failed to resolve parent of {geyser_plugin_config_file:?}", )) - })?; + }).unwrap(); libpath = config_dir.join(libpath); } - let plugin_name = result["name"].as_str().map(|s| s.to_owned()); - - let _config_file = geyser_plugin_config_file - .as_os_str() - .to_str() - .ok_or(GeyserPluginManagerError::InvalidPluginPath)?; - - let (plugin, lib) = unsafe { - let lib = Library::new(libpath) - .map_err(|e| GeyserPluginManagerError::PluginLoadError(e.to_string()))?; - let constructor: Symbol = lib - .get(b"_create_plugin") - .map_err(|e| GeyserPluginManagerError::PluginLoadError(e.to_string()))?; - let plugin_raw = constructor(); - (Box::from_raw(plugin_raw), lib) - }; - plugin_manager.plugins.push(LoadedGeyserPlugin::new(lib, plugin, plugin_name)); - } - - while let Ok((transaction, transaction_metadata)) = plugins_data_rx.recv() { - let transaction_status_meta = TransactionStatusMeta { - status: Ok(()), - fee: 0, - pre_balances: vec![], - post_balances: vec![], - inner_instructions: None, - log_messages: Some(transaction_metadata.logs.clone()), - pre_token_balances: None, - post_token_balances: None, - rewards: None, - loaded_addresses: LoadedAddresses { - writable: vec![], - readonly: vec![], - }, - return_data: Some(transaction_metadata.return_data.clone()), - compute_units_consumed: Some(transaction_metadata.compute_units_consumed), - }; - - let transaction = SanitizedTransaction::try_from_legacy_transaction(transaction, &HashSet::new()) - .unwrap(); - - let transaction_replica = ReplicaTransactionInfoV2 { - signature: &transaction_metadata.signature, - is_vote: false, - transaction: &transaction, - transaction_status_meta: &transaction_status_meta, - index: 0 - }; - for plugin in plugin_manager.plugins.iter() { - plugin.notify_transaction(ReplicaTransactionInfoVersions::V0_0_2(&transaction_replica), 0).unwrap(); + let plugin_name = result["name"].as_str().map(|s| s.to_owned()).unwrap_or(format!("surfpool-subgraph")); + + loop { + select! { + recv(plugin_manager_commands_rx) -> msg => match msg { + Ok(event) => { + match event { + PluginManagerCommand::LoadConfig(uuid, config, notifier) => { + let _ = subgraph_commands_tx.send(SubgraphCommand::CreateSubgraph(uuid.clone(), config.data.clone(), notifier)); + + let (mut plugin, lib) = unsafe { + let lib = match Library::new(&libpath) { + Ok(lib) => lib, + Err(e) => { + let _ = simnet_events_tx_copy.send(SimnetEvent::ErrorLog(Local::now(), format!("Unable to load plugin {}: {}", plugin_name, e.to_string()))); + continue; + } + }; + let constructor: Symbol = lib + .get(b"_create_plugin") + .map_err(|e| format!("{}", e.to_string()))?; + let plugin_raw = constructor(); + (Box::from_raw(plugin_raw), lib) + }; + + let (server, ipc_token) = IpcOneShotServer::>::new().expect("Failed to create IPC one-shot server."); + let subgraph_plugin_config = SubgraphPluginConfig { + uuid, + ipc_token, + subgraph_request: config.data.clone() + }; + let config_file = serde_json::to_string(&subgraph_plugin_config).unwrap(); + let _res = plugin.on_load(&config_file, false); + if let Ok((_, rx)) = server.accept() { + let subgraph_rx = ipc_router.route_ipc_receiver_to_new_crossbeam_receiver(rx); + let _ = subgraph_commands_tx.send(SubgraphCommand::ObserveSubgraph(subgraph_rx)); + }; + plugin_manager.plugins.push(LoadedGeyserPlugin::new(lib, plugin, Some(plugin_name.clone()))); + let _ = simnet_events_tx_copy.send(SimnetEvent::PluginLoaded("surfpool-subgraph".into())); + } + } + }, + Err(_) => {}, + }, + recv(plugins_data_rx) -> msg => match msg { + Err(_) => unreachable!(), + Ok((transaction, transaction_metadata)) => { + let transaction_status_meta = TransactionStatusMeta { + status: Ok(()), + fee: 0, + pre_balances: vec![], + post_balances: vec![], + inner_instructions: None, + log_messages: Some(transaction_metadata.logs.clone()), + pre_token_balances: None, + post_token_balances: None, + rewards: None, + loaded_addresses: LoadedAddresses { + writable: vec![], + readonly: vec![], + }, + return_data: Some(transaction_metadata.return_data.clone()), + compute_units_consumed: Some(transaction_metadata.compute_units_consumed), + }; + + let transaction = SanitizedTransaction::try_from_legacy_transaction(transaction, &HashSet::new()) + .unwrap(); + + let transaction_replica = ReplicaTransactionInfoV2 { + signature: &transaction_metadata.signature, + is_vote: false, + transaction: &transaction, + transaction_status_meta: &transaction_status_meta, + index: 0 + }; + for plugin in plugin_manager.plugins.iter() { + plugin.notify_transaction(ReplicaTransactionInfoVersions::V0_0_2(&transaction_replica), 0).unwrap(); + } + } + } + } } - } - Ok(()) + #[allow(unreachable_code)] + Ok::<(), String>(()) }); } @@ -470,19 +492,16 @@ pub async fn start( runloop_trigger_mode = update; continue } - } - }, - Err(_) => {}, - }, - recv(mempool_rx) -> msg => match msg { - Ok((key, transaction, status_tx)) => { - let signature = transaction.signatures[0].clone(); - transactions_to_process.push((key, transaction, status_tx)); - if let Ok(mut ctx) = context.write() { - ctx.transactions.insert( - signature, - EntryStatus::Received, - ); + SimnetCommand::TransactionReceived(key, transaction, status_tx, config) => { + let signature = transaction.signatures[0].clone(); + transactions_to_process.push((key, transaction, status_tx, config)); + if let Ok(mut ctx) = context.write() { + ctx.transactions.insert( + signature, + EntryStatus::Received, + ); + } + } } }, Err(_) => {}, @@ -496,12 +515,7 @@ pub async fn start( }; // Handle the transactions accumulated - for (key, transaction, status_tx) in transactions_to_process.drain(..) { - let _ = simnet_events_tx.try_send(SimnetEvent::TransactionReceived( - Local::now(), - transaction.clone(), - )); - + for (key, transaction, status_tx, tx_config) in transactions_to_process.drain(..) { transaction.verify_with_results(); let transaction = transaction.into_legacy_transaction().unwrap(); let message = &transaction.message; @@ -528,6 +542,25 @@ pub async fn start( } } + if !tx_config.skip_preflight { + let (meta, err) = match ctx.svm.simulate_transaction(transaction.clone()) { + Ok(res) => (res.meta, None), + Err(e) => { + let _ = simnet_events_tx.try_send(SimnetEvent::ErrorLog( + Local::now(), + format!("Transaction simulation failed: {}", e.err.to_string()), + )); + (e.meta, Some(e.err)) + } + }; + + if let Some(e) = &err { + let _ = status_tx + .try_send(TransactionStatusEvent::SimulationFailure((e.clone(), meta))); + continue; + } + } + let (meta, err) = match ctx.svm.send_transaction(transaction.clone()) { Ok(res) => { let _ = plugins_data_tx.try_send((transaction.clone(), res.clone())); @@ -536,7 +569,7 @@ pub async fn start( Err(e) => { let _ = simnet_events_tx.try_send(SimnetEvent::ErrorLog( Local::now(), - format!("Error processing transaction: {}", e.err.to_string()), + format!("Transaction execution failed: {}", e.err.to_string()), )); (e.meta, Some(e.err)) } @@ -552,12 +585,19 @@ pub async fn start( err.clone(), )), ); - let _ = status_tx.try_send(TransactionConfirmationStatus::Processed); - let _ = simnet_events_tx.try_send(SimnetEvent::TransactionProcessed( - Local::now(), - meta, - err, - )); + if let Some(e) = &err { + let _ = + status_tx.try_send(TransactionStatusEvent::ExecutionFailure((e.clone(), meta))); + } else { + let _ = status_tx.try_send(TransactionStatusEvent::Success( + TransactionConfirmationStatus::Processed, + )); + let _ = simnet_events_tx.try_send(SimnetEvent::TransactionProcessed( + Local::now(), + meta, + err, + )); + } transactions_processed.push((key, transaction, status_tx)); num_transactions += 1; } @@ -567,7 +607,9 @@ pub async fn start( } for (_key, _transaction, status_tx) in transactions_processed.iter() { - let _ = status_tx.try_send(TransactionConfirmationStatus::Confirmed); + let _ = status_tx.try_send(TransactionStatusEvent::Success( + TransactionConfirmationStatus::Confirmed, + )); } ctx.epoch_info.slot_index += 1; diff --git a/crates/core/src/test_helpers.rs b/crates/core/src/test_helpers.rs index c6f39f4..c4fd850 100644 --- a/crates/core/src/test_helpers.rs +++ b/crates/core/src/test_helpers.rs @@ -1,3 +1,5 @@ +#![allow(dead_code)] + use std::{ collections::{HashMap, VecDeque}, sync::{Arc, RwLock}, @@ -6,17 +8,12 @@ use std::{ use crossbeam_channel::Sender; use litesvm::LiteSVM; use solana_client::nonblocking::rpc_client::RpcClient; -use solana_sdk::{ - blake3::Hash, - clock::Clock, - epoch_info::EpochInfo, - transaction::{Transaction, VersionedTransaction}, -}; -use solana_transaction_status::TransactionConfirmationStatus; +use solana_sdk::{blake3::Hash, clock::Clock, epoch_info::EpochInfo, transaction::Transaction}; use crate::{ rpc::RunloopContext, simnet::{EntryStatus, GlobalState, TransactionWithStatusMeta}, + types::SimnetCommand, }; pub struct TestSetup { @@ -26,7 +23,8 @@ pub struct TestSetup { impl TestSetup { pub fn new(rpc: T) -> Self { - let (mempool_tx, _rx) = crossbeam_channel::unbounded(); + let (simnet_commands_tx, _rx) = crossbeam_channel::unbounded(); + let (plugin_manager_commands_tx, _rx) = crossbeam_channel::unbounded(); let mut svm = LiteSVM::new(); let clock = Clock { @@ -40,7 +38,8 @@ impl TestSetup { TestSetup { context: RunloopContext { - mempool_tx, + simnet_commands_tx, + plugin_manager_commands_tx, id: Hash::new_unique(), state: Arc::new(RwLock::new(GlobalState { svm, @@ -74,16 +73,9 @@ impl TestSetup { setup } - pub fn new_with_mempool( - rpc: T, - mempool_tx: Sender<( - Hash, - VersionedTransaction, - Sender, - )>, - ) -> Self { + pub fn new_with_mempool(rpc: T, simnet_commands_tx: Sender) -> Self { let mut setup = TestSetup::new(rpc); - setup.context.mempool_tx = mempool_tx; + setup.context.simnet_commands_tx = simnet_commands_tx; setup } diff --git a/crates/core/src/types.rs b/crates/core/src/types.rs index 44bcb36..72ab6a2 100644 --- a/crates/core/src/types.rs +++ b/crates/core/src/types.rs @@ -1,5 +1,20 @@ -use solana_sdk::pubkey::Pubkey; -use std::path::PathBuf; +use chrono::{DateTime, Local}; +use crossbeam_channel::{Receiver, Sender}; +use litesvm::types::TransactionMetadata; +use solana_client::rpc_config::RpcSendTransactionConfig; +use solana_sdk::{ + blake3::Hash, + clock::Clock, + epoch_info::EpochInfo, + pubkey::Pubkey, + transaction::{TransactionError, VersionedTransaction}, +}; +use solana_transaction_status::TransactionConfirmationStatus; +use std::{collections::HashMap, path::PathBuf}; +use txtx_addon_network_svm::codec::subgraph::{PluginConfig, SubgraphRequest}; +use uuid::Uuid; + +pub const DEFAULT_RPC_URL: &str = "https://api.mainnet-beta.solana.com"; #[derive(Clone, Debug, PartialEq, Eq, Default)] pub enum RunloopTriggerMode { @@ -9,10 +24,104 @@ pub enum RunloopTriggerMode { Transaction, } +#[derive(Debug, Clone)] +pub struct Collection { + pub uuid: Uuid, + pub name: String, + pub entries: Vec, +} + +#[derive(Debug, Clone)] +pub struct Entry { + pub uuid: Uuid, + pub values: HashMap, +} + +#[derive(Debug)] +pub enum SubgraphEvent { + EndpointReady, + InfoLog(DateTime, String), + ErrorLog(DateTime, String), + WarnLog(DateTime, String), + DebugLog(DateTime, String), + Shutdown, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub enum SchemaDatasourceingEvent { + Rountrip(Uuid), + ApplyEntry(Uuid, String), //, SubgraphRequest, u64), +} + +#[derive(Debug, Clone)] +pub enum SubgraphCommand { + CreateSubgraph(Uuid, SubgraphRequest, Sender), + ObserveSubgraph(Receiver), + Shutdown, +} + +#[derive(Debug)] +pub enum SimnetEvent { + Ready, + Aborted(String), + Shutdown, + ClockUpdate(Clock), + EpochInfoUpdate(EpochInfo), + BlockHashExpired, + InfoLog(DateTime, String), + ErrorLog(DateTime, String), + WarnLog(DateTime, String), + DebugLog(DateTime, String), + PluginLoaded(String), + TransactionReceived(DateTime, VersionedTransaction), + TransactionProcessed( + DateTime, + TransactionMetadata, + Option, + ), + AccountUpdate(DateTime, Pubkey), +} + +pub enum TransactionStatusEvent { + Success(TransactionConfirmationStatus), + SimulationFailure((TransactionError, TransactionMetadata)), + ExecutionFailure((TransactionError, TransactionMetadata)), +} + +pub enum SimnetCommand { + SlotForward, + SlotBackward, + UpdateClock(ClockCommand), + UpdateRunloopMode(RunloopTriggerMode), + TransactionReceived( + Hash, + VersionedTransaction, + Sender, + RpcSendTransactionConfig, + ), +} + +pub enum PluginManagerCommand { + LoadConfig(Uuid, PluginConfig, Sender), +} + +pub enum ClockCommand { + Pause, + Resume, + Toggle, + UpdateSlotInterval(u64), +} + +pub enum ClockEvent { + Tick, + ExpireBlockHash, +} + #[derive(Clone, Debug, Default)] pub struct SurfpoolConfig { pub simnet: SimnetConfig, pub rpc: RpcConfig, + pub subgraph: SubgraphConfig, pub plugin_config_path: Vec, } @@ -28,7 +137,7 @@ pub struct SimnetConfig { impl Default for SimnetConfig { fn default() -> Self { Self { - remote_rpc_url: "https://api.mainnet-beta.solana.com".to_string(), + remote_rpc_url: DEFAULT_RPC_URL.to_string(), slot_time: 0, runloop_trigger_mode: RunloopTriggerMode::Clock, airdrop_addresses: vec![], @@ -37,6 +146,9 @@ impl Default for SimnetConfig { } } +#[derive(Clone, Debug, Default)] +pub struct SubgraphConfig {} + #[derive(Clone, Debug)] pub struct RpcConfig { pub bind_host: String, @@ -50,10 +162,17 @@ impl RpcConfig { } } +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct SubgraphPluginConfig { + pub uuid: Uuid, + pub ipc_token: String, + pub subgraph_request: SubgraphRequest, +} + impl Default for RpcConfig { fn default() -> Self { Self { - remote_rpc_url: "https://api.mainnet-beta.solana.com".to_string(), + remote_rpc_url: DEFAULT_RPC_URL.to_string(), bind_host: "127.0.0.1".to_string(), bind_port: 8899, } diff --git a/crates/core/tests/integration.rs b/crates/core/tests/integration.rs index 3d7bacc..0a42f44 100644 --- a/crates/core/tests/integration.rs +++ b/crates/core/tests/integration.rs @@ -18,10 +18,10 @@ use solana_sdk::{ use std::{str::FromStr, time::Duration}; use surfpool_core::{ rpc::{full::FullClient, minimal::MinimalClient}, - simnet::{start, SimnetEvent}, - types::{RpcConfig, RunloopTriggerMode, SimnetConfig, SurfpoolConfig}, + simnet::start, + types::{RpcConfig, RunloopTriggerMode, SimnetConfig, SimnetEvent, SurfpoolConfig}, }; -use tokio::{task, time::sleep}; +use tokio::task; mod helpers; @@ -36,10 +36,17 @@ async fn test_simnet_ready() { }; let (simnet_events_tx, simnet_events_rx) = unbounded(); - let (_simnet_commands_tx, simnet_commands_rx) = unbounded(); + let (simnet_commands_tx, simnet_commands_rx) = unbounded(); + let (subgraph_commands_tx, _subgraph_commands_rx) = unbounded(); let _handle = hiro_system_kit::thread_named("test").spawn(move || { - let future = start(config, simnet_events_tx, simnet_commands_rx); + let future = start( + config, + subgraph_commands_tx, + simnet_events_tx, + simnet_commands_tx, + simnet_commands_rx, + ); if let Err(e) = hiro_system_kit::nestable_block_on(future) { panic!("{e:?}"); } @@ -62,11 +69,18 @@ async fn test_simnet_ticks() { }; let (simnet_events_tx, simnet_events_rx) = unbounded(); - let (_simnet_commands_tx, simnet_commands_rx) = unbounded(); + let (simnet_commands_tx, simnet_commands_rx) = unbounded(); + let (subgraph_commands_tx, _subgraph_commands_rx) = unbounded(); let (test_tx, test_rx) = unbounded(); let _handle = hiro_system_kit::thread_named("test").spawn(move || { - let future = start(config, simnet_events_tx, simnet_commands_rx); + let future = start( + config, + subgraph_commands_tx, + simnet_events_tx, + simnet_commands_tx, + simnet_commands_rx, + ); if let Err(e) = hiro_system_kit::nestable_block_on(future) { panic!("{e:?}"); } @@ -116,10 +130,17 @@ async fn test_simnet_some_sol_transfers() { }; let (simnet_events_tx, simnet_events_rx) = unbounded(); - let (_simnet_commands_tx, simnet_commands_rx) = unbounded(); + let (simnet_commands_tx, simnet_commands_rx) = unbounded(); + let (subgraph_commands_tx, _subgraph_commands_rx) = unbounded(); let _handle = hiro_system_kit::thread_named("test").spawn(move || { - let future = start(config, simnet_events_tx, simnet_commands_rx); + let future = start( + config, + subgraph_commands_tx, + simnet_events_tx, + simnet_commands_tx, + simnet_commands_rx, + ); if let Err(e) = hiro_system_kit::nestable_block_on(future) { panic!("{e:?}"); } diff --git a/crates/gql/Cargo.toml b/crates/gql/Cargo.toml new file mode 100644 index 0000000..52a3ce1 --- /dev/null +++ b/crates/gql/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "surfpool-gql" +description = { workspace = true } +version = { workspace = true } +edition = { workspace = true } +license = { workspace = true } +repository = { workspace = true } +keywords = { workspace = true } +categories = { workspace = true } + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +# txtx-addon-kit = { version = "0.2.1" } +futures = { version = "0.3.22", features = ["alloc"], default-features = false } +futures-enum = { version = "0.1.12", default-features = false } +uuid = { version = "1.6.1", features = ["serde", "v7"] } +juniper = { version = "0.16.1" } +juniper_codegen = { version = "0.16.0" } +# juniper_codegen = { git = "https://github.com/graphql-rust/juniper", rev = "c0e1b3e" } +async-stream = "0.3.5" +tokio = "1.37.0" +surfpool-core = { workspace = true } +convert_case = "0.7.1" \ No newline at end of file diff --git a/crates/gql/src/lib.rs b/crates/gql/src/lib.rs new file mode 100644 index 0000000..33c74c7 --- /dev/null +++ b/crates/gql/src/lib.rs @@ -0,0 +1,45 @@ +use juniper::RootNode; +use mutation::Mutation; +use query::{Query, SchemaDatasource}; +use std::sync::RwLock; +use std::{collections::BTreeMap, sync::Arc}; +use subscription::{DynamicSubscription, EntryData, EntryUpdate}; +use uuid::Uuid; + +pub mod mutation; +pub mod query; +pub mod subscription; +// pub mod types; + +#[derive(Clone, Debug)] +pub struct Context { + pub uuid_lookup: Arc>>, + pub entries_store: Arc)>>>, + pub entries_broadcaster: tokio::sync::broadcast::Sender, +} + +impl Context { + pub fn new() -> Context { + let (entries_broadcaster, _) = tokio::sync::broadcast::channel(128); + Context { + uuid_lookup: Arc::new(RwLock::new(BTreeMap::new())), + entries_store: Arc::new(RwLock::new(BTreeMap::new())), + entries_broadcaster, + } + } +} + +impl juniper::Context for Context {} + +pub type GqlDynamicSchema = RootNode<'static, Query, Mutation, DynamicSubscription>; + +pub fn new_dynamic_schema(subgraph_index: SchemaDatasource) -> GqlDynamicSchema { + GqlDynamicSchema::new_with_info( + Query::new(), + Mutation, + DynamicSubscription, + subgraph_index, + (), + (), + ) +} diff --git a/crates/gql/src/mutation.rs b/crates/gql/src/mutation.rs new file mode 100644 index 0000000..8ceb4b7 --- /dev/null +++ b/crates/gql/src/mutation.rs @@ -0,0 +1,24 @@ +use crate::Context; +use juniper_codegen::graphql_object; + +pub struct Mutation; + +#[graphql_object( + context = Context, +)] +impl Mutation { + fn api_version() -> &'static str { + "1.0" + } +} + +pub struct DynamicMutation; + +#[graphql_object( + context = Context, +)] +impl DynamicMutation { + fn api_version() -> &'static str { + "1.0" + } +} diff --git a/crates/gql/src/query.rs b/crates/gql/src/query.rs new file mode 100644 index 0000000..7f8f98d --- /dev/null +++ b/crates/gql/src/query.rs @@ -0,0 +1,136 @@ +use std::collections::HashMap; + +use crate::Context; +use convert_case::{Case, Casing}; +use juniper::{ + meta::{Field, MetaType}, + Arguments, BoxFuture, DefaultScalarValue, ExecutionResult, Executor, GraphQLType, GraphQLValue, + GraphQLValueAsync, Registry, +}; +use uuid::Uuid; + +#[derive(Debug)] +pub struct Query {} + +#[derive(Clone, Debug)] +pub struct SchemaDatasource { + pub entries: HashMap, +} + +impl SchemaDatasource { + pub fn new() -> Self { + Self { + entries: HashMap::new(), + } + } + + pub fn add_entry(&mut self, entry: SchemaDatasourceEntry) { + self.entries.insert(entry.name.to_case(Case::Camel), entry); + } +} + +#[derive(Clone, Debug)] +pub struct SchemaDatasourceEntry { + pub name: String, + pub subgraph_uuid: Uuid, + pub description: Option, + pub fields: Vec, +} + +impl SchemaDatasourceEntry { + pub fn new(uuid: &Uuid, name: &str) -> Self { + Self { + name: name.to_case(Case::Pascal), + subgraph_uuid: uuid.clone(), + description: None, + fields: vec![], + } + } +} + +impl GraphQLType for SchemaDatasourceEntry { + fn name(spec: &SchemaDatasourceEntry) -> Option<&str> { + Some(spec.name.as_str()) + } + + fn meta<'r>(spec: &SchemaDatasourceEntry, registry: &mut Registry<'r>) -> MetaType<'r> + where + DefaultScalarValue: 'r, + { + let mut fields: Vec> = vec![]; + fields.push(registry.field::<&String>("uuid", &())); + for field in spec.fields.iter() { + fields.push(registry.field::<&String>(field, &())); + } + registry + .build_object_type::(&spec, &fields) + .into_meta() + } +} + +impl GraphQLValue for SchemaDatasourceEntry { + type Context = Context; + type TypeInfo = SchemaDatasourceEntry; + + fn type_name<'i>(&self, info: &'i Self::TypeInfo) -> Option<&'i str> { + ::name(info) + } +} + +impl Query { + pub fn new() -> Self { + Self {} + } +} + +impl GraphQLType for Query { + fn name(_spec: &SchemaDatasource) -> Option<&str> { + Some("Query") + } + + fn meta<'r>(spec: &SchemaDatasource, registry: &mut Registry<'r>) -> MetaType<'r> + where + DefaultScalarValue: 'r, + { + let mut fields = vec![]; + fields.push(registry.field::<&String>("apiVersion", &())); + + for (name, entry) in spec.entries.iter() { + fields.push(registry.field::<&[SchemaDatasourceEntry]>(name, &entry)); + } + registry + .build_object_type::(&spec, &fields) + .into_meta() + } +} + +impl GraphQLValue for Query { + type Context = Context; + type TypeInfo = SchemaDatasource; + + fn type_name<'i>(&self, info: &'i Self::TypeInfo) -> Option<&'i str> { + ::name(&info) + } +} + +impl GraphQLValueAsync for Query { + fn resolve_field_async( + &self, + info: &SchemaDatasource, + field_name: &str, + _arguments: &Arguments, + executor: &Executor, + ) -> BoxFuture { + let res = match field_name { + "apiVersion" => executor.resolve_with_ctx(&(), "1.0"), + subgraph_name => { + let database = executor.context(); + let subgraph_db = database.entries_store.read().unwrap(); + let entry = info.entries.get(subgraph_name).unwrap(); + let (_, entries) = subgraph_db.get(subgraph_name).unwrap(); + executor.resolve_with_ctx(entry, &entries[..]) + } + }; + Box::pin(async { res }) + } +} diff --git a/crates/gql/src/subscription.rs b/crates/gql/src/subscription.rs new file mode 100644 index 0000000..5f19606 --- /dev/null +++ b/crates/gql/src/subscription.rs @@ -0,0 +1,137 @@ +use std::pin::Pin; + +use crate::{query::SchemaDatasourceEntry, Context}; +use futures::Stream; +use juniper::{ + graphql_subscription, + meta::{Field, MetaType}, + Arguments, DefaultScalarValue, ExecutionResult, Executor, FieldError, GraphQLType, + GraphQLValue, Registry, +}; +use juniper_codegen::graphql_object; +use surfpool_core::types::Entry; + +#[derive(Debug, Clone)] +pub struct EntryData { + pub entry: Entry, + pub name: String, +} + +impl EntryData { + pub fn new(name: &String, entry: &Entry) -> Self { + Self { + entry: entry.clone(), + name: name.clone(), + } + } +} + +impl GraphQLType for EntryData { + fn name(spec: &SchemaDatasourceEntry) -> Option<&str> { + Some(spec.name.as_str()) + } + + fn meta<'r>(spec: &SchemaDatasourceEntry, registry: &mut Registry<'r>) -> MetaType<'r> + where + DefaultScalarValue: 'r, + { + let mut fields: Vec> = vec![]; + fields.push(registry.field::<&String>("uuid", &())); + for field in spec.fields.iter() { + fields.push(registry.field::<&String>(field, &())); + } + registry + .build_object_type::<[EntryData]>(&spec, &fields) + .into_meta() + } +} + +impl GraphQLValue for EntryData { + type Context = Context; + type TypeInfo = SchemaDatasourceEntry; + + fn type_name<'i>(&self, info: &'i Self::TypeInfo) -> Option<&'i str> { + ::name(info) + } + + fn resolve_field( + &self, + _info: &SchemaDatasourceEntry, + field_name: &str, + _args: &Arguments, + executor: &Executor, + ) -> ExecutionResult { + match field_name { + "uuid" => executor.resolve_with_ctx(&(), &self.entry.uuid.to_string()), + field_name => { + let value = self.entry.values.get(field_name).unwrap(); + executor.resolve_with_ctx(&(), value) + } + } + } +} + +#[derive(Debug, Clone)] +pub struct EntryUpdate { + pub entry: Entry, + pub name: String, +} + +impl EntryUpdate { + pub fn new(name: &String, entry: &Entry) -> Self { + Self { + entry: entry.clone(), + name: name.clone(), + } + } +} + +#[graphql_object(context = Context)] +impl EntryUpdate { + pub fn uuid(&self) -> String { + self.entry.uuid.to_string() + } +} + +pub struct Subscription; + +type GqlEntriesStream = Pin> + Send>>; + +#[graphql_subscription( + context = Context, +)] +impl Subscription { + async fn entries_event(context: &Context) -> GqlEntriesStream { + let entries_tx = context.entries_broadcaster.clone(); + let mut entries_tx = entries_tx.subscribe(); + let stream = async_stream::stream! { + loop { + if let Ok(entry_event) = entries_tx.recv().await { + yield Ok(entry_event) + } + } + }; + Box::pin(stream) + } +} + +pub struct DynamicSubscription; + +#[graphql_subscription( + context = Context, + +)] +impl DynamicSubscription { + async fn entries_event(context: &Context) -> GqlEntriesStream { + let entries_tx = context.entries_broadcaster.clone(); + let mut entries_tx = entries_tx.subscribe(); + let stream = async_stream::stream! { + loop { + if let Ok(entry_event) = entries_tx.recv().await { + yield Ok(entry_event) + } + } + }; + Box::pin(stream) + } +} diff --git a/crates/subgraph/Cargo.toml b/crates/subgraph/Cargo.toml new file mode 100644 index 0000000..70dd69d --- /dev/null +++ b/crates/subgraph/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "surfpool-subgraph" +version = { workspace = true } +readme = { workspace = true } +edition = { workspace = true } +license = { workspace = true } +repository = { workspace = true } +keywords = { workspace = true } +categories = { workspace = true } + +[lib] +crate-type = ["cdylib", "rlib"] + +[dependencies] +agave-geyser-plugin-interface = { workspace = true } +solana-program = { workspace = true } +txtx-addon-network-svm = { workspace = true } +ipc-channel = { workspace = true } +surfpool-core = { workspace = true } +serde_json = { workspace = true } +serde = { workspace = true } +bincode = { workspace = true } +uuid = { workspace = true } diff --git a/crates/subgraph/src/lib.rs b/crates/subgraph/src/lib.rs new file mode 100644 index 0000000..c9ba442 --- /dev/null +++ b/crates/subgraph/src/lib.rs @@ -0,0 +1,184 @@ +use { + agave_geyser_plugin_interface::geyser_plugin_interface::{ + GeyserPlugin, GeyserPluginError, ReplicaAccountInfoVersions, ReplicaBlockInfoVersions, + ReplicaEntryInfoVersions, ReplicaTransactionInfoVersions, Result as PluginResult, + SlotStatus, + }, + ipc_channel::ipc::IpcSender, + solana_program::clock::Slot, + std::sync::Mutex, + surfpool_core::{ + solana_sdk::bs58, + types::{SchemaDatasourceingEvent, SubgraphPluginConfig}, + }, + txtx_addon_network_svm::codec::subgraph::{IndexedSubgraphSourceType, SubgraphRequest}, + uuid::Uuid, +}; + +#[derive(Default, Debug)] +pub struct SurfpoolSubgraph { + pub uuid: Uuid, + subgraph_indexing_event_tx: Mutex>>, + subgraph_request: Option, +} + +impl GeyserPlugin for SurfpoolSubgraph { + fn name(&self) -> &'static str { + "surfpool-subgraph" + } + + fn on_load(&mut self, config_file: &str, _is_reload: bool) -> PluginResult<()> { + let config = serde_json::from_str::(&config_file).unwrap(); + let oneshot_tx = IpcSender::connect(config.ipc_token).unwrap(); + let (tx, rx) = ipc_channel::ipc::channel().unwrap(); + let _ = tx.send(SchemaDatasourceingEvent::Rountrip(config.uuid.clone())); + let _ = oneshot_tx.send(rx); + self.uuid = config.uuid.clone(); + self.subgraph_indexing_event_tx = Mutex::new(Some(tx)); + self.subgraph_request = Some(config.subgraph_request); + Ok(()) + } + + fn on_unload(&mut self) {} + + fn notify_end_of_startup(&self) -> PluginResult<()> { + Ok(()) + } + + fn update_account( + &self, + account: ReplicaAccountInfoVersions, + _slot: Slot, + _is_startup: bool, + ) -> PluginResult<()> { + let account_info = match account { + ReplicaAccountInfoVersions::V0_0_1(_info) => { + unreachable!("ReplicaAccountInfoVersions::V0_0_1 is not supported") + } + ReplicaAccountInfoVersions::V0_0_2(_info) => { + unreachable!("ReplicaAccountInfoVersions::V0_0_2 is not supported") + } + ReplicaAccountInfoVersions::V0_0_3(info) => info, + }; + + println!("lamports: {}", account_info.lamports); + + Ok(()) + } + + fn update_slot_status( + &self, + _slot: Slot, + _parent: Option, + _status: &SlotStatus, + ) -> PluginResult<()> { + Ok(()) + } + + fn notify_transaction( + &self, + transaction: ReplicaTransactionInfoVersions, + _slot: Slot, + ) -> PluginResult<()> { + let Ok(tx) = self.subgraph_indexing_event_tx.lock() else { + return Ok(()); + }; + let tx = tx.as_ref().unwrap(); + let Some(ref subgraph_request) = self.subgraph_request else { + return Ok(()); + }; + match transaction { + ReplicaTransactionInfoVersions::V0_0_2(data) => { + let _ = tx.send(SchemaDatasourceingEvent::ApplyEntry( + self.uuid, + data.signature.to_string(), + // subgraph_request.clone(), + // slot, + )); + if data.is_vote { + return Ok(()); + } + let Some(ref inner_instructions) = data.transaction_status_meta.inner_instructions + else { + return Ok(()); + }; + for inner_instructions in inner_instructions.iter() { + for instruction in inner_instructions.instructions.iter() { + let instruction = &instruction.instruction; + let decoded_data = bs58::decode(&instruction.data) + .into_vec() + .map_err(|e| GeyserPluginError::Custom(Box::new(e)))?; + // it's not valid cpi event data if there isn't an 8-byte signature + if decoded_data.len() < 8 { + continue; + } + let eight_bytes = decoded_data[0..8].to_vec(); + let decoded_signature = bs58::decode(eight_bytes) + .into_vec() + .map_err(|e| GeyserPluginError::Custom(Box::new(e)))?; + for field in subgraph_request.fields.iter() { + match &field.data_source { + IndexedSubgraphSourceType::Instruction( + _instruction_subgraph_source, + ) => { + continue; + } + IndexedSubgraphSourceType::Event(event_subgraph_source) => { + if event_subgraph_source + .event + .discriminator + .eq(decoded_signature.as_slice()) + { + println!( + "found event with match!!!: {:?}", + event_subgraph_source.event.name + ); + let _ = tx.send(SchemaDatasourceingEvent::ApplyEntry( + self.uuid, + data.signature.to_string(), + // subgraph_request.clone(), + // slot, + )); + } + } + } + } + } + } + } + ReplicaTransactionInfoVersions::V0_0_1(_) => {} + } + Ok(()) + } + + fn notify_entry(&self, _entry: ReplicaEntryInfoVersions) -> PluginResult<()> { + Ok(()) + } + + fn notify_block_metadata(&self, _blockinfo: ReplicaBlockInfoVersions) -> PluginResult<()> { + Ok(()) + } + + fn account_data_notifications_enabled(&self) -> bool { + true + } + + fn transaction_notifications_enabled(&self) -> bool { + false + } + + fn entry_notifications_enabled(&self) -> bool { + false + } +} + +#[no_mangle] +#[allow(improper_ctypes_definitions)] + +/// # Safety +/// +/// This function returns the Plugin pointer as trait GeyserPlugin. +pub unsafe extern "C" fn _create_plugin() -> *mut dyn GeyserPlugin { + let plugin: Box = Box::::default(); + Box::into_raw(plugin) +} diff --git a/surfpool_subgraph_plugin.json b/surfpool_subgraph_plugin.json new file mode 100644 index 0000000..d31dc3e --- /dev/null +++ b/surfpool_subgraph_plugin.json @@ -0,0 +1,4 @@ +{ + "name": "surfpool-subgraph", + "libpath": "target/release/libsurfpool_subgraph.dylib" +} \ No newline at end of file