Skip to content

Commit

Permalink
Issue #735: Event Time support in c++ client (#736)
Browse files Browse the repository at this point in the history
  • Loading branch information
sijie authored and merlimat committed Sep 14, 2017
1 parent 2a04683 commit 8f15858
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 0 deletions.
5 changes: 5 additions & 0 deletions pulsar-client-cpp/include/pulsar/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ class Message {
*/
uint64_t getPublishTimestamp() const;

/**
* Get the event timestamp associated with this message. It is set by the client producer.
*/
uint64_t getEventTimestamp() const;

private:
typedef boost::shared_ptr<MessageImpl> MessageImplPtr;
MessageImplPtr impl_;
Expand Down
5 changes: 5 additions & 0 deletions pulsar-client-cpp/include/pulsar/MessageBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ class MessageBuilder {
*/
MessageBuilder& setPartitionKey(const std::string& partitionKey);

/**
* Set the event timestamp for the message.
*/
MessageBuilder& setEventTimestamp(uint64_t eventTimestamp);

/**
* override namespace replication clusters. note that it is the
* caller's responsibility to provide valid cluster names, and that
Expand Down
4 changes: 4 additions & 0 deletions pulsar-client-cpp/lib/Message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ uint64_t Message::getPublishTimestamp() const {
return impl_ ? impl_->getPublishTimestamp() : 0ull;
}

uint64_t Message::getEventTimestamp() const {
return impl_ ? impl_->getEventTimestamp() : 0ull;
}

#pragma GCC visibility push(default)

std::ostream& operator<<(std::ostream& s, const Message::StringMap& map) {
Expand Down
6 changes: 6 additions & 0 deletions pulsar-client-cpp/lib/MessageBuilder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ MessageBuilder& MessageBuilder::setPartitionKey(const std::string& partitionKey)
return *this;
}

MessageBuilder& MessageBuilder::setEventTimestamp(uint64_t eventTimestamp) {
checkMetadata();
impl_->metadata.set_event_time(eventTimestamp);
return *this;
}

MessageBuilder& MessageBuilder::setReplicationClusters(const std::vector<std::string>& clusters) {
checkMetadata();
google::protobuf::RepeatedPtrField<std::string> r(clusters.begin(), clusters.end());
Expand Down
12 changes: 12 additions & 0 deletions pulsar-client-cpp/lib/MessageImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ namespace pulsar {
}
}

uint64_t MessageImpl::getEventTimestamp() const {
if (metadata.has_event_time()) {
return metadata.event_time();
} else {
return 0ull;
}
}

void MessageImpl::setReplicationClusters(const std::vector<std::string>& clusters) {
google::protobuf::RepeatedPtrField<std::string> r(clusters.begin(), clusters.end());
r.Swap(metadata.mutable_replicate_to());
Expand All @@ -77,4 +85,8 @@ namespace pulsar {
void MessageImpl::setPartitionKey(const std::string& partitionKey) {
metadata.set_partition_key(partitionKey);
}

void MessageImpl::setEventTimestamp(uint64_t eventTimestamp) {
metadata.set_event_time(eventTimestamp);
}
}
2 changes: 2 additions & 0 deletions pulsar-client-cpp/lib/MessageImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class MessageImpl {
bool hasPartitionKey() const;

uint64_t getPublishTimestamp() const;
uint64_t getEventTimestamp() const;

friend class PulsarWrapper;
friend class MessageBuilder;
Expand All @@ -57,6 +58,7 @@ class MessageImpl {
void setProperty(const std::string& name, const std::string& value);
void disableReplication(bool flag);
void setPartitionKey(const std::string& partitionKey);
void setEventTimestamp(uint64_t eventTimestamp);
Message::StringMap properties_;
};

Expand Down
2 changes: 2 additions & 0 deletions pulsar-client-cpp/python/src/message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ void export_message() {
.def("property", &MessageBuilder::setProperty, return_self<>())
.def("properties", &MessageBuilder::setProperties, return_self<>())
.def("partition_key", &MessageBuilder::setPartitionKey, return_self<>())
.def("event_timestamp", &MessageBuilder::setEventTimestamp, return_self<>())
.def("replication_clusters", &MessageBuilder::setReplicationClusters, return_self<>())
.def("disable_replication", &MessageBuilder::disableReplication, return_self<>())
.def("build", &MessageBuilder::build)
Expand All @@ -78,6 +79,7 @@ void export_message() {
.def("length", &Message::getLength)
.def("partition_key", &Message::getPartitionKey, return_value_policy<copy_const_reference>())
.def("publish_timestamp", &Message::getPublishTimestamp)
.def("event_timestamp", &Message::getEventTimestamp)
.def("message_id", &Message_getMessageId, return_value_policy<copy_const_reference>())
.def("__str__", &Message_str)
;
Expand Down

0 comments on commit 8f15858

Please sign in to comment.