Skip to content
This repository has been archived by the owner on Jul 12, 2023. It is now read-only.

Add param to endpoint #1065

Merged
merged 13 commits into from
Apr 12, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 3 additions & 9 deletions doc/api.apib
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ Resources related to the Workflows registered in Styx.
+ version: `v3` (enum[string]) - API version
+ Members
+ `v3`
+ full: `true` (optional, boolean) - Return the state of the workflow

#### Without `full`
+ Response 200 (application/json)

[{
Expand All @@ -72,15 +74,7 @@ Resources related to the Workflows registered in Styx.
}
}]


## Workflows [/{version}/workflows/full]

+ Parameters
+ version: `v3` (enum[string]) - API version
+ Members
+ `v3`

### Get Workflows with State [GET]
#### With full=true

+ Response 200 (application/json)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import static com.spotify.styx.api.Api.Version.V3;
import static com.spotify.styx.api.Middlewares.json;
import static com.spotify.styx.api.util.QueryParams.INCLUDE_STATES;
import static com.spotify.styx.api.util.WorkflowFiltering.filterWorkflows;
import static com.spotify.styx.serialization.Json.OBJECT_MAPPER;

Expand Down Expand Up @@ -107,10 +108,7 @@ public Stream<Route<AsyncHandler<Response<ByteString>>>> routes(
json(), "GET", BASE + "/<cid>/<wfid>/full",
rc -> workflowWithState(arg("cid", rc), arg("wfid", rc))),
Route.with(
json(), "GET", BASE + "/full",
rc -> workflowsWithState()),
Route.with(
json(), "GET", BASE,
json(), "GET", BASE + "",
rc -> workflows(rc.request())),
Route.with(
json(), "GET", BASE + "/<cid>",
Expand Down Expand Up @@ -219,31 +217,32 @@ private WorkflowConfiguration readFromJsonWithDefaults(ByteString payload)
return WorkflowConfigurationBuilder.from(workflowConfig).deploymentTime(time.get()).build();
}

private Response<Collection<WorkflowWithState>> workflowsWithState() {
try {
Collection<WorkflowWithState> workflows = storage.workflowsWithState().values();
return Response.forPayload(workflows);
} catch (IOException e) {
throw new RuntimeException("Failed to get workflows", e);
}
}

private Response<Collection<Workflow>> workflows(Request request) {
private Response<? extends Collection<?>> workflows(Request request) {
try {
var paramFilters = Stream.of(QueryParams.values())
.map(e -> getFilterParams(request, e).map(param -> Map.entry(e, param)))
.flatMap(Optional::stream)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

Collection<Workflow> workflows = storage.workflows().values();
Collection<WorkflowWithState> workflowsWithState = storage.workflowsWithState().values();
Collection<WorkflowWithState> filteredWorkflows = filterWorkflows(workflowsWithState, paramFilters);
Optional<String> includeStates = getFilterParams(request, INCLUDE_STATES);

return Response.forPayload(filterWorkflows(workflows, paramFilters));
if (shouldIncludeState(includeStates)) {
return Response.forPayload(filteredWorkflows);
}

List<Workflow> workflows = filteredWorkflows.stream().map(WorkflowWithState::workflow).collect(Collectors.toList());
return Response.forPayload(workflows);
} catch (IOException e) {
throw new RuntimeException("Failed to get workflows", e);
}
}

private boolean shouldIncludeState(Optional<String> includeStates) {
return includeStates.filter(Boolean::parseBoolean).isPresent();
}

private Optional<String> getFilterParams(Request request, QueryParams filter) {
return request.parameter(filter.getQueryName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package com.spotify.styx.api.util;

public enum QueryParams {
INCLUDE_STATES("full"),
DEPLOYMENT_TYPE("deployment_type"),
DEPLOYMENT_TIME_BEFORE("deployment_time_before"),
DEPLOYMENT_TIME_AFTER("deployment_time_after");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,25 @@
import java.util.Map;
import java.util.Objects;
import java.util.function.Predicate;

import com.spotify.styx.model.WorkflowWithState;
import joptsimple.internal.Strings;

public final class WorkflowFiltering {

private WorkflowFiltering(){}

public static Collection<Workflow> filterWorkflows(
Collection<Workflow> workflows, Map<QueryParams, String> paramFilters){
public static Collection<WorkflowWithState> filterWorkflows(
Collection<WorkflowWithState> workflowWithStates, Map<QueryParams, String> paramFilters){
if (paramFilters.isEmpty()) {
// Nothing to filter on
return workflows;
return workflowWithStates;
}
List<Predicate> workflowFilters = createWorkflowFilters(paramFilters);

return workflows.stream()
return workflowWithStates.stream()
.filter(w -> workflowFilters.stream()
.allMatch(pre-> pre.test(w)))
.allMatch(pre-> pre.test(w.workflow())))
.collect(toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,7 @@ public void shouldReturnWorkflowsWithState() throws Exception {
sinceVersion(Api.Version.V3);

Response<ByteString> response = awaitResponse(
serviceHelper.request("GET", path("/full")));
serviceHelper.request("GET", path("?full=true")));

var parsedResponse = Arrays.asList(deserialize(response.payload().orElseThrow(), WorkflowWithState[].class));
var expectedWF1 = WorkflowWithState.create(FLYTE_EXEC_WORKFLOW, WorkflowState.builder().enabled(false).build());
Expand All @@ -850,7 +850,7 @@ public void shouldReturnWorkflowsWithState() throws Exception {
public void shouldReturnFilteredDeploymentTypeWorkflow() throws Exception {
sinceVersion(Api.Version.V3);

when(storage.workflows()).thenReturn(
when(storage.workflowsWithState()).thenReturn(
buildWorkflowMap(
createWorkflowWithType("id1", "remote-foo"),
createWorkflowWithType("id2", "not-remote-foo")
Expand All @@ -870,7 +870,7 @@ public void shouldReturnFilteredDeploymentTypeWorkflow() throws Exception {
public void shouldReturnFilteredDeploymentTimeBeforeWorkflow() throws Exception {
sinceVersion(Api.Version.V3);

when(storage.workflows()).thenReturn(
when(storage.workflowsWithState()).thenReturn(
buildWorkflowMap(
createWorkflowWithType("id1", "remote-foo"),
createWorkflowWithTime("id2", QUERY_THRESHOLD_BEFORE),
Expand All @@ -890,7 +890,7 @@ public void shouldReturnFilteredDeploymentTimeBeforeWorkflow() throws Exception
public void shouldReturnFilteredDeploymentTimeAfterWorkflow() throws Exception {
sinceVersion(Api.Version.V3);

when(storage.workflows()).thenReturn(
when(storage.workflowsWithState()).thenReturn(
buildWorkflowMap(
createWorkflowWithType("id1", "remote-foo"),
createWorkflowWithTime("id2", QUERY_THRESHOLD_BEFORE),
Expand All @@ -915,7 +915,7 @@ public void shouldReturnEmptyDeploymentTimeBeforeAndAfterWorkflow() throws Excep
var queryThresholdBefore = "2022-01-01T10:15:27.00Z";
var queryThresholdAfter = "2022-01-01T10:15:33.00Z";

when(storage.workflows()).thenReturn(
when(storage.workflowsWithState()).thenReturn(
buildWorkflowMap(
createWorkflowWithType("id1", "remote-foo"),
createWorkflowWithTime("id2", Instant.parse(queryThresholdBefore)),
Expand All @@ -938,7 +938,7 @@ public void shouldReturnFilteredDeploymentTimeBeforeAndAfterWorkflow() throws Ex
var deploymentTimeAfter = "2022-01-01T10:15:28.00Z";
var deploymentTimeBefore = "2022-01-01T10:15:32.00Z";

when(storage.workflows()).thenReturn(
when(storage.workflowsWithState()).thenReturn(
buildWorkflowMap(
createWorkflowWithType("id1", "remote-foo"),
createWorkflowWithTime("id2", QUERY_THRESHOLD_BEFORE),
Expand All @@ -958,7 +958,7 @@ public void shouldReturnFilteredDeploymentTimeBeforeAndAfterWorkflow() throws Ex
public void shouldReturnFilteredDeploymentTypeTimeBeforeWorkflow() throws Exception {
sinceVersion(Api.Version.V3);

when(storage.workflows()).thenReturn(
when(storage.workflowsWithState()).thenReturn(
buildWorkflowMap(
createWorkflowWithType("id1", "remote-foo"),
createWorkflowWithTypeAndTime("id2", "remote-foo", QUERY_THRESHOLD_BEFORE),
Expand All @@ -978,7 +978,7 @@ public void shouldReturnFilteredDeploymentTypeTimeBeforeWorkflow() throws Except
public void shouldReturnFilteredDeploymentTypeTimeAfterWorkflow() throws Exception {
sinceVersion(Api.Version.V3);

when(storage.workflows()).thenReturn(
when(storage.workflowsWithState()).thenReturn(
buildWorkflowMap(
createWorkflowWithType("id1", "remote-foo"),
createWorkflowWithTime("id2", QUERY_THRESHOLD_BEFORE),
Expand All @@ -1000,7 +1000,7 @@ public void shouldReturnFilteredDeploymentTypeTimeBeforeAndAfterWorkflow() throw
var deploymentTimeAfter = "2022-01-01T10:15:28.00Z";
var deploymentTimeBefore = "2022-01-01T10:15:32.00Z";

when(storage.workflows()).thenReturn(
when(storage.workflowsWithState()).thenReturn(
buildWorkflowMap(
createWorkflowWithType("id1", "remote-foo"),
createWorkflowWithTypeAndTime("id2", "remote-foo", QUERY_THRESHOLD_BEFORE),
Expand All @@ -1023,7 +1023,7 @@ public void shouldReturnFilteredDeploymentTypeTimeBeforeAndAfterWorkflow() throw
public void shouldFailedToReturnWorkflows() throws Exception {
sinceVersion(Api.Version.V3);

when(storage.workflows()).thenThrow(new IOException());
when(storage.workflowsWithState()).thenThrow(new IOException());

Response<ByteString> response = awaitResponse(
serviceHelper.request("GET", path("")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,35 +25,44 @@
import com.spotify.styx.model.Workflow;
import com.spotify.styx.model.WorkflowConfiguration;
import com.spotify.styx.model.WorkflowId;
import com.spotify.styx.model.WorkflowState;
import com.spotify.styx.model.WorkflowWithState;

import java.time.Instant;
import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;

public class CreateWorkflowUtil {

public static WorkflowState STATE = WorkflowState.builder().enabled(true).build();
private CreateWorkflowUtil() {
// no instantiation
}

public static Workflow createWorkflowWithType(String id, String type) {
return Workflow.create(id, WorkflowConfiguration.builder().id(id).schedule(Schedule.DAYS)
.deploymentSource(DeploymentSource.builder().source(type).build()).build());

public static WorkflowWithState workflowWithState(Workflow workflow) {
return WorkflowWithState.create(workflow, STATE);
}

public static WorkflowWithState createWorkflowWithType(String id, String type) {
return workflowWithState(Workflow.create(id, WorkflowConfiguration.builder().id(id).schedule(Schedule.DAYS)
.deploymentSource(DeploymentSource.builder().source(type).build()).build()));
}

public static Workflow createWorkflowWithTime(String id, Instant instant) {
return Workflow.create(id,
WorkflowConfiguration.builder().id(id).schedule(Schedule.DAYS).deploymentTime(instant).build());
public static WorkflowWithState createWorkflowWithTime(String id, Instant instant) {
return workflowWithState(Workflow.create(id,
WorkflowConfiguration.builder().id(id).schedule(Schedule.DAYS).deploymentTime(instant).build()));
}

public static Workflow createWorkflowWithTypeAndTime(String id, String type, Instant instant) {
return Workflow.create(id,
public static WorkflowWithState createWorkflowWithTypeAndTime(String id, String type, Instant instant) {
return workflowWithState(Workflow.create(id,
WorkflowConfiguration.builder().id(id).schedule(Schedule.DAYS).deploymentTime(instant).deploymentSource(
DeploymentSource.builder().source(type).build()).build());
DeploymentSource.builder().source(type).build()).build()));
}

public static Map<WorkflowId, Workflow> buildWorkflowMap(Workflow... workflows){
return Arrays.stream(workflows).collect(Collectors.toMap(Workflow::id, w->w));
public static Map<WorkflowId, WorkflowWithState> buildWorkflowMap(WorkflowWithState... workflows){
return Arrays.stream(workflows).collect(Collectors.toMap(w-> w.workflow().id(), w->w));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,11 @@
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.not;

import com.spotify.styx.model.Workflow;
import java.time.Instant;
import java.util.List;
import java.util.Map;

import com.spotify.styx.model.WorkflowWithState;
import org.junit.Test;

public class WorkflowFilteringTest {
Expand Down Expand Up @@ -74,8 +75,8 @@ public void shouldNotReturnWorkflowsWithDeploymentType() {
@Test
public void shouldReturnWorkflowsWithDeploymentType() {

Workflow validWorkflow = createWorkflowWithType("id1", "remote-foo");
Workflow invalidWorkflow = createWorkflowWithType("id2", ""); // Empty type
WorkflowWithState validWorkflow = createWorkflowWithType("id1", "remote-foo");
WorkflowWithState invalidWorkflow = createWorkflowWithType("id2", ""); // Empty type

var workflowCollection = List.of(validWorkflow, invalidWorkflow);

Expand All @@ -91,8 +92,8 @@ public void shouldReturnWorkflowsWithDeploymentType() {
@Test
public void shouldReturnWorkflowsWithDeploymentTimeBefore() {

Workflow invalidWorkflow = createWorkflowWithType("id1", "remote-foo");
Workflow validWorkflow = createWorkflowWithTime("id2", QUERY_THRESHOLD_BEFORE);
WorkflowWithState invalidWorkflow = createWorkflowWithType("id1", "remote-foo");
WorkflowWithState validWorkflow = createWorkflowWithTime("id2", QUERY_THRESHOLD_BEFORE);

var workflowCollection = List.of(validWorkflow, invalidWorkflow);

Expand All @@ -107,8 +108,8 @@ public void shouldReturnWorkflowsWithDeploymentTimeBefore() {

@Test
public void shouldReturnWorkflowsWithDeploymentTimeAfter() {
Workflow validWorkflow = createWorkflowWithTime("id2", QUERY_THRESHOLD_AFTER);
Workflow invalidWorkflow = createWorkflowWithTime("id2", QUERY_THRESHOLD_BEFORE);
WorkflowWithState validWorkflow = createWorkflowWithTime("id2", QUERY_THRESHOLD_AFTER);
WorkflowWithState invalidWorkflow = createWorkflowWithTime("id2", QUERY_THRESHOLD_BEFORE);

var workflowCollection = List.of(validWorkflow, invalidWorkflow);

Expand All @@ -127,8 +128,8 @@ public void shouldReturnWorkflowsWithDeploymentTypeDeploymentTimeBeforeAndAfter(
var deploymentTimeBefore = "2022-01-01T10:15:32.00Z";
var queryThresholdOutsideWindow = "2022-01-01T10:15:33.00Z";

Workflow validWorkflow = createWorkflowWithTypeAndTime("id1", "remote-foo", QUERY_THRESHOLD);
Workflow invalidWorkflow = createWorkflowWithTypeAndTime("id2", "remote-foo",
WorkflowWithState validWorkflow = createWorkflowWithTypeAndTime("id1", "remote-foo", QUERY_THRESHOLD);
WorkflowWithState invalidWorkflow = createWorkflowWithTypeAndTime("id2", "remote-foo",
Instant.parse(queryThresholdOutsideWindow));

var workflowCollection = List.of(validWorkflow, invalidWorkflow);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.HashMap;

import org.apache.hadoop.hbase.client.Connection;

/**
Expand Down Expand Up @@ -198,7 +198,7 @@ public Map<WorkflowId, Workflow> workflows() throws IOException {
}

@Override
public HashMap<WorkflowId, WorkflowWithState> workflowsWithState() throws IOException {
public Map<WorkflowId, WorkflowWithState> workflowsWithState() throws IOException {
return datastoreStorage.workflowsWithState();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,7 @@ Optional<WorkflowWithState> workflowWithState(WorkflowId workflowId) throws IOEx
return Optional.of(WorkflowWithState.create(workflow, workflowState));
}

HashMap<WorkflowId, WorkflowWithState> workflowsWithState() throws IOException {
Map<WorkflowId, WorkflowWithState> workflowsWithState() throws IOException {
HashMap<WorkflowId, WorkflowWithState> workflows = new HashMap<>();
var query = Query.newEntityQueryBuilder().setKind(KIND_WORKFLOW).build();
datastore.query(query, entity -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.HashMap;

/**
* The interface to the persistence layer.
Expand Down Expand Up @@ -135,7 +134,7 @@ public interface Storage extends Closeable {
* Get all {@link WorkflowWithState}s.
* @return
*/
HashMap<WorkflowId, WorkflowWithState> workflowsWithState() throws IOException;
Map<WorkflowId, WorkflowWithState> workflowsWithState() throws IOException;

/** Get all {@link Workflow}s by doing strongly consistent batch fetch.
*
Expand Down