Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cherry-pick] drainer,pump: Add support for enabling gzip grpc compression #530

Merged
merged 1 commit into from
Apr 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/drainer/drainer.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ data-dir = "data.drainer"
# a comma separated list of PD endpoints
pd-urls = "http://127.0.0.1:2379"

# Use the specified compressor to compress payload between pump and drainer
compressor = ""

#[security]
# Path of file that contains list of trusted SSL CAs for connection with cluster components.
# ssl-ca = "/path/to/ca.pem"
Expand Down
17 changes: 17 additions & 0 deletions drainer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const (
var (
maxBinlogItemCount int
defaultBinlogItemCount = 16 << 12
supportedCompressors = [...]string{"gzip"}
)

// SyncerConfig is the Syncer's configuration.
Expand Down Expand Up @@ -73,6 +74,7 @@ type Config struct {
SyncerCfg *SyncerConfig `toml:"syncer" json:"sycner"`
Security security.Config `toml:"security" json:"security"`
SyncedCheckTime int `toml:"synced-check-time" json:"synced-check-time"`
Compressor string `toml:"compressor" json:"compressor"`
EtcdTimeout time.Duration
MetricsAddr string
MetricsInterval int
Expand Down Expand Up @@ -106,6 +108,7 @@ func NewConfig() *Config {
fs.StringVar(&cfg.LogFile, "log-file", "", "log file path")
fs.StringVar(&cfg.LogRotate, "log-rotate", "", "log file rotate type, hour/day")
fs.Int64Var(&cfg.InitialCommitTS, "initial-commit-ts", 0, "if drainer donesn't have checkpoint, use initial commitTS to initial checkpoint")
fs.StringVar(&cfg.Compressor, "compressor", "", "use the specified compressor to compress payload between pump and drainer, only 'gzip' is supported now (default \"\", ie. compression disabled.)")
fs.IntVar(&cfg.SyncerCfg.TxnBatch, "txn-batch", 20, "number of binlog events in a transaction batch")
fs.StringVar(&cfg.SyncerCfg.IgnoreSchemas, "ignore-schemas", "INFORMATION_SCHEMA,PERFORMANCE_SCHEMA,mysql", "disable sync those schemas")
fs.IntVar(&cfg.SyncerCfg.WorkerCount, "c", 16, "parallel worker count")
Expand Down Expand Up @@ -245,6 +248,20 @@ func (cfg *Config) validate() error {
}
}

if cfg.Compressor != "" {
found := false
for _, c := range supportedCompressors {
if cfg.Compressor == c {
found = true
break
}
}
if !found {
return errors.Errorf(
"Invalid compressor: %v, must be one of these: %v", cfg.Compressor, supportedCompressors)
}
}

return nil
}

Expand Down
10 changes: 9 additions & 1 deletion drainer/pump.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
_ "google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/status"
)

Expand Down Expand Up @@ -177,7 +178,14 @@ func (p *Pump) createPullBinlogsClient(ctx context.Context, last int64) error {
p.grpcConn.Close()
}

conn, err := grpc.Dial(p.addr, grpc.WithInsecure(), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)))
callOpts := []grpc.CallOption{grpc.MaxCallRecvMsgSize(maxMsgSize)}

if compressor, ok := ctx.Value(drainerKeyType("compressor")).(string); ok {
log.Infof("[pump %s] grpc compression enabled", p.nodeID)
callOpts = append(callOpts, grpc.UseCompressor(compressor))
}

conn, err := grpc.Dial(p.addr, grpc.WithInsecure(), grpc.WithDefaultCallOptions(callOpts...))
if err != nil {
log.Errorf("[pump %s] create grpc dial error %v", p.nodeID, err)
p.pullCli = nil
Expand Down
4 changes: 4 additions & 0 deletions drainer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ var (
pdReconnTimes = 30
)

type drainerKeyType string

// Server implements the gRPC interface,
// and maintains the runtime status
type Server struct {
Expand Down Expand Up @@ -86,6 +88,8 @@ func NewServer(cfg *Config) (*Server, error) {
}

ctx, cancel := context.WithCancel(context.Background())
ctx = context.WithValue(ctx, drainerKeyType("compressor"), cfg.Compressor)

clusterID = pdCli.GetClusterID(ctx)
// update latestTS and latestTime
latestTS, err := util.GetTSO(pdCli)
Expand Down
3 changes: 2 additions & 1 deletion pump/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/gorilla/mux"
"github.com/ngaut/log"
"github.com/pingcap/errors"
"github.com/pingcap/pd/client"
pd "github.com/pingcap/pd/client"
"github.com/pingcap/tidb-binlog/pkg/flags"
"github.com/pingcap/tidb-binlog/pkg/node"
"github.com/pingcap/tidb-binlog/pkg/util"
Expand All @@ -33,6 +33,7 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
_ "google.golang.org/grpc/encoding/gzip"
)

var notifyDrainerTimeout = time.Second * 10
Expand Down