-
Notifications
You must be signed in to change notification settings - Fork 4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(kinesisanalytics-flink): VPC support for Flink applications #24442
Changes from 8 commits
04464cb
88f03af
985d40a
c0a1318
a6acadc
ed87d70
e5110f1
6432960
82e453e
5cf2e59
8b6ed6f
6042760
4f3db20
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -1,4 +1,5 @@ | ||||||
import * as cloudwatch from '@aws-cdk/aws-cloudwatch'; | ||||||
import * as ec2 from '@aws-cdk/aws-ec2'; | ||||||
import * as iam from '@aws-cdk/aws-iam'; | ||||||
import { CfnApplicationCloudWatchLoggingOptionV2, CfnApplicationV2 } from '@aws-cdk/aws-kinesisanalytics'; | ||||||
import * as logs from '@aws-cdk/aws-logs'; | ||||||
|
@@ -14,7 +15,7 @@ import { LogLevel, MetricsLevel, PropertyGroups, Runtime } from './types'; | |||||
* An interface expressing the public properties on both an imported and | ||||||
* CDK-created Flink application. | ||||||
*/ | ||||||
export interface IApplication extends core.IResource, iam.IGrantable { | ||||||
export interface IApplication extends core.IResource, ec2.IConnectable, iam.IGrantable { | ||||||
/** | ||||||
* The application ARN. | ||||||
* | ||||||
|
@@ -351,6 +352,13 @@ abstract class ApplicationBase extends core.Resource implements IApplication { | |||||
// Implement iam.IGrantable interface | ||||||
public abstract readonly grantPrincipal: iam.IPrincipal; | ||||||
|
||||||
/** | ||||||
* The underlying connections object for the connections getter. | ||||||
* | ||||||
* @internal | ||||||
*/ | ||||||
protected _connections?: ec2.Connections; | ||||||
|
||||||
/** Implement the convenience `IApplication.addToPrincipalPolicy` method. */ | ||||||
public addToRolePolicy(policyStatement: iam.PolicyStatement): boolean { | ||||||
if (this.role) { | ||||||
|
@@ -361,6 +369,13 @@ abstract class ApplicationBase extends core.Resource implements IApplication { | |||||
return false; | ||||||
} | ||||||
|
||||||
public get connections() { | ||||||
if (!this._connections) { | ||||||
throw new Error('This Application isn\'t associated with a VPC. Provide a "vpc" prop when creating the Application or "securityGroups" when importing it.'); | ||||||
} | ||||||
return this._connections; | ||||||
} | ||||||
|
||||||
/** | ||||||
* Return a CloudWatch metric associated with this Flink application. | ||||||
* | ||||||
|
@@ -720,6 +735,25 @@ abstract class ApplicationBase extends core.Resource implements IApplication { | |||||
} | ||||||
} | ||||||
|
||||||
/** | ||||||
* Attributes used for importing an Application with Application.fromApplicationAttributes. | ||||||
*/ | ||||||
export interface ApplicationAttributes { | ||||||
/** | ||||||
* The ARN of the Flink application. | ||||||
* | ||||||
* Format: arn:<partition>:kinesisanalytics:<region>:<account-id>:application/<application-name> | ||||||
*/ | ||||||
readonly applicationArn: string; | ||||||
|
||||||
/** | ||||||
* The security groups for this Flink application if deployed in a VPC. | ||||||
* | ||||||
* @default no security groups | ||||||
*/ | ||||||
readonly securityGroups?: ec2.ISecurityGroup[]; | ||||||
} | ||||||
|
||||||
/** | ||||||
* Props for creating an Application construct. | ||||||
*/ | ||||||
|
@@ -840,6 +874,27 @@ export interface ApplicationProps { | |||||
* @default CDK's default LogGroup | ||||||
*/ | ||||||
readonly logGroup?: logs.ILogGroup; | ||||||
|
||||||
/** | ||||||
* Deploy the Flink application in a VPC. | ||||||
* | ||||||
* @default no VPC | ||||||
*/ | ||||||
readonly vpc?: ec2.IVpc; | ||||||
|
||||||
/** | ||||||
* Choose which VPC subnets to use. | ||||||
* | ||||||
* @default SubnetType.PRIVATE_WITH_EGRESS subnets | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
*/ | ||||||
readonly vpcSubnets?: ec2.SubnetSelection; | ||||||
|
||||||
/** | ||||||
* Security groups to use with a provided VPC. | ||||||
* | ||||||
* @default a new security group is created for this application. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
*/ | ||||||
readonly securityGroups?: ec2.ISecurityGroup[]; | ||||||
} | ||||||
|
||||||
/** | ||||||
|
@@ -851,15 +906,24 @@ class Import extends ApplicationBase { | |||||
public readonly applicationName: string; | ||||||
public readonly applicationArn: string; | ||||||
|
||||||
constructor(scope: Construct, id: string, attrs: { applicationArn: string, applicationName: string }) { | ||||||
constructor(scope: Construct, id: string, attrs: { applicationArn: string, securityGroups?: ec2.ISecurityGroup[] }) { | ||||||
Comment on lines
-854
to
+909
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why has I'd rather avoid removing this arg entirely, as doing so is a breaking change There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ahh I see, yep I'd missed that. Thanks |
||||||
super(scope, id); | ||||||
|
||||||
// Imported applications have no associated role or grantPrincipal | ||||||
this.grantPrincipal = new iam.UnknownPrincipal({ resource: this }); | ||||||
this.role = undefined; | ||||||
|
||||||
this.applicationArn = attrs.applicationArn; | ||||||
this.applicationName = attrs.applicationName; | ||||||
const applicationName = core.Stack.of(scope).splitArn(attrs.applicationArn, core.ArnFormat.SLASH_RESOURCE_NAME).resourceName; | ||||||
if (!applicationName) { | ||||||
throw new Error(`applicationArn for fromApplicationArn (${attrs.applicationArn}) must include resource name`); | ||||||
} | ||||||
this.applicationName = applicationName; | ||||||
Comment on lines
-862
to
+921
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why are we getting this from the arn now? This worked when it was in the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Before this code was in the Also I have tests for |
||||||
|
||||||
const securityGroups = attrs.securityGroups ?? []; | ||||||
if (securityGroups.length > 0) { | ||||||
this._connections = new ec2.Connections({ securityGroups }); | ||||||
} | ||||||
} | ||||||
} | ||||||
|
||||||
|
@@ -877,20 +941,25 @@ export class Application extends ApplicationBase { | |||||
public static fromApplicationName(scope: Construct, id: string, applicationName: string): IApplication { | ||||||
const applicationArn = core.Stack.of(scope).formatArn(applicationArnComponents(applicationName)); | ||||||
|
||||||
return new Import(scope, id, { applicationArn, applicationName }); | ||||||
return new Import(scope, id, { applicationArn }); | ||||||
} | ||||||
|
||||||
/** | ||||||
* Import an existing application defined outside of CDK code by | ||||||
* applicationArn. | ||||||
*/ | ||||||
public static fromApplicationArn(scope: Construct, id: string, applicationArn: string): IApplication { | ||||||
const applicationName = core.Stack.of(scope).splitArn(applicationArn, core.ArnFormat.SLASH_RESOURCE_NAME).resourceName; | ||||||
if (!applicationName) { | ||||||
throw new Error(`applicationArn for fromApplicationArn (${applicationArn}) must include resource name`); | ||||||
} | ||||||
return new Import(scope, id, { applicationArn }); | ||||||
} | ||||||
|
||||||
return new Import(scope, id, { applicationArn, applicationName }); | ||||||
/** | ||||||
* Import an existing application defined outside of CDK code. | ||||||
*/ | ||||||
public static fromApplicationAttributes(scope: Construct, id: string, attrs: ApplicationAttributes): IApplication { | ||||||
return new Import(scope, id, { | ||||||
applicationArn: attrs.applicationArn, | ||||||
securityGroups: attrs.securityGroups, | ||||||
}); | ||||||
} | ||||||
|
||||||
public readonly applicationArn: string; | ||||||
|
@@ -919,6 +988,23 @@ export class Application extends ApplicationBase { | |||||
const code = props.code.bind(this); | ||||||
code.bucket.grantRead(this); | ||||||
|
||||||
let vpcConfigurations; | ||||||
if (props.vpc) { | ||||||
const securityGroups = props.securityGroups ?? [ | ||||||
new ec2.SecurityGroup(this, 'SecurityGroup', { | ||||||
vpc: props.vpc, | ||||||
}), | ||||||
]; | ||||||
this._connections = new ec2.Connections({ securityGroups }); | ||||||
const subnetSelection = props.vpcSubnets ?? { | ||||||
subnetType: ec2.SubnetType.PRIVATE_WITH_EGRESS, | ||||||
}; | ||||||
vpcConfigurations = [{ | ||||||
securityGroupIds: securityGroups.map(sg => sg.securityGroupId), | ||||||
subnetIds: props.vpc.selectSubnets(subnetSelection).subnetIds, | ||||||
}]; | ||||||
} | ||||||
|
||||||
const resource = new CfnApplicationV2(this, 'Resource', { | ||||||
applicationName: props.applicationName, | ||||||
runtimeEnvironment: props.runtime.value, | ||||||
|
@@ -939,6 +1025,7 @@ export class Application extends ApplicationBase { | |||||
applicationSnapshotConfiguration: { | ||||||
snapshotsEnabled: props.snapshotsEnabled ?? true, | ||||||
}, | ||||||
vpcConfigurations, | ||||||
}, | ||||||
}); | ||||||
resource.node.addDependency(this.role); | ||||||
|
@@ -978,6 +1065,24 @@ export class Application extends ApplicationBase { | |||||
}, | ||||||
}); | ||||||
|
||||||
// Permissions required for VPC usage per: | ||||||
// https://docs.aws.amazon.com/kinesisanalytics/latest/java/vpc-permissions.html | ||||||
if (props.vpc) { | ||||||
this.role.addToPrincipalPolicy(new iam.PolicyStatement({ | ||||||
actions: [ | ||||||
'ec2:DescribeVpcs', | ||||||
'ec2:DescribeSubnets', | ||||||
'ec2:DescribeSecurityGroups', | ||||||
'ec2:DescribeDhcpOptions', | ||||||
'ec2:CreateNetworkInterface', | ||||||
'ec2:CreateNetworkInterfacePermission', | ||||||
'ec2:DescribeNetworkInterfaces', | ||||||
'ec2:DeleteNetworkInterface', | ||||||
], | ||||||
resources: ['*'], | ||||||
})); | ||||||
} | ||||||
|
||||||
this.applicationName = this.getResourceNameAttribute(resource.ref); | ||||||
this.applicationArn = this.getResourceArnAttribute( | ||||||
core.Stack.of(this).formatArn(applicationArnComponents(resource.ref)), | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just pushed a change which adds hyphens for these kinds of @default phrases to the entire file.