Skip to content

Commit

Permalink
Add support for loading multiple --sql-directory sources
Browse files Browse the repository at this point in the history
  • Loading branch information
knadh committed Nov 1, 2018
1 parent 4d5878a commit 70c16ab
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 9 deletions.
28 changes: 20 additions & 8 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ var (

// Global Jobber container.
jobber = &Jobber{
Tasks: make(Tasks),
DBs: make(DBs),
ResultBackends: make(ResultBackends),
Logger: sysLog,
Expand All @@ -82,14 +83,14 @@ func init() {
viper.SetConfigType("toml")
viper.SetDefault("config", "config.toml")
viper.SetDefault("server", ":6060")
viper.SetDefault("sql-directory", "./sql")
viper.SetDefault("sql-directory", []string{"./sql"})
viper.SetDefault("worker-name", "sqljobber")
viper.SetDefault("worker-concurrency", 10)
viper.SetDefault("worker-only", false)

flagSet.String("config", "config.toml", "Path to the TOML configuration file")
flagSet.String("server", "127.0.0.1:6060", "Web server address")
flagSet.String("sql-directory", "./sql", "Path to the directory with .sql scripts")
flagSet.StringSlice("sql-directory", []string{"./sql"}, "Path to directory with .sql scripts. Can be specified multiple times")
flagSet.String("queue", "default_queue", "Name of the job queue to accept jobs from")
flagSet.String("worker-name", "sqljobber", "Name of this worker instance")
flagSet.Int("worker-concurrency", 10, "Number of concurrent worker threads to run")
Expand Down Expand Up @@ -173,14 +174,24 @@ func main() {
jobber.ResultBackends[dbName] = backend
}

var err error
// Parse and load SQL queries.
sysLog.Printf("loading SQL queries from %s", viper.GetString("sql-directory"))
if jobber.Tasks, err = loadSQLTasks(viper.GetString("sql-directory"),
jobber.DBs, jobber.ResultBackends, viper.GetString("queue")); err != nil {
sysLog.Fatal(err)
for _, d := range viper.GetStringSlice("sql-directory") {
sysLog.Printf("loading SQL queries from directory: %s", d)
tasks, err := loadSQLTasks(d, jobber.DBs, jobber.ResultBackends, viper.GetString("queue"))
if err != nil {
sysLog.Fatal(err)
}

for t, q := range tasks {
if _, ok := jobber.Tasks[t]; ok {
sysLog.Fatalf("duplicate task %s", t)
}

jobber.Tasks[t] = q
}
sysLog.Printf("loaded %d SQL queries from %s", len(tasks), d)
}
sysLog.Printf("loaded %d SQL queries", len(jobber.Tasks))
sysLog.Printf("loaded %d tasks in total", len(jobber.Tasks))

// Bind the server HTTP endpoints.
r := chi.NewRouter()
Expand All @@ -201,6 +212,7 @@ func main() {
r.Get("/groups/{groupID}", handleGetGroupStatus)

// Setup the job server.
var err error
jobber.Machinery, err = connectJobServer(jobber, &config.Config{
Broker: viper.GetString("machinery.broker_address"),
DefaultQueue: viper.GetString("queue"),
Expand Down
2 changes: 1 addition & 1 deletion tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func loadSQLTasks(dir string, dbs DBs, resBackends ResultBackends, defQueue stri
queue = strings.TrimSpace(v)
}

sysLog.Printf("-- loaded task %s (%s) (db = %v) (results = %v) (queue = %v)", name, typ,
sysLog.Printf("-- task %s (%s) (db = %v) (results = %v) (queue = %v)", name, typ,
dbsToAttach.GetNames(), resBackendsToAttach.GetNames(), queue)
tasks[name] = Task{
Name: name,
Expand Down

0 comments on commit 70c16ab

Please sign in to comment.