From 4c5b32009bd970f7054e14ffaf82eb5f68f7abf4 Mon Sep 17 00:00:00 2001 From: Roman Volosatovs Date: Wed, 21 Aug 2024 14:29:19 +0200 Subject: [PATCH] feat(go-bindgen): sync resource design with Rust Signed-off-by: Roman Volosatovs --- Cargo.lock | 2 +- Cargo.toml | 2 +- crates/wit-bindgen-go/Cargo.toml | 2 +- crates/wit-bindgen-go/src/interface.rs | 471 +++++------------- .../complex/resources/bindings.wrpc.go | 375 +++++++------- .../go/complex-server/bindings/server.wrpc.go | 2 +- .../cmd/complex-server-nats/main.go | 17 +- .../go/hello-client/bindings/client.wrpc.go | 2 +- .../hello/handler/bindings.wrpc.go | 4 +- .../hello/handler/bindings.wrpc.go | 42 +- .../go/hello-server/bindings/server.wrpc.go | 2 +- .../streams/handler/bindings.wrpc.go | 50 +- .../go/streams-server/bindings/server.wrpc.go | 2 +- go/nats/client.go | 263 +++++----- go/wrpc.go | 29 +- tests/go/resources.go | 14 +- tests/go/resources_test.go | 18 +- 17 files changed, 566 insertions(+), 731 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d3c20396..5b773b4d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3587,7 +3587,7 @@ dependencies = [ [[package]] name = "wit-bindgen-wrpc-go" -version = "0.5.0" +version = "0.6.0" dependencies = [ "anyhow", "clap", diff --git a/Cargo.toml b/Cargo.toml index be652df8..de37dfe9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -131,7 +131,7 @@ wasmtime-wasi = { version = "23", default-features = false } wit-bindgen = { version = "0.30", default-features = false } wit-bindgen-core = { version = "0.30", default-features = false } wit-bindgen-wrpc = { version = "0.6.3", default-features = false, path = "./crates/wit-bindgen" } -wit-bindgen-wrpc-go = { version = "0.5", default-features = false, path = "./crates/wit-bindgen-go" } +wit-bindgen-wrpc-go = { version = "0.6", default-features = false, path = "./crates/wit-bindgen-go" } wit-bindgen-wrpc-rust = { version = "0.6.3", default-features = false, path = "./crates/wit-bindgen-rust" } wit-bindgen-wrpc-rust-macro = { version = "0.6.3", default-features = false, path = "./crates/wit-bindgen-rust-macro" } wit-component = { version = "0.215", default-features = false } diff --git a/crates/wit-bindgen-go/Cargo.toml b/crates/wit-bindgen-go/Cargo.toml index 9f0cf2b9..3fe9168a 100644 --- a/crates/wit-bindgen-go/Cargo.toml +++ b/crates/wit-bindgen-go/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "wit-bindgen-wrpc-go" -version = "0.5.0" +version = "0.6.0" description = """ Go bindings generator for wRPC """ diff --git a/crates/wit-bindgen-go/src/interface.rs b/crates/wit-bindgen-go/src/interface.rs index 0f05b9e4..aea814db 100644 --- a/crates/wit-bindgen-go/src/interface.rs +++ b/crates/wit-bindgen-go/src/interface.rs @@ -1,4 +1,3 @@ -use std::collections::BTreeMap; use std::fmt::Write as _; use std::mem; @@ -34,7 +33,18 @@ fn go_func_name(func: &Function) -> String { tail.to_upper_camel_case() ) } - FunctionKind::Method(..) => to_upper_camel_case(func.item_name()), + FunctionKind::Method(..) => { + let name = func + .name + .strip_prefix("[method]") + .expect("failed to strip `[method]` prefix"); + let (head, tail) = name.split_once('.').expect("failed to split on `.`"); + format!( + "{}_{}", + head.to_upper_camel_case(), + tail.to_upper_camel_case() + ) + } FunctionKind::Freestanding => to_upper_camel_case(&func.name), } } @@ -2541,106 +2551,53 @@ impl InterfaceGenerator<'_> { identifier: Identifier<'a>, funcs: impl Iterator, ) -> bool { - let mut traits = BTreeMap::new(); - let mut methods = BTreeMap::new(); + let mut methods = vec![]; let mut funcs_to_export = vec![]; - traits.insert(None, ("Handler".to_string(), vec![])); - - if let Identifier::Interface(id, ..) = identifier { - for (name, id) in &self.resolve.interfaces[id].types { - if let TypeDefKind::Resource = self.resolve.types[*id].kind { - let camel = to_upper_camel_case(name); - traits.insert(Some(*id), (camel, vec![])); - methods.insert(*id, vec![]); - } - } - } - for func in funcs { if self.gen.skip.contains(&func.name) { continue; } - let resource = if let FunctionKind::Method(id) = func.kind { - methods.get_mut(&id).unwrap().push(func); - Some(id) - } else { - funcs_to_export.push(func); - None - }; - let (_, handler_methods) = traits.get_mut(&resource).unwrap(); - + funcs_to_export.push(func); let prev = mem::take(&mut self.src); - self.print_docs_and_params(func, true); - if let FunctionKind::Constructor(id) = &func.kind { - let ty = &self.resolve.types[*id]; - let Some(name) = &ty.name else { - panic!("unnamed resources are not supported") - }; - let context = self.deps.context(); - let camel = name.to_upper_camel_case(); - let name = self.type_path_with_name(*id, format!("Handler{camel}")); - self.push_str(" ("); - self.push_str(&name); - self.push_str(", "); - self.push_str(context); - self.push_str(".Context, string, error)"); - } else { - self.src.push_str(" ("); - for ty in func.results.iter_types() { - self.print_opt_ty(ty, true); - self.src.push_str(", "); - } - self.push_str("error)"); + self.print_docs_and_params(func); + self.src.push_str(" ("); + for ty in func.results.iter_types() { + self.print_opt_ty(ty, true); + self.src.push_str(", "); } - self.push_str("\n"); + self.push_str("error)\n"); let trait_method = mem::replace(&mut self.src, prev); - handler_methods.push(trait_method); + methods.push(trait_method); } - // TODO: The method serving should be propagated into the `ServeInterface` - - let (name, interface_methods) = traits.remove(&None).unwrap(); - if interface_methods.is_empty() && traits.is_empty() { + if methods.is_empty() { return false; } - uwriteln!(self.src, "type {name} interface {{"); - for method in &interface_methods { + uwriteln!(self.src, "type Handler interface {{"); + for method in &methods { self.src.push_str(method); } - uwriteln!(self.src, "}}"); - - for (trait_name, handler_methods) in traits.values() { - uwriteln!(self.src, "type Handler{trait_name} interface {{"); - for method in handler_methods { - self.src.push_str(method); - } - uwriteln!(self.src, "}}"); - } - uwriteln!( self.src, - "func ServeInterface(s {wrpc}.Server, h Handler) (stop func() error, err error) {{", + " +}} + +func ServeInterface(s {wrpc}.Server, h Handler) (stop func() error, err error) {{ + stops := make([]func() error, 0, {}) + stop = func() error {{ + for _, stop := range stops {{ + if err := stop(); err != nil {{ + return err + }} + }} + return nil + }}", + funcs_to_export.len(), wrpc = self.deps.wrpc(), ); - uwriteln!( - self.src, - r#"stops := make([]func() error, 0, {})"#, - funcs_to_export.len() - ); - self.src.push_str( - r"stop = func() error { - for _, stop := range stops { - if err := stop(); err != nil { - return err - } - } - return nil - } -", - ); let instance = match identifier { Identifier::Interface(id, name) => { let interface = &self.resolve.interfaces[id]; @@ -2672,40 +2629,47 @@ impl InterfaceGenerator<'_> { for (i, func) in funcs_to_export.iter().enumerate() { let name = rpc_func_name(func); - let atomic = self.deps.atomic(); let bytes = self.deps.bytes(); let context = self.deps.context(); let fmt = self.deps.fmt(); let slog = self.deps.slog(); - let sync = self.deps.sync(); let wrpc = self.deps.wrpc(); - uwriteln!( + uwrite!( self.src, - r#"stop{i}, err := s.Serve("{instance}", "{name}", func(ctx {context}.Context, w {wrpc}.IndexWriter, r {wrpc}.IndexReadCloser) error {{"#, + r#" + stop{i}, err := s.Serve("{instance}", "{name}", func(ctx {context}.Context, w {wrpc}.IndexWriteCloser, r {wrpc}.IndexReadCloser) {{ + defer func() {{ + if err := w.Close(); err != nil {{ + {slog}.DebugContext(ctx, "failed to close writer", "instance", "{instance}", "name", "{name}", "err", err) + }} + }}()"#, ); for (i, (_, ty)) in func.params.iter().enumerate() { uwrite!( self.src, - r#"{slog}.DebugContext(ctx, "reading parameter", "i", {i}) + r#" + {slog}.DebugContext(ctx, "reading parameter", "i", {i}) p{i}, err := "# ); self.print_read_ty(ty, "r", &format!("[]uint32{{ {i} }}")); self.push_str("\n"); - uwriteln!( + uwrite!( self.src, - r#"if err != nil {{ return {fmt}.Errorf("failed to read parameter {i}: %w", err) }}"#, + r#" + if err != nil {{ + {slog}.WarnContext(ctx, "failed to read parameter", "i", {i}, "instance", "{instance}", "name", "{name}", "err", err) + if err := r.Close(); err != nil {{ + {slog}.ErrorContext(ctx, "failed to close reader", "instance", "{instance}", "name", "{name}", "err", err) + }} + return + }}"#, ); } uwriteln!( self.src, - r#"{slog}.DebugContext(ctx, "calling `{instance}.{name}` handler")"#, + r#" + {slog}.DebugContext(ctx, "calling `{instance}.{name}` handler")"#, ); - if let FunctionKind::Constructor(..) = func.kind { - self.push_str("ctx, cancel := "); - self.push_str(context); - self.push_str(".WithCancelCause(ctx)\n"); - self.push_str("res, ctx, "); - } for (i, _) in func.results.iter_types().enumerate() { uwrite!(self.src, "r{i}, "); } @@ -2719,252 +2683,66 @@ impl InterfaceGenerator<'_> { for (i, _) in func.params.iter().enumerate() { uwrite!(self.src, ", p{i}"); } - self.push_str(")\n"); - self.push_str("if err != nil {\n"); uwriteln!( self.src, - r#"return {fmt}.Errorf("failed to handle `{instance}.{name}` invocation: %w", err)"#, - ); - self.push_str("}\n"); - - if let FunctionKind::Constructor(id) = func.kind { - self.push_str("rx := string(r0)\n"); - uwriteln!( - self.src, - r#"stops := make([]func() error, 0, {})"#, - methods.len() + 1 - ); - self.src.push_str( - r"stop := func() error { - for _, stop := range stops { - if err := stop(); err != nil { - return err - } - } - return nil - } -", - ); - for (i, func) in methods[&id].iter().enumerate() { - let name = rpc_func_name(func); - uwriteln!( - self.src, - r#"stop{i}, err := s.Serve(rx, "{name}", func(ctx {context}.Context, w {wrpc}.IndexWriter, r {wrpc}.IndexReadCloser) error {{"#, - ); - for (i, (_, ty)) in func.params.iter().enumerate().skip(1) { - uwrite!( - self.src, - r#"{slog}.DebugContext(ctx, "reading method parameter", "i", {i}) - p{i}, err := "# - ); - self.print_read_ty(ty, "r", &format!("[]uint32{{ {i} }}")); - self.push_str("\n"); - uwriteln!( - self.src, - r#"if err != nil {{ return {fmt}.Errorf("failed to read method parameter {i}: %w", err) }}"#, - ); - } - uwriteln!( - self.src, - r#"{slog}.DebugContext(ctx, "calling `{name}` handler", "resource", rx)"#, - ); - for (i, _) in func.results.iter_types().enumerate() { - uwrite!(self.src, "r{i}, "); - } - self.push_str("err "); - if func.results.len() > 0 { - self.push_str(":"); - } - self.push_str("= res."); - self.push_str(&go_func_name(func)); - self.push_str("(ctx"); - for (i, _) in func.params.iter().enumerate().skip(1) { - uwrite!(self.src, ", p{i}"); - } - self.push_str(")\n"); - self.push_str("if err != nil {\n"); - uwriteln!( - self.src, - r#"return {fmt}.Errorf("failed to handle `%s.{name}` invocation: %w", rx, err)"#, - ); - self.push_str("}\n"); - uwriteln!( - self.src, - r" - var buf {bytes}.Buffer - writes := make(map[uint32]func({wrpc}.IndexWriter) error, {})", - func.results.len() - ); - for (i, ty) in func.results.iter_types().enumerate() { - uwrite!(self.src, "write{i}, err :="); - self.print_write_ty(ty, &format!("r{i}"), "&buf"); - self.push_str("\n"); - self.push_str("if err != nil {\n"); - uwriteln!( - self.src, - r#"return {fmt}.Errorf("failed to write result value {i}: %w", err)"#, - ); - self.src.push_str("}\n"); - uwriteln!( - self.src, - r#"if write{i} != nil {{ - writes[{i}] = write{i} - }}"#, - ); - } - uwrite!( - self.src, - r#"{slog}.DebugContext(ctx, "transmitting `{instance}.{name}` result") - _, err = w.Write(buf.Bytes()) - if err != nil {{ - return {fmt}.Errorf("failed to write result: %w", err) - }} - if len(writes) > 0 {{ - var wg {sync}.WaitGroup - var wgErr {atomic}.Value - for index, write := range writes {{ - wg.Add(1) - w, err := w.Index(index) - if err != nil {{ - return {fmt}.Errorf("failed to index writer: %w", err) - }} - write := write - go func() {{ - defer wg.Done() - if err := write(w); err != nil {{ - wgErr.Store(err) - }} - }}() - }} - wg.Wait() - err := wgErr.Load() - if err == nil {{ - return nil - }} - return err.(error) - }} - return nil - }}, "#, - ); - for (i, (_, ty)) in func.params.iter().enumerate() { - let (nested, fut) = async_paths_ty(self.resolve, ty); - for path in nested { - self.push_str(wrpc); - self.push_str(".NewSubscribePath().Index("); - uwrite!(self.src, "{i})"); - for p in path { - if let Some(p) = p { - uwrite!(self.src, ".Index({p})"); - } else { - self.push_str(".Wildcard()"); - } - } - self.push_str(", "); - } - if fut { - uwrite!(self.src, "{wrpc}.NewSubscribePath().Index({i}), "); - } - } - uwriteln!( - self.src, - r#") - if err != nil {{ - err = {fmt}.Errorf("failed to serve `%s.{name}`: %w", rx, err) - if sErr := stop(); sErr != nil {{ - {slog}.ErrorContext(ctx, "failed to stop serving resource methods", "err", err) - }} - cancel(err) - return err - }} - stops = append(stops, stop{i})"#, - ); - } - uwriteln!( - self.src, - r#"stopDrop, err := s.Serve(rx, "drop", func(_ {context}.Context, w {wrpc}.IndexWriter, _ {wrpc}.IndexReadCloser) error {{ - defer cancel(nil) - _, err := w.Write(nil) - if err != nil {{ - return {fmt}.Errorf("failed to write empty result: %w", err) - }} - return nil - }}) - if err != nil {{ - err = {fmt}.Errorf("failed to serve `%s.drop`: %w", rx, err) - if sErr := stop(); sErr != nil {{ - {slog}.ErrorContext(ctx, "failed to stop serving resource methods", "err", sErr) - }} - cancel(err) - return err - }} - stops = append(stops, stopDrop) - go func() {{ - <-ctx.Done() - if sErr := stop(); sErr != nil {{ - {slog}.ErrorContext(ctx, "failed to stop serving resource methods", "err", sErr) - }} - cancel(ctx.Err()) - }}()"#, - ); - } + r#") + if cErr := r.Close(); cErr != nil {{ + {slog}.ErrorContext(ctx, "failed to close reader", "instance", "{instance}", "name", "{name}", "err", err) + }} + if err != nil {{ + {slog}.WarnContext(ctx, "failed to handle invocation", "instance", "{instance}", "name", "{name}", "err", err) + return + }} - uwriteln!( - self.src, - r" - var buf {bytes}.Buffer - writes := make(map[uint32]func({wrpc}.IndexWriter) error, {})", + var buf {bytes}.Buffer + writes := make(map[uint32]func({wrpc}.IndexWriter) error, {})"#, func.results.len() ); for (i, ty) in func.results.iter_types().enumerate() { - uwrite!(self.src, "write{i}, err :="); - self.print_write_ty(ty, &format!("r{i}"), "&buf"); - self.push_str("\n"); - self.push_str("if err != nil {\n"); - uwriteln!( + uwrite!( self.src, - r#"return {fmt}.Errorf("failed to write result value {i}: %w", err)"#, + r#" + write{i}, err := "# ); - self.src.push_str("}\n"); - uwriteln!( + self.print_write_ty(ty, &format!("r{i}"), "&buf"); + uwrite!( self.src, - r#"if write{i} != nil {{ - writes[{i}] = write{i} - }}"#, + r#" + if err != nil {{ + {slog}.WarnContext(ctx, "failed to write result value", "i", {i}, "{instance}", "name", "{name}", "err", err) + return + }} + if write{i} != nil {{ + writes[{i}] = write{i} + }}"#, ); } uwrite!( self.src, - r#"{slog}.DebugContext(ctx, "transmitting `{instance}.{name}` result") - _, err = w.Write(buf.Bytes()) + r#" + {slog}.DebugContext(ctx, "transmitting `{instance}.{name}` result") + _, err = w.Write(buf.Bytes()) + if err != nil {{ + {slog}.WarnContext(ctx, "failed to write result", "{instance}", "name", "{name}", "err", err) + return + }} + if len(writes) > 0 {{ + for index, write := range writes {{ + w, err := w.Index(index) if err != nil {{ - return {fmt}.Errorf("failed to write result: %w", err) + {slog}.ErrorContext(ctx, "failed to index writer", "index", index, "{instance}", "name", "{name}", "err", err) + return }} - if len(writes) > 0 {{ - var wg {sync}.WaitGroup - var wgErr {atomic}.Value - for index, write := range writes {{ - wg.Add(1) - w, err := w.Index(index) - if err != nil {{ - return {fmt}.Errorf("failed to index writer: %w", err) - }} - write := write - go func() {{ - defer wg.Done() - if err := write(w); err != nil {{ - wgErr.Store(err) - }} - }}() - }} - wg.Wait() - err := wgErr.Load() - if err == nil {{ - return nil + index := index + write := write + go func() {{ + if err := write(w); err != nil {{ + {slog}.WarnContext(ctx, "failed to write nested result value", "index", index, "{instance}", "name", "{name}", "err", err) }} - return err.(error) - }} - return nil - }}, "#, + }}() + }} + }} + }}, "#, ); for (i, (_, ty)) in func.params.iter().enumerate() { let (nested, fut) = async_paths_ty(self.resolve, ty); @@ -3013,7 +2791,7 @@ impl InterfaceGenerator<'_> { let fmt = self.deps.fmt(); let wrpc = self.deps.wrpc(); - self.print_docs_and_params(func, false); + self.print_docs_and_params(func); if let FunctionKind::Constructor(id) = &func.kind { self.push_str(" (r0__ "); self.print_own(*id); @@ -3027,25 +2805,17 @@ impl InterfaceGenerator<'_> { } } self.push_str("close__ func() error, err__ error) "); - self.src.push_str("{\n"); - self.src.push_str("if err__ = wrpc__.Invoke(ctx__, "); - match func.kind { - FunctionKind::Freestanding - | FunctionKind::Static(..) - | FunctionKind::Constructor(..) => { - uwrite!(self.src, r#""{instance}", ""#); - } - FunctionKind::Method(..) => { - self.src.push_str("string(self), \""); - } - } + uwrite!( + self.src, + r#"{{ + if err__ = wrpc__.Invoke(ctx__, "{instance}", ""# + ); self.src.push_str(rpc_func_name(func)); - self.src.push_str("\", "); uwriteln!( self.src, - "func(w__ {wrpc}.IndexWriter, r__ {wrpc}.IndexReadCloser) error {{" + r#"", func(w__ {wrpc}.IndexWriteCloser, r__ {wrpc}.IndexReadCloser) error {{ + close__ = r__.Close"# ); - self.push_str("close__ = r__.Close\n"); if !func.params.is_empty() { let bytes = self.deps.bytes(); uwriteln!( @@ -3213,22 +2983,14 @@ impl InterfaceGenerator<'_> { // } } - fn print_docs_and_params(&mut self, func: &Function, interface: bool) { + fn print_docs_and_params(&mut self, func: &Function) { self.godoc(&func.docs); self.godoc_params(&func.params, "Parameters"); // TODO: re-add this when docs are back // self.godoc_params(&func.results, "Return"); - if !interface { + if self.in_import { self.push_str("func "); - if let FunctionKind::Method(..) = func.kind { - let name = func - .name - .strip_prefix("[method]") - .expect("failed to strip `[method]` prefix"); - let (head, _) = name.split_once('.').expect("failed to split on `.`"); - self.push_str(&format!("{}_", head.to_upper_camel_case())); - } } if self.in_import && matches!(func.kind, FunctionKind::Constructor(..)) { self.push_str("New"); @@ -3240,12 +3002,7 @@ impl InterfaceGenerator<'_> { let wrpc = self.deps.wrpc(); uwrite!(self.src, "wrpc__ {wrpc}.Invoker, "); } - for (i, (name, param)) in func.params.iter().enumerate() { - if let FunctionKind::Method(..) = &func.kind { - if i == 0 && interface { - continue; - } - } + for (name, param) in &func.params { self.push_str(&to_go_ident(name)); self.push_str(" "); self.print_opt_ty(param, true); diff --git a/examples/go/complex-server/bindings/exports/wrpc_examples/complex/resources/bindings.wrpc.go b/examples/go/complex-server/bindings/exports/wrpc_examples/complex/resources/bindings.wrpc.go index 302a7959..a6a40dff 100644 --- a/examples/go/complex-server/bindings/exports/wrpc_examples/complex/resources/bindings.wrpc.go +++ b/examples/go/complex-server/bindings/exports/wrpc_examples/complex/resources/bindings.wrpc.go @@ -1,4 +1,4 @@ -// Generated by `wit-bindgen-wrpc-go` 0.1.1. DO NOT EDIT! +// Generated by `wit-bindgen-wrpc-go` 0.6.0. DO NOT EDIT! package resources import ( @@ -11,25 +11,19 @@ import ( io "io" slog "log/slog" math "math" - sync "sync" - atomic "sync/atomic" utf8 "unicode/utf8" ) +type Foo interface{} type Handler interface { - Foo(ctx__ context.Context) (HandlerFoo, context.Context, string, error) + Foo(ctx__ context.Context) (wrpc.Own[Foo], error) Foo_Foo(ctx__ context.Context, v wrpc.Own[Foo]) (string, error) + Foo_Bar(ctx__ context.Context, self wrpc.Borrow[Foo]) (string, error) Bar(ctx__ context.Context, v wrpc.Borrow[Foo]) (string, error) } -type HandlerFoo interface { - Bar(ctx__ context.Context) (string, error) -} -type Foo interface { - Bar(ctx__ context.Context) (string, func() error, error) -} func ServeInterface(s wrpc.Server, h Handler) (stop func() error, err error) { - stops := make([]func() error, 0, 3) + stops := make([]func() error, 0, 4) stop = func() error { for _, stop := range stops { if err := stop(); err != nil { @@ -38,126 +32,26 @@ func ServeInterface(s wrpc.Server, h Handler) (stop func() error, err error) { } return nil } - stop0, err := s.Serve("wrpc-examples:complex/resources", "foo", func(ctx context.Context, w wrpc.IndexWriter, r wrpc.IndexReadCloser) error { - slog.DebugContext(ctx, "calling `wrpc-examples:complex/resources.foo` handler") - ctx, cancel := context.WithCancelCause(ctx) - res, ctx, r0, err := h.Foo(ctx) - if err != nil { - return fmt.Errorf("failed to handle `wrpc-examples:complex/resources.foo` invocation: %w", err) - } - rx := string(r0) - stops := make([]func() error, 0, 2) - stop := func() error { - for _, stop := range stops { - if err := stop(); err != nil { - return err - } - } - return nil - } - stop0, err := s.Serve(rx, "bar", func(ctx context.Context, w wrpc.IndexWriter, r wrpc.IndexReadCloser) error { - slog.DebugContext(ctx, "calling `bar` handler", "resource", rx) - r0, err := res.Bar(ctx) - if err != nil { - return fmt.Errorf("failed to handle `%s.bar` invocation: %w", rx, err) - } - var buf bytes.Buffer - writes := make(map[uint32]func(wrpc.IndexWriter) error, 1) - write0, err := (func(wrpc.IndexWriter) error)(nil), func(v string, w io.Writer) (err error) { - n := len(v) - if n > math.MaxUint32 { - return fmt.Errorf("string byte length of %d overflows a 32-bit integer", n) - } - if err = func(v int, w io.Writer) error { - b := make([]byte, binary.MaxVarintLen32) - i := binary.PutUvarint(b, uint64(v)) - slog.Debug("writing string byte length", "len", n) - _, err = w.Write(b[:i]) - return err - }(n, w); err != nil { - return fmt.Errorf("failed to write string byte length of %d: %w", n, err) - } - slog.Debug("writing string bytes") - _, err = w.Write([]byte(v)) - if err != nil { - return fmt.Errorf("failed to write string bytes: %w", err) - } - return nil - }(r0, &buf) - if err != nil { - return fmt.Errorf("failed to write result value 0: %w", err) - } - if write0 != nil { - writes[0] = write0 + stop0, err := s.Serve("wrpc-examples:complex/resources", "foo", func(ctx context.Context, w wrpc.IndexWriteCloser, r wrpc.IndexReadCloser) { + defer func() { + if err := w.Close(); err != nil { + slog.DebugContext(ctx, "failed to close writer", "instance", "wrpc-examples:complex/resources", "name", "foo", "err", err) } - slog.DebugContext(ctx, "transmitting `wrpc-examples:complex/resources.bar` result") - _, err = w.Write(buf.Bytes()) - if err != nil { - return fmt.Errorf("failed to write result: %w", err) - } - if len(writes) > 0 { - var wg sync.WaitGroup - var wgErr atomic.Value - for index, write := range writes { - wg.Add(1) - w, err := w.Index(index) - if err != nil { - return fmt.Errorf("failed to index writer: %w", err) - } - write := write - go func() { - defer wg.Done() - if err := write(w); err != nil { - wgErr.Store(err) - } - }() - } - wg.Wait() - err := wgErr.Load() - if err == nil { - return nil - } - return err.(error) - } - return nil - }) - if err != nil { - err = fmt.Errorf("failed to serve `%s.bar`: %w", rx, err) - if sErr := stop(); sErr != nil { - slog.ErrorContext(ctx, "failed to stop serving resource methods", "err", err) - } - cancel(err) - return err + }() + slog.DebugContext(ctx, "calling `wrpc-examples:complex/resources.foo` handler") + r0, err := h.Foo(ctx) + if cErr := r.Close(); cErr != nil { + slog.ErrorContext(ctx, "failed to close reader", "instance", "wrpc-examples:complex/resources", "name", "foo", "err", err) } - stops = append(stops, stop0) - stopDrop, err := s.Serve(rx, "drop", func(_ context.Context, w wrpc.IndexWriter, _ wrpc.IndexReadCloser) error { - defer cancel(nil) - _, err := w.Write(nil) - if err != nil { - return fmt.Errorf("failed to write empty result: %w", err) - } - return nil - }) if err != nil { - err = fmt.Errorf("failed to serve `%s.drop`: %w", rx, err) - if sErr := stop(); sErr != nil { - slog.ErrorContext(ctx, "failed to stop serving resource methods", "err", sErr) - } - cancel(err) - return err + slog.WarnContext(ctx, "failed to handle invocation", "instance", "wrpc-examples:complex/resources", "name", "foo", "err", err) + return } - stops = append(stops, stopDrop) - go func() { - <-ctx.Done() - if sErr := stop(); sErr != nil { - slog.ErrorContext(ctx, "failed to stop serving resource methods", "err", sErr) - } - cancel(ctx.Err()) - }() var buf bytes.Buffer writes := make(map[uint32]func(wrpc.IndexWriter) error, 1) + write0, err := (func(wrpc.IndexWriter) error)(nil), func(v string, w io.Writer) (err error) { n := len(v) if n > math.MaxUint32 { @@ -180,7 +74,8 @@ func ServeInterface(s wrpc.Server, h Handler) (stop func() error, err error) { return nil }(string(r0), &buf) if err != nil { - return fmt.Errorf("failed to write result value 0: %w", err) + slog.WarnContext(ctx, "failed to write result value", "i", 0, "wrpc-examples:complex/resources", "name", "foo", "err", err) + return } if write0 != nil { writes[0] = write0 @@ -188,39 +83,37 @@ func ServeInterface(s wrpc.Server, h Handler) (stop func() error, err error) { slog.DebugContext(ctx, "transmitting `wrpc-examples:complex/resources.foo` result") _, err = w.Write(buf.Bytes()) if err != nil { - return fmt.Errorf("failed to write result: %w", err) + slog.WarnContext(ctx, "failed to write result", "wrpc-examples:complex/resources", "name", "foo", "err", err) + return } if len(writes) > 0 { - var wg sync.WaitGroup - var wgErr atomic.Value for index, write := range writes { - wg.Add(1) w, err := w.Index(index) if err != nil { - return fmt.Errorf("failed to index writer: %w", err) + slog.ErrorContext(ctx, "failed to index writer", "index", index, "wrpc-examples:complex/resources", "name", "foo", "err", err) + return } + index := index write := write go func() { - defer wg.Done() if err := write(w); err != nil { - wgErr.Store(err) + slog.WarnContext(ctx, "failed to write nested result value", "index", index, "wrpc-examples:complex/resources", "name", "foo", "err", err) } }() } - wg.Wait() - err := wgErr.Load() - if err == nil { - return nil - } - return err.(error) } - return nil }) if err != nil { return nil, fmt.Errorf("failed to serve `wrpc-examples:complex/resources.foo`: %w", err) } stops = append(stops, stop0) - stop1, err := s.Serve("wrpc-examples:complex/resources", "foo.foo", func(ctx context.Context, w wrpc.IndexWriter, r wrpc.IndexReadCloser) error { + + stop1, err := s.Serve("wrpc-examples:complex/resources", "foo.foo", func(ctx context.Context, w wrpc.IndexWriteCloser, r wrpc.IndexReadCloser) { + defer func() { + if err := w.Close(); err != nil { + slog.DebugContext(ctx, "failed to close writer", "instance", "wrpc-examples:complex/resources", "name", "foo.foo", "err", err) + } + }() slog.DebugContext(ctx, "reading parameter", "i", 0) p0, err := func(r interface { io.ByteReader @@ -258,17 +151,27 @@ func ServeInterface(s wrpc.Server, h Handler) (stop func() error, err error) { } return "", errors.New("owned resource ID length overflows a 32-bit integer") }(r) + if err != nil { - return fmt.Errorf("failed to read parameter 0: %w", err) + slog.WarnContext(ctx, "failed to read parameter", "i", 0, "instance", "wrpc-examples:complex/resources", "name", "foo.foo", "err", err) + if err := r.Close(); err != nil { + slog.ErrorContext(ctx, "failed to close reader", "instance", "wrpc-examples:complex/resources", "name", "foo.foo", "err", err) + } + return } slog.DebugContext(ctx, "calling `wrpc-examples:complex/resources.foo.foo` handler") r0, err := h.Foo_Foo(ctx, p0) + if cErr := r.Close(); cErr != nil { + slog.ErrorContext(ctx, "failed to close reader", "instance", "wrpc-examples:complex/resources", "name", "foo.foo", "err", err) + } if err != nil { - return fmt.Errorf("failed to handle `wrpc-examples:complex/resources.foo.foo` invocation: %w", err) + slog.WarnContext(ctx, "failed to handle invocation", "instance", "wrpc-examples:complex/resources", "name", "foo.foo", "err", err) + return } var buf bytes.Buffer writes := make(map[uint32]func(wrpc.IndexWriter) error, 1) + write0, err := (func(wrpc.IndexWriter) error)(nil), func(v string, w io.Writer) (err error) { n := len(v) if n > math.MaxUint32 { @@ -291,7 +194,8 @@ func ServeInterface(s wrpc.Server, h Handler) (stop func() error, err error) { return nil }(r0, &buf) if err != nil { - return fmt.Errorf("failed to write result value 0: %w", err) + slog.WarnContext(ctx, "failed to write result value", "i", 0, "wrpc-examples:complex/resources", "name", "foo.foo", "err", err) + return } if write0 != nil { writes[0] = write0 @@ -299,39 +203,37 @@ func ServeInterface(s wrpc.Server, h Handler) (stop func() error, err error) { slog.DebugContext(ctx, "transmitting `wrpc-examples:complex/resources.foo.foo` result") _, err = w.Write(buf.Bytes()) if err != nil { - return fmt.Errorf("failed to write result: %w", err) + slog.WarnContext(ctx, "failed to write result", "wrpc-examples:complex/resources", "name", "foo.foo", "err", err) + return } if len(writes) > 0 { - var wg sync.WaitGroup - var wgErr atomic.Value for index, write := range writes { - wg.Add(1) w, err := w.Index(index) if err != nil { - return fmt.Errorf("failed to index writer: %w", err) + slog.ErrorContext(ctx, "failed to index writer", "index", index, "wrpc-examples:complex/resources", "name", "foo.foo", "err", err) + return } + index := index write := write go func() { - defer wg.Done() if err := write(w); err != nil { - wgErr.Store(err) + slog.WarnContext(ctx, "failed to write nested result value", "index", index, "wrpc-examples:complex/resources", "name", "foo.foo", "err", err) } }() } - wg.Wait() - err := wgErr.Load() - if err == nil { - return nil - } - return err.(error) } - return nil }) if err != nil { return nil, fmt.Errorf("failed to serve `wrpc-examples:complex/resources.foo.foo`: %w", err) } stops = append(stops, stop1) - stop2, err := s.Serve("wrpc-examples:complex/resources", "bar", func(ctx context.Context, w wrpc.IndexWriter, r wrpc.IndexReadCloser) error { + + stop2, err := s.Serve("wrpc-examples:complex/resources", "foo.bar", func(ctx context.Context, w wrpc.IndexWriteCloser, r wrpc.IndexReadCloser) { + defer func() { + if err := w.Close(); err != nil { + slog.DebugContext(ctx, "failed to close writer", "instance", "wrpc-examples:complex/resources", "name", "foo.bar", "err", err) + } + }() slog.DebugContext(ctx, "reading parameter", "i", 0) p0, err := func(r interface { io.ByteReader @@ -369,17 +271,147 @@ func ServeInterface(s wrpc.Server, h Handler) (stop func() error, err error) { } return "", errors.New("borrowed resource ID length overflows a 32-bit integer") }(r) + if err != nil { - return fmt.Errorf("failed to read parameter 0: %w", err) + slog.WarnContext(ctx, "failed to read parameter", "i", 0, "instance", "wrpc-examples:complex/resources", "name", "foo.bar", "err", err) + if err := r.Close(); err != nil { + slog.ErrorContext(ctx, "failed to close reader", "instance", "wrpc-examples:complex/resources", "name", "foo.bar", "err", err) + } + return + } + slog.DebugContext(ctx, "calling `wrpc-examples:complex/resources.foo.bar` handler") + r0, err := h.Foo_Bar(ctx, p0) + if cErr := r.Close(); cErr != nil { + slog.ErrorContext(ctx, "failed to close reader", "instance", "wrpc-examples:complex/resources", "name", "foo.bar", "err", err) + } + if err != nil { + slog.WarnContext(ctx, "failed to handle invocation", "instance", "wrpc-examples:complex/resources", "name", "foo.bar", "err", err) + return + } + + var buf bytes.Buffer + writes := make(map[uint32]func(wrpc.IndexWriter) error, 1) + + write0, err := (func(wrpc.IndexWriter) error)(nil), func(v string, w io.Writer) (err error) { + n := len(v) + if n > math.MaxUint32 { + return fmt.Errorf("string byte length of %d overflows a 32-bit integer", n) + } + if err = func(v int, w io.Writer) error { + b := make([]byte, binary.MaxVarintLen32) + i := binary.PutUvarint(b, uint64(v)) + slog.Debug("writing string byte length", "len", n) + _, err = w.Write(b[:i]) + return err + }(n, w); err != nil { + return fmt.Errorf("failed to write string byte length of %d: %w", n, err) + } + slog.Debug("writing string bytes") + _, err = w.Write([]byte(v)) + if err != nil { + return fmt.Errorf("failed to write string bytes: %w", err) + } + return nil + }(r0, &buf) + if err != nil { + slog.WarnContext(ctx, "failed to write result value", "i", 0, "wrpc-examples:complex/resources", "name", "foo.bar", "err", err) + return + } + if write0 != nil { + writes[0] = write0 + } + slog.DebugContext(ctx, "transmitting `wrpc-examples:complex/resources.foo.bar` result") + _, err = w.Write(buf.Bytes()) + if err != nil { + slog.WarnContext(ctx, "failed to write result", "wrpc-examples:complex/resources", "name", "foo.bar", "err", err) + return + } + if len(writes) > 0 { + for index, write := range writes { + w, err := w.Index(index) + if err != nil { + slog.ErrorContext(ctx, "failed to index writer", "index", index, "wrpc-examples:complex/resources", "name", "foo.bar", "err", err) + return + } + index := index + write := write + go func() { + if err := write(w); err != nil { + slog.WarnContext(ctx, "failed to write nested result value", "index", index, "wrpc-examples:complex/resources", "name", "foo.bar", "err", err) + } + }() + } + } + }) + if err != nil { + return nil, fmt.Errorf("failed to serve `wrpc-examples:complex/resources.foo.bar`: %w", err) + } + stops = append(stops, stop2) + + stop3, err := s.Serve("wrpc-examples:complex/resources", "bar", func(ctx context.Context, w wrpc.IndexWriteCloser, r wrpc.IndexReadCloser) { + defer func() { + if err := w.Close(); err != nil { + slog.DebugContext(ctx, "failed to close writer", "instance", "wrpc-examples:complex/resources", "name", "bar", "err", err) + } + }() + slog.DebugContext(ctx, "reading parameter", "i", 0) + p0, err := func(r interface { + io.ByteReader + io.Reader + }) (wrpc.Borrow[Foo], error) { + var x uint32 + var s uint + for i := 0; i < 5; i++ { + slog.Debug("reading borrowed resource ID length byte", "i", i) + b, err := r.ReadByte() + if err != nil { + if i > 0 && err == io.EOF { + err = io.ErrUnexpectedEOF + } + return "", fmt.Errorf("failed to read borrowed resource ID length byte: %w", err) + } + if b < 0x80 { + if i == 4 && b > 1 { + return "", errors.New("borrowed resource ID length overflows a 32-bit integer") + } + x = x | uint32(b)< math.MaxUint32 { @@ -402,7 +434,8 @@ func ServeInterface(s wrpc.Server, h Handler) (stop func() error, err error) { return nil }(r0, &buf) if err != nil { - return fmt.Errorf("failed to write result value 0: %w", err) + slog.WarnContext(ctx, "failed to write result value", "i", 0, "wrpc-examples:complex/resources", "name", "bar", "err", err) + return } if write0 != nil { writes[0] = write0 @@ -410,37 +443,29 @@ func ServeInterface(s wrpc.Server, h Handler) (stop func() error, err error) { slog.DebugContext(ctx, "transmitting `wrpc-examples:complex/resources.bar` result") _, err = w.Write(buf.Bytes()) if err != nil { - return fmt.Errorf("failed to write result: %w", err) + slog.WarnContext(ctx, "failed to write result", "wrpc-examples:complex/resources", "name", "bar", "err", err) + return } if len(writes) > 0 { - var wg sync.WaitGroup - var wgErr atomic.Value for index, write := range writes { - wg.Add(1) w, err := w.Index(index) if err != nil { - return fmt.Errorf("failed to index writer: %w", err) + slog.ErrorContext(ctx, "failed to index writer", "index", index, "wrpc-examples:complex/resources", "name", "bar", "err", err) + return } + index := index write := write go func() { - defer wg.Done() if err := write(w); err != nil { - wgErr.Store(err) + slog.WarnContext(ctx, "failed to write nested result value", "index", index, "wrpc-examples:complex/resources", "name", "bar", "err", err) } }() } - wg.Wait() - err := wgErr.Load() - if err == nil { - return nil - } - return err.(error) } - return nil }) if err != nil { return nil, fmt.Errorf("failed to serve `wrpc-examples:complex/resources.bar`: %w", err) } - stops = append(stops, stop2) + stops = append(stops, stop3) return stop, nil } diff --git a/examples/go/complex-server/bindings/server.wrpc.go b/examples/go/complex-server/bindings/server.wrpc.go index a53c63b7..d91fe622 100644 --- a/examples/go/complex-server/bindings/server.wrpc.go +++ b/examples/go/complex-server/bindings/server.wrpc.go @@ -1,4 +1,4 @@ -// Generated by `wit-bindgen-wrpc-go` 0.1.1. DO NOT EDIT! +// Generated by `wit-bindgen-wrpc-go` 0.6.0. DO NOT EDIT! // server package contains wRPC bindings for `server` world package server diff --git a/examples/go/complex-server/cmd/complex-server-nats/main.go b/examples/go/complex-server/cmd/complex-server-nats/main.go index ab28d526..46bd0faa 100644 --- a/examples/go/complex-server/cmd/complex-server-nats/main.go +++ b/examples/go/complex-server/cmd/complex-server-nats/main.go @@ -31,19 +31,18 @@ type ResourcesHandler struct { sync.Map } -func (h *ResourcesHandler) Foo(ctx context.Context) (resources.HandlerFoo, context.Context, string, error) { +func (h *ResourcesHandler) Foo(ctx context.Context) (wrpc.Own[resources.Foo], error) { id, err := uuid.NewV7() if err != nil { - return nil, nil, "", fmt.Errorf("failed to generate UUIDv7: %w", err) + return "", fmt.Errorf("failed to generate UUIDv7: %w", err) } ctx, cancel := context.WithCancel(ctx) - v := Foo{id: id, cancel: cancel} - h.Store(id.String(), v) + h.Store(id.String(), Foo{id: id, cancel: cancel}) go func() { <-ctx.Done() h.Delete(id) }() - return v, ctx, id.String(), nil + return wrpc.Own[resources.Foo](id.String()), nil } func (h *ResourcesHandler) Foo_Foo(ctx context.Context, v wrpc.Own[resources.Foo]) (string, error) { @@ -56,6 +55,14 @@ func (h *ResourcesHandler) Foo_Foo(ctx context.Context, v wrpc.Own[resources.Foo return "foo", nil } +func (h *ResourcesHandler) Foo_Bar(ctx context.Context, v wrpc.Borrow[resources.Foo]) (string, error) { + stored, ok := h.Load(string(v)) + if !ok { + return "", fmt.Errorf("unknown resource ID `%s`", string(v)) + } + return stored.(Foo).Bar(ctx) +} + func (h *ResourcesHandler) Bar(ctx context.Context, v wrpc.Borrow[resources.Foo]) (string, error) { stored, ok := h.Load(string(v)) if !ok { diff --git a/examples/go/hello-client/bindings/client.wrpc.go b/examples/go/hello-client/bindings/client.wrpc.go index e1f9257a..54c69d59 100644 --- a/examples/go/hello-client/bindings/client.wrpc.go +++ b/examples/go/hello-client/bindings/client.wrpc.go @@ -1,3 +1,3 @@ -// Generated by `wit-bindgen-wrpc-go` 0.5.0. DO NOT EDIT! +// Generated by `wit-bindgen-wrpc-go` 0.6.0. DO NOT EDIT! // client package contains wRPC bindings for `client` world package client diff --git a/examples/go/hello-client/bindings/wrpc_examples/hello/handler/bindings.wrpc.go b/examples/go/hello-client/bindings/wrpc_examples/hello/handler/bindings.wrpc.go index 2ac02377..4e9e9556 100644 --- a/examples/go/hello-client/bindings/wrpc_examples/hello/handler/bindings.wrpc.go +++ b/examples/go/hello-client/bindings/wrpc_examples/hello/handler/bindings.wrpc.go @@ -1,4 +1,4 @@ -// Generated by `wit-bindgen-wrpc-go` 0.5.0. DO NOT EDIT! +// Generated by `wit-bindgen-wrpc-go` 0.6.0. DO NOT EDIT! package handler import ( @@ -12,7 +12,7 @@ import ( ) func Hello(ctx__ context.Context, wrpc__ wrpc.Invoker) (r0__ string, close__ func() error, err__ error) { - if err__ = wrpc__.Invoke(ctx__, "wrpc-examples:hello/handler", "hello", func(w__ wrpc.IndexWriter, r__ wrpc.IndexReadCloser) error { + if err__ = wrpc__.Invoke(ctx__, "wrpc-examples:hello/handler", "hello", func(w__ wrpc.IndexWriteCloser, r__ wrpc.IndexReadCloser) error { close__ = r__.Close _, err__ = w__.Write(nil) if err__ != nil { diff --git a/examples/go/hello-server/bindings/exports/wrpc_examples/hello/handler/bindings.wrpc.go b/examples/go/hello-server/bindings/exports/wrpc_examples/hello/handler/bindings.wrpc.go index 19990684..33bfb892 100644 --- a/examples/go/hello-server/bindings/exports/wrpc_examples/hello/handler/bindings.wrpc.go +++ b/examples/go/hello-server/bindings/exports/wrpc_examples/hello/handler/bindings.wrpc.go @@ -1,4 +1,4 @@ -// Generated by `wit-bindgen-wrpc-go` 0.5.0. DO NOT EDIT! +// Generated by `wit-bindgen-wrpc-go` 0.6.0. DO NOT EDIT! package handler import ( @@ -10,8 +10,6 @@ import ( io "io" slog "log/slog" math "math" - sync "sync" - atomic "sync/atomic" ) type Handler interface { @@ -28,15 +26,26 @@ func ServeInterface(s wrpc.Server, h Handler) (stop func() error, err error) { } return nil } - stop0, err := s.Serve("wrpc-examples:hello/handler", "hello", func(ctx context.Context, w wrpc.IndexWriter, r wrpc.IndexReadCloser) error { + + stop0, err := s.Serve("wrpc-examples:hello/handler", "hello", func(ctx context.Context, w wrpc.IndexWriteCloser, r wrpc.IndexReadCloser) { + defer func() { + if err := w.Close(); err != nil { + slog.DebugContext(ctx, "failed to close writer", "instance", "wrpc-examples:hello/handler", "name", "hello", "err", err) + } + }() slog.DebugContext(ctx, "calling `wrpc-examples:hello/handler.hello` handler") r0, err := h.Hello(ctx) + if cErr := r.Close(); cErr != nil { + slog.ErrorContext(ctx, "failed to close reader", "instance", "wrpc-examples:hello/handler", "name", "hello", "err", err) + } if err != nil { - return fmt.Errorf("failed to handle `wrpc-examples:hello/handler.hello` invocation: %w", err) + slog.WarnContext(ctx, "failed to handle invocation", "instance", "wrpc-examples:hello/handler", "name", "hello", "err", err) + return } var buf bytes.Buffer writes := make(map[uint32]func(wrpc.IndexWriter) error, 1) + write0, err := (func(wrpc.IndexWriter) error)(nil), func(v string, w io.Writer) (err error) { n := len(v) if n > math.MaxUint32 { @@ -59,7 +68,8 @@ func ServeInterface(s wrpc.Server, h Handler) (stop func() error, err error) { return nil }(r0, &buf) if err != nil { - return fmt.Errorf("failed to write result value 0: %w", err) + slog.WarnContext(ctx, "failed to write result value", "i", 0, "wrpc-examples:hello/handler", "name", "hello", "err", err) + return } if write0 != nil { writes[0] = write0 @@ -67,33 +77,25 @@ func ServeInterface(s wrpc.Server, h Handler) (stop func() error, err error) { slog.DebugContext(ctx, "transmitting `wrpc-examples:hello/handler.hello` result") _, err = w.Write(buf.Bytes()) if err != nil { - return fmt.Errorf("failed to write result: %w", err) + slog.WarnContext(ctx, "failed to write result", "wrpc-examples:hello/handler", "name", "hello", "err", err) + return } if len(writes) > 0 { - var wg sync.WaitGroup - var wgErr atomic.Value for index, write := range writes { - wg.Add(1) w, err := w.Index(index) if err != nil { - return fmt.Errorf("failed to index writer: %w", err) + slog.ErrorContext(ctx, "failed to index writer", "index", index, "wrpc-examples:hello/handler", "name", "hello", "err", err) + return } + index := index write := write go func() { - defer wg.Done() if err := write(w); err != nil { - wgErr.Store(err) + slog.WarnContext(ctx, "failed to write nested result value", "index", index, "wrpc-examples:hello/handler", "name", "hello", "err", err) } }() } - wg.Wait() - err := wgErr.Load() - if err == nil { - return nil - } - return err.(error) } - return nil }) if err != nil { return nil, fmt.Errorf("failed to serve `wrpc-examples:hello/handler.hello`: %w", err) diff --git a/examples/go/hello-server/bindings/server.wrpc.go b/examples/go/hello-server/bindings/server.wrpc.go index 890164d2..85f7995c 100644 --- a/examples/go/hello-server/bindings/server.wrpc.go +++ b/examples/go/hello-server/bindings/server.wrpc.go @@ -1,4 +1,4 @@ -// Generated by `wit-bindgen-wrpc-go` 0.5.0. DO NOT EDIT! +// Generated by `wit-bindgen-wrpc-go` 0.6.0. DO NOT EDIT! // server package contains wRPC bindings for `server` world package server diff --git a/examples/go/streams-server/bindings/exports/wrpc_examples/streams/handler/bindings.wrpc.go b/examples/go/streams-server/bindings/exports/wrpc_examples/streams/handler/bindings.wrpc.go index c365aea1..469e1596 100644 --- a/examples/go/streams-server/bindings/exports/wrpc_examples/streams/handler/bindings.wrpc.go +++ b/examples/go/streams-server/bindings/exports/wrpc_examples/streams/handler/bindings.wrpc.go @@ -1,4 +1,4 @@ -// Generated by `wit-bindgen-wrpc-go` 0.5.0. DO NOT EDIT! +// Generated by `wit-bindgen-wrpc-go` 0.6.0. DO NOT EDIT! package handler import ( @@ -356,7 +356,13 @@ func ServeInterface(s wrpc.Server, h Handler) (stop func() error, err error) { } return nil } - stop0, err := s.Serve("wrpc-examples:streams/handler", "echo", func(ctx context.Context, w wrpc.IndexWriter, r wrpc.IndexReadCloser) error { + + stop0, err := s.Serve("wrpc-examples:streams/handler", "echo", func(ctx context.Context, w wrpc.IndexWriteCloser, r wrpc.IndexReadCloser) { + defer func() { + if err := w.Close(); err != nil { + slog.DebugContext(ctx, "failed to close writer", "instance", "wrpc-examples:streams/handler", "name", "echo", "err", err) + } + }() slog.DebugContext(ctx, "reading parameter", "i", 0) p0, err := func(r wrpc.IndexReader, path ...uint32) (*Req, error) { v := &Req{} @@ -580,17 +586,27 @@ func ServeInterface(s wrpc.Server, h Handler) (stop func() error, err error) { } return v, nil }(r, []uint32{0}...) + if err != nil { - return fmt.Errorf("failed to read parameter 0: %w", err) + slog.WarnContext(ctx, "failed to read parameter", "i", 0, "instance", "wrpc-examples:streams/handler", "name", "echo", "err", err) + if err := r.Close(); err != nil { + slog.ErrorContext(ctx, "failed to close reader", "instance", "wrpc-examples:streams/handler", "name", "echo", "err", err) + } + return } slog.DebugContext(ctx, "calling `wrpc-examples:streams/handler.echo` handler") r0, r1, err := h.Echo(ctx, p0) + if cErr := r.Close(); cErr != nil { + slog.ErrorContext(ctx, "failed to close reader", "instance", "wrpc-examples:streams/handler", "name", "echo", "err", err) + } if err != nil { - return fmt.Errorf("failed to handle `wrpc-examples:streams/handler.echo` invocation: %w", err) + slog.WarnContext(ctx, "failed to handle invocation", "instance", "wrpc-examples:streams/handler", "name", "echo", "err", err) + return } var buf bytes.Buffer writes := make(map[uint32]func(wrpc.IndexWriter) error, 2) + write0, err := func(v wrpc.ReceiveCompleter[[]uint64], w interface { io.ByteWriter io.Writer @@ -783,7 +799,8 @@ func ServeInterface(s wrpc.Server, h Handler) (stop func() error, err error) { } }(r0, &buf) if err != nil { - return fmt.Errorf("failed to write result value 0: %w", err) + slog.WarnContext(ctx, "failed to write result value", "i", 0, "wrpc-examples:streams/handler", "name", "echo", "err", err) + return } if write0 != nil { writes[0] = write0 @@ -872,7 +889,8 @@ func ServeInterface(s wrpc.Server, h Handler) (stop func() error, err error) { } }(r1, &buf) if err != nil { - return fmt.Errorf("failed to write result value 1: %w", err) + slog.WarnContext(ctx, "failed to write result value", "i", 1, "wrpc-examples:streams/handler", "name", "echo", "err", err) + return } if write1 != nil { writes[1] = write1 @@ -880,33 +898,25 @@ func ServeInterface(s wrpc.Server, h Handler) (stop func() error, err error) { slog.DebugContext(ctx, "transmitting `wrpc-examples:streams/handler.echo` result") _, err = w.Write(buf.Bytes()) if err != nil { - return fmt.Errorf("failed to write result: %w", err) + slog.WarnContext(ctx, "failed to write result", "wrpc-examples:streams/handler", "name", "echo", "err", err) + return } if len(writes) > 0 { - var wg sync.WaitGroup - var wgErr atomic.Value for index, write := range writes { - wg.Add(1) w, err := w.Index(index) if err != nil { - return fmt.Errorf("failed to index writer: %w", err) + slog.ErrorContext(ctx, "failed to index writer", "index", index, "wrpc-examples:streams/handler", "name", "echo", "err", err) + return } + index := index write := write go func() { - defer wg.Done() if err := write(w); err != nil { - wgErr.Store(err) + slog.WarnContext(ctx, "failed to write nested result value", "index", index, "wrpc-examples:streams/handler", "name", "echo", "err", err) } }() } - wg.Wait() - err := wgErr.Load() - if err == nil { - return nil - } - return err.(error) } - return nil }, wrpc.NewSubscribePath().Index(0).Index(0), wrpc.NewSubscribePath().Index(0).Index(1)) if err != nil { return nil, fmt.Errorf("failed to serve `wrpc-examples:streams/handler.echo`: %w", err) diff --git a/examples/go/streams-server/bindings/server.wrpc.go b/examples/go/streams-server/bindings/server.wrpc.go index eb9cf418..e7040122 100644 --- a/examples/go/streams-server/bindings/server.wrpc.go +++ b/examples/go/streams-server/bindings/server.wrpc.go @@ -1,4 +1,4 @@ -// Generated by `wit-bindgen-wrpc-go` 0.5.0. DO NOT EDIT! +// Generated by `wit-bindgen-wrpc-go` 0.6.0. DO NOT EDIT! // server package contains wRPC bindings for `server` world package server diff --git a/go/nats/client.go b/go/nats/client.go index 7c6aadbf..85f236d0 100644 --- a/go/nats/client.go +++ b/go/nats/client.go @@ -81,86 +81,71 @@ func subscribe(conn *nats.Conn, prefix string, f func(context.Context, []byte), type Client struct { conn *nats.Conn prefix string - queueGroup *string + queueGroup string } func NewClient(conn *nats.Conn, prefix string) *Client { - return &Client{conn: conn, prefix: prefix, queueGroup: nil} + return &Client{conn: conn, prefix: prefix, queueGroup: ""} } func NewClientWithQueueGroup(conn *nats.Conn, prefix string, queueGroup string) *Client { - return &Client{conn, prefix, &queueGroup} + return &Client{conn, prefix, queueGroup} } type paramWriter struct { - ctx context.Context nc *nats.Conn - rx string - tx string - init bool + init func() (*initState, error) + path string } -func (w *paramWriter) publish(p []byte) (int, error) { - maxPayload := w.nc.MaxPayload() - pn := len(p) - if !w.init { - header, hasHeader := HeaderFromContext(w.ctx) - m := nats.NewMsg(w.tx) - m.Reply = w.rx - if hasHeader { - m.Header = header - } - mSize := int64(m.Size()) - if mSize > maxPayload { - return 0, fmt.Errorf("message size %d is larger than maximum allowed payload size %d", mSize, maxPayload) - } - maxPayload -= mSize - maxPayload = min(maxPayload, int64(len(p))) - m.Data, p = p[:maxPayload], p[maxPayload:] - - sub, err := w.nc.SubscribeSync(w.rx) - if err != nil { - return 0, fmt.Errorf("failed to subscribe on Rx subject: %w", err) - } - defer func() { - if err := sub.Unsubscribe(); err != nil { - slog.Error("failed to unsubscribe from Rx subject", "err", err) - } - }() - - slog.DebugContext(w.ctx, "publishing handshake", "rx", m.Reply) - if err := w.nc.PublishMsg(m); err != nil { - return 0, fmt.Errorf("failed to send initial payload chunk: %w", err) - } - n := len(m.Data) +type initState struct { + tx string + buf []byte +} - m, err = sub.NextMsgWithContext(w.ctx) - if err != nil { - return n, fmt.Errorf("failed to receive handshake: %w", err) - } - if m.Reply == "" { - return n, errors.New("peer did not specify a reply subject") - } - w.tx = paramSubject(m.Reply) - w.init = true +func (w *paramWriter) Write(p []byte) (int, error) { + init, err := w.init() + if err != nil { + return 0, fmt.Errorf("failed to perform handshake") } + tx := init.tx + if w.path != "" { + tx = fmt.Sprintf("%s.%s", tx, w.path) + } + buf := p + if w.path == "" && len(init.buf) > 0 { + buf = append(init.buf, p...) + } + maxPayload := w.nc.MaxPayload() + pn := len(p) + n := 0 for len(buf) > 0 { maxPayload = min(maxPayload, int64(len(buf))) p, buf = buf[:maxPayload], buf[maxPayload:] - if err := w.nc.Publish(w.tx, p); err != nil { - return 0, fmt.Errorf("failed to send payload chunk: %w", err) + if err := w.nc.Publish(tx, p); err != nil { + if w.path == "" { + bn := len(init.buf) + if n < bn { + init.buf = init.buf[n:] + n = 0 + } else { + n -= bn + init.buf = nil + } + } + return n, fmt.Errorf("failed to send payload chunk: %w", err) } + n += len(p) + } + if w.path == "" { + init.buf = nil } return pn, nil } -func (w *paramWriter) Write(p []byte) (int, error) { - return w.publish(p) -} - func (w *paramWriter) WriteByte(b byte) error { - _, err := w.publish([]byte{b}) + _, err := w.Write([]byte{b}) if err != nil { return err } @@ -168,12 +153,24 @@ func (w *paramWriter) WriteByte(b byte) error { } func (w *paramWriter) Index(path ...uint32) (wrpc.IndexWriter, error) { - return nil, errors.New("indexing not supported yet") + return ¶mWriter{ + nc: w.nc, + init: w.init, + path: indexPath(w.path, path...), + }, nil } func (w *paramWriter) Close() error { - if err := w.nc.Publish(w.tx, nil); err != nil { - return fmt.Errorf("failed to send shutdown message: %w", err) + init, err := w.init() + if err != nil { + return fmt.Errorf("failed to perform handshake") + } + tx := init.tx + if w.path != "" { + tx = fmt.Sprintf("%s.%s", tx, w.path) + } + if err := w.nc.Publish(tx, nil); err != nil { + return fmt.Errorf("failed to send empty message to shut down stream") } return nil } @@ -270,18 +267,21 @@ func (r *streamReader) ReadByte() (byte, error) { } func (r *streamReader) Close() (err error) { + var errs []error + if err := r.sub.Unsubscribe(); err != nil { + errs = append(errs, fmt.Errorf("failed to unsubscribe: %w", err)) + } refs := r.nestRef.Add(-1) if refs == 0 { // since this is the only reference to `nest`, no need to lock the mutex - var errs []error for path, sub := range r.nest { if err := sub.Unsubscribe(); err != nil { errs = append(errs, fmt.Errorf("failed to unsubscribe from nested path `%s`: %w", path, err)) } } - if len(errs) > 0 { - return fmt.Errorf("%v", errs) - } + } + if len(errs) > 0 { + return fmt.Errorf("%v", errs) } return nil } @@ -306,56 +306,97 @@ func (r *streamReader) Index(path ...uint32) (wrpc.IndexReader, error) { }, nil } -func (c *Client) Invoke(ctx context.Context, instance string, name string, f func(wrpc.IndexWriter, wrpc.IndexReadCloser) error, subs ...wrpc.SubscribePath) (err error) { +func (c *Client) Invoke(ctx context.Context, instance string, name string, f func(wrpc.IndexWriteCloser, wrpc.IndexReadCloser) error, paths ...wrpc.SubscribePath) (err error) { + w, r, err := c.InvokeConn(ctx, instance, name, nil, paths...) + if err != nil { + return err + } + return f(w, r) +} + +func (c *Client) InvokeConn(ctx context.Context, instance string, name string, buf []byte, paths ...wrpc.SubscribePath) (wrpc.IndexWriteCloser, wrpc.IndexReadCloser, error) { rx := nats.NewInbox() resultRx := resultSubject(rx) slog.Debug("subscribing on result subject", "subject", resultRx) resultSub, err := c.conn.SubscribeSync(resultRx) if err != nil { - return fmt.Errorf("failed to subscribe on result subject `%s`: %w", resultRx, err) + return nil, nil, fmt.Errorf("failed to subscribe on result subject `%s`: %w", resultRx, err) } - defer func() { - if sErr := resultSub.Unsubscribe(); sErr != nil { - if err == nil { - err = fmt.Errorf("failed to unsubscribe from result subject: %w", sErr) - } else { - slog.Error("failed to unsubscribe from result subject", "err", sErr) - } - } - }() - nest := make(map[string]*nats.Subscription, len(subs)) - for _, path := range subs { + nest := make(map[string]*nats.Subscription, len(paths)) + for _, path := range paths { s := subscribePath(resultRx, path) slog.Debug("subscribing on nested result subject", "subject", s) sub, err := c.conn.SubscribeSync(s) if err != nil { - return fmt.Errorf("failed to subscribe on nested result subject `%s`: %w", s, err) + return nil, nil, fmt.Errorf("failed to subscribe on nested result subject `%s`: %w", s, err) } nest[subscribePath("", path)] = sub } - - slog.Debug("calling client handler") - w := ¶mWriter{ - ctx: ctx, - nc: c.conn, - rx: rx, - tx: invocationSubject(c.prefix, instance, name), - } nestRef := &atomic.Int64{} nestRef.Add(1) - return f(w, &streamReader{ - ctx: ctx, - sub: resultSub, - nestMu: &sync.Mutex{}, - nestRef: nestRef, - nest: nest, - }) + return ¶mWriter{ + nc: c.conn, + init: sync.OnceValues(func() (init *initState, err error) { + header, hasHeader := HeaderFromContext(ctx) + + m := nats.NewMsg(invocationSubject(c.prefix, instance, name)) + m.Reply = rx + if hasHeader { + m.Header = header + } + + maxPayload := c.conn.MaxPayload() + mSize := int64(m.Size()) + if mSize > maxPayload { + return nil, fmt.Errorf("message size %d is larger than maximum allowed payload size %d", mSize, maxPayload) + } + maxPayload -= mSize + maxPayload = min(maxPayload, int64(len(buf))) + m.Data, buf = buf[:maxPayload], buf[maxPayload:] + + sub, err := c.conn.SubscribeSync(rx) + if err != nil { + return nil, fmt.Errorf("failed to subscribe on Rx subject: %w", err) + } + defer func() { + if sErr := sub.Unsubscribe(); sErr != nil { + if err == nil { + err = fmt.Errorf("failed to unsubscribe from handshake subject: %w", sErr) + } else { + slog.Error("failed to unsubscribe from handshake subject", "err", sErr) + } + } + }() + + slog.DebugContext(ctx, "publishing handshake", "rx", m.Reply) + if err := c.conn.PublishMsg(m); err != nil { + return nil, fmt.Errorf("failed to send initial payload chunk: %w", err) + } + m, err = sub.NextMsgWithContext(ctx) + if err != nil { + return nil, fmt.Errorf("failed to receive handshake: %w", err) + } + if m.Reply == "" { + return nil, errors.New("peer did not specify a reply subject") + } + return &initState{ + buf: buf, + tx: paramSubject(m.Reply), + }, nil + }), + }, &streamReader{ + ctx: ctx, + sub: resultSub, + nestMu: &sync.Mutex{}, + nestRef: nestRef, + nest: nest, + }, nil } -func (c *Client) Serve(instance string, name string, f func(context.Context, wrpc.IndexWriter, wrpc.IndexReadCloser) error, subs ...wrpc.SubscribePath) (stop func() error, err error) { - serveFunction := func(m *nats.Msg) { +func (c *Client) handleMessage(instance string, name string, f func(context.Context, wrpc.IndexWriteCloser, wrpc.IndexReadCloser), paths ...wrpc.SubscribePath) func(m *nats.Msg) { + return func(m *nats.Msg) { ctx := context.Background() ctx = ContextWithHeader(ctx, m.Header) @@ -374,14 +415,9 @@ func (c *Client) Serve(instance string, name string, f func(context.Context, wrp slog.Warn("failed to subscribe on parameter subject", "subject", paramRx, "err", err) return } - defer func() { - if err := paramSub.Unsubscribe(); err != nil { - slog.Error("failed to unsubscribe from parameter subject", "subject", paramRx, "err", err) - } - }() - nest := make(map[string]*nats.Subscription, len(subs)) - for _, path := range subs { + nest := make(map[string]*nats.Subscription, len(paths)) + for _, path := range paths { s := subscribePath(paramRx, path) slog.Debug("subscribing on nested parameter subject", "subject", s) sub, err := c.conn.SubscribeSync(s) @@ -403,7 +439,7 @@ func (c *Client) Serve(instance string, name string, f func(context.Context, wrp slog.Debug("calling server handler") nestRef := &atomic.Int64{} nestRef.Add(1) - if err := f(ctx, &resultWriter{ + f(ctx, &resultWriter{ nc: c.conn, tx: resultSubject(m.Reply), }, &streamReader{ @@ -413,21 +449,22 @@ func (c *Client) Serve(instance string, name string, f func(context.Context, wrp nestMu: &sync.Mutex{}, nestRef: nestRef, nest: nest, - }); err != nil { - slog.Warn("failed to handle invocation", "err", err) - return - } - slog.Debug("successfully finished serving invocation") + }) + slog.Debug("finished serving invocation") } +} + +func (c *Client) Serve(instance string, name string, f func(context.Context, wrpc.IndexWriteCloser, wrpc.IndexReadCloser), paths ...wrpc.SubscribePath) (stop func() error, err error) { + slog.Debug("serving", "instance", instance, "name", name, "group", c.queueGroup) + + subject := invocationSubject(c.prefix, instance, name) + handle := c.handleMessage(instance, name, f, paths...) var sub *nats.Subscription - if c.queueGroup != nil { - slog.Debug("serving with queue group", "instance", instance, "name", name, "group", *c.queueGroup) - sub, err = c.conn.QueueSubscribe(invocationSubject(c.prefix, instance, name), *c.queueGroup, serveFunction) + if c.queueGroup != "" { + sub, err = c.conn.QueueSubscribe(subject, c.queueGroup, handle) } else { - slog.Debug("serving", "instance", instance, "name", name) - sub, err = c.conn.Subscribe(invocationSubject(c.prefix, instance, name), serveFunction) + sub, err = c.conn.Subscribe(subject, handle) } - if err != nil { return nil, fmt.Errorf("failed to serve `%s` for instance `%s`: %w", name, instance, err) } diff --git a/go/wrpc.go b/go/wrpc.go index aed73951..136af287 100644 --- a/go/wrpc.go +++ b/go/wrpc.go @@ -2,39 +2,21 @@ package wrpc import ( "context" - "errors" - "fmt" "io" ) type Invoker interface { - Invoke(ctx context.Context, instance string, name string, f func(IndexWriter, IndexReadCloser) error, subs ...SubscribePath) error + Invoke(ctx context.Context, instance string, name string, f func(IndexWriteCloser, IndexReadCloser) error, subs ...SubscribePath) error + InvokeConn(ctx context.Context, instance string, name string, b []byte, paths ...SubscribePath) (IndexWriteCloser, IndexReadCloser, error) } type Server interface { - Serve(instance string, name string, f func(context.Context, IndexWriter, IndexReadCloser) error, subs ...SubscribePath) (func() error, error) + Serve(instance string, name string, f func(context.Context, IndexWriteCloser, IndexReadCloser), paths ...SubscribePath) (func() error, error) } // Own is an owned resource handle type Own[T any] string -func (v Own[T]) Drop(ctx context.Context, c Invoker) error { - if v == "" { - return errors.New("cannot drop a resource without an ID") - } - return c.Invoke(ctx, string(v), "drop", func(w IndexWriter, r IndexReadCloser) error { - _, err := w.Write(nil) - if err != nil { - return fmt.Errorf("failed to write empty `drop` parameters") - } - _, err = r.Read(nil) - if err != nil { - return fmt.Errorf("failed to read empty `drop` result") - } - return nil - }) -} - func (v Own[T]) Borrow() Borrow[T] { return Borrow[T](v) } @@ -91,6 +73,11 @@ type IndexReadCloser interface { io.Closer } +type IndexWriteCloser interface { + IndexWriter + io.Closer +} + type ByteWriter interface { io.ByteWriter io.Writer diff --git a/tests/go/resources.go b/tests/go/resources.go index be04a981..bb432990 100644 --- a/tests/go/resources.go +++ b/tests/go/resources.go @@ -25,10 +25,10 @@ type ResourcesHandler struct { sync.Map } -func (h *ResourcesHandler) Foo(ctx context.Context) (resources.HandlerFoo, context.Context, string, error) { +func (h *ResourcesHandler) Foo(ctx context.Context) (wrpc.Own[resources.Foo], error) { id, err := uuid.NewV7() if err != nil { - return nil, nil, "", fmt.Errorf("failed to generate UUIDv7: %w", err) + return "", fmt.Errorf("failed to generate UUIDv7: %w", err) } ctx, cancel := context.WithCancel(ctx) v := Foo{id: id, cancel: cancel} @@ -37,7 +37,7 @@ func (h *ResourcesHandler) Foo(ctx context.Context) (resources.HandlerFoo, conte <-ctx.Done() h.Delete(id) }() - return v, ctx, id.String(), nil + return wrpc.Own[resources.Foo](id.String()), nil } func (h *ResourcesHandler) Foo_Foo(ctx context.Context, v wrpc.Own[resources.Foo]) (string, error) { @@ -50,6 +50,14 @@ func (h *ResourcesHandler) Foo_Foo(ctx context.Context, v wrpc.Own[resources.Foo return "foo", nil } +func (h *ResourcesHandler) Foo_Bar(ctx context.Context, v wrpc.Borrow[resources.Foo]) (string, error) { + stored, ok := h.Load(string(v)) + if !ok { + return "", fmt.Errorf("unknown resource ID `%s`", string(v)) + } + return stored.(Foo).Bar(ctx) +} + func (h *ResourcesHandler) Bar(ctx context.Context, v wrpc.Borrow[resources.Foo]) (string, error) { stored, ok := h.Load(string(v)) if !ok { diff --git a/tests/go/resources_test.go b/tests/go/resources_test.go index 61c90092..8e6d9acf 100644 --- a/tests/go/resources_test.go +++ b/tests/go/resources_test.go @@ -64,10 +64,11 @@ func TestResources(t *testing.T) { } foo = v } - if err := foo.Drop(ctx, client); err != nil { - t.Errorf("failed to drop `foo` resource: %s", err) - return - } + // TODO: Reenable once resource dropping is supported + //if err := foo.Drop(ctx, client); err != nil { + // t.Errorf("failed to drop `foo` resource: %s", err) + // return + //} { slog.DebugContext(ctx, "calling `wrpc-test:integration/resources.[constructor]foo`") @@ -151,10 +152,11 @@ func TestResources(t *testing.T) { } } - if err := foo.Drop(ctx, client); err == nil { - t.Errorf("`foo` resource did not get dropped by moving") - return - } + // TODO: Reenable once resource dropping is supported + //if err := foo.Drop(ctx, client); err == nil { + // t.Errorf("`foo` resource did not get dropped by moving") + // return + //} if err = stop(); err != nil { t.Errorf("failed to stop serving `resources-server` world: %s", err)