Skip to content

Commit

Permalink
feat(go): add nats keyvalue server
Browse files Browse the repository at this point in the history
  • Loading branch information
jamesstocktonj1 authored and rvolosatovs committed Aug 31, 2024
1 parent 09b2b3a commit c99c348
Show file tree
Hide file tree
Showing 15 changed files with 2,101 additions and 0 deletions.

Large diffs are not rendered by default.

32 changes: 32 additions & 0 deletions examples/go/wasi-keyvalue-nats-server/bindings/server.wrpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Generated by `wit-bindgen-wrpc-go` 0.8.0. DO NOT EDIT!
// server package contains wRPC bindings for `server` world
package server

import (
exports__wasi__keyvalue__store "wrpc.io/examples/go/wasi-keyvalue-nats-server/bindings/exports/wasi/keyvalue/store"
wrpc "wrpc.io/go"
)

func Serve(s wrpc.Server, h0 exports__wasi__keyvalue__store.Handler) (stop func() error, err error) {
stops := make([]func() error, 0, 1)
stop = func() error {
for _, stop := range stops {
if err := stop(); err != nil {
return err
}
}
return nil
}
stop0, err := exports__wasi__keyvalue__store.ServeInterface(s, h0)
if err != nil {
return
}
stops = append(stops, stop0)
stop = func() error {
if err := stop0(); err != nil {
return err
}
return nil
}
return
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package main

import (
"context"
"fmt"
"log"
"log/slog"
"os"
"os/signal"
"sync"
"syscall"

"github.com/nats-io/nats.go"
server "wrpc.io/examples/go/wasi-keyvalue-nats-server/bindings"
"wrpc.io/examples/go/wasi-keyvalue-nats-server/bindings/exports/wasi/keyvalue/store"
wrpc "wrpc.io/go"
wrpcnats "wrpc.io/go/nats"
)

var _ store.Handler = &Handler{}

type Handler struct {
data map[Bucket]map[string][]uint8
lock sync.Mutex
}

type Bucket string

func (h *Handler) Open(ctx context.Context, identifier string) (*wrpc.Result[wrpc.Own[store.Bucket], store.Error], error) {
h.lock.Lock()
defer h.lock.Unlock()

if h.data == nil {
h.data = make(map[Bucket]map[string][]uint8)
}
bucket := (Bucket)(identifier)
if _, ok := h.data[bucket]; !ok {
h.data[bucket] = make(map[string][]uint8)
}
return wrpc.Ok[store.Error, wrpc.Own[store.Bucket]](wrpc.Own[store.Bucket](bucket)), nil
}

func (h *Handler) Bucket_Get(ctx__ context.Context, self wrpc.Borrow[store.Bucket], key string) (*wrpc.Result[[]uint8, store.Error], error) {
h.lock.Lock()
defer h.lock.Unlock()

bucket := (Bucket)(self)
if _, ok := h.data[bucket]; !ok {
err := store.NewErrorNoSuchStore()
return wrpc.Err[[]uint8, store.Error](*err), err
}
value, ok := h.data[bucket][key]
if !ok {
return wrpc.Ok[store.Error, []uint8](nil), nil
}
return wrpc.Ok[store.Error, []uint8](value), nil
}

func (h *Handler) Bucket_Set(ctx__ context.Context, self wrpc.Borrow[store.Bucket], key string, value []uint8) (*wrpc.Result[struct{}, store.Error], error) {
h.lock.Lock()
defer h.lock.Unlock()

bucket := (Bucket)(self)
if _, ok := h.data[bucket]; !ok {
err := store.NewErrorNoSuchStore()
return wrpc.Err[struct{}, store.Error](*err), err
}
h.data[bucket][key] = value
return wrpc.Ok[store.Error, struct{}](struct{}{}), nil
}

func (h *Handler) Bucket_Delete(ctx__ context.Context, self wrpc.Borrow[store.Bucket], key string) (*wrpc.Result[struct{}, store.Error], error) {
h.lock.Lock()
defer h.lock.Unlock()

bucket := (Bucket)(self)
if _, ok := h.data[bucket]; !ok {
err := store.NewErrorNoSuchStore()
return wrpc.Err[struct{}, store.Error](*err), err
}
delete(h.data[bucket], key)

return wrpc.Ok[store.Error, struct{}](struct{}{}), nil
}

func (h *Handler) Bucket_Exists(ctx__ context.Context, self wrpc.Borrow[store.Bucket], key string) (*wrpc.Result[bool, store.Error], error) {
h.lock.Lock()
defer h.lock.Unlock()

bucket := (Bucket)(self)
if _, ok := h.data[bucket]; !ok {
err := store.NewErrorNoSuchStore()
return wrpc.Err[bool, store.Error](*err), err
}
_, ok := h.data[bucket][key]
return wrpc.Ok[store.Error, bool](ok), nil
}

func (h *Handler) Bucket_ListKeys(ctx__ context.Context, self wrpc.Borrow[store.Bucket], cursor *uint64) (*wrpc.Result[store.KeyResponse, store.Error], error) {
h.lock.Lock()
defer h.lock.Unlock()

bucket := (Bucket)(self)
if _, ok := h.data[bucket]; !ok {
err := store.NewErrorNoSuchStore()
return wrpc.Err[store.KeyResponse, store.Error](*err), err
}
keyResponse := store.KeyResponse{Keys: []string{}}
for k := range h.data[bucket] {
keyResponse.Keys = append(keyResponse.Keys, k)
}
return wrpc.Ok[store.Error, store.KeyResponse](keyResponse), nil
}

func run() error {
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
return fmt.Errorf("failed to connect to NATS.io: %w", err)
}
defer nc.Close()
defer func() {
if dErr := nc.Drain(); dErr != nil {
if err == nil {
err = fmt.Errorf("failed to drain NATS.io connection: %w", dErr)
} else {
slog.Error("failed to drain NATS.io connection", "err", dErr)
}
}
}()

client := wrpcnats.NewClient(nc, wrpcnats.WithPrefix("go"))
stop, err := server.Serve(client, &Handler{})
if err != nil {
return fmt.Errorf("failed to serve `keyvalue` world: %w", err)
}

signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, syscall.SIGINT)
<-signalCh

if err = stop(); err != nil {
return fmt.Errorf("failed to stop `keyvalue` world: %w", err)
}
return nil
}

func init() {
slog.SetDefault(slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{
Level: slog.LevelInfo, ReplaceAttr: func(groups []string, a slog.Attr) slog.Attr {
if a.Key == slog.TimeKey {
return slog.Attr{}
}
return a
},
})))
}

func main() {
if err := run(); err != nil {
log.Fatal(err)
}
}
18 changes: 18 additions & 0 deletions examples/go/wasi-keyvalue-nats-server/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
module wrpc.io/examples/go/wasi-keyvalue-nats-server

go 1.22.2

replace wrpc.io/go v0.0.3 => ../../../go

require (
github.com/nats-io/nats.go v1.37.0
wrpc.io/go v0.0.3
)

require (
github.com/klauspost/compress v1.17.8 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
golang.org/x/crypto v0.23.0 // indirect
golang.org/x/sys v0.20.0 // indirect
)
12 changes: 12 additions & 0 deletions examples/go/wasi-keyvalue-nats-server/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=
github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE=
github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI=
golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8=
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
//go:generate cargo run --bin wit-bindgen-wrpc go --out-dir bindings --package wrpc.io/examples/go/wasi-keyvalue-nats-server/bindings wit

package wasi_keyvalue_nats_server
4 changes: 4 additions & 0 deletions examples/go/wasi-keyvalue-nats-server/wit/deps.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[keyvalue]
url = "https://github.com/WebAssembly/wasi-keyvalue/archive/main.tar.gz"
sha256 = "d2de617fe31ec0abc6072f75f97dd22bf95b3231d5b3111471d73871df9081cd"
sha512 = "6f0b4e44c684d760c54552e2bde9bc976e0a4f6525fc1d47acb98625e030847276436242f42a41f4da1bb9169fb2968c53d659d61af9b2f709f4eb6f9880e2c7"
1 change: 1 addition & 0 deletions examples/go/wasi-keyvalue-nats-server/wit/deps.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
keyvalue = "https://github.com/WebAssembly/wasi-keyvalue/archive/main.tar.gz"
22 changes: 22 additions & 0 deletions examples/go/wasi-keyvalue-nats-server/wit/deps/keyvalue/atomic.wit
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/// A keyvalue interface that provides atomic operations.
///
/// Atomic operations are single, indivisible operations. When a fault causes an atomic operation to
/// fail, it will appear to the invoker of the atomic operation that the action either completed
/// successfully or did nothing at all.
///
/// Please note that this interface is bare functions that take a reference to a bucket. This is to
/// get around the current lack of a way to "extend" a resource with additional methods inside of
/// wit. Future version of the interface will instead extend these methods on the base `bucket`
/// resource.
interface atomics {
use store.{bucket, error};

/// Atomically increment the value associated with the key in the store by the given delta. It
/// returns the new value.
///
/// If the key does not exist in the store, it creates a new key-value pair with the value set
/// to the given delta.
///
/// If any other error occurs, it returns an `Err(error)`.
increment: func(bucket: borrow<bucket>, key: string, delta: u64) -> result<u64, error>;
}
63 changes: 63 additions & 0 deletions examples/go/wasi-keyvalue-nats-server/wit/deps/keyvalue/batch.wit
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/// A keyvalue interface that provides batch operations.
///
/// A batch operation is an operation that operates on multiple keys at once.
///
/// Batch operations are useful for reducing network round-trip time. For example, if you want to
/// get the values associated with 100 keys, you can either do 100 get operations or you can do 1
/// batch get operation. The batch operation is faster because it only needs to make 1 network call
/// instead of 100.
///
/// A batch operation does not guarantee atomicity, meaning that if the batch operation fails, some
/// of the keys may have been modified and some may not.
///
/// This interface does has the same consistency guarantees as the `store` interface, meaning that
/// you should be able to "read your writes."
///
/// Please note that this interface is bare functions that take a reference to a bucket. This is to
/// get around the current lack of a way to "extend" a resource with additional methods inside of
/// wit. Future version of the interface will instead extend these methods on the base `bucket`
/// resource.
interface batch {
use store.{bucket, error};

/// Get the key-value pairs associated with the keys in the store. It returns a list of
/// key-value pairs.
///
/// If any of the keys do not exist in the store, it returns a `none` value for that pair in the
/// list.
///
/// MAY show an out-of-date value if there are concurrent writes to the store.
///
/// If any other error occurs, it returns an `Err(error)`.
get-many: func(bucket: borrow<bucket>, keys: list<string>) -> result<list<option<tuple<string, list<u8>>>>, error>;

/// Set the values associated with the keys in the store. If the key already exists in the
/// store, it overwrites the value.
///
/// Note that the key-value pairs are not guaranteed to be set in the order they are provided.
///
/// If any of the keys do not exist in the store, it creates a new key-value pair.
///
/// If any other error occurs, it returns an `Err(error)`. When an error occurs, it does not
/// rollback the key-value pairs that were already set. Thus, this batch operation does not
/// guarantee atomicity, implying that some key-value pairs could be set while others might
/// fail.
///
/// Other concurrent operations may also be able to see the partial results.
set-many: func(bucket: borrow<bucket>, key-values: list<tuple<string, list<u8>>>) -> result<_, error>;

/// Delete the key-value pairs associated with the keys in the store.
///
/// Note that the key-value pairs are not guaranteed to be deleted in the order they are
/// provided.
///
/// If any of the keys do not exist in the store, it skips the key.
///
/// If any other error occurs, it returns an `Err(error)`. When an error occurs, it does not
/// rollback the key-value pairs that were already deleted. Thus, this batch operation does not
/// guarantee atomicity, implying that some key-value pairs could be deleted while others might
/// fail.
///
/// Other concurrent operations may also be able to see the partial results.
delete-many: func(bucket: borrow<bucket>, keys: list<string>) -> result<_, error>;
}
Loading

0 comments on commit c99c348

Please sign in to comment.