diff --git a/http-ballerina-tests/tests/datafiles/BallerinaLang.pdf b/http-ballerina-tests/tests/datafiles/BallerinaLang.pdf new file mode 100755 index 0000000000..f3ff1ec594 Binary files /dev/null and b/http-ballerina-tests/tests/datafiles/BallerinaLang.pdf differ diff --git a/http-ballerina-tests/tests/http_streaming_file_test.bal b/http-ballerina-tests/tests/http_streaming_file_test.bal new file mode 100644 index 0000000000..f24ae5cdbf --- /dev/null +++ b/http-ballerina-tests/tests/http_streaming_file_test.bal @@ -0,0 +1,124 @@ +// Copyright (c) 2021 WSO2 Inc. (http://www.wso2.org) All Rights Reserved. +// +// WSO2 Inc. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import ballerina/http; +import ballerina/io; +import ballerina/log; +import ballerina/mime; +import ballerina/test; +import ballerina/file; + +http:Client streamTestClient = check new ("http://localhost:" + streamTest1.toString()); +http:Client streamBackendClient = check new ("http://localhost:" + streamTest2.toString()); + +service /'stream on new http:Listener(streamTest1) { + resource function get fileupload(http:Caller caller) { + http:Request request = new; + request.setFileAsPayload("tests/datafiles/BallerinaLang.pdf", contentType = mime:APPLICATION_PDF); + var clientResponse = streamBackendClient->post("/streamBack/receiver", request); + + http:Response res = new; + if (clientResponse is http:Response) { + res = clientResponse; + } else { + log:printError("Error occurred while sending data to the client ", 'error = clientResponse); + setError(res, clientResponse); + } + var result = caller->respond(res); + if (result is error) { + log:printError("Error while while sending response to the caller", 'error = result); + } + } + + resource function get cacheFileupload(http:Caller caller) { + http:Response res = new; + res.setFileAsPayload("tests/datafiles/BallerinaLang.pdf", contentType = mime:APPLICATION_PDF); + var result = caller->respond(res); + if (result is error) { + log:printError("Error while while sending response to the caller", 'error = result); + } + } +} + +service /streamBack on new http:Listener(streamTest2) { + resource function post receiver(http:Caller caller, http:Request request) { + http:Response res = new; + stream|error streamer = request.getByteStream(); + + if (streamer is stream) { + io:Error? result = io:fileWriteBlocksFromStream("tests/tempfiles/ReceivedFile.pdf", streamer); + + if (result is error) { + log:printError("error occurred while writing ", 'error = result); + setError(res, result); + } else { + res.setPayload("File Received!"); + error? removeResults = file:remove("tests/tempfiles", file:RECURSIVE); // Removes file. + } + var cr = streamer.close(); + if (cr is error) { + log:printError("Error occurred while closing the stream: ", 'error = cr); + } + } else { + setError(res, streamer); + } + var result = caller->respond(res); + if (result is error) { + log:printError("Error occurred while sending response", 'error = result); + } + } +} + +function setError(http:Response res, error err) { + res.statusCode = 500; + res.setPayload(<@untainted>err.message()); +} + +@test:Config {} +function testStreamingLargeFile() { + var response = streamTestClient->get("/stream/fileupload"); + if (response is http:Response) { + test:assertEquals(response.statusCode, 200, msg = "Found unexpected output"); + assertHeaderValue(checkpanic response.getHeader(CONTENT_TYPE), TEXT_PLAIN); + assertTextPayload(response.getTextPayload(), "File Received!"); + } else { + test:assertFail(msg = "Found unexpected output type: " + response.message()); + } +} + +@test:Config {} +function testConsumedStream() returns error? { + string msg = "Error occurred while retrieving the byte stream from the response"; + string cMsg = "Byte stream is not available but payload can be obtain either as xml, json, string or byte[] type"; + var response = streamTestClient->get("/stream/cacheFileupload"); + if (response is http:Response) { + byte[] bytes = check response.getBinaryPayload(); + stream|error streamer = response.getByteStream(); + if (streamer is stream) { + test:assertFail(msg = "Found unexpected output type"); + } else { + test:assertEquals(streamer.message(), msg, msg = "Found unexpected output"); + error? cause = streamer.cause(); + if (cause is error) { + test:assertEquals(cause.message(), cMsg, msg = "Found unexpected output"); + } else { + test:assertFail(msg = "Found unexpected output type"); + } + } + } else { + test:assertFail(msg = "Found unexpected output type: " + response.message()); + } +} diff --git a/http-ballerina-tests/tests/test_service_ports.bal b/http-ballerina-tests/tests/test_service_ports.bal index 61fb67918c..a96dc52f8d 100644 --- a/http-ballerina-tests/tests/test_service_ports.bal +++ b/http-ballerina-tests/tests/test_service_ports.bal @@ -40,6 +40,8 @@ const int entityTest = 9097; const int mimeTest = 9096; const int proxyTest1 = 9019; const int proxyTest2 = 9020; +const int streamTest1 = 9021; +const int streamTest2 = 9022; // Integration test ports // HTTP diff --git a/http-native/src/main/java/org/ballerinalang/net/http/nativeimpl/ExternHttpDataSourceBuilder.java b/http-native/src/main/java/org/ballerinalang/net/http/nativeimpl/ExternHttpDataSourceBuilder.java index b483a22fd0..b0ddcdc64f 100644 --- a/http-native/src/main/java/org/ballerinalang/net/http/nativeimpl/ExternHttpDataSourceBuilder.java +++ b/http-native/src/main/java/org/ballerinalang/net/http/nativeimpl/ExternHttpDataSourceBuilder.java @@ -38,6 +38,7 @@ import java.io.IOException; import java.io.InputStream; import java.util.Locale; +import java.util.Objects; import static org.ballerinalang.mime.util.EntityBodyHandler.constructBlobDataSource; import static org.ballerinalang.mime.util.EntityBodyHandler.constructJsonDataSource; @@ -45,6 +46,7 @@ import static org.ballerinalang.mime.util.EntityBodyHandler.constructXmlDataSource; import static org.ballerinalang.mime.util.EntityBodyHandler.isStreamingRequired; import static org.ballerinalang.mime.util.MimeConstants.ENTITY_BYTE_CHANNEL; +import static org.ballerinalang.mime.util.MimeConstants.MESSAGE_DATA_SOURCE; import static org.ballerinalang.mime.util.MimeConstants.NO_CONTENT_ERROR; import static org.ballerinalang.mime.util.MimeConstants.PARSER_ERROR; import static org.ballerinalang.mime.util.MimeConstants.TRANSPORT_MESSAGE; @@ -146,8 +148,12 @@ public static Object getByteChannel(BObject entityObj) { } public static void populateInputStream(BObject entityObj) { + Object dataSource = entityObj.getNativeData(MESSAGE_DATA_SOURCE); + if (Objects.nonNull(dataSource)) { + return; + } HttpCarbonMessage httpCarbonMessage = (HttpCarbonMessage) entityObj.getNativeData(TRANSPORT_MESSAGE); - if (httpCarbonMessage != null) { + if (Objects.nonNull(httpCarbonMessage)) { HttpMessageDataStreamer httpMessageDataStreamer = new HttpMessageDataStreamer(httpCarbonMessage); long contentLength = HttpUtil.extractContentLength(httpCarbonMessage);