Skip to content

Commit

Permalink
Merge pull request #374 from chamil321/cc
Browse files Browse the repository at this point in the history
Fix built messge datasource streaming exception
  • Loading branch information
chamil321 authored May 10, 2021
2 parents eb954d1 + 51b8bf3 commit 8d60679
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 1 deletion.
Binary file not shown.
124 changes: 124 additions & 0 deletions http-ballerina-tests/tests/http_streaming_file_test.bal
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright (c) 2021 WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
//
// WSO2 Inc. licenses this file to you under the Apache License,
// Version 2.0 (the "License"); you may not use this file except
// in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import ballerina/http;
import ballerina/io;
import ballerina/log;
import ballerina/mime;
import ballerina/test;
import ballerina/file;

http:Client streamTestClient = check new ("http://localhost:" + streamTest1.toString());
http:Client streamBackendClient = check new ("http://localhost:" + streamTest2.toString());

service /'stream on new http:Listener(streamTest1) {
resource function get fileupload(http:Caller caller) {
http:Request request = new;
request.setFileAsPayload("tests/datafiles/BallerinaLang.pdf", contentType = mime:APPLICATION_PDF);
var clientResponse = streamBackendClient->post("/streamBack/receiver", request);

http:Response res = new;
if (clientResponse is http:Response) {
res = clientResponse;
} else {
log:printError("Error occurred while sending data to the client ", 'error = clientResponse);
setError(res, clientResponse);
}
var result = caller->respond(res);
if (result is error) {
log:printError("Error while while sending response to the caller", 'error = result);
}
}

resource function get cacheFileupload(http:Caller caller) {
http:Response res = new;
res.setFileAsPayload("tests/datafiles/BallerinaLang.pdf", contentType = mime:APPLICATION_PDF);
var result = caller->respond(res);
if (result is error) {
log:printError("Error while while sending response to the caller", 'error = result);
}
}
}

service /streamBack on new http:Listener(streamTest2) {
resource function post receiver(http:Caller caller, http:Request request) {
http:Response res = new;
stream<byte[], io:Error?>|error streamer = request.getByteStream();

if (streamer is stream<byte[], io:Error?>) {
io:Error? result = io:fileWriteBlocksFromStream("tests/tempfiles/ReceivedFile.pdf", streamer);

if (result is error) {
log:printError("error occurred while writing ", 'error = result);
setError(res, result);
} else {
res.setPayload("File Received!");
error? removeResults = file:remove("tests/tempfiles", file:RECURSIVE); // Removes file.
}
var cr = streamer.close();
if (cr is error) {
log:printError("Error occurred while closing the stream: ", 'error = cr);
}
} else {
setError(res, streamer);
}
var result = caller->respond(res);
if (result is error) {
log:printError("Error occurred while sending response", 'error = result);
}
}
}

function setError(http:Response res, error err) {
res.statusCode = 500;
res.setPayload(<@untainted>err.message());
}

@test:Config {}
function testStreamingLargeFile() {
var response = streamTestClient->get("/stream/fileupload");
if (response is http:Response) {
test:assertEquals(response.statusCode, 200, msg = "Found unexpected output");
assertHeaderValue(checkpanic response.getHeader(CONTENT_TYPE), TEXT_PLAIN);
assertTextPayload(response.getTextPayload(), "File Received!");
} else {
test:assertFail(msg = "Found unexpected output type: " + response.message());
}
}

@test:Config {}
function testConsumedStream() returns error? {
string msg = "Error occurred while retrieving the byte stream from the response";
string cMsg = "Byte stream is not available but payload can be obtain either as xml, json, string or byte[] type";
var response = streamTestClient->get("/stream/cacheFileupload");
if (response is http:Response) {
byte[] bytes = check response.getBinaryPayload();
stream<byte[], io:Error?>|error streamer = response.getByteStream();
if (streamer is stream<byte[], io:Error?>) {
test:assertFail(msg = "Found unexpected output type");
} else {
test:assertEquals(streamer.message(), msg, msg = "Found unexpected output");
error? cause = streamer.cause();
if (cause is error) {
test:assertEquals(cause.message(), cMsg, msg = "Found unexpected output");
} else {
test:assertFail(msg = "Found unexpected output type");
}
}
} else {
test:assertFail(msg = "Found unexpected output type: " + response.message());
}
}
2 changes: 2 additions & 0 deletions http-ballerina-tests/tests/test_service_ports.bal
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ const int entityTest = 9097;
const int mimeTest = 9096;
const int proxyTest1 = 9019;
const int proxyTest2 = 9020;
const int streamTest1 = 9021;
const int streamTest2 = 9022;

// Integration test ports
// HTTP
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,15 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.Locale;
import java.util.Objects;

import static org.ballerinalang.mime.util.EntityBodyHandler.constructBlobDataSource;
import static org.ballerinalang.mime.util.EntityBodyHandler.constructJsonDataSource;
import static org.ballerinalang.mime.util.EntityBodyHandler.constructStringDataSource;
import static org.ballerinalang.mime.util.EntityBodyHandler.constructXmlDataSource;
import static org.ballerinalang.mime.util.EntityBodyHandler.isStreamingRequired;
import static org.ballerinalang.mime.util.MimeConstants.ENTITY_BYTE_CHANNEL;
import static org.ballerinalang.mime.util.MimeConstants.MESSAGE_DATA_SOURCE;
import static org.ballerinalang.mime.util.MimeConstants.NO_CONTENT_ERROR;
import static org.ballerinalang.mime.util.MimeConstants.PARSER_ERROR;
import static org.ballerinalang.mime.util.MimeConstants.TRANSPORT_MESSAGE;
Expand Down Expand Up @@ -146,8 +148,12 @@ public static Object getByteChannel(BObject entityObj) {
}

public static void populateInputStream(BObject entityObj) {
Object dataSource = entityObj.getNativeData(MESSAGE_DATA_SOURCE);
if (Objects.nonNull(dataSource)) {
return;
}
HttpCarbonMessage httpCarbonMessage = (HttpCarbonMessage) entityObj.getNativeData(TRANSPORT_MESSAGE);
if (httpCarbonMessage != null) {
if (Objects.nonNull(httpCarbonMessage)) {
HttpMessageDataStreamer httpMessageDataStreamer = new HttpMessageDataStreamer(httpCarbonMessage);

long contentLength = HttpUtil.extractContentLength(httpCarbonMessage);
Expand Down

0 comments on commit 8d60679

Please sign in to comment.