Skip to content

Commit

Permalink
Added UT for dynamic NAT notification
Browse files Browse the repository at this point in the history
  • Loading branch information
arvbb committed Nov 8, 2022
1 parent 177c3c3 commit b815d86
Showing 1 changed file with 82 additions and 0 deletions.
82 changes: 82 additions & 0 deletions tests/test_nat.py
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,88 @@ def test_AddDnaptDynamicEntry(self, dvs, testlog):
# check the entry is not there in asic db
self.asic_db.wait_for_n_keys("ASIC_STATE:SAI_OBJECT_TYPE_NAT_ENTRY", 0)

def test_NaptDynamicEntryNotification(self, dvs, testlog):
# initialize
self.setup_db(dvs)
dvs.nat_mode_set("enabled")
dvs.nat_tcp_timeout_set("700")
self.set_interfaces(dvs)

exitcode, output = dvs.runcmd("conntrack -L -j")
assert exitcode == 0, "0 flow entries have been shown" in output

# create SNAPT entry in kernel
exitcode, output = dvs.runcmd("conntrack -I -p tcp --state ESTABLISHED -t 700 --src 33.33.33.33 --sport 3000 --dst 44.44.44.44 --dport 4000 -u ASSURED -n 10.10.10.10:7000")
assert exitcode == 0, "1 flow entries have been created" in output

# check the SNAPT entry and implicit DNAPT entry pushed to app db by Natsyncd
self.app_db.wait_for_n_keys("NAPT_TABLE", 2)

# check the entries in app db
fvs = self.app_db.wait_for_entry("NAPT_TABLE", "TCP:33.33.33.33:3000")
assert fvs == {
"translated_ip": "10.10.10.10",
"translated_l4_port": "7000",
"nat_type": "snat",
"entry_type": "dynamic"
}
fvs = self.app_db.wait_for_entry("NAPT_TABLE", "TCP:10.10.10.10:7000")
assert fvs == {
"translated_ip": "33.33.33.33",
"translated_l4_port": "3000",
"nat_type": "dnat",
"entry_type": "dynamic"
}

# check the entries in asic db
keys = self.asic_db.wait_for_n_keys("ASIC_STATE:SAI_OBJECT_TYPE_NAT_ENTRY", 2)
for key in keys:
if '"nat_type":"SAI_NAT_TYPE_SOURCE_NAT"' in key:
fvs = self.asic_db.wait_for_entry("ASIC_STATE:SAI_OBJECT_TYPE_NAT_ENTRY", key)
assert fvs == {
'SAI_NAT_ENTRY_ATTR_SRC_IP': '10.10.10.10',
'SAI_NAT_ENTRY_ATTR_L4_SRC_PORT': '7000',
'SAI_NAT_ENTRY_ATTR_SRC_IP_MASK': '255.255.255.255',
'SAI_NAT_ENTRY_ATTR_ENABLE_PACKET_COUNT': 'true',
'SAI_NAT_ENTRY_ATTR_ENABLE_BYTE_COUNT': 'true',
'SAI_NAT_ENTRY_ATTR_AGING_TIME': '700'
}
elif '"nat_type":"SAI_NAT_TYPE_DESTINATION_NAT"' in key:
fvs = self.asic_db.wait_for_entry("ASIC_STATE:SAI_OBJECT_TYPE_NAT_ENTRY", key)
assert fvs == {
'SAI_NAT_ENTRY_ATTR_DST_IP': '33.33.33.33',
'SAI_NAT_ENTRY_ATTR_L4_DST_PORT': '3000',
'SAI_NAT_ENTRY_ATTR_DST_IP_MASK': '255.255.255.255',
'SAI_NAT_ENTRY_ATTR_ENABLE_PACKET_COUNT': 'true',
'SAI_NAT_ENTRY_ATTR_ENABLE_BYTE_COUNT': 'true',
'SAI_NAT_ENTRY_ATTR_AGING_TIME': '700'
}
else:
assert False

# Obtain vr_id to send through the notification
asic_db = swsscommon.DBConnector(swsscommon.ASIC_DB, dvs.redis_sock, 0)
tbl = swsscommon.Table(asic_db, "ASIC_STATE:SAI_OBJECT_TYPE_VIRTUAL_ROUTER")
keys = tbl.getKeys()
assert len(keys) == 1, "Wrong number of virtual routers found"
vr_id = keys[0]

# Send notification
ntf = swsscommon.NotificationProducer(asic_db, "NOTIFICATIONS")
fvp = swsscommon.FieldValuePairs()
ntf_data = "[{\"nat_entry\":\"{\\\"nat_data\\\":{\\\"key\\\":{\\\"dst_ip\\\":\\\"0.0.0.0\\\",\\\"l4_dst_port\\\":\\\"0\\\",\\\"l4_src_port\\\":\\\"3000\\\",\\\"proto\\\":\\\"6\\\",\\\"src_ip\\\":\\\"33.33.33.33\\\"},\\\"mask\\\":{\\\"dst_ip\\\":\\\"0.0.0.0\\\",\\\"l4_dst_port\\\":\\\"0\\\",\\\"l4_src_port\\\":\\\"65535\\\",\\\"proto\\\":\\\"255\\\",\\\"src_ip\\\":\\\"255.255.255.255\\\"}},\\\"nat_type\\\":\\\"SAI_NAT_TYPE_SOURCE_NAT\\\",\\\"switch_id\\\":\\\"oid:0x21000000000000\\\",\\\"vr\\\":\\\"" + vr_id + "\\\"}\",\"nat_event\":\"SAI_NAT_EVENT_AGED\"}]"
ntf.send("nat_event", ntf_data, fvp)

# check the entry is not there in kernel
output = dvs.runcmd("conntrack -L -j")
assert("0 flow entries have been shown.")

# check the entry is not there in app db
self.app_db.wait_for_n_keys("NAPT_TABLE", 0)

# check the entry is not there in asic db
self.asic_db.wait_for_n_keys("ASIC_STATE:SAI_OBJECT_TYPE_NAT_ENTRY", 0)

# Add Dummy always-pass test at end as workaroud
# for issue when Flaky fail on final test it invokes module tear-down before retrying
def test_nonflaky_dummy():
Expand Down

0 comments on commit b815d86

Please sign in to comment.