Skip to content

Commit

Permalink
Feature: enable TLS/SSL through local filepath in http source (#359)
Browse files Browse the repository at this point in the history
* ENH: ssl config parameters and tests

Signed-off-by: qchea <[email protected]>

* MAINT: comment string

Signed-off-by: qchea <[email protected]>

* REF: server SSLCertProvider into common plugin

Signed-off-by: qchea <[email protected]>

* TST: ssl enabled server

Signed-off-by: qchea <[email protected]>

* MAINT: TODO comment

Signed-off-by: qchea <[email protected]>

* MAINT: update README

Signed-off-by: qchea <[email protected]>

* STY: rename ssl related parameters and variables

Signed-off-by: qchea <[email protected]>

* ENH: ssl config parameters and tests

Signed-off-by: qchea <[email protected]>

* MAINT: comment string

Signed-off-by: qchea <[email protected]>

* REF: server SSLCertProvider into common plugin

Signed-off-by: qchea <[email protected]>

* TST: ssl enabled server

Signed-off-by: qchea <[email protected]>

* MAINT: TODO comment

Signed-off-by: qchea <[email protected]>

* MAINT: update README

Signed-off-by: qchea <[email protected]>

* STY: rename ssl related parameters and variables

Signed-off-by: qchea <[email protected]>
  • Loading branch information
chenqi0805 authored Oct 6, 2021
1 parent 6124118 commit ba0d9f1
Show file tree
Hide file tree
Showing 30 changed files with 484 additions and 19 deletions.
7 changes: 7 additions & 0 deletions data-prepper-plugins/common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,15 @@ dependencies {
api project(':data-prepper-api')
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
implementation "commons-io:commons-io:2.11.0"
implementation 'com.amazonaws:aws-java-sdk-s3'
implementation 'com.amazonaws:aws-java-sdk-acm'
implementation "org.apache.commons:commons-lang3:3.12.0"
implementation "org.bouncycastle:bcprov-jdk15on:1.69"
implementation "org.bouncycastle:bcpkix-jdk15on:1.69"
implementation 'org.reflections:reflections:0.9.12'
testImplementation 'commons-io:commons-io:2.11.0'
testImplementation "org.hamcrest:hamcrest:2.2"
}

jacocoTestCoverageVerification {
Expand Down
14 changes: 14 additions & 0 deletions data-prepper-plugins/common/data/certificate/test_cert.crt
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
-----BEGIN CERTIFICATE-----
MIICHTCCAYYCCQD4hqYeYDQZADANBgkqhkiG9w0BAQUFADBSMQswCQYDVQQGEwJV
UzELMAkGA1UECAwCVFgxDzANBgNVBAcMBkF1c3RpbjEPMA0GA1UECgwGQW1hem9u
MRQwEgYDVQQLDAtEYXRhcHJlcHBlcjAgFw0yMTA2MjUxOTIzMTBaGA8yMTIxMDYw
MTE5MjMxMFowUjELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAlRYMQ8wDQYDVQQHDAZB
dXN0aW4xDzANBgNVBAoMBkFtYXpvbjEUMBIGA1UECwwLRGF0YXByZXBwZXIwgZ8w
DQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAKrb3YhdKbQ5PtLHall10iLZC9ZdDVrq
HOvqVSM8NHlL8f82gJ3l0n9k7hYc5eKisutaS9eDTmJ+Dnn8xn/qPSKTIq9Wh+OZ
O+e9YEEpI/G4F9KpGULgMyRg9sJK0GlZdEt9o5GJNJIJUkptJU5eiLuE0IV+jyJo
Nvm8OE6EJPqxAgMBAAEwDQYJKoZIhvcNAQEFBQADgYEAjgnX5n/Tt7eo9uakIGAb
uBhvYdR8JqKXqF9rjFJ/MIK7FdQSF/gCdjnvBhzLlZFK/Nb6MGKoSKm5Lcr75LgC
FyhIwp3WlqQksiMFnOypYVY71vqDgj6UKdMaOBgthsYhngj8lC+wsVzWqQvkJ2Qg
/GAIzJwiZfXiaevQHRk79qI=
-----END CERTIFICATE-----
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
-----BEGIN RSA PRIVATE KEY-----
MIICXAIBAAKBgQCq292IXSm0OT7Sx2pZddIi2QvWXQ1a6hzr6lUjPDR5S/H/NoCd
5dJ/ZO4WHOXiorLrWkvXg05ifg55/MZ/6j0ikyKvVofjmTvnvWBBKSPxuBfSqRlC
4DMkYPbCStBpWXRLfaORiTSSCVJKbSVOXoi7hNCFfo8iaDb5vDhOhCT6sQIDAQAB
AoGANrrhFqpJDpr7vcb1ER0Fp/YArbT27zVo+EUC6puBb41dQlQyFOImcHpjLaAq
H1PgnjU5cBp2hGQ+vOK0rwrYc/HNl6vfh6N3NbDptMiuoBafRJA9JzYourAM09BU
zmXyr61Yn3KHzx1PRwWe37icX93oXP3P0qHb3dI1ZF4jG0ECQQDU5N/a7ogoz2zn
ZssD6FvUOUQDsdBWdXmhUvg+YdZrV44e4xk+FVzwEONoRktEYKz9MFXlsgNHr445
KRguHWcJAkEAzXQkwOkN8WID1wrwoobUIMbZSGAZzofwkKXgTTnllnT1qOQXuRbS
aCMejFEymBBef4aXP6N4+va2FKW/MF34aQJAO2oMl1sOoOUSrZngepy0VAwPUUCk
thxe74jqQu6nGpn6zd/vQYZQw6bS8Fz90H1yic6dilcd1znFZWp0lxoZkQJBALeI
xoBycRsuFQIYasi1q3AwUtBd0Q/3zkZZeBtk2hzjFMUwJaUZpxKSNOrialD/ZnuD
jz+xWBTRKe0d98JMX+kCQCmsJEj/HYQAC1GamZ7JQWogRSRF2KTgTWRaDXDxy0d4
yUQgwHB+HZLFcbi1JEK6eIixCsX8iifrrkteh+1npJ0=
-----END RSA PRIVATE KEY-----
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
-----BEGIN ENCRYPTED PRIVATE KEY-----
MIICojAcBgoqhkiG9w0BDAEDMA4ECAd2FKZw2oGwAgIIAASCAoDTgiaXkazaotc7
SxQK3bX34sEvdkLXg/O4ZpHTb0f4gLPxNhDe7ZPKrAS2TdywpSHT0189MVl+PIvw
4YQDaGVHL1SM5ukJu+PQkfQAMigdCJ+bUsG6hkrUDC74qYhHZHj1yVGavL6I4KHT
Ixh9IV2GMRS4m6HGJ17nYsdiTFFNdK++WcTMxbVWv3SNdKGZG79T32pjMxuIUPWr
3dB+ZXM+FSqwlBLZxPvvjlP6ETw7QXrlBHcQh1tHSh10bM+q0c5CktZoXLwpc+gv
ZsGXzpVjqFrAw4Vw0ikJl1mUCoGOtqqP0P6QWwbIJZBxNoO0MvWcXW+U3AGNFFze
nMR8UTXdga2l1Lx7pokQkWUpp48SDRjDx/RdZTRXCgtRcKuBcm0x2lxNILdwOzjJ
5GlKMvvc2OXXTnYqSCTqdfbuR3XBYmWgFki92D6JnVIYq+QJr5qj8IJDJ7mADQ1i
Za6PEJnrT641fLeSKRq7QiTydMQ3JXa9DFqUPwdZPPHLr/hC19sWHrq7gxvhkcLI
wrTtTIi8/iV4IVaiHk7YU83IM6sGExabQ3BRXcHMr+7i1vVxtEsFNC6ycTfJ8gpJ
YsnpXUQe912l5sk7GRSP1InNRF7kzMD0QeOAQ0UVfmsbHOPSXvD7fXkJWIb6N+zW
qCQYRmBwc7Bz2KZein5MLsMeNz2AWj/DcA2fMC+4+QtI0nF5BFtaR0V0npWhsbPu
3rj+AXipnvVhDIkfl8495j7ybCBj7HAZk8Ux8GmiZ3PGFO1C7XCQaLPWJ4Aw4Kb3
EUqtVtpbwsCov5PDmMDXgz8qOxWMdQsP/dPF1HnVAg7SoFG9xA4nHdZ2WAFZwYtf
rRxEd7br
-----END ENCRYPTED PRIVATE KEY-----
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static java.util.Objects.requireNonNull;

// TODO: accommodate encrypted private key with password
public class Certificate {
/**
* The base64 PEM-encoded certificate.
Expand Down
6 changes: 6 additions & 0 deletions data-prepper-plugins/http-source/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ source:
* max_connection_count(Optional) => An `int` larger than 0 represents the maximum allowed number of open connections. Default is `500`.
* max_pending_requests(Optional) => An `int` larger than 0 represents the maximum allowed number of tasks in the ScheduledThreadPool work queue. Default is `1024`.

### SSL

* ssl(Optional) => A `boolean` enables TLS/SSL. Default is ```true```.
* ssl_certificate_file(Optional) => A `String` represents the SSL certificate chain file path. Required if ```ssl``` is set to ```true```.
* ssl_key_file(Optional) => A `String` represents the SSL key file path. Only decrypted key file is supported. Required if ```ssl``` is set to ```true```.

## Metrics

TBD
Expand Down
2 changes: 2 additions & 0 deletions data-prepper-plugins/http-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ plugins {
dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:blocking-buffer')
implementation project(':data-prepper-plugins:common')
implementation "com.linecorp.armeria:armeria:1.9.2"
implementation "commons-io:commons-io:2.11.0"
testImplementation 'org.assertj:assertj-core:3.20.2'
testImplementation "org.hamcrest:hamcrest:2.2"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,17 @@
import com.amazon.dataprepper.model.configuration.PluginSetting;
import com.amazon.dataprepper.model.record.Record;
import com.amazon.dataprepper.model.source.Source;
import com.amazon.dataprepper.plugins.certificate.CertificateProvider;
import com.amazon.dataprepper.plugins.certificate.model.Certificate;
import com.amazon.dataprepper.plugins.source.loghttp.certificate.CertificateProviderFactory;
import com.linecorp.armeria.server.Server;
import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.server.throttling.ThrottlingService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
Expand All @@ -32,10 +37,12 @@ public class HTTPSource implements Source<Record<String>> {
private static final Logger LOG = LoggerFactory.getLogger(HTTPSource.class);

private final HTTPSourceConfig sourceConfig;
private final CertificateProviderFactory certificateProviderFactory;
private Server server;

public HTTPSource(final PluginSetting pluginSetting) {
sourceConfig = HTTPSourceConfig.buildConfig(pluginSetting);
certificateProviderFactory = new CertificateProviderFactory(sourceConfig);
}

@Override
Expand All @@ -45,8 +52,19 @@ public void start(Buffer<Record<String>> buffer) {
}
if (server == null) {
final ServerBuilder sb = Server.builder();
// TODO: allow tls/ssl
sb.http(sourceConfig.getPort());
if (sourceConfig.isSsl()) {
LOG.info("SSL/TLS is enabled.");
final CertificateProvider certificateProvider = certificateProviderFactory.getCertificateProvider();
final Certificate certificate = certificateProvider.getCertificate();
// TODO: enable encrypted key with password
sb.https(sourceConfig.getPort()).tls(
new ByteArrayInputStream(certificate.getCertificate().getBytes(StandardCharsets.UTF_8)),
new ByteArrayInputStream(certificate.getPrivateKey().getBytes(StandardCharsets.UTF_8)
)
);
} else {
sb.http(sourceConfig.getPort());
}
sb.maxNumConnections(sourceConfig.getMaxConnectionCount());
final int requestTimeoutInMillis = sourceConfig.getRequestTimeoutInMillis();
// Allow 2*requestTimeoutInMillis to accommodate non-blocking operations other than buffer writing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@

import com.amazon.dataprepper.model.configuration.PluginSetting;
import com.google.common.base.Preconditions;
import io.micrometer.core.instrument.util.StringUtils;

import java.nio.file.Files;
import java.nio.file.Path;

public class HTTPSourceConfig {
static final String PORT = "port";
Expand All @@ -21,6 +25,10 @@ public class HTTPSourceConfig {
static final String MAX_CONNECTION_COUNT = "max_connection_count";
static final String MAX_PENDING_REQUESTS = "max_pending_requests";
static final String DEFAULT_LOG_INGEST_URI = "/log/ingest";
static final String SSL = "ssl";
static final String SSL_CERTIFICATE_FILE = "ssl_certificate_file";
static final String SSL_KEY_FILE = "ssl_key_file";
static final String SSL_KEY_PASSWORD = "ssl_key_password";
static final int DEFAULT_PORT = 2021;
static final int DEFAULT_REQUEST_TIMEOUT_MS = 10000;
static final int DEFAULT_THREAD_COUNT = 200;
Expand All @@ -32,22 +40,38 @@ public class HTTPSourceConfig {
private final int threadCount;
private final int maxConnectionCount;
private final int maxPendingRequests;
private final boolean ssl;
private final String sslCertificateFile;
private final String sslKeyFile;
private final String sslKeyPassword;

private HTTPSourceConfig(final int port,
final int requestTimeoutInMillis,
final int threadCount,
final int maxConnectionCount,
final int maxPendingRequests) {
final int maxPendingRequests,
final boolean ssl,
final String sslCertificateFile,
final String sslKeyFile,
final String sslKeyPassword) {
Preconditions.checkArgument(port >= 0 && port < 65535, "port must be between 0 and 65535.");
Preconditions.checkArgument(requestTimeoutInMillis > 0, "request_timeout must be greater than 0.");
Preconditions.checkArgument(threadCount > 0, "thread_count must be greater than 0.");
Preconditions.checkArgument(maxConnectionCount > 0, "max_connection_count must be greater than 0.");
Preconditions.checkArgument(maxPendingRequests > 0, "max_pending_requests must be greater than 0.");
if (ssl) {
validateFilePath(String.format("%s is enabled", SSL), sslCertificateFile, SSL_CERTIFICATE_FILE);
validateFilePath(String.format("%s is enabled", SSL), sslKeyFile, SSL_KEY_FILE);
}
this.port = port;
this.requestTimeoutInMillis = requestTimeoutInMillis;
this.threadCount = threadCount;
this.maxConnectionCount = maxConnectionCount;
this.maxPendingRequests = maxPendingRequests;
this.ssl = ssl;
this.sslCertificateFile = sslCertificateFile;
this.sslKeyFile = sslKeyFile;
this.sslKeyPassword = sslKeyPassword;
}

public static HTTPSourceConfig buildConfig(final PluginSetting pluginSetting) {
Expand All @@ -56,10 +80,20 @@ public static HTTPSourceConfig buildConfig(final PluginSetting pluginSetting) {
pluginSetting.getIntegerOrDefault(REQUEST_TIMEOUT, DEFAULT_REQUEST_TIMEOUT_MS),
pluginSetting.getIntegerOrDefault(THREAD_COUNT, DEFAULT_THREAD_COUNT),
pluginSetting.getIntegerOrDefault(MAX_CONNECTION_COUNT, DEFAULT_MAX_CONNECTION_COUNT),
pluginSetting.getIntegerOrDefault(MAX_PENDING_REQUESTS, DEFAULT_MAX_PENDING_REQUESTS)
pluginSetting.getIntegerOrDefault(MAX_PENDING_REQUESTS, DEFAULT_MAX_PENDING_REQUESTS),
pluginSetting.getBooleanOrDefault(SSL, false),
pluginSetting.getStringOrDefault(SSL_CERTIFICATE_FILE, null),
pluginSetting.getStringOrDefault(SSL_KEY_FILE, null),
pluginSetting.getStringOrDefault(SSL_KEY_PASSWORD, null)
);
}

private void validateFilePath(final String typeMessage, final String argument, final String argumentName) {
if (StringUtils.isEmpty(argument) || !Files.exists(Path.of(argument))) {
throw new IllegalArgumentException(String.format("%s, %s needs to be a valid file path.", typeMessage, argumentName));
}
}

public int getPort() {
return port;
}
Expand All @@ -79,4 +113,20 @@ public int getMaxConnectionCount() {
public int getMaxPendingRequests() {
return maxPendingRequests;
}

public boolean isSsl() {
return ssl;
}

public String getSslCertificateFile() {
return sslCertificateFile;
}

public String getSslKeyFile() {
return sslKeyFile;
}

public String getSslKeyPassword() {
return sslKeyPassword;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package com.amazon.dataprepper.plugins.source.loghttp.certificate;

import com.amazon.dataprepper.plugins.certificate.CertificateProvider;
import com.amazon.dataprepper.plugins.certificate.file.FileCertificateProvider;
import com.amazon.dataprepper.plugins.source.loghttp.HTTPSourceConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CertificateProviderFactory {
private static final Logger LOG = LoggerFactory.getLogger(CertificateProviderFactory.class);

final HTTPSourceConfig httpSourceConfig;
public CertificateProviderFactory(final HTTPSourceConfig httpSourceConfig) {
this.httpSourceConfig = httpSourceConfig;
}

public CertificateProvider getCertificateProvider() {
// TODO: support more certificate providers
LOG.info("Using local file system to get certificate and private key for SSL/TLS.");
return new FileCertificateProvider(httpSourceConfig.getSslCertificateFile(), httpSourceConfig.getSslKeyFile());
}
}
14 changes: 14 additions & 0 deletions data-prepper-plugins/http-source/src/main/resources/test_cert.crt
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
-----BEGIN CERTIFICATE-----
MIICHTCCAYYCCQD4hqYeYDQZADANBgkqhkiG9w0BAQUFADBSMQswCQYDVQQGEwJV
UzELMAkGA1UECAwCVFgxDzANBgNVBAcMBkF1c3RpbjEPMA0GA1UECgwGQW1hem9u
MRQwEgYDVQQLDAtEYXRhcHJlcHBlcjAgFw0yMTA2MjUxOTIzMTBaGA8yMTIxMDYw
MTE5MjMxMFowUjELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAlRYMQ8wDQYDVQQHDAZB
dXN0aW4xDzANBgNVBAoMBkFtYXpvbjEUMBIGA1UECwwLRGF0YXByZXBwZXIwgZ8w
DQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAKrb3YhdKbQ5PtLHall10iLZC9ZdDVrq
HOvqVSM8NHlL8f82gJ3l0n9k7hYc5eKisutaS9eDTmJ+Dnn8xn/qPSKTIq9Wh+OZ
O+e9YEEpI/G4F9KpGULgMyRg9sJK0GlZdEt9o5GJNJIJUkptJU5eiLuE0IV+jyJo
Nvm8OE6EJPqxAgMBAAEwDQYJKoZIhvcNAQEFBQADgYEAjgnX5n/Tt7eo9uakIGAb
uBhvYdR8JqKXqF9rjFJ/MIK7FdQSF/gCdjnvBhzLlZFK/Nb6MGKoSKm5Lcr75LgC
FyhIwp3WlqQksiMFnOypYVY71vqDgj6UKdMaOBgthsYhngj8lC+wsVzWqQvkJ2Qg
/GAIzJwiZfXiaevQHRk79qI=
-----END CERTIFICATE-----
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
-----BEGIN RSA PRIVATE KEY-----
MIICXAIBAAKBgQCq292IXSm0OT7Sx2pZddIi2QvWXQ1a6hzr6lUjPDR5S/H/NoCd
5dJ/ZO4WHOXiorLrWkvXg05ifg55/MZ/6j0ikyKvVofjmTvnvWBBKSPxuBfSqRlC
4DMkYPbCStBpWXRLfaORiTSSCVJKbSVOXoi7hNCFfo8iaDb5vDhOhCT6sQIDAQAB
AoGANrrhFqpJDpr7vcb1ER0Fp/YArbT27zVo+EUC6puBb41dQlQyFOImcHpjLaAq
H1PgnjU5cBp2hGQ+vOK0rwrYc/HNl6vfh6N3NbDptMiuoBafRJA9JzYourAM09BU
zmXyr61Yn3KHzx1PRwWe37icX93oXP3P0qHb3dI1ZF4jG0ECQQDU5N/a7ogoz2zn
ZssD6FvUOUQDsdBWdXmhUvg+YdZrV44e4xk+FVzwEONoRktEYKz9MFXlsgNHr445
KRguHWcJAkEAzXQkwOkN8WID1wrwoobUIMbZSGAZzofwkKXgTTnllnT1qOQXuRbS
aCMejFEymBBef4aXP6N4+va2FKW/MF34aQJAO2oMl1sOoOUSrZngepy0VAwPUUCk
thxe74jqQu6nGpn6zd/vQYZQw6bS8Fz90H1yic6dilcd1znFZWp0lxoZkQJBALeI
xoBycRsuFQIYasi1q3AwUtBd0Q/3zkZZeBtk2hzjFMUwJaUZpxKSNOrialD/ZnuD
jz+xWBTRKe0d98JMX+kCQCmsJEj/HYQAC1GamZ7JQWogRSRF2KTgTWRaDXDxy0d4
yUQgwHB+HZLFcbi1JEK6eIixCsX8iifrrkteh+1npJ0=
-----END RSA PRIVATE KEY-----
Loading

0 comments on commit ba0d9f1

Please sign in to comment.