Skip to content

Commit

Permalink
Refactor broker/state config keys. (#33)
Browse files Browse the repository at this point in the history
  • Loading branch information
knadh committed Mar 13, 2024
1 parent 91e3026 commit f08a1a8
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 19 deletions.
13 changes: 8 additions & 5 deletions cmd/config.sample.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
# The broker that manages job queuing.
# Currently, only "redis" is supported.
[job_queue.broker]
type = "redis"
addresses = ["localhost:6379"]
password = ""
db = 1
Expand All @@ -8,11 +11,11 @@ dial_timeout = "1s"
read_timeout = "1s"
write_timeout = "1s"

# This is where Tasqueue stores it's job states.
# In Tasqueue's terminology, this is its "result backend"
# and is not to be confused with the DungBeetle's 'result_backend'
# where DungBeetle stores results of jobs, bypassing tasqueue.
[job_queue.results]
# The state store where the state of the jobs in the queue and their
# metadata are maintained.
# Currently, only "redis" is supported.
[job_queue.state]
type = "redis"
addresses = ["localhost:6379"]
password = ""
db = 1
Expand Down
33 changes: 19 additions & 14 deletions cmd/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"embed"
"errors"
"fmt"
"log"
"log/slog"
"net/http"
"os"
Expand Down Expand Up @@ -96,13 +95,13 @@ func initCore(ko *koanf.Koanf) *core.Core {
// Connect to source DBs.
srcPool, err := dbpool.New(srcDBs)
if err != nil {
log.Fatal(err)
lo.Fatal(err)
}

// Connect to result DBs.
resPool, err := dbpool.New(resDBs)
if err != nil {
log.Fatal(err)
lo.Fatal(err)
}

// Initialize the result backend controller for every backend.
Expand All @@ -116,14 +115,20 @@ func initCore(ko *koanf.Koanf) *core.Core {

backend, err := sqldb.NewSQLBackend(db, opt, lo)
if err != nil {
log.Fatalf("error initializing result backend: %v", err)
lo.Fatalf("error initializing result backend: %v", err)
}

backends[name] = backend
}

lo := slog.Default()
if v := ko.MustString("job_queue.broker.type"); v != "redis" {
lo.Fatalf("unsupported job_queue.broker.type '%s'. Only 'redis' is supported.", v)
}
if v := ko.MustString("job_queue.state.type"); v != "redis" {
lo.Fatalf("unsupported job_queue.state.type '%s'. Only 'redis' is supported.", v)
}

lo := slog.Default()
rBroker := bredis.New(bredis.Options{
PollPeriod: bredis.DefaultPollPeriod,
Addrs: ko.MustStrings("job_queue.broker.addresses"),
Expand All @@ -136,15 +141,15 @@ func initCore(ko *koanf.Koanf) *core.Core {
}, lo)

rResult := rredis.New(rredis.Options{
Addrs: ko.MustStrings("job_queue.results.addresses"),
Password: ko.String("job_queue.results.password"),
DB: ko.Int("job_queue.results.db"),
MinIdleConns: ko.MustInt("job_queue.results.max_idle"),
DialTimeout: ko.MustDuration("job_queue.results.dial_timeout"),
ReadTimeout: ko.MustDuration("job_queue.results.read_timeout"),
WriteTimeout: ko.MustDuration("job_queue.results.write_timeout"),
Expiry: ko.Duration("job_queue.results.expiry"),
MetaExpiry: ko.Duration("job_queue.results.meta_expiry"),
Addrs: ko.MustStrings("job_queue.state.addresses"),
Password: ko.String("job_queue.state.password"),
DB: ko.Int("job_queue.state.db"),
MinIdleConns: ko.MustInt("job_queue.state.max_idle"),
DialTimeout: ko.MustDuration("job_queue.state.dial_timeout"),
ReadTimeout: ko.MustDuration("job_queue.state.read_timeout"),
WriteTimeout: ko.MustDuration("job_queue.state.write_timeout"),
Expiry: ko.Duration("job_queue.state.expiry"),
MetaExpiry: ko.Duration("job_queue.state.meta_expiry"),
}, lo)

// Initialize the server and load SQL tasks.
Expand Down

0 comments on commit f08a1a8

Please sign in to comment.