Skip to content

Commit

Permalink
Make stream and config IDs configurable.
Browse files Browse the repository at this point in the history
  • Loading branch information
adewes committed Nov 4, 2021
1 parent 23efbfe commit 21aac55
Show file tree
Hide file tree
Showing 11 changed files with 164 additions and 79 deletions.
2 changes: 1 addition & 1 deletion api/decorators/valid_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func ValidObject(settings kodex.Settings, objectType string, objectRoles []strin
object, roleObject, err = adaptor.Get(apiController, c, objectID)

if err != nil {
if cErr, ok := err.(errors.ChainableError); ok && cErr.Code() == "NOT-FOUND" {
if err == kodex.NotFound {
api.HandleError(c, 404, fmt.Errorf("object not found"))
} else {
api.HandleError(c, 500, err)
Expand Down
2 changes: 1 addition & 1 deletion api/definitions/adaptor_configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,5 +104,5 @@ func (a ConfigAdaptor) MakeObject(c *gin.Context) kodex.Model {
return nil
}

return stream.MakeConfig()
return stream.MakeConfig(nil)
}
2 changes: 1 addition & 1 deletion api/definitions/adaptor_streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,5 +86,5 @@ func (a StreamAdaptor) MakeObject(c *gin.Context) kodex.Model {
return nil
}

return project.MakeStream()
return project.MakeStream(nil)
}
3 changes: 3 additions & 0 deletions api/v1/resources/transform_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ func TransformConfigEndpoint(meter kodex.Meter) func(*gin.Context) {
for k, v := range writer.Items {
channels[k] = v
}
channels["errors"] = writer.Errors
channels["messages"] = writer.Messages
channels["warnings"] = writer.Warnings
data := map[string]interface{}{
"data": channels,
}
Expand Down
203 changes: 140 additions & 63 deletions blueprint.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,108 @@ import (
"strings"
)

var BlueprintProjectForm = forms.Form{
Fields: []forms.Field{
{
Name: "id",
Validators: []forms.Validator{
forms.IsBytes{Encoding: "hex"},
},
},
{
Name: "name",
Validators: []forms.Validator{
forms.IsString{},
},
},
{
Name: "description",
Validators: []forms.Validator{
forms.IsOptional{Default: ""},
forms.IsString{},
},
},
},
}

var BlueprintStreamForm = forms.Form{
Fields: []forms.Field{
{
Name: "id",
Validators: []forms.Validator{
forms.IsOptional{},
forms.IsBytes{Encoding: "hex"},
},
},
{
Name: "name",
Validators: []forms.Validator{
forms.IsString{},
},
},
{
Name: "description",
Validators: []forms.Validator{
forms.IsOptional{Default: ""},
forms.IsString{},
},
},
{
Name: "data",
Validators: []forms.Validator{
forms.IsOptional{},
forms.IsStringMap{},
},
},
{
Name: "status",
Validators: []forms.Validator{
forms.IsOptional{Default: "active"},
forms.IsIn{Choices: []interface{}{"active", "inactive", "testing"}},
},
},
},
}

var BlueprintConfigForm = forms.Form{
Fields: []forms.Field{
{
Name: "id",
Validators: []forms.Validator{
forms.IsOptional{},
forms.IsBytes{Encoding: "hex"},
},
},
{
Name: "name",
Validators: []forms.Validator{
forms.IsString{},
},
},
{
Name: "description",
Validators: []forms.Validator{
forms.IsOptional{Default: ""},
forms.IsString{},
},
},
{
Name: "status",
Validators: []forms.Validator{
forms.IsOptional{Default: "active"},
forms.IsIn{Choices: []interface{}{"active", "inactive", "testing"}},
},
},
{
Name: "data",
Validators: []forms.Validator{
forms.IsOptional{},
forms.IsStringMap{},
},
},
},
}

type Blueprint struct {
config map[string]interface{}
}
Expand Down Expand Up @@ -316,29 +418,31 @@ func initStreams(project Project, config map[string]interface{}) error {
if !ok {
return fmt.Errorf("stream config missing")
}
Log.Debugf("Creating stream: %s", name)
stream := project.MakeStream()
values := map[string]interface{}{
"name": name,
"description": "",
"status": string(ActiveStream),
}

if err := stream.Create(values); err != nil {
if params, err := BlueprintStreamForm.Validate(streamConfigMap); err != nil {
return err
}
} else {
Log.Debugf("Creating stream: %s", name)
streamID, _ := streamConfigMap["id"].([]byte)
stream := project.MakeStream(streamID)

if err := stream.Save(); err != nil {
return err
}
if err := stream.Create(params); err != nil {
return err
}

if err := initStreamSources(stream, streamConfigMap); err != nil {
return err
}
if err := stream.Save(); err != nil {
return err
}

if err := initStreamConfigs(stream, streamConfigMap); err != nil {
return err
if err := initStreamSources(stream, streamConfigMap); err != nil {
return err
}

if err := initStreamConfigs(stream, streamConfigMap); err != nil {
return err
}
}

}
return nil
}
Expand Down Expand Up @@ -400,36 +504,33 @@ func initStreamConfigs(stream Stream, config map[string]interface{}) error {
if !ok {
return fmt.Errorf("invalid config: %s", name)
}
config := stream.MakeConfig()

status, ok := mapConfigConfig["status"]
if params, err := BlueprintConfigForm.Validate(mapConfigConfig); err != nil {
return err
} else {

if !ok {
status = "active"
}
id, _ := params["id"].([]byte)

values := map[string]interface{}{
"name": name,
"status": status,
}
config := stream.MakeConfig(id)

if err := config.Create(values); err != nil {
return err
}
if err := config.Create(params); err != nil {
return err
}

if err := config.Save(); err != nil {
return err
}
if err := config.Save(); err != nil {
return err
}

if err := initConfigDestinations(config, mapConfigConfig); err != nil {
return err
}
if err := initConfigDestinations(config, mapConfigConfig); err != nil {
return err
}

if err := initConfigActions(config, mapConfigConfig); err != nil {
return err
}
if err := initConfigActions(config, mapConfigConfig); err != nil {
return err
}

Log.Debugf("Created config '%s'", name)
Log.Debugf("Created config '%s'", name)
}
}

return nil
Expand Down Expand Up @@ -491,30 +592,6 @@ func initConfigDestinations(config Config, configData map[string]interface{}) er
return nil
}

var BlueprintProjectForm = forms.Form{
Fields: []forms.Field{
{
Name: "id",
Validators: []forms.Validator{
forms.IsBytes{Encoding: "hex"},
},
},
{
Name: "name",
Validators: []forms.Validator{
forms.IsString{},
},
},
{
Name: "description",
Validators: []forms.Validator{
forms.IsOptional{Default: ""},
forms.IsString{},
},
},
},
}

func initProject(controller Controller, configData map[string]interface{}) (Project, error) {
projectConfigData, ok := configData["project"]
if !ok {
Expand Down
10 changes: 5 additions & 5 deletions controllers/in_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (c *InMemoryController) Stream(streamID []byte) (kodex.Stream, error) {
}
}

return nil, fmt.Errorf("stream not found")
return nil, kodex.NotFound
}

func (c *InMemoryController) Config(configID []byte) (kodex.Config, error) {
Expand All @@ -216,7 +216,7 @@ func (c *InMemoryController) Config(configID []byte) (kodex.Config, error) {
}
}
}
return nil, fmt.Errorf("config not found")
return nil, kodex.NotFound
}

func (c *InMemoryController) ActionConfig(actionConfigID []byte) (kodex.ActionConfig, error) {
Expand All @@ -225,7 +225,7 @@ func (c *InMemoryController) ActionConfig(actionConfigID []byte) (kodex.ActionCo
return actionConfig, nil
}
}
return nil, fmt.Errorf("action config not found")
return nil, kodex.NotFound
}

/* Action Config Management */
Expand Down Expand Up @@ -305,7 +305,7 @@ func (c *InMemoryController) Source(sourceID []byte) (kodex.Source, error) {
return source, nil
}
}
return nil, fmt.Errorf("not found")
return nil, kodex.NotFound
}

/* Destination Management */
Expand Down Expand Up @@ -349,7 +349,7 @@ func (c *InMemoryController) Destination(destinationID []byte) (kodex.Destinatio
return destination, nil
}
}
return nil, fmt.Errorf("not found")
return nil, kodex.NotFound
}

func (c *InMemoryController) StreamsByUrgency(n int) ([]kodex.Stream, error) {
Expand Down
6 changes: 4 additions & 2 deletions controllers/in_memory_project.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,10 @@ func (i *InMemoryProject) SetDescription(description string) error {
return nil
}

func (c *InMemoryProject) MakeStream() kodex.Stream {
id := kodex.RandomID()
func (c *InMemoryProject) MakeStream(id []byte) kodex.Stream {
if id == nil {
id = kodex.RandomID()
}
stream, err := MakeInMemoryStream(id, map[string]interface{}{
"configs": []map[string]interface{}{},
"params": []map[string]interface{}{},
Expand Down
7 changes: 5 additions & 2 deletions controllers/in_memory_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,11 @@ func (c *InMemoryStream) DeleteConfig(dc *InMemoryConfig) error {
return nil
}

func (c *InMemoryStream) MakeConfig() kodex.Config {
config, err := MakeInMemoryConfig(c, kodex.RandomID(), map[string]interface{}{})
func (c *InMemoryStream) MakeConfig(id []byte) kodex.Config {
if id == nil {
id = kodex.RandomID()
}
config, err := MakeInMemoryConfig(c, id, map[string]interface{}{})
if err != nil {
panic(err)
}
Expand Down
4 changes: 2 additions & 2 deletions helpers/testing/fixtures/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (c Stream) Setup(fixtures map[string]interface{}) (interface{}, error) {
return nil, fmt.Errorf("project missing")
}

stream := project.MakeStream()
stream := project.MakeStream(nil)

values := map[string]interface{}{
"name": c.Name,
Expand Down Expand Up @@ -96,7 +96,7 @@ func (c Config) Setup(fixtures map[string]interface{}) (interface{}, error) {
return nil, fmt.Errorf("not a stream")
}

config := stream.MakeConfig()
config := stream.MakeConfig(nil)

values := map[string]interface{}{
"status": string(c.Status),
Expand Down
2 changes: 1 addition & 1 deletion project.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type Project interface {
MakeActionConfig() ActionConfig
MakeDestination() Destination
MakeSource() Source
MakeStream() Stream
MakeStream(id []byte) Stream

Controller() Controller
}
Expand Down
2 changes: 1 addition & 1 deletion stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type Stream interface {
PriorityModel
Configs() ([]Config, error)
Config([]byte) (Config, error)
MakeConfig() Config
MakeConfig(id []byte) Config

AddSource(Source, SourceStatus) error
RemoveSource(Source) error
Expand Down

0 comments on commit 21aac55

Please sign in to comment.