Skip to content

Commit

Permalink
Add support for multiple databases
Browse files Browse the repository at this point in the history
With this commit, we will support multiple logical databases within a
single dp3 instance. Databases are top-level folders under the storage
root. The default database is called "default". The client can switch
connected databases by using the new ".connect <database>" dot command.
No directory or file is created until first write - there is no explicit
database creation step.
  • Loading branch information
wkalt committed Apr 17, 2024
1 parent ad9a468 commit 54707ef
Show file tree
Hide file tree
Showing 26 changed files with 380 additions and 190 deletions.
51 changes: 32 additions & 19 deletions client/dp3/cmd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,10 @@ func withPaging(pager string, f func(io.Writer) error) error {
}
}

func executeQuery(s string) error {
func executeQuery(database string, query string) error {
req := &routes.QueryRequest{
Query: s,
Database: database,
Query: query,
}
buf := &bytes.Buffer{}
if err := json.NewEncoder(buf).Encode(req); err != nil {
Expand Down Expand Up @@ -139,7 +140,7 @@ func printError(s string) {

func run() error {
l, err := readline.NewEx(&readline.Config{
Prompt: "dp3 # ",
Prompt: "dp3:[default] # ",
HistoryFile: "/tmp/dp3-history.tmp",
InterruptPrompt: "^C",
EOFPrompt: "exit",
Expand All @@ -156,11 +157,12 @@ func run() error {
log.SetOutput(l.Stderr())

lines := []string{}
database := "default"
for {
line, err := l.Readline()
if err != nil {
if errors.Is(err, readline.ErrInterrupt) {
l.SetPrompt("dp3 # ")
l.SetPrompt(fmt.Sprintf("dp3:[%s] # ", database))
continue
}
if errors.Is(err, io.EOF) {
Expand All @@ -177,23 +179,32 @@ func run() error {
_, topic, _ := strings.Cut(line, " ")
fmt.Println(help[topic])
continue
case strings.HasPrefix(line, ".connect"):
parts := strings.Split(line, " ")[1:]
if len(parts) != 1 {
printError("usage: .connect <database>")
continue
}
database = parts[0]
l.SetPrompt(fmt.Sprintf("dp3:[%s] # ", database))
continue
case strings.HasPrefix(line, ".statrange"):
if err := handleStatRange(line); err != nil {
printError(err.Error())
}
continue
case strings.HasPrefix(line, ".import"):
if err := handleImport(line); err != nil {
if err := handleImport(database, line); err != nil {
printError(err.Error())
}
continue
case strings.HasPrefix(line, ".delete"):
if err := handleDelete(line); err != nil {
if err := handleDelete(database, line); err != nil {
printError(err.Error())
}
continue
case strings.HasPrefix(line, ".tables"):
if err := handleTables(line); err != nil {
if err := handleTables(database, line); err != nil {
printError(err.Error())
}
continue
Expand All @@ -209,17 +220,17 @@ func run() error {
}
query := strings.Join(lines, " ")
lines = lines[:0]
l.SetPrompt("dp3 # ")
l.SetPrompt(fmt.Sprintf("dp3:[%s] # ", database))
l.SaveHistory(query)
if err := executeQuery(strings.TrimSuffix(query, ";")); err != nil {
if err := executeQuery(database, strings.TrimSuffix(query, ";")); err != nil {
printError(err.Error())
}
}

return nil
}

func handleDelete(line string) error {
func handleDelete(database string, line string) error {
parts := strings.Split(line, " ")[1:]
if len(parts) < 4 {
return errors.New("not enough arguments")
Expand All @@ -234,11 +245,12 @@ func handleDelete(line string) error {
if err != nil {
return fmt.Errorf("failed to parse end time: %w", err)
}
return doDelete(producer, topic, starttime.UnixNano(), endtime.UnixNano())
return doDelete(database, producer, topic, starttime.UnixNano(), endtime.UnixNano())
}

func doDelete(producer, topic string, start, end int64) error {
func doDelete(database, producer, topic string, start, end int64) error {
req := &routes.DeleteRequest{
Database: database,
ProducerID: producer,
Topic: topic,
Start: uint64(start),
Expand All @@ -262,7 +274,7 @@ func doDelete(producer, topic string, start, end int64) error {
return nil
}

func handleImport(line string) error {
func handleImport(database string, line string) error {
parts := strings.Split(line, " ")[1:]
if len(parts) < 2 {
return errors.New("not enough arguments")
Expand All @@ -277,7 +289,7 @@ func handleImport(line string) error {
return fmt.Errorf("no files found matching %s", pattern)
}
workers := runtime.NumCPU() / 2
return doImport(producer, paths, workers)
return doImport(database, producer, paths, workers)
}

func handleStatRange(line string) error {
Expand Down Expand Up @@ -315,12 +327,13 @@ func handleStatRange(line string) error {
})
}

func printTables(w io.Writer, producerID string, topic string) error {
func printTables(w io.Writer, database string, producerID string, topic string) error {
var historical bool
if producerID != "" && topic != "" {
historical = true
}
req := &routes.TablesRequest{
Database: database,
Producer: producerID,
Topic: topic,
Historical: historical,
Expand Down Expand Up @@ -445,19 +458,19 @@ func printTables(w io.Writer, producerID string, topic string) error {
return nil
}

func handleTables(line string) error {
func handleTables(database string, line string) error {
parts := strings.Split(line, " ")[1:]

switch len(parts) {
case 0:
// all topics, all producers
return printTables(os.Stdout, "", "")
return printTables(os.Stdout, database, "", "")
case 1:
topic := parts[0]
return printTables(os.Stdout, "", topic)
return printTables(os.Stdout, database, "", topic)
case 2:
topic, producer := parts[0], parts[1]
return printTables(os.Stdout, producer, topic)
return printTables(os.Stdout, database, producer, topic)
default:
return errors.New("too many arguments")
}
Expand Down
7 changes: 5 additions & 2 deletions client/dp3/cmd/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ import (

var (
importProducerID string
importDatabase string
importWorkerCount int
)

func doImport(producer string, paths []string, workers int) error {
func doImport(database string, producer string, paths []string, workers int) error {
g := &errgroup.Group{}
g.SetLimit(workers)
for _, path := range paths {
Expand All @@ -31,6 +32,7 @@ func doImport(producer string, paths []string, workers int) error {
return fmt.Errorf("error getting absolute path: %w", err)
}
req := &routes.ImportRequest{
Database: database,
ProducerID: producer,
Path: abs,
}
Expand Down Expand Up @@ -67,7 +69,7 @@ var importCmd = &cobra.Command{
cmd.Usage()
return
}
if err := doImport(importProducerID, paths, importWorkerCount); err != nil {
if err := doImport(importDatabase, importProducerID, paths, importWorkerCount); err != nil {
bailf("Import error: %s", err)
}
},
Expand All @@ -76,6 +78,7 @@ var importCmd = &cobra.Command{
func init() {
rootCmd.AddCommand(importCmd)
importCmd.PersistentFlags().StringVarP(&importProducerID, "producer", "p", "", "Producer ID")
importCmd.PersistentFlags().StringVarP(&importDatabase, "database", "d", "", "Database")
importCmd.PersistentFlags().IntVarP(&importWorkerCount, "workers", "w", 1, "Worker count")
importCmd.MarkFlagRequired("producer")
}
28 changes: 19 additions & 9 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func Run(
ctx context.Context,
w io.Writer,
node *plan.Node,
scanFactory func(ctx context.Context, producer string, table string, start, end uint64) (*tree.Iterator, error),
scanFactory ScanFactory,
) error {
root, err := CompilePlan(ctx, node, scanFactory)
if err != nil {
Expand Down Expand Up @@ -79,7 +79,13 @@ func Run(
return nil
}

type ScanFactory func(ctx context.Context, producer string, table string, start, end uint64) (*tree.Iterator, error)
type ScanFactory func(
ctx context.Context,
database string,
producer string,
table string,
start, end uint64,
) (*tree.Iterator, error)

// CompilePlan compiles a "plan tree" -- a tree of plan nodes -- to a tree of
// executor nodes.
Expand Down Expand Up @@ -191,26 +197,30 @@ func compileScan(ctx context.Context, node *plan.Node, sf ScanFactory) (Node, er
if !ok {
return nil, fmt.Errorf("expected string alias, got %T", node.Args[1])
}
producer, ok := node.Args[2].(string)
database, ok := node.Args[2].(string)
if !ok {
return nil, fmt.Errorf("expected string database, got %T", node.Args[2])
}
producer, ok := node.Args[3].(string)
if !ok {
return nil, fmt.Errorf("expected string producer, got %T", node.Args[1])
}
var err error
var start, end uint64
if node.Args[3] == "all-time" {
if node.Args[4] == "all-time" {
start = 0
end = ^uint64(0)
} else {
start, ok = node.Args[3].(uint64)
start, ok = node.Args[4].(uint64)
if !ok {
return nil, fmt.Errorf("expected uint64 start time, got %T", node.Args[2])
return nil, fmt.Errorf("expected uint64 start time, got %T", node.Args[4])
}
end, ok = node.Args[4].(uint64)
end, ok = node.Args[5].(uint64)
if !ok {
return nil, fmt.Errorf("expected uint64 end time, got %T", node.Args[3])
return nil, fmt.Errorf("expected uint64 end time, got %T", node.Args[5])
}
}
it, err := sf(ctx, producer, table, start, end)
it, err := sf(ctx, database, producer, table, start, end)
if err != nil {
return nil, err
}
Expand Down
12 changes: 6 additions & 6 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func TestQueryExecution(t *testing.T) {
parser := ql.NewParser()
ast, err := parser.ParseString("", c.query)
require.NoError(t, err)
qp, err := plan.CompileQuery(*ast)
qp, err := plan.CompileQuery("db", *ast)
require.NoError(t, err)
actual, err := executor.CompilePlan(ctx, qp, tmgr.NewTreeIterator)
require.NoError(t, err)
Expand Down Expand Up @@ -146,7 +146,7 @@ func TestQueryExecution(t *testing.T) {
query := "from device topic-0 where topic-0.s " + query
ast, err := parser.ParseString("", query)
require.NoError(t, err)
qp, err := plan.CompileQuery(*ast)
qp, err := plan.CompileQuery("db", *ast)
require.NoError(t, err)
actual, err := executor.CompilePlan(ctx, qp, tmgr.NewTreeIterator)
require.NoError(t, err)
Expand Down Expand Up @@ -201,7 +201,7 @@ func TestQueryExecution(t *testing.T) {
}
ast, err := parser.ParseString("", query)
require.NoError(t, err)
qp, err := plan.CompileQuery(*ast)
qp, err := plan.CompileQuery("db", *ast)
require.NoError(t, err)
actual, err := executor.CompilePlan(ctx, qp, tmgr.NewTreeIterator)
require.NoError(t, err)
Expand Down Expand Up @@ -280,7 +280,7 @@ func TestCompilePlan(t *testing.T) {
ast, err := parser.ParseString("", c.query)
require.NoError(t, err)

qp, err := plan.CompileQuery(*ast)
qp, err := plan.CompileQuery("db", *ast)
require.NoError(t, err)

actual, err := executor.CompilePlan(ctx, qp, tmgr.NewTreeIterator)
Expand All @@ -294,7 +294,7 @@ func prepTmgr(t *testing.T, ctx context.Context, tmgr *treemgr.TreeManager) {
t.Helper()
buf := &bytes.Buffer{}
mcap.WriteFile(t, buf, [][]int64{{1, 3, 5}, {2, 4, 6}}...)
require.NoError(t, tmgr.Receive(ctx, "device", buf))
require.NoError(t, tmgr.Receive(ctx, "db", "device", buf))
require.NoError(t, tmgr.ForceFlush(ctx))
}

Expand Down Expand Up @@ -356,7 +356,7 @@ func prepTmgr2(t *testing.T, ctx context.Context, tmgr *treemgr.TreeManager) {
c++
}
require.NoError(t, w.Close())
require.NoError(t, tmgr.Receive(ctx, "device", buf))
require.NoError(t, tmgr.Receive(ctx, "db", "device", buf))
require.NoError(t, tmgr.ForceFlush(ctx))
buf.Reset()
}
Expand Down
8 changes: 4 additions & 4 deletions plan/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func compileOr(ast []ql.OrClause) *Node {
}

// CompileQuery compiles an AST query to a plan node.
func CompileQuery(ast ql.Query) (*Node, error) {
func CompileQuery(database string, ast ql.Query) (*Node, error) {
start := int64(0)
end := int64(math.MaxInt64)
var err error
Expand All @@ -268,7 +268,7 @@ func CompileQuery(ast ql.Query) (*Node, error) {
traverse(
base,
pullUpMergeJoins,
func(n *Node) { pushDownFilters(n, where, producer, uint64(start), uint64(end)) })
func(n *Node) { pushDownFilters(n, where, database, producer, uint64(start), uint64(end)) })
if len(ast.PagingClause) > 0 {
base = wrapWithPaging(base, ast.PagingClause)
}
Expand All @@ -279,14 +279,14 @@ func CompileQuery(ast ql.Query) (*Node, error) {
// since we don't know about schemas here. The executor will resolve
// it according to the schema of the data and error if nonsense is
// submitted.
func pushDownFilters(n *Node, where *Node, producer string, start, end uint64) {
func pushDownFilters(n *Node, where *Node, database string, producer string, start, end uint64) {
if n.Type != Scan {
return
}
if len(where.Children) > 0 {
n.Children = append(n.Children, where)
}
n.Args = append(n.Args, producer)
n.Args = append(n.Args, database, producer)
if start == 0 && end == math.MaxInt64 {
n.Args = append(n.Args, "all-time")
return
Expand Down
Loading

0 comments on commit 54707ef

Please sign in to comment.