Skip to content

Commit

Permalink
Update FlightSqlProducer to conform to new design.
Browse files Browse the repository at this point in the history
Rename files for SQL -> Sql.
Correct compilation errors in client code, but design needs to be updated.

Tests do not yet compile.
  • Loading branch information
kylep-dremio authored and rafael-telles committed Jan 27, 2022
1 parent 427f192 commit 4c02d2b
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

Expand All @@ -29,11 +28,9 @@
import org.apache.arrow.flight.FlightDescriptor;
import org.apache.arrow.flight.FlightInfo;
import org.apache.arrow.flight.Result;
import org.apache.arrow.flight.sql.impl.FlightSQL;
import org.apache.arrow.flight.sql.impl.FlightSQL.ActionGetPreparedStatementResult;
import org.apache.arrow.flight.sql.impl.FlightSQL.ActionGetTablesRequest;
import org.apache.arrow.flight.sql.impl.FlightSQL.ActionGetTablesResult;
import org.apache.arrow.flight.sql.impl.FlightSQL.CommandPreparedStatementQuery;
import org.apache.arrow.flight.sql.impl.FlightSql;
import org.apache.arrow.flight.sql.impl.FlightSql.ActionCreatePreparedStatementResult;
import org.apache.arrow.flight.sql.impl.FlightSql.CommandPreparedStatementQuery;
import org.apache.arrow.vector.types.pojo.Schema;

import com.google.protobuf.Any;
Expand All @@ -44,7 +41,7 @@
/**
* Client side utilities to work with Flight SQL semantics.
*/
public final class FlightSQLClientUtils {
public final class FlightSqlClientUtils {

/**
* Helper method to request a list of tables from a Flight SQL enabled endpoint.
Expand All @@ -55,42 +52,32 @@ public final class FlightSQLClientUtils {
* @param tableFilterPattern The table filter pattern.
* @param tableTypes The table types to include.
* @param includeSchema True to include the schema upon return, false to not include the schema.
* @return A list of tables matching the criteria.
* @return a FlightInfo object representing the stream(s) to fetch.
*/
public static List<ActionGetTablesResult> getTables(FlightClient client, String catalog, String schemaFilterPattern,
public static FlightInfo getTables(FlightClient client, String catalog, String schemaFilterPattern,
String tableFilterPattern, List<String> tableTypes, boolean includeSchema) {

final ActionGetTablesRequest.Builder requestBuilder = ActionGetTablesRequest
.newBuilder()
.setIncludeSchema(includeSchema);
final FlightSql.CommandGetTables.Builder builder = FlightSql.CommandGetTables.newBuilder();

if (catalog != null) {
requestBuilder.setCatalog(catalog);
builder.setCatalog(catalog);
}

if (schemaFilterPattern != null) {
requestBuilder.setSchemaFilterPattern(schemaFilterPattern);
builder.setSchemaFilterPattern(schemaFilterPattern);
}

if (tableFilterPattern != null) {
requestBuilder.setTableNameFilterPattern(tableFilterPattern);
builder.setTableNameFilterPattern(tableFilterPattern);
}

if (tableTypes != null) {
requestBuilder.addAllTableTypes(tableTypes);
builder.addAllTableTypes(tableTypes);
}
builder.setIncludeSchema(includeSchema);

final Iterator<Result> results = client.doAction(new Action(
"GetTables", Any.pack(requestBuilder.build()).toByteArray()));

final List<ActionGetTablesResult> getTablesResults = new ArrayList<>();
results.forEachRemaining(result -> {
ActionGetTablesResult actual = FlightSQLUtils.unpackAndParseOrThrow(result.getBody(),
ActionGetTablesResult.class);
getTablesResults.add(actual);
});

return getTablesResults;
final FlightDescriptor descriptor = FlightDescriptor.command(Any.pack(builder.build()).toByteArray());
return client.getInfo(descriptor);
}

/**
Expand All @@ -100,16 +87,16 @@ public static List<ActionGetTablesResult> getTables(FlightClient client, String
* @param query The query to prepare.
* @return Metadata and handles to the prepared statement which exists on the server.
*/
public static FlightSQLPreparedStatement getPreparedStatement(FlightClient client, String query) {
return new FlightSQLPreparedStatement(client, query);
public static FlightSqlPreparedStatement getPreparedStatement(FlightClient client, String query) {
return new FlightSqlPreparedStatement(client, query);
}

/**
* Helper class to encapsulate Flight SQL prepared statement logic.
*/
public static class FlightSQLPreparedStatement implements Closeable {
public static class FlightSqlPreparedStatement implements Closeable {
private final FlightClient client;
private final ActionGetPreparedStatementResult preparedStatementResult;
private final ActionCreatePreparedStatementResult preparedStatementResult;
private long invocationCount;
private boolean isClosed;
private Schema resultSetSchema = null;
Expand All @@ -118,22 +105,22 @@ public static class FlightSQLPreparedStatement implements Closeable {
/**
* Constructor.
*
* @param client The client. FlightSQLPreparedStatement does not maintain this resource.
* @param client The client. FlightSqlPreparedStatement does not maintain this resource.
* @param sql The query.
*/
public FlightSQLPreparedStatement(FlightClient client, String sql) {
public FlightSqlPreparedStatement(FlightClient client, String sql) {
this.client = client;

final Iterator<Result> preparedStatementResults = client.doAction(new Action("GetPreparedStatement",
Any.pack(FlightSQL.ActionGetPreparedStatementRequest
Any.pack(FlightSql.ActionCreatePreparedStatementRequest
.newBuilder()
.setQuery(sql)
.build())
.toByteArray()));

preparedStatementResult = FlightSQLUtils.unpackAndParseOrThrow(
preparedStatementResult = FlightSqlUtils.unpackAndParseOrThrow(
preparedStatementResults.next().getBody(),
ActionGetPreparedStatementResult.class);
ActionCreatePreparedStatementResult.class);

invocationCount = 0;
isClosed = false;
Expand Down Expand Up @@ -198,7 +185,7 @@ public long executeUpdate() {
public void close() {
isClosed = true;
final Iterator<Result> closePreparedStatementResults = client.doAction(new Action("ClosePreparedStatement",
Any.pack(FlightSQL.ActionClosePreparedStatementRequest
Any.pack(FlightSql.ActionClosePreparedStatementRequest
.newBuilder()
.setPreparedStatementHandleBytes(preparedStatementResult.getPreparedStatementHandle())
.build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.arrow.flight.sql;

import static org.apache.arrow.flight.sql.FlightSQLUtils.getArrowTypeFromJDBCType;
import static org.apache.arrow.flight.sql.FlightSqlUtils.getArrowTypeFromJDBCType;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -259,7 +259,7 @@ private Schema buildSchema(String catalog, String schema, String table) throws S

final int precision = columns.getInt("DECIMAL_DIGITS");
final int scale = columns.getInt("COLUMN_SIZE");
final ArrowType arrowType = FlightSQLUtils.getArrowTypeFromJDBCType(jdbcDataType, precision, scale);
final ArrowType arrowType = FlightSqlUtils.getArrowTypeFromJDBCType(jdbcDataType, precision, scale);

final FieldType fieldType = new FieldType(arrowIsNullable, arrowType, /*dictionary=*/null);
fields.add(new Field(columnName, fieldType, null));
Expand Down

0 comments on commit 4c02d2b

Please sign in to comment.