Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
Change-Id: Ib028780e726a9480e6049514bc776ccf27b84496
  • Loading branch information
mint570 committed May 2, 2024
1 parent fa0efe5 commit f3af0da
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 33 deletions.
42 changes: 10 additions & 32 deletions orchagent/response_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,12 @@ ResponsePublisher::ResponsePublisher(const std::string &dbName, bool buffered, b
if (m_buffered)
{
m_ntf_pipe = std::make_unique<swss::RedisPipeline>(m_db.get());
m_pipe = std::make_unique<swss::RedisPipeline>(m_db.get());
m_db_pipe = std::make_unique<swss::RedisPipeline>(m_db.get());
}
else
{
m_ntf_pipe = std::make_unique<swss::RedisPipeline>(m_db.get(), 1);
m_pipe = std::make_unique<swss::RedisPipeline>(m_db.get(), 1);
m_db_pipe = std::make_unique<swss::RedisPipeline>(m_db.get(), 1);
}
if (db_write_thread)
{
Expand All @@ -89,15 +89,8 @@ ResponsePublisher::~ResponsePublisher()
{
{
std::lock_guard<std::mutex> lock(m_lock);
m_queue.push(entry{
.table = "",
.key = "",
.values = std::vector<swss::FieldValueTuple>{},
.op = "",
.replace = false,
.flush = false,
.shutdown = true,
});
m_queue.emplace(/*table=*/"", /*key=*/"", /*values =*/std::vector<swss::FieldValueTuple>{}, /*op=*/"",
/*replace=*/false, /*flush=*/false, /*shutdown=*/true);
}
m_signal.notify_one();
m_update_thread->join();
Expand Down Expand Up @@ -151,15 +144,7 @@ void ResponsePublisher::writeToDB(const std::string &table, const std::string &k
{
{
std::lock_guard<std::mutex> lock(m_lock);
m_queue.push(entry{
.table = table,
.key = key,
.values = values,
.op = op,
.replace = replace,
.flush = false,
.shutdown = false,
});
m_queue.emplace(table, key, values, op, replace, /*flush=*/false, /*shutdown=*/false);
}
m_signal.notify_one();
}
Expand All @@ -174,7 +159,7 @@ void ResponsePublisher::writeToDBInternal(const std::string &table, const std::s
const std::vector<swss::FieldValueTuple> &values, const std::string &op,
bool replace)
{
swss::Table applStateTable{m_pipe.get(), table, m_buffered};
swss::Table applStateTable{m_db_pipe.get(), table, m_buffered};

auto attrs = values;
if (op == SET_COMMAND)
Expand Down Expand Up @@ -225,21 +210,14 @@ void ResponsePublisher::flush()
{
{
std::lock_guard<std::mutex> lock(m_lock);
m_queue.push(entry{
.table = "",
.key = "",
.values = std::vector<swss::FieldValueTuple>{},
.op = "",
.replace = false,
.flush = true,
.shutdown = false,
});
m_queue.emplace(/*table=*/"", /*key=*/"", /*values =*/std::vector<swss::FieldValueTuple>{}, /*op=*/"",
/*replace=*/false, /*flush=*/true, /*shutdown=*/false);
}
m_signal.notify_one();
}
else
{
m_pipe->flush();
m_db_pipe->flush();
}
}

Expand Down Expand Up @@ -269,7 +247,7 @@ void ResponsePublisher::dbUpdateThread()
}
if (e.flush)
{
m_pipe->flush();
m_db_pipe->flush();
}
else
{
Expand Down
8 changes: 7 additions & 1 deletion orchagent/response_publisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ class ResponsePublisher : public ResponsePublisherInterface
bool replace;
bool flush;
bool shutdown;

entry(const std::string &table, const std::string &key, const std::vector<swss::FieldValueTuple> &values,
const std::string &op, bool replace, bool flush, bool shutdown)
: table(table), key(key), values(values), op(op), replace(replace), flush(flush), shutdown(shutdown)
{
}
};

void dbUpdateThread();
Expand All @@ -78,7 +84,7 @@ class ResponsePublisher : public ResponsePublisherInterface

std::unique_ptr<swss::DBConnector> m_db;
std::unique_ptr<swss::RedisPipeline> m_ntf_pipe;
std::unique_ptr<swss::RedisPipeline> m_pipe;
std::unique_ptr<swss::RedisPipeline> m_db_pipe;

bool m_buffered{false};
// Thread to write to DB.
Expand Down

0 comments on commit f3af0da

Please sign in to comment.