Skip to content

Commit

Permalink
refactor(vpc_endpoint_connections_trust_boundaries) (prowler-cloud#2667)
Browse files Browse the repository at this point in the history
  • Loading branch information
jfagoagas authored and MrCloudSec committed Aug 3, 2023
1 parent 03fc2fe commit d7dcf2e
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,29 +24,24 @@ def execute(self):
if not access_from_trusted_accounts:
break
if "*" == statement["Principal"]:
access_from_trusted_accounts = False
report = Check_Report_AWS(self.metadata())
report.region = endpoint.region
report.resource_id = endpoint.id
report.resource_arn = endpoint.arn
report.resource_tags = endpoint.tags

for account_id in trusted_account_ids:
if (
"Condition" in statement
and is_account_only_allowed_in_condition(
if "Condition" in statement:
for account_id in trusted_account_ids:
if is_account_only_allowed_in_condition(
statement["Condition"], account_id
)
):
access_from_trusted_accounts = True
else:
access_from_trusted_accounts = False
break
):
access_from_trusted_accounts = True
else:
access_from_trusted_accounts = False
break

if (
not access_from_trusted_accounts
or len(trusted_account_ids) == 0
):
access_from_trusted_accounts = False
if not access_from_trusted_accounts:
report.status = "FAIL"
report.status_extended = f"VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id} can be accessed from non-trusted accounts."
else:
Expand All @@ -63,30 +58,25 @@ def execute(self):
else:
principals = statement["Principal"]["AWS"]
for principal_arn in principals:
if principal_arn == "*":
report = Check_Report_AWS(self.metadata())
report.region = endpoint.region
report.resource_id = endpoint.id
report.resource_arn = endpoint.arn
report.resource_tags = endpoint.tags
report = Check_Report_AWS(self.metadata())
report.region = endpoint.region
report.resource_id = endpoint.id
report.resource_arn = endpoint.arn
report.resource_tags = endpoint.tags

for account_id in trusted_account_ids:
if (
"Condition" in statement
and is_account_only_allowed_in_condition(
if principal_arn == "*":
access_from_trusted_accounts = False
if "Condition" in statement:
for account_id in trusted_account_ids:
if is_account_only_allowed_in_condition(
statement["Condition"], account_id
)
):
access_from_trusted_accounts = True
else:
access_from_trusted_accounts = False
break
):
access_from_trusted_accounts = True
else:
access_from_trusted_accounts = False
break

if (
not access_from_trusted_accounts
or len(trusted_account_ids) == 0
):
access_from_trusted_accounts = False
if not access_from_trusted_accounts:
report.status = "FAIL"
report.status_extended = f"VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id} can be accessed from non-trusted accounts."
else:
Expand All @@ -104,50 +94,29 @@ def execute(self):
account_id = principal_arn.split(":")[4]
else:
account_id = match.string
if (
account_id in trusted_account_ids
or account_id in vpc_client.audited_account
):
report = Check_Report_AWS(self.metadata())
report.region = endpoint.region
report.status = "PASS"
report.status_extended = f"Found trusted account {account_id} in VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id}."
report.resource_id = endpoint.id
report.resource_arn = endpoint.arn
report.resource_tags = endpoint.tags
findings.append(report)
else:
report = Check_Report_AWS(self.metadata())
report.region = endpoint.region
report.resource_id = endpoint.id
report.resource_arn = endpoint.arn
report.resource_tags = endpoint.tags

if account_id not in trusted_account_ids:
access_from_trusted_accounts = False

if "Condition" in statement:
for account_id in trusted_account_ids:
if (
"Condition" in statement
and is_account_only_allowed_in_condition(
statement["Condition"], account_id
)
if is_account_only_allowed_in_condition(
statement["Condition"], account_id
):
access_from_trusted_accounts = True
else:
access_from_trusted_accounts = False
break

if (
not access_from_trusted_accounts
or len(trusted_account_ids) == 0
):
access_from_trusted_accounts = False
report.status = "FAIL"
report.status_extended = f"VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id} can be accessed from non-trusted accounts."
else:
report.status = "PASS"
report.status_extended = f"VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id} can only be accessed from trusted accounts."
if not access_from_trusted_accounts:
report.status = "FAIL"
report.status_extended = f"VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id} can be accessed from non-trusted accounts."
else:
report.status = "PASS"
report.status_extended = f"VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id} can only be accessed from trusted accounts."

findings.append(report)
if not access_from_trusted_accounts:
break
findings.append(report)
if not access_from_trusted_accounts:
break

return findings
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ def test_vpc_endpoint_with_trusted_account_arn(self):
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Found trusted account {AWS_ACCOUNT_NUMBER} in VPC Endpoint {vpc_endpoint['VpcEndpoint']['VpcEndpointId']} in VPC {vpc['VpcId']}."
== f"VPC Endpoint {vpc_endpoint['VpcEndpoint']['VpcEndpointId']} in VPC {vpc['VpcId']} can only be accessed from trusted accounts."
)
assert (
result[0].resource_id
Expand Down Expand Up @@ -244,7 +244,7 @@ def test_vpc_endpoint_with_trusted_account_id(self):
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Found trusted account {AWS_ACCOUNT_NUMBER} in VPC Endpoint {vpc_endpoint['VpcEndpoint']['VpcEndpointId']} in VPC {vpc['VpcId']}."
== f"VPC Endpoint {vpc_endpoint['VpcEndpoint']['VpcEndpointId']} in VPC {vpc['VpcId']} can only be accessed from trusted accounts."
)
assert (
result[0].resource_id
Expand Down Expand Up @@ -368,7 +368,7 @@ def test_vpc_endpoint_with_config_trusted_account_with_arn(self):
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Found trusted account {TRUSTED_AWS_ACCOUNT_NUMBER} in VPC Endpoint {vpc_endpoint['VpcEndpoint']['VpcEndpointId']} in VPC {vpc['VpcId']}."
== f"VPC Endpoint {vpc_endpoint['VpcEndpoint']['VpcEndpointId']} in VPC {vpc['VpcId']} can only be accessed from trusted accounts."
)
assert (
result[0].resource_id
Expand Down Expand Up @@ -430,7 +430,7 @@ def test_vpc_endpoint_with_config_trusted_account(self):
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Found trusted account {TRUSTED_AWS_ACCOUNT_NUMBER} in VPC Endpoint {vpc_endpoint['VpcEndpoint']['VpcEndpointId']} in VPC {vpc['VpcId']}."
== f"VPC Endpoint {vpc_endpoint['VpcEndpoint']['VpcEndpointId']} in VPC {vpc['VpcId']} can only be accessed from trusted accounts."
)
assert (
result[0].resource_id
Expand Down

0 comments on commit d7dcf2e

Please sign in to comment.