-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat:Stream base commands #1955
Conversation
你这个实现有没有可能hash key和xstream key冲突? |
好的,我加到 replication_test 里面 |
有两个前缀会冲突,我在 hash 中禁止掉了这两个前缀 // pika_hash.cc
// check the conflict of stream used prefix, see details in defination of STREAM_TREE_PREFIX
if (key_.compare(0, STERAM_TREE_PREFIX.size(), STERAM_TREE_PREFIX) == 0 ||
key_.compare(0, STREAM_DATA_HASH_PREFIX.size(), STREAM_DATA_HASH_PREFIX) == 0) {
res_.SetRes(CmdRes::kErrOther, "hash key can't start with " + STERAM_TREE_PREFIX + " or " + STREAM_META_HASH_KEY);
return;
} //pika_stream_base.h
// each abstracted tree has a prefix,
// prefix + treeID will be the key of the hash to store the tree,
// notice: we need to ban the use of this prefix when using "HSET".
static const std::string STERAM_TREE_PREFIX = "STR_TREE";
// key of the hash to store stream meta,
// notice: we need to ban the use of this key when using "HSET".
static const std::string STREAM_META_HASH_KEY = "STREAM";
// each stream's data (messages) are stored in a hash,
// to avoid stream key confilict with hash key, we add a prefix to each key for stream data,
// it's ok to use the same string as STREAM_META_HASH_KEY to be the prefix.
// notice: we need to ban the use of this prefix when using "HSET".
static const std::string STREAM_DATA_HASH_PREFIX = STREAM_META_HASH_KEY; |
The XADD command in Redis has the capability to generate IDs based on timestamps. To ensure that repeated execution of a command results in the same timestamp, it is necessary to include the generated ID in the command when generating the binlog. This way, when replaying or replicating, the same ID will be obtained.
这样搞也行吧。为什么当初不选择单独整个db类型来支持stream |
那有个疑问, stream的数据会不会 也能被 hash操作了, 比如通过 hash相关的命令 增删改查, 还有 在scan的时候 会不会也把 Steam的数据也 拿到了 |
一开始是 db 的方案,后来被换掉了,因为blackwidow本身有一些缺陷可能要改,如果往里面加类型会增大后续改 blackwidow 的工作量。 |
我想了下,理论上如果 hset 不能调用成功,db 里面的 hash 就没有对应 key 的元数据,hash 的其它操作也不会执行成功。 |
突然想到了, 能不能给 hash的db 再加一个 |
是可以的,但要搞个方法判断是不是要存入这个特定的 column family,要么新增接口,要么搞个 flag,感觉都不太优雅。我建议既然选择了不在 blackwidow 做单独的类型,就尽量不侵入 blackwidow 层。 |
我觉得还是单独搞个db 设计cf编码合理,你和hash捏在一块,可能影响hash结构的性能 当然这么搞我也没啥意见,得雨哥谦祥排一下 |
这个方案最早是我的意见,可以先实现一个版本。然后等后续再改进一版,在 blackwidow 层面进行升级。还是希望 @KKorpse 在这个 PR 合并完毕后,能进一步完成整体的工作。 |
good idea, 可以现有功能, 可以 后续在 blackwidow 或者 Floyd 里再优化存储 |
好的,以后可以根据存储层的具体方案来优化,blackwidow或者floyd都可以。 |
在单机测试部分,建议加上针对同一个stream的,多个线程生产,多个线程消费同时进行的并发测试,这也是实际使用时经常出现的场景,同时也能对锁的正确性进行一个基础检验。 |
* [develop] finish StreamMetaValue, ParsedStreamMetaValue in stream_meta_value.h * [develop] add other cgroup, consumer and pel meta value * [develop] move stream metadata out of the storage layer and rewrite it. * [develop] add two class :TreeIDGenerator, StreamUtil * [develop] add some helper func * [develop] xadd cmd's most function finished * [develop] XREAD command parse * [temp] add xgroup cmmand parse * [develop] XGROUP commad parse, XGROUP CREATE, XGROUP CREATECONSUMER finished * [develop] XRANGE, XREAD finished, add some unit test * [develop] XREADGROUP finished * [develop] XLEN finished * [develop] XACK command finished * [develop] DEL command support, XGROUP DESTORY finished, registered some command. * [debug] fix the problem of CMakeFile in test/stream * Add xgroup help command * fix conflicts * [debug] fix the compile problem in debug mode * [develop] XTRIM command finished * [debug] fix some problem of XTRIM command * [develop] remove stream cpp utest, fix some bug and pass 23 unit test * [develop] ajust code structure, add 2 tests * [develop] XCLAIM command finished * [test] now pass 30 tests * [develop] ajust code structure, fix a return value bug of XRANGE * [develop] XREVRANGE cmd finished and passed tests * [develop] add 4 more tests of XADD and XDEL * [develop] Adjusting code structure * [develop] change const shared_ptr & to raw pointer * [develop] remove xgroup commands implementation * [fix] fix compile problem on mac and centos * [fix] add licence, fix compile problem * [fix] code format * [fix] fix compile problem on macos * [fix] fix macos compile problem again, add log to find ubuntu test problem * fix the change of config file * [fix] fix a bug found by sanitizer * [fix] add test: XADD large data, triggering flushing, XREAD, XLEN, XRANG, XTRIIM should work * "fix: ./stream_test.go:76:6: randomInt redeclared in this block" * fix: add replication test of stream * fix: a bug of XADD replication and add a stronger replication test. The XADD command in Redis has the capability to generate IDs based on timestamps. To ensure that repeated execution of a command results in the same timestamp, it is necessary to include the generated ID in the command when generating the binlog. This way, when replaying or replicating, the same ID will be obtained. * fix: compile bug. * fix: add concurrency test of stream. --------- Co-authored-by: Jinghui <[email protected]>
* [develop] finish StreamMetaValue, ParsedStreamMetaValue in stream_meta_value.h * [develop] add other cgroup, consumer and pel meta value * [develop] move stream metadata out of the storage layer and rewrite it. * [develop] add two class :TreeIDGenerator, StreamUtil * [develop] add some helper func * [develop] xadd cmd's most function finished * [develop] XREAD command parse * [temp] add xgroup cmmand parse * [develop] XGROUP commad parse, XGROUP CREATE, XGROUP CREATECONSUMER finished * [develop] XRANGE, XREAD finished, add some unit test * [develop] XREADGROUP finished * [develop] XLEN finished * [develop] XACK command finished * [develop] DEL command support, XGROUP DESTORY finished, registered some command. * [debug] fix the problem of CMakeFile in test/stream * Add xgroup help command * fix conflicts * [debug] fix the compile problem in debug mode * [develop] XTRIM command finished * [debug] fix some problem of XTRIM command * [develop] remove stream cpp utest, fix some bug and pass 23 unit test * [develop] ajust code structure, add 2 tests * [develop] XCLAIM command finished * [test] now pass 30 tests * [develop] ajust code structure, fix a return value bug of XRANGE * [develop] XREVRANGE cmd finished and passed tests * [develop] add 4 more tests of XADD and XDEL * [develop] Adjusting code structure * [develop] change const shared_ptr & to raw pointer * [develop] remove xgroup commands implementation * [fix] fix compile problem on mac and centos * [fix] add licence, fix compile problem * [fix] code format * [fix] fix compile problem on macos * [fix] fix macos compile problem again, add log to find ubuntu test problem * fix the change of config file * [fix] fix a bug found by sanitizer * [fix] add test: XADD large data, triggering flushing, XREAD, XLEN, XRANG, XTRIIM should work * "fix: ./stream_test.go:76:6: randomInt redeclared in this block" * fix: add replication test of stream * fix: a bug of XADD replication and add a stronger replication test. The XADD command in Redis has the capability to generate IDs based on timestamps. To ensure that repeated execution of a command results in the same timestamp, it is necessary to include the generated ID in the command when generating the binlog. This way, when replaying or replicating, the same ID will be obtained. * fix: compile bug. * fix: add concurrency test of stream. --------- Co-authored-by: Jinghui <[email protected]>
issue:#1584
实现文档:#1717
PR 概要
Redis 中的 stream 分为两部分指令,分别对应两个单测文件:
第一部分完全独立于第二部分的指令,拥有消息队列增删改查等基本功能,第二部分是第一部分基础上的更高级功能。
鉴于工作量巨大,我建议先把第一部分的指令,以及基础框架合并,第二部分中的指令作为单独的 issue 分批次补充(或者直接在迁移到 4.0 后补充)。
此 pr 为第一部分功能的实现,包含了第一部分的所有指令,以及第二部分所依赖的元数据,工具函数等。(代码是从原始pr中抽离出来的,原始完整 pr 见:#1673)
测试:单测使用的是 go 语言,内容在 stream.go 中,依附于 integration 已有的测试框架。其中的所有测试样例都是从 redis 官方的 stream.tcl 中翻译而来,不支持的单测在后续有解释。
该 PR 支持的指令:Stream 基本指令集
暂不支持的指令:Stream 消费组指令集
(实现代码未在当前 pr 中)
暂不支持的单测及理由:
AOF文件相关
Pika 无 AOF,无需支持
XADD,XTRIM: “~” 以及 “limit” 参数
~ 和 limit 是 redis 用于模糊裁剪以提高裁剪性能的参数。redis stream 的消息是存储在 radix tree 中,每个树节指向了一个 Listpack,内可容纳多条消息。而模糊裁剪是为了性能,以树节点为单位进行裁剪。
Pika 每条消息都是以独立 kv 对存储的,不需要模糊裁剪功能。对于 ~ 参数,pika 当作 = 参数使用。对于 limit 参数,pika 会返回参数错误。
XSETID
管理员指令,无实用意义,优先级在 xgroup 之后。
XREAD:block
block 参数功能和 blpop,brpop 类似,可以仿照其逻辑。但考虑到实现与 Pika 的线程模型有关,建议迁移到 Pika 4.0 以后再实现。
后续流程建议