Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: enable TLS/SSL through local filepath in http source #359

Merged
merged 15 commits into from
Oct 6, 2021
Merged
7 changes: 7 additions & 0 deletions data-prepper-plugins/common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,15 @@ dependencies {
api project(':data-prepper-api')
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
implementation "commons-io:commons-io:2.11.0"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing you have to fix now, but going forward, please use single quotes for Gradle strings. Double quotes are GStrings which are used for string interpolation.

implementation 'com.amazonaws:aws-java-sdk-s3'
implementation 'com.amazonaws:aws-java-sdk-acm'
implementation "org.apache.commons:commons-lang3:3.12.0"
implementation "org.bouncycastle:bcprov-jdk15on:1.69"
implementation "org.bouncycastle:bcpkix-jdk15on:1.69"
implementation 'org.reflections:reflections:0.9.12'
testImplementation 'commons-io:commons-io:2.11.0'
testImplementation "org.hamcrest:hamcrest:2.2"
}

jacocoTestCoverageVerification {
Expand Down
14 changes: 14 additions & 0 deletions data-prepper-plugins/common/data/certificate/test_cert.crt
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
-----BEGIN CERTIFICATE-----
MIICHTCCAYYCCQD4hqYeYDQZADANBgkqhkiG9w0BAQUFADBSMQswCQYDVQQGEwJV
UzELMAkGA1UECAwCVFgxDzANBgNVBAcMBkF1c3RpbjEPMA0GA1UECgwGQW1hem9u
MRQwEgYDVQQLDAtEYXRhcHJlcHBlcjAgFw0yMTA2MjUxOTIzMTBaGA8yMTIxMDYw
MTE5MjMxMFowUjELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAlRYMQ8wDQYDVQQHDAZB
dXN0aW4xDzANBgNVBAoMBkFtYXpvbjEUMBIGA1UECwwLRGF0YXByZXBwZXIwgZ8w
DQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAKrb3YhdKbQ5PtLHall10iLZC9ZdDVrq
HOvqVSM8NHlL8f82gJ3l0n9k7hYc5eKisutaS9eDTmJ+Dnn8xn/qPSKTIq9Wh+OZ
O+e9YEEpI/G4F9KpGULgMyRg9sJK0GlZdEt9o5GJNJIJUkptJU5eiLuE0IV+jyJo
Nvm8OE6EJPqxAgMBAAEwDQYJKoZIhvcNAQEFBQADgYEAjgnX5n/Tt7eo9uakIGAb
uBhvYdR8JqKXqF9rjFJ/MIK7FdQSF/gCdjnvBhzLlZFK/Nb6MGKoSKm5Lcr75LgC
FyhIwp3WlqQksiMFnOypYVY71vqDgj6UKdMaOBgthsYhngj8lC+wsVzWqQvkJ2Qg
/GAIzJwiZfXiaevQHRk79qI=
-----END CERTIFICATE-----
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
-----BEGIN RSA PRIVATE KEY-----
MIICXAIBAAKBgQCq292IXSm0OT7Sx2pZddIi2QvWXQ1a6hzr6lUjPDR5S/H/NoCd
5dJ/ZO4WHOXiorLrWkvXg05ifg55/MZ/6j0ikyKvVofjmTvnvWBBKSPxuBfSqRlC
4DMkYPbCStBpWXRLfaORiTSSCVJKbSVOXoi7hNCFfo8iaDb5vDhOhCT6sQIDAQAB
AoGANrrhFqpJDpr7vcb1ER0Fp/YArbT27zVo+EUC6puBb41dQlQyFOImcHpjLaAq
H1PgnjU5cBp2hGQ+vOK0rwrYc/HNl6vfh6N3NbDptMiuoBafRJA9JzYourAM09BU
zmXyr61Yn3KHzx1PRwWe37icX93oXP3P0qHb3dI1ZF4jG0ECQQDU5N/a7ogoz2zn
ZssD6FvUOUQDsdBWdXmhUvg+YdZrV44e4xk+FVzwEONoRktEYKz9MFXlsgNHr445
KRguHWcJAkEAzXQkwOkN8WID1wrwoobUIMbZSGAZzofwkKXgTTnllnT1qOQXuRbS
aCMejFEymBBef4aXP6N4+va2FKW/MF34aQJAO2oMl1sOoOUSrZngepy0VAwPUUCk
thxe74jqQu6nGpn6zd/vQYZQw6bS8Fz90H1yic6dilcd1znFZWp0lxoZkQJBALeI
xoBycRsuFQIYasi1q3AwUtBd0Q/3zkZZeBtk2hzjFMUwJaUZpxKSNOrialD/ZnuD
jz+xWBTRKe0d98JMX+kCQCmsJEj/HYQAC1GamZ7JQWogRSRF2KTgTWRaDXDxy0d4
yUQgwHB+HZLFcbi1JEK6eIixCsX8iifrrkteh+1npJ0=
-----END RSA PRIVATE KEY-----
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
-----BEGIN ENCRYPTED PRIVATE KEY-----
MIICojAcBgoqhkiG9w0BDAEDMA4ECAd2FKZw2oGwAgIIAASCAoDTgiaXkazaotc7
SxQK3bX34sEvdkLXg/O4ZpHTb0f4gLPxNhDe7ZPKrAS2TdywpSHT0189MVl+PIvw
4YQDaGVHL1SM5ukJu+PQkfQAMigdCJ+bUsG6hkrUDC74qYhHZHj1yVGavL6I4KHT
Ixh9IV2GMRS4m6HGJ17nYsdiTFFNdK++WcTMxbVWv3SNdKGZG79T32pjMxuIUPWr
3dB+ZXM+FSqwlBLZxPvvjlP6ETw7QXrlBHcQh1tHSh10bM+q0c5CktZoXLwpc+gv
ZsGXzpVjqFrAw4Vw0ikJl1mUCoGOtqqP0P6QWwbIJZBxNoO0MvWcXW+U3AGNFFze
nMR8UTXdga2l1Lx7pokQkWUpp48SDRjDx/RdZTRXCgtRcKuBcm0x2lxNILdwOzjJ
5GlKMvvc2OXXTnYqSCTqdfbuR3XBYmWgFki92D6JnVIYq+QJr5qj8IJDJ7mADQ1i
Za6PEJnrT641fLeSKRq7QiTydMQ3JXa9DFqUPwdZPPHLr/hC19sWHrq7gxvhkcLI
wrTtTIi8/iV4IVaiHk7YU83IM6sGExabQ3BRXcHMr+7i1vVxtEsFNC6ycTfJ8gpJ
YsnpXUQe912l5sk7GRSP1InNRF7kzMD0QeOAQ0UVfmsbHOPSXvD7fXkJWIb6N+zW
qCQYRmBwc7Bz2KZein5MLsMeNz2AWj/DcA2fMC+4+QtI0nF5BFtaR0V0npWhsbPu
3rj+AXipnvVhDIkfl8495j7ybCBj7HAZk8Ux8GmiZ3PGFO1C7XCQaLPWJ4Aw4Kb3
EUqtVtpbwsCov5PDmMDXgz8qOxWMdQsP/dPF1HnVAg7SoFG9xA4nHdZ2WAFZwYtf
rRxEd7br
-----END ENCRYPTED PRIVATE KEY-----
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static java.util.Objects.requireNonNull;

// TODO: accommodate encrypted private key with password
public class Certificate {
/**
* The base64 PEM-encoded certificate.
Expand Down
6 changes: 6 additions & 0 deletions data-prepper-plugins/http-source/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ source:
* max_connection_count(Optional) => An `int` larger than 0 represents the maximum allowed number of open connections. Default is `500`.
* max_pending_requests(Optional) => An `int` larger than 0 represents the maximum allowed number of tasks in the ScheduledThreadPool work queue. Default is `1024`.

### SSL

* ssl(Optional) => A `boolean` enables TLS/SSL. Default is ```true```.
* ssl_certificate_file(Optional) => A `String` represents the SSL certificate chain file path. Required if ```ssl``` is set to ```true```.
* ssl_key_file(Optional) => A `String` represents the SSL key file path. Only decrypted key file is supported. Required if ```ssl``` is set to ```true```.

## Metrics

TBD
Expand Down
2 changes: 2 additions & 0 deletions data-prepper-plugins/http-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ plugins {
dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:blocking-buffer')
implementation project(':data-prepper-plugins:common')
implementation "com.linecorp.armeria:armeria:1.9.2"
implementation "commons-io:commons-io:2.11.0"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We ought to move the versions of dependencies we use in multiple places out to the external file with the others that we've already defined.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now we have a versionsMap in our build-resources.gradle to provide this consistency.

Gradle 7 should help us improve this, but it is not possible to update presently due to the OpenSearch plugin.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could add commons-io version into versionMap. There might be other common packages. We should do that in a separate PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make a separate PR for this.

testImplementation 'org.assertj:assertj-core:3.20.2'
testImplementation "org.hamcrest:hamcrest:2.2"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,17 @@
import com.amazon.dataprepper.model.configuration.PluginSetting;
import com.amazon.dataprepper.model.record.Record;
import com.amazon.dataprepper.model.source.Source;
import com.amazon.dataprepper.plugins.certificate.CertificateProvider;
import com.amazon.dataprepper.plugins.certificate.model.Certificate;
import com.amazon.dataprepper.plugins.source.loghttp.certificate.CertificateProviderFactory;
import com.linecorp.armeria.server.Server;
import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.server.throttling.ThrottlingService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
Expand All @@ -32,10 +37,12 @@ public class HTTPSource implements Source<Record<String>> {
private static final Logger LOG = LoggerFactory.getLogger(HTTPSource.class);

private final HTTPSourceConfig sourceConfig;
private final CertificateProviderFactory certificateProviderFactory;
private Server server;

public HTTPSource(final PluginSetting pluginSetting) {
sourceConfig = HTTPSourceConfig.buildConfig(pluginSetting);
certificateProviderFactory = new CertificateProviderFactory(sourceConfig);
}

@Override
Expand All @@ -45,8 +52,19 @@ public void start(Buffer<Record<String>> buffer) {
}
if (server == null) {
final ServerBuilder sb = Server.builder();
// TODO: allow tls/ssl
sb.http(sourceConfig.getPort());
if (sourceConfig.isSsl()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: do we want to have isSsl or isSSL given that SSL is an acronym and not a proper word?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think isSSL makes more sense. This getter is autogenerated.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should stick with isSsl. Making initials lowercase is a common convention. For example, java.net.http.HttpRequest.

LOG.info("SSL/TLS is enabled.");
final CertificateProvider certificateProvider = certificateProviderFactory.getCertificateProvider();
final Certificate certificate = certificateProvider.getCertificate();
// TODO: enable encrypted key with password
sb.https(sourceConfig.getPort()).tls(
new ByteArrayInputStream(certificate.getCertificate().getBytes(StandardCharsets.UTF_8)),
new ByteArrayInputStream(certificate.getPrivateKey().getBytes(StandardCharsets.UTF_8)
)
);
} else {
sb.http(sourceConfig.getPort());
}
sb.maxNumConnections(sourceConfig.getMaxConnectionCount());
final int requestTimeoutInMillis = sourceConfig.getRequestTimeoutInMillis();
// Allow 2*requestTimeoutInMillis to accommodate non-blocking operations other than buffer writing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@

import com.amazon.dataprepper.model.configuration.PluginSetting;
import com.google.common.base.Preconditions;
import io.micrometer.core.instrument.util.StringUtils;

import java.nio.file.Files;
import java.nio.file.Path;

public class HTTPSourceConfig {
static final String PORT = "port";
Expand All @@ -21,6 +25,10 @@ public class HTTPSourceConfig {
static final String MAX_CONNECTION_COUNT = "max_connection_count";
static final String MAX_PENDING_REQUESTS = "max_pending_requests";
static final String DEFAULT_LOG_INGEST_URI = "/log/ingest";
static final String SSL = "ssl";
static final String SSL_CERTIFICATE_FILE = "ssl_certificate_file";
static final String SSL_KEY_FILE = "ssl_key_file";
static final String SSL_KEY_PASSWORD = "ssl_key_password";
static final int DEFAULT_PORT = 2021;
static final int DEFAULT_REQUEST_TIMEOUT_MS = 10000;
static final int DEFAULT_THREAD_COUNT = 200;
Expand All @@ -32,22 +40,38 @@ public class HTTPSourceConfig {
private final int threadCount;
private final int maxConnectionCount;
private final int maxPendingRequests;
private final boolean ssl;
private final String sslCertificateFile;
private final String sslKeyFile;
private final String sslKeyPassword;

private HTTPSourceConfig(final int port,
final int requestTimeoutInMillis,
final int threadCount,
final int maxConnectionCount,
final int maxPendingRequests) {
final int maxPendingRequests,
final boolean ssl,
final String sslCertificateFile,
final String sslKeyFile,
final String sslKeyPassword) {
Preconditions.checkArgument(port >= 0 && port < 65535, "port must be between 0 and 65535.");
Preconditions.checkArgument(requestTimeoutInMillis > 0, "request_timeout must be greater than 0.");
Preconditions.checkArgument(threadCount > 0, "thread_count must be greater than 0.");
Preconditions.checkArgument(maxConnectionCount > 0, "max_connection_count must be greater than 0.");
Preconditions.checkArgument(maxPendingRequests > 0, "max_pending_requests must be greater than 0.");
if (ssl) {
validateFilePath(String.format("%s is enabled", SSL), sslCertificateFile, SSL_CERTIFICATE_FILE);
validateFilePath(String.format("%s is enabled", SSL), sslKeyFile, SSL_KEY_FILE);
}
this.port = port;
this.requestTimeoutInMillis = requestTimeoutInMillis;
this.threadCount = threadCount;
this.maxConnectionCount = maxConnectionCount;
this.maxPendingRequests = maxPendingRequests;
this.ssl = ssl;
this.sslCertificateFile = sslCertificateFile;
this.sslKeyFile = sslKeyFile;
this.sslKeyPassword = sslKeyPassword;
}

public static HTTPSourceConfig buildConfig(final PluginSetting pluginSetting) {
Expand All @@ -56,10 +80,20 @@ public static HTTPSourceConfig buildConfig(final PluginSetting pluginSetting) {
pluginSetting.getIntegerOrDefault(REQUEST_TIMEOUT, DEFAULT_REQUEST_TIMEOUT_MS),
pluginSetting.getIntegerOrDefault(THREAD_COUNT, DEFAULT_THREAD_COUNT),
pluginSetting.getIntegerOrDefault(MAX_CONNECTION_COUNT, DEFAULT_MAX_CONNECTION_COUNT),
pluginSetting.getIntegerOrDefault(MAX_PENDING_REQUESTS, DEFAULT_MAX_PENDING_REQUESTS)
pluginSetting.getIntegerOrDefault(MAX_PENDING_REQUESTS, DEFAULT_MAX_PENDING_REQUESTS),
pluginSetting.getBooleanOrDefault(SSL, false),
pluginSetting.getStringOrDefault(SSL_CERTIFICATE_FILE, null),
pluginSetting.getStringOrDefault(SSL_KEY_FILE, null),
pluginSetting.getStringOrDefault(SSL_KEY_PASSWORD, null)
);
}

private void validateFilePath(final String typeMessage, final String argument, final String argumentName) {
if (StringUtils.isEmpty(argument) || !Files.exists(Path.of(argument))) {
throw new IllegalArgumentException(String.format("%s, %s needs to be a valid file path.", typeMessage, argumentName));
}
}

public int getPort() {
return port;
}
Expand All @@ -79,4 +113,20 @@ public int getMaxConnectionCount() {
public int getMaxPendingRequests() {
return maxPendingRequests;
}

public boolean isSsl() {
return ssl;
}

public String getSslCertificateFile() {
return sslCertificateFile;
}

public String getSslKeyFile() {
return sslKeyFile;
}

public String getSslKeyPassword() {
return sslKeyPassword;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package com.amazon.dataprepper.plugins.source.loghttp.certificate;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a blocker for now, but do we want to centralize the SSL certificate logic so we don't have these CertificateProviderFactories all over the codebase?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I did not centralize this class in the PR is b/c right now it takes the source plugin config as input arg. We will need to refactor out a common SSLConfig with all possible relevant parameters in order to do this centralization. The SSLConfig will be reused by the existing source plugins. I will reserve it for future iteration and open an issue.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


import com.amazon.dataprepper.plugins.certificate.CertificateProvider;
import com.amazon.dataprepper.plugins.certificate.file.FileCertificateProvider;
import com.amazon.dataprepper.plugins.source.loghttp.HTTPSourceConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CertificateProviderFactory {
private static final Logger LOG = LoggerFactory.getLogger(CertificateProviderFactory.class);

final HTTPSourceConfig httpSourceConfig;
public CertificateProviderFactory(final HTTPSourceConfig httpSourceConfig) {
this.httpSourceConfig = httpSourceConfig;
}

public CertificateProvider getCertificateProvider() {
// TODO: support more certificate providers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make an issue for this, please.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LOG.info("Using local file system to get certificate and private key for SSL/TLS.");
return new FileCertificateProvider(httpSourceConfig.getSslCertificateFile(), httpSourceConfig.getSslKeyFile());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
-----BEGIN CERTIFICATE-----
MIICHTCCAYYCCQD4hqYeYDQZADANBgkqhkiG9w0BAQUFADBSMQswCQYDVQQGEwJV
UzELMAkGA1UECAwCVFgxDzANBgNVBAcMBkF1c3RpbjEPMA0GA1UECgwGQW1hem9u
MRQwEgYDVQQLDAtEYXRhcHJlcHBlcjAgFw0yMTA2MjUxOTIzMTBaGA8yMTIxMDYw
MTE5MjMxMFowUjELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAlRYMQ8wDQYDVQQHDAZB
dXN0aW4xDzANBgNVBAoMBkFtYXpvbjEUMBIGA1UECwwLRGF0YXByZXBwZXIwgZ8w
DQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAKrb3YhdKbQ5PtLHall10iLZC9ZdDVrq
HOvqVSM8NHlL8f82gJ3l0n9k7hYc5eKisutaS9eDTmJ+Dnn8xn/qPSKTIq9Wh+OZ
O+e9YEEpI/G4F9KpGULgMyRg9sJK0GlZdEt9o5GJNJIJUkptJU5eiLuE0IV+jyJo
Nvm8OE6EJPqxAgMBAAEwDQYJKoZIhvcNAQEFBQADgYEAjgnX5n/Tt7eo9uakIGAb
uBhvYdR8JqKXqF9rjFJ/MIK7FdQSF/gCdjnvBhzLlZFK/Nb6MGKoSKm5Lcr75LgC
FyhIwp3WlqQksiMFnOypYVY71vqDgj6UKdMaOBgthsYhngj8lC+wsVzWqQvkJ2Qg
/GAIzJwiZfXiaevQHRk79qI=
-----END CERTIFICATE-----
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
-----BEGIN RSA PRIVATE KEY-----
MIICXAIBAAKBgQCq292IXSm0OT7Sx2pZddIi2QvWXQ1a6hzr6lUjPDR5S/H/NoCd
5dJ/ZO4WHOXiorLrWkvXg05ifg55/MZ/6j0ikyKvVofjmTvnvWBBKSPxuBfSqRlC
4DMkYPbCStBpWXRLfaORiTSSCVJKbSVOXoi7hNCFfo8iaDb5vDhOhCT6sQIDAQAB
AoGANrrhFqpJDpr7vcb1ER0Fp/YArbT27zVo+EUC6puBb41dQlQyFOImcHpjLaAq
H1PgnjU5cBp2hGQ+vOK0rwrYc/HNl6vfh6N3NbDptMiuoBafRJA9JzYourAM09BU
zmXyr61Yn3KHzx1PRwWe37icX93oXP3P0qHb3dI1ZF4jG0ECQQDU5N/a7ogoz2zn
ZssD6FvUOUQDsdBWdXmhUvg+YdZrV44e4xk+FVzwEONoRktEYKz9MFXlsgNHr445
KRguHWcJAkEAzXQkwOkN8WID1wrwoobUIMbZSGAZzofwkKXgTTnllnT1qOQXuRbS
aCMejFEymBBef4aXP6N4+va2FKW/MF34aQJAO2oMl1sOoOUSrZngepy0VAwPUUCk
thxe74jqQu6nGpn6zd/vQYZQw6bS8Fz90H1yic6dilcd1znFZWp0lxoZkQJBALeI
xoBycRsuFQIYasi1q3AwUtBd0Q/3zkZZeBtk2hzjFMUwJaUZpxKSNOrialD/ZnuD
jz+xWBTRKe0d98JMX+kCQCmsJEj/HYQAC1GamZ7JQWogRSRF2KTgTWRaDXDxy0d4
yUQgwHB+HZLFcbi1JEK6eIixCsX8iifrrkteh+1npJ0=
-----END RSA PRIVATE KEY-----
Loading