Skip to content

Commit

Permalink
Merge pull request #76 from lasanthaS/master
Browse files Browse the repository at this point in the history
Fix build failure
  • Loading branch information
AnuGayan authored Jun 15, 2021
2 parents 3568bed + b687bc5 commit 1e07c59
Showing 1 changed file with 12 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ Map<String, Object> createMap(ConnectRecord connectRecord, String operation) {
case CDCSourceConstants.CONNECT_RECORD_INSERT_OPERATION:
if (operationList.contains(CDCSourceConstants.INSERT)) {
transportProperties.add(CDCSourceConstants.INSERT);
transportProperties.add(record.getStruct(CDCSourceConstants.SOURCE_SCHEMA).get(CDCSourceConstants.EVENT_TIMESTAMP));
transportProperties.add(record.getStruct(CDCSourceConstants.SOURCE_SCHEMA)
.get(CDCSourceConstants.EVENT_TIMESTAMP));
transportProperties.add(record.get(CDCSourceConstants.EVENT_TIMESTAMP));
detailsMap.put(CDCSourceConstants.TRANSPORT_PROPERTIES, transportProperties);
rawDetails = (Struct) record.get(CDCSourceConstants.AFTER);
Expand All @@ -84,7 +85,8 @@ Map<String, Object> createMap(ConnectRecord connectRecord, String operation) {
case CDCSourceConstants.CONNECT_RECORD_UPDATE_OPERATION:
if (operationList.contains(CDCSourceConstants.UPDATE)) {
transportProperties.add(CDCSourceConstants.UPDATE);
transportProperties.add(record.getStruct(CDCSourceConstants.SOURCE_SCHEMA).get(CDCSourceConstants.EVENT_TIMESTAMP));
transportProperties.add(record.getStruct(CDCSourceConstants.SOURCE_SCHEMA)
.get(CDCSourceConstants.EVENT_TIMESTAMP));
transportProperties.add(record.get(CDCSourceConstants.EVENT_TIMESTAMP));
detailsMap.put(CDCSourceConstants.TRANSPORT_PROPERTIES, transportProperties);
rawDetails = (Struct) record.get(CDCSourceConstants.BEFORE);
Expand All @@ -107,7 +109,8 @@ Map<String, Object> createMap(ConnectRecord connectRecord, String operation) {
case CDCSourceConstants.CONNECT_RECORD_DELETE_OPERATION:
if (operationList.contains(CDCSourceConstants.DELETE)) {
transportProperties.add(CDCSourceConstants.DELETE);
transportProperties.add(record.getStruct(CDCSourceConstants.SOURCE_SCHEMA).get(CDCSourceConstants.EVENT_TIMESTAMP));
transportProperties.add(record.getStruct(CDCSourceConstants.SOURCE_SCHEMA)
.get(CDCSourceConstants.EVENT_TIMESTAMP));
transportProperties.add(record.get(CDCSourceConstants.EVENT_TIMESTAMP));
detailsMap.put(CDCSourceConstants.TRANSPORT_PROPERTIES, transportProperties);
rawDetails = (Struct) record.get(CDCSourceConstants.BEFORE);
Expand Down Expand Up @@ -135,7 +138,8 @@ Map<String, Object> createMap(ConnectRecord connectRecord, String operation) {
switch (op) {
case CDCSourceConstants.CONNECT_RECORD_INSERT_OPERATION:
transportProperties.add(CDCSourceConstants.INSERT);
transportProperties.add(record.getStruct(CDCSourceConstants.SOURCE_SCHEMA).get(CDCSourceConstants.EVENT_TIMESTAMP));
transportProperties.add(record.getStruct(CDCSourceConstants.SOURCE_SCHEMA)
.get(CDCSourceConstants.EVENT_TIMESTAMP));
transportProperties.add(record.get(CDCSourceConstants.EVENT_TIMESTAMP));
detailsMap.put(CDCSourceConstants.TRANSPORT_PROPERTIES, transportProperties);
//append row details after insert.
Expand All @@ -148,7 +152,8 @@ Map<String, Object> createMap(ConnectRecord connectRecord, String operation) {
break;
case CDCSourceConstants.CONNECT_RECORD_DELETE_OPERATION:
transportProperties.add(CDCSourceConstants.DELETE);
transportProperties.add(record.getStruct(CDCSourceConstants.SOURCE_SCHEMA).get(CDCSourceConstants.EVENT_TIMESTAMP));
transportProperties.add(record.getStruct(CDCSourceConstants.SOURCE_SCHEMA)
.get(CDCSourceConstants.EVENT_TIMESTAMP));
transportProperties.add(record.get(CDCSourceConstants.EVENT_TIMESTAMP));
detailsMap.put(CDCSourceConstants.TRANSPORT_PROPERTIES, transportProperties);
//append row details before delete.
Expand All @@ -162,7 +167,8 @@ Map<String, Object> createMap(ConnectRecord connectRecord, String operation) {
break;
case CDCSourceConstants.CONNECT_RECORD_UPDATE_OPERATION:
transportProperties.add(CDCSourceConstants.UPDATE);
transportProperties.add(record.getStruct(CDCSourceConstants.SOURCE_SCHEMA).get(CDCSourceConstants.EVENT_TIMESTAMP));
transportProperties.add(record.getStruct(CDCSourceConstants.SOURCE_SCHEMA)
.get(CDCSourceConstants.EVENT_TIMESTAMP));
transportProperties.add(record.get(CDCSourceConstants.EVENT_TIMESTAMP));
detailsMap.put(CDCSourceConstants.TRANSPORT_PROPERTIES, transportProperties);
//append row details before update.
Expand Down

0 comments on commit 1e07c59

Please sign in to comment.