forked from hanguofeng/taiji
-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathoffset_storage.go
40 lines (31 loc) · 1 KB
/
offset_storage.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package main
import (
"errors"
"fmt"
"strings"
)
type OffsetStorage interface {
// start stop control
Init(config OffsetStorageConfig, manager *CallbackManager) error
Run() error
Close() error
// offset management
InitializePartition(topic string, partition int32) (int64, error)
ReadOffset(topic string, partition int32) (int64, error)
WriteOffset(topic string, partition int32, offset int64) error
FinalizePartition(topic string, partition int32) error
// stat
GetStat() interface{}
}
type OffsetStorageCreator func() OffsetStorage
var registeredOffsetStorageMap = make(map[string]OffsetStorageCreator)
func RegisterOffsetStorage(name string, factory OffsetStorageCreator) {
registeredOffsetStorageMap[strings.ToLower(name)] = factory
}
func NewOffsetStorage(name string) (OffsetStorage, error) {
name = strings.ToLower(name)
if registeredOffsetStorageMap[name] == nil {
return nil, errors.New(fmt.Sprintf("OffsetStorage %s not exists", name))
}
return registeredOffsetStorageMap[name](), nil
}