Skip to content

Commit

Permalink
Refactor resultbackend and related models to separate packages.
Browse files Browse the repository at this point in the history
  • Loading branch information
knadh committed Mar 13, 2024
1 parent 9933f69 commit 0a5b18c
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 31 deletions.
6 changes: 3 additions & 3 deletions cmd/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"time"

"github.com/knadh/koanf/v2"
"github.com/zerodha/dungbeetle/backends"
"github.com/zerodha/dungbeetle/internal/resultbackends/sqldb"
)

// dbConfig represents an SQL database's configuration.
Expand Down Expand Up @@ -83,14 +83,14 @@ func initDBs(server *Server, ko *koanf.Koanf) {
log.Fatal(err)
}

opt := backends.Opt{
opt := sqldb.Opt{
DBType: cfg.Type,
ResultsTable: ko.MustString(fmt.Sprintf("results.%s.results_table", dbName)),
UnloggedTables: cfg.Unlogged,
}

// Create a new backend instance.
backend, err := backends.NewSQLBackend(conn, opt, lo)
backend, err := sqldb.NewSQLBackend(conn, opt, lo)
if err != nil {
log.Fatalf("error initializing result backend: %v", err)
}
Expand Down
8 changes: 4 additions & 4 deletions cmd/results.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ import (
"math/rand"
"strings"

"github.com/zerodha/dungbeetle/backends"
"github.com/zerodha/dungbeetle/models"
)

// ResultBackends represents a map of *sql.DB connections.
type ResultBackends map[string]backends.ResultBackend
type ResultBackends map[string]models.ResultBackend

// Get returns an *sql.DB from the DBs map by name.
func (r ResultBackends) Get(name string) backends.ResultBackend {
func (r ResultBackends) Get(name string) models.ResultBackend {
return r[name]
}

Expand All @@ -27,7 +27,7 @@ func (r ResultBackends) GetNames() []string {
}

// GetRandom returns a random *sql.DB from the DBs map.
func (r ResultBackends) GetRandom() (string, backends.ResultBackend) {
func (r ResultBackends) GetRandom() (string, models.ResultBackend) {
stop := 0
if len(r) > 1 {
stop = rand.Intn(len(r))
Expand Down
43 changes: 24 additions & 19 deletions backends/sqldb.go → internal/resultbackends/sqldb/sqldb.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
package backends
// Package sqldb is a general SQL DB backend implementation that takes an stdlib
// sql.DB connection and creates tables and writes results to it.
// It has explicit support for MySQL and PostGres for handling differences in
// SQL dialects, but should ideally work with any standard SQL backend.
package sqldb

import (
"database/sql"
Expand All @@ -8,6 +12,8 @@ import (
"strings"
"sync"
"time"

"github.com/zerodha/dungbeetle/models"
)

const (
Expand All @@ -22,8 +28,8 @@ type Opt struct {
UnloggedTables bool
}

// sqlDB represents the sqlDB backend.
type sqlDB struct {
// SqlDB represents the SqlDB backend.
type SqlDB struct {
db *sql.DB
opt Opt
logger *log.Logger
Expand All @@ -35,9 +41,9 @@ type sqlDB struct {
schemaMutex sync.RWMutex
}

// sqlDBWriter represents a writer that saves results
// SQLDBResultSet represents a writer that saves results
// to a sqlDB backend.
type sqlDBWriter struct {
type SQLDBResultSet struct {
jobID string
taskName string
colsWritten bool
Expand All @@ -46,7 +52,7 @@ type sqlDBWriter struct {
tx *sql.Tx
tbl string

backend *sqlDB
backend *SqlDB
}

// insertSchema contains the generated SQL for creating tables
Expand All @@ -58,14 +64,13 @@ type insertSchema struct {
}

// NewSQLBackend returns a new sqlDB result backend instance.
// It accepts an *sql.DB connection
func NewSQLBackend(db *sql.DB, opt Opt, l *log.Logger) (ResultBackend, error) {
s := sqlDB{
func NewSQLBackend(db *sql.DB, opt Opt, lo *log.Logger) (*SqlDB, error) {
s := SqlDB{
db: db,
opt: opt,
resTableSchemas: make(map[string]insertSchema),
schemaMutex: sync.RWMutex{},
logger: l,
logger: lo,
}

// Config.
Expand All @@ -81,13 +86,13 @@ func NewSQLBackend(db *sql.DB, opt Opt, l *log.Logger) (ResultBackend, error) {
// NewResultSet returns a new instance of an sqlDB result writer.
// A new instance should be acquired for every individual job result
// to be written to the backend and then thrown away.
func (s *sqlDB) NewResultSet(jobID, taskName string, ttl time.Duration) (ResultSet, error) {
func (s *SqlDB) NewResultSet(jobID, taskName string, ttl time.Duration) (models.ResultSet, error) {
tx, err := s.db.Begin()
if err != nil {
return nil, err
}

return &sqlDBWriter{
return &SQLDBResultSet{
jobID: jobID,
taskName: taskName,
backend: s,
Expand All @@ -101,7 +106,7 @@ func (s *sqlDB) NewResultSet(jobID, taskName string, ttl time.Duration) (ResultS
// creates a CREATE TABLE() schema for the results table with the structure of the
// particular taskName, and caches it be used for every subsequent result db creation
// and population. This should only be called once for each kind of taskName.
func (w *sqlDBWriter) RegisterColTypes(cols []string, colTypes []*sql.ColumnType) error {
func (w *SQLDBResultSet) RegisterColTypes(cols []string, colTypes []*sql.ColumnType) error {
if w.IsColTypesRegistered() {
return errors.New("column types are already registered")
}
Expand Down Expand Up @@ -139,7 +144,7 @@ func (w *sqlDBWriter) RegisterColTypes(cols []string, colTypes []*sql.ColumnType

// IsColTypesRegistered checks whether the column types for a particular taskName's
// structure is registered in the backend.
func (w *sqlDBWriter) IsColTypesRegistered() bool {
func (w *SQLDBResultSet) IsColTypesRegistered() bool {
w.backend.schemaMutex.RLock()
_, ok := w.backend.resTableSchemas[w.taskName]
w.backend.schemaMutex.RUnlock()
Expand All @@ -151,7 +156,7 @@ func (w *sqlDBWriter) IsColTypesRegistered() bool {
// Internally, it creates a sqlDB database and creates a results table
// based on the schema RegisterColTypes() would've created and cached.
// This should only be called once on a ResultWriter instance.
func (w *sqlDBWriter) WriteCols(cols []string) error {
func (w *SQLDBResultSet) WriteCols(cols []string) error {
if w.colsWritten {
return fmt.Errorf("columns for '%s' are already written", w.taskName)
}
Expand Down Expand Up @@ -187,7 +192,7 @@ func (w *sqlDBWriter) WriteCols(cols []string) error {

// WriteRow writes an individual row from a result set to the backend.
// Internally, it INSERT()s the given row into the sqlDB results table.
func (w *sqlDBWriter) WriteRow(row []interface{}) error {
func (w *SQLDBResultSet) WriteRow(row []interface{}) error {
w.backend.schemaMutex.RLock()
rSchema, ok := w.backend.resTableSchemas[w.taskName]
w.backend.schemaMutex.RUnlock()
Expand All @@ -202,7 +207,7 @@ func (w *sqlDBWriter) WriteRow(row []interface{}) error {
}

// Flush flushes the rows written into the sqlDB pipe.
func (w *sqlDBWriter) Flush() error {
func (w *SQLDBResultSet) Flush() error {
err := w.tx.Commit()
if err != nil {
return err
Expand All @@ -212,7 +217,7 @@ func (w *sqlDBWriter) Flush() error {
}

// Close closes the active sqlDB connection.
func (w *sqlDBWriter) Close() error {
func (w *SQLDBResultSet) Close() error {
if w.tx != nil {
return w.tx.Rollback()
}
Expand All @@ -222,7 +227,7 @@ func (w *sqlDBWriter) Close() error {

// createTableSchema takes an SQL query results, gets its column names and types,
// and generates a sqlDB CREATE TABLE() schema for the results.
func (s *sqlDB) createTableSchema(cols []string, colTypes []*sql.ColumnType) insertSchema {
func (s *SqlDB) createTableSchema(cols []string, colTypes []*sql.ColumnType) insertSchema {
var (
colNameHolder = make([]string, len(cols))
colValHolder = make([]string, len(cols))
Expand Down
7 changes: 2 additions & 5 deletions backends/backends.go → models/resultbackends.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
// Package backends provides interfaces to write backends for
// DungBeetle that take and store results from executed SQL jobs.
package backends
package models

import (
"database/sql"
Expand All @@ -13,8 +11,7 @@ type ResultBackend interface {
NewResultSet(dbName, taskName string, ttl time.Duration) (ResultSet, error)
}

// ResultSet represents the set of results from an individual
// job that's executed.
// ResultSet represents the set of results from an individual job that's executed.
type ResultSet interface {
RegisterColTypes([]string, []*sql.ColumnType) error
IsColTypesRegistered() bool
Expand Down

0 comments on commit 0a5b18c

Please sign in to comment.