Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #735: Event Time support in c++ client #736

Merged
merged 3 commits into from
Sep 14, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pulsar-client-cpp/include/pulsar/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ class Message {
*/
uint64_t getPublishTimestamp() const;

/**
* Get the event timestamp associated with this message. It is set by the client producer.
*/
uint64_t getEventTimestamp() const;

private:
typedef boost::shared_ptr<MessageImpl> MessageImplPtr;
MessageImplPtr impl_;
Expand Down
5 changes: 5 additions & 0 deletions pulsar-client-cpp/include/pulsar/MessageBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ class MessageBuilder {
*/
MessageBuilder& setPartitionKey(const std::string& partitionKey);

/**
* Set the event timestamp for the message.
*/
MessageBuilder& setEventTimestamp(uint64_t eventTimestamp);

/**
* override namespace replication clusters. note that it is the
* caller's responsibility to provide valid cluster names, and that
Expand Down
4 changes: 4 additions & 0 deletions pulsar-client-cpp/lib/Message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ uint64_t Message::getPublishTimestamp() const {
return impl_ ? impl_->getPublishTimestamp() : 0ull;
}

uint64_t Message::getEventTimestamp() const {
return impl_ ? impl_->getEventTimestamp() : 0ull;
}

#pragma GCC visibility push(default)

std::ostream& operator<<(std::ostream& s, const Message::StringMap& map) {
Expand Down
6 changes: 6 additions & 0 deletions pulsar-client-cpp/lib/MessageBuilder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ MessageBuilder& MessageBuilder::setPartitionKey(const std::string& partitionKey)
return *this;
}

MessageBuilder& MessageBuilder::setEventTimestamp(uint64_t eventTimestamp) {
checkMetadata();
impl_->metadata.set_event_time(eventTimestamp);
return *this;
}

MessageBuilder& MessageBuilder::setReplicationClusters(const std::vector<std::string>& clusters) {
checkMetadata();
google::protobuf::RepeatedPtrField<std::string> r(clusters.begin(), clusters.end());
Expand Down
12 changes: 12 additions & 0 deletions pulsar-client-cpp/lib/MessageImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ namespace pulsar {
}
}

uint64_t MessageImpl::getEventTimestamp() const {
if (metadata.has_event_time()) {
return metadata.event_time();
} else {
return 0ull;
}
}

void MessageImpl::setReplicationClusters(const std::vector<std::string>& clusters) {
google::protobuf::RepeatedPtrField<std::string> r(clusters.begin(), clusters.end());
r.Swap(metadata.mutable_replicate_to());
Expand All @@ -77,4 +85,8 @@ namespace pulsar {
void MessageImpl::setPartitionKey(const std::string& partitionKey) {
metadata.set_partition_key(partitionKey);
}

void MessageImpl::setEventTimestamp(uint64_t eventTimestamp) {
metadata.set_event_time(eventTimestamp);
}
}
2 changes: 2 additions & 0 deletions pulsar-client-cpp/lib/MessageImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class MessageImpl {
bool hasPartitionKey() const;

uint64_t getPublishTimestamp() const;
uint64_t getEventTimestamp() const;

friend class PulsarWrapper;
friend class MessageBuilder;
Expand All @@ -57,6 +58,7 @@ class MessageImpl {
void setProperty(const std::string& name, const std::string& value);
void disableReplication(bool flag);
void setPartitionKey(const std::string& partitionKey);
void setEventTimestamp(uint64_t eventTimestamp);
Message::StringMap properties_;
};

Expand Down
2 changes: 2 additions & 0 deletions pulsar-client-cpp/python/src/message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ void export_message() {
.def("property", &MessageBuilder::setProperty, return_self<>())
.def("properties", &MessageBuilder::setProperties, return_self<>())
.def("partition_key", &MessageBuilder::setPartitionKey, return_self<>())
.def("event_timestamp", &MessageBuilder::setEventTimestamp, return_self<>())
.def("replication_clusters", &MessageBuilder::setReplicationClusters, return_self<>())
.def("disable_replication", &MessageBuilder::disableReplication, return_self<>())
.def("build", &MessageBuilder::build)
Expand All @@ -78,6 +79,7 @@ void export_message() {
.def("length", &Message::getLength)
.def("partition_key", &Message::getPartitionKey, return_value_policy<copy_const_reference>())
.def("publish_timestamp", &Message::getPublishTimestamp)
.def("event_timestamp", &Message::getEventTimestamp)
.def("message_id", &Message_getMessageId, return_value_policy<copy_const_reference>())
.def("__str__", &Message_str)
;
Expand Down