forked from hanguofeng/taiji
-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathsequential_arbiter.go
125 lines (102 loc) · 2.9 KB
/
sequential_arbiter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package main
import (
"sync/atomic"
"time"
"github.com/Shopify/sarama"
"github.com/golang/glog"
)
type SequentialArbiter struct {
*StartStopControl
// input/output
offsets chan int64
messages chan *sarama.ConsumerMessage
// parent
manager *PartitionManager
// config
config *CallbackItemConfig
arbiterConfig ArbiterConfig
// stat variables
processed uint64
startTime time.Time
}
func NewSequentialArbiter() Arbiter {
return &SequentialArbiter{
StartStopControl: NewStartStopControl(),
}
}
func (*SequentialArbiter) PreferredTransporterWorkerNum(workerNum int) int {
return 1
}
func (sa *SequentialArbiter) OffsetChannel() chan<- int64 {
return sa.offsets
}
func (sa *SequentialArbiter) MessageChannel() <-chan *sarama.ConsumerMessage {
return sa.messages
}
func (sa *SequentialArbiter) Init(config *CallbackItemConfig, arbiterConfig ArbiterConfig, manager *PartitionManager) error {
sa.manager = manager
sa.config = config
sa.arbiterConfig = arbiterConfig
return nil
}
func (sa *SequentialArbiter) Run() error {
if err := sa.ensureStart(); err != nil {
return err
}
defer sa.markStop()
// reset stat variables
atomic.StoreUint64(&sa.processed, 0)
sa.startTime = time.Now().Local()
consumer := sa.manager.GetKafkaPartitionConsumer()
// buffer only one message
sa.offsets = make(chan int64)
sa.messages = make(chan *sarama.ConsumerMessage)
// cold start trigger
go func() {
sa.offsets <- int64(-1)
}()
sa.markReady()
arbiterLoop:
for {
select {
case <-sa.WaitForCloseChannel():
glog.V(1).Infof("Stop event triggered [url:%s]", sa.config.Url)
break arbiterLoop
case offset := <-sa.offsets:
glog.V(1).Infof("Read offset from Transporter [topic:%s][partition:%d][url:%s][offset:%d]",
sa.manager.Topic, sa.manager.Partition, sa.config.Url, offset)
if offset >= 0 {
// trigger offset commit
sa.manager.GetCallbackManager().GetOffsetManager().CommitOffset(
sa.manager.Topic, sa.manager.Partition, offset)
}
select {
case <-sa.WaitForCloseChannel():
glog.V(1).Infof("Stop event triggered [url:%s]", sa.config.Url)
break arbiterLoop
case message := <-consumer.Messages():
glog.V(1).Infof("Read message from PartitionConsumer [topic:%s][partition:%d][url:%s][offset:%d]",
message.Topic, message.Partition, sa.config.Url, message.Offset)
// feed message to transporter
select {
case <-sa.WaitForCloseChannel():
glog.V(1).Infof("Stop event triggered [url:%s]", sa.config.Url)
break arbiterLoop
case sa.messages <- message:
}
atomic.AddUint64(&sa.processed, 1)
}
}
}
return nil
}
func (sa *SequentialArbiter) GetStat() interface{} {
result := make(map[string]interface{})
result["processed"] = atomic.LoadUint64(&sa.processed)
result["start_time"] = sa.startTime
return result
}
func init() {
RegisterArbiter("Sequential", NewSequentialArbiter)
RegisterArbiter("seq", NewSequentialArbiter)
}