Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rds_instance_transport_encrypted): add new check #1963

Merged
merged 6 commits into from
Mar 6, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"Provider": "aws",
"CheckID": "rds_instance_transport_encrypted",
"CheckTitle": "Check if RDS instances client connections are encrypted.",
"CheckType": [],
"ServiceName": "rds",
"SubServiceName": "",
"ResourceIdTemplate": "arn:aws:rds:region:account-id:db-instance",
"Severity": "high",
"ResourceType": "AwsRdsDbInstance",
"Description": "Check if RDS instances client connections are encrypted.",
"Risk": "If not enabled sensitive information at transit is not protected.",
"RelatedUrl": "https://aws.amazon.com/premiumsupport/knowledge-center/rds-connect-ssl-connection/",
"Remediation": {
"Code": {
"CLI": "aws rds modify-db-parameter-group --region <REGION_NAME> --db-parameter-group-name <PARAMETER_GROUP_NAME> --parameters ParameterName='rds.force_ssl',ParameterValue='1',ApplyMethod='pending-reboot'",
"NativeIaC": "",
"Other": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/aws/RDS/transport-encryption.html",
"Terraform": ""
},
"Recommendation": {
"Text": "Ensure that Microsoft SQL Server and PostgreSQL instances provisioned with Amazon RDS have Transport Encryption feature enabled in order to meet security and compliance requirements.",
"Url": "https://aws.amazon.com/premiumsupport/knowledge-center/rds-connect-ssl-connection/"
}
},
"Categories": [ "encryption" ],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.rds.rds_client import rds_client


class rds_instance_transport_encrypted(Check):
def execute(self):
findings = []
supported_engines = ["sqlserver", "postgres"]
for db_instance in rds_client.db_instances:
report = Check_Report_AWS(self.metadata())
report.region = db_instance.region
report.resource_id = db_instance.id
report.status = "FAIL"
report.status_extended = (
f"RDS Instance {db_instance.id} connections are not encrypted."
)
# Check only RDS SQL Server or PostgreSQL engines
if any(engine in db_instance.engine for engine in supported_engines):
for parameter in db_instance.parameters:
if (
parameter["ParameterName"] == "rds.force_ssl"
and parameter["ParameterValue"] == "1"
):
report.status = "PASS"
report.status_extended = f"RDS Instance {db_instance.id} connections use SSL encryption."

findings.append(report)

return findings
27 changes: 27 additions & 0 deletions prowler/providers/aws/services/rds/rds_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def __init__(self, audit_info):
self.db_snapshots = []
self.db_cluster_snapshots = []
self.__threading_call__(self.__describe_db_instances__)
self.__threading_call__(self.__describe_db_parameters__)
self.__threading_call__(self.__describe_db_snapshots__)
self.__threading_call__(self.__describe_db_snapshot_attributes__)
self.__threading_call__(self.__describe_db_cluster_snapshots__)
Expand Down Expand Up @@ -72,6 +73,10 @@ def __describe_db_instances__(self, regional_client):
enhanced_monitoring_arn=instance.get(
"EnhancedMonitoringResourceArn"
),
parameter_groups=[
item["DBParameterGroupName"]
for item in instance["DBParameterGroups"]
],
multi_az=instance["MultiAZ"],
region=regional_client.region,
)
Expand All @@ -81,6 +86,26 @@ def __describe_db_instances__(self, regional_client):
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)

def __describe_db_parameters__(self, regional_client):
logger.info("RDS - Describe DB Parameters...")
try:
for instance in self.db_instances:
if instance.region == regional_client.region:
for parameter_group in instance.parameter_groups:
describe_db_parameters_paginator = (
regional_client.get_paginator("describe_db_parameters")
)
for page in describe_db_parameters_paginator.paginate(
DBParameterGroupName=parameter_group
):
for parameter in page["Parameters"]:
instance.parameters.append(parameter)

except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)

def __describe_db_snapshots__(self, regional_client):
logger.info("RDS - Describe Snapshots...")
try:
Expand Down Expand Up @@ -182,6 +207,8 @@ class DBInstance(BaseModel):
auto_minor_version_upgrade: bool
enhanced_monitoring_arn: Optional[str]
multi_az: bool
parameter_groups: list[str] = []
parameters: list[dict] = []
region: str


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
from re import search
from unittest import mock

from boto3 import client
from moto import mock_rds

AWS_REGION = "us-east-1"


class Test_rds_instance_transport_encrypted:
@mock_rds
def test_rds_no_instances(self):
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
from prowler.providers.aws.services.rds.rds_service import RDS

current_audit_info.audited_partition = "aws"

with mock.patch(
"prowler.providers.aws.services.rds.rds_instance_transport_encrypted.rds_instance_transport_encrypted.rds_client",
new=RDS(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.rds.rds_instance_transport_encrypted.rds_instance_transport_encrypted import (
rds_instance_transport_encrypted,
)

check = rds_instance_transport_encrypted()
result = check.execute()

assert len(result) == 0

@mock_rds
def test_rds_instance_no_ssl(self):
conn = client("rds", region_name=AWS_REGION)
conn.create_db_parameter_group(
DBParameterGroupName="test",
DBParameterGroupFamily="default.postgres9.3",
Description="test parameter group",
)
conn.create_db_instance(
DBInstanceIdentifier="db-master-1",
AllocatedStorage=10,
Engine="postgres",
DBName="staging-postgres",
DBInstanceClass="db.m1.small",
DBParameterGroupName="test",
)

conn.modify_db_parameter_group(
DBParameterGroupName="test",
Parameters=[
{
"ParameterName": "rds.force_ssl",
"ParameterValue": "0",
"ApplyMethod": "immediate",
},
],
)
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
from prowler.providers.aws.services.rds.rds_service import RDS

current_audit_info.audited_partition = "aws"

with mock.patch(
"prowler.providers.aws.services.rds.rds_instance_transport_encrypted.rds_instance_transport_encrypted.rds_client",
new=RDS(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.rds.rds_instance_transport_encrypted.rds_instance_transport_encrypted import (
rds_instance_transport_encrypted,
)

check = rds_instance_transport_encrypted()
result = check.execute()

assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
"connections are not encrypted",
result[0].status_extended,
)
assert result[0].resource_id == "db-master-1"

@mock_rds
def test_rds_instance_with_ssl(self):
conn = client("rds", region_name=AWS_REGION)
conn.create_db_parameter_group(
DBParameterGroupName="test",
DBParameterGroupFamily="default.postgres9.3",
Description="test parameter group",
)
conn.create_db_instance(
DBInstanceIdentifier="db-master-1",
AllocatedStorage=10,
Engine="postgres",
DBName="staging-postgres",
DBInstanceClass="db.m1.small",
DBParameterGroupName="test",
)

conn.modify_db_parameter_group(
DBParameterGroupName="test",
Parameters=[
{
"ParameterName": "rds.force_ssl",
"ParameterValue": "1",
"ApplyMethod": "immediate",
},
],
)
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
from prowler.providers.aws.services.rds.rds_service import RDS

current_audit_info.audited_partition = "aws"

with mock.patch(
"prowler.providers.aws.services.rds.rds_instance_transport_encrypted.rds_instance_transport_encrypted.rds_client",
new=RDS(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.rds.rds_instance_transport_encrypted.rds_instance_transport_encrypted import (
rds_instance_transport_encrypted,
)

check = rds_instance_transport_encrypted()
result = check.execute()

assert len(result) == 1
assert result[0].status == "PASS"
assert search(
"connections use SSL encryption",
result[0].status_extended,
)
assert result[0].resource_id == "db-master-1"
44 changes: 44 additions & 0 deletions tests/providers/aws/services/rds/rds_service_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ def test_audited_account(self):
@mock_rds
def test__describe_db_instances__(self):
conn = client("rds", region_name=AWS_REGION)
conn.create_db_parameter_group(
DBParameterGroupName="test",
DBParameterGroupFamily="default.postgres9.3",
Description="test parameter group",
)
conn.create_db_instance(
DBInstanceIdentifier="db-master-1",
AllocatedStorage=10,
Expand All @@ -82,6 +87,7 @@ def test__describe_db_instances__(self):
BackupRetentionPeriod=10,
EnableCloudwatchLogsExports=["audit", "error"],
MultiAZ=True,
DBParameterGroupName="test",
)
# RDS client for this test class
audit_info = self.set_mocked_audit_info()
Expand All @@ -101,6 +107,44 @@ def test__describe_db_instances__(self):
assert rds.db_instances[0].deletion_protection
assert rds.db_instances[0].auto_minor_version_upgrade
assert rds.db_instances[0].multi_az
assert "test" in rds.db_instances[0].parameter_groups

@mock_rds
def test__describe_db_parameters__(self):
conn = client("rds", region_name=AWS_REGION)
conn.create_db_parameter_group(
DBParameterGroupName="test",
DBParameterGroupFamily="default.postgres9.3",
Description="test parameter group",
)
conn.create_db_instance(
DBInstanceIdentifier="db-master-1",
AllocatedStorage=10,
Engine="postgres",
DBName="staging-postgres",
DBInstanceClass="db.m1.small",
DBParameterGroupName="test",
)

conn.modify_db_parameter_group(
DBParameterGroupName="test",
Parameters=[
{
"ParameterName": "rds.force_ssl",
"ParameterValue": "1",
"ApplyMethod": "immediate",
},
],
)
# RDS client for this test class
audit_info = self.set_mocked_audit_info()
rds = RDS(audit_info)
assert len(rds.db_instances) == 1
assert rds.db_instances[0].id == "db-master-1"
assert rds.db_instances[0].region == AWS_REGION
for parameter in rds.db_instances[0].parameters:
if parameter["ParameterName"] == "rds.force_ssl":
assert parameter["ParameterValue"] == "1"

# Test RDS Describe DB Snapshots
@mock_rds
Expand Down