Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update the Supervisor endpoint to not restart the Supervisor if the spec was unmodified #17707

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
57 changes: 57 additions & 0 deletions docs/api-reference/supervisor-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -2353,6 +2353,63 @@ Content-Length: 1359
</TabItem>
</Tabs>

#### Sample request with skipRestartIfUnmodified
The following example sets the skipRestartIfUnmodified flag to true. With this flag set to true, the Supervisor will only restart if there has been a modification to the SupervisorSpec.
```shell
curl "http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/supervisor?skipRestartIfUnmodified=true" \
--header 'Content-Type: application/json' \
--data '{
"type": "kafka",
"spec": {
"ioConfig": {
"type": "kafka",
"consumerProperties": {
"bootstrap.servers": "localhost:9094"
},
"topic": "social_media",
"inputFormat": {
"type": "json"
},
"useEarliestOffset": true
},
"tuningConfig": {
"type": "kafka"
},
"dataSchema": {
"dataSource": "social_media",
"timestampSpec": {
"column": "__time",
"format": "iso"
},
"dimensionsSpec": {
"dimensions": [
"username",
"post_title",
{
"type": "long",
"name": "views"
},
{
"type": "long",
"name": "upvotes"
},
{
"type": "long",
"name": "comments"
},
"edited"
]
},
"granularitySpec": {
"queryGranularity": "none",
"rollup": false,
"segmentGranularity": "hour"
}
}
}
}'
```

#### Sample response

<details>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@

package org.apache.druid.indexing.overlord.supervisor;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.error.DruidException;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
Expand All @@ -42,6 +45,7 @@

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -62,10 +66,12 @@ public class SupervisorManager
private final Object lock = new Object();

private volatile boolean started = false;
private final ObjectMapper jsonMapper;

@Inject
public SupervisorManager(MetadataSupervisorManager metadataSupervisorManager)
public SupervisorManager(@Json ObjectMapper jsonMapper, MetadataSupervisorManager metadataSupervisorManager)
{
this.jsonMapper = jsonMapper;
this.metadataSupervisorManager = metadataSupervisorManager;
}

Expand Down Expand Up @@ -166,6 +172,36 @@ public boolean createOrUpdateAndStartSupervisor(SupervisorSpec spec)
}
}

/**
* Checks whether the submitted SupervisorSpec differs from the current spec in SupervisorManager's supervisor list.
* This is used in SupervisorResource specPost to determine whether the Supervisor needs to be restarted
* @param spec The spec submitted
* @return boolean - false if the spec is unchanged, else true
*/
public boolean shouldUpdateSupervisor(SupervisorSpec spec)
{
Preconditions.checkState(started, "SupervisorManager not started");
Preconditions.checkNotNull(spec, "spec");
Preconditions.checkNotNull(spec.getId(), "spec.getId()");
Preconditions.checkNotNull(spec.getDataSources(), "spec.getDatasources()");
synchronized (lock) {
Preconditions.checkState(started, "SupervisorManager not started");
try {
byte[] specAsBytes = jsonMapper.writeValueAsBytes(spec);
Pair<Supervisor, SupervisorSpec> currentSupervisor = supervisors.get(spec.getId());
if (currentSupervisor != null &&
Arrays.equals(specAsBytes, jsonMapper.writeValueAsBytes(currentSupervisor.rhs))
) {
return false;
}
}
catch (JsonProcessingException ex) {
log.warn("Failed to write spec as bytes for spec_id[%s]", spec.getId());
}
}
return true;
}

public boolean stopAndRemoveSupervisor(String id)
{
Preconditions.checkState(started, "SupervisorManager not started");
Expand Down Expand Up @@ -363,8 +399,12 @@ public boolean registerUpgradedPendingSegmentOnSupervisor(
return true;
}
catch (Exception e) {
log.error(e, "Failed to upgrade pending segment[%s] to new pending segment[%s] on Supervisor[%s].",
upgradedPendingSegment.getUpgradedFromSegmentId(), upgradedPendingSegment.getId().getVersion(), supervisorId);
log.error(e,
"Failed to upgrade pending segment[%s] to new pending segment[%s] on Supervisor[%s].",
upgradedPendingSegment.getUpgradedFromSegmentId(),
upgradedPendingSegment.getId().getVersion(),
supervisorId
);
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,11 @@ public SupervisorResource(
@POST
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public Response specPost(final SupervisorSpec spec, @Context final HttpServletRequest req)
public Response specPost(
final SupervisorSpec spec,
@Context final HttpServletRequest req,
@QueryParam("skipRestartIfUnmodified") boolean skipRestartIfUnmodified
)
{
return asLeaderWithSupervisorManager(
manager -> {
Expand Down Expand Up @@ -151,6 +155,9 @@ public Response specPost(final SupervisorSpec spec, @Context final HttpServletRe
if (!authResult.allowAccessWithNoRestriction()) {
throw new ForbiddenException(authResult.getErrorMessage());
}
if (skipRestartIfUnmodified && !manager.shouldUpdateSupervisor(spec)) {
return Response.ok(ImmutableMap.of("id", spec.getId())).build();
}

manager.createOrUpdateAndStartSupervisor(spec);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.indexing.overlord.supervisor;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand All @@ -33,6 +34,7 @@
import org.apache.druid.indexing.seekablestream.TestSeekableStreamDataSourceMetadata;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.metadata.MetadataSupervisorManager;
Expand Down Expand Up @@ -60,6 +62,7 @@
@RunWith(EasyMockRunner.class)
public class SupervisorManagerTest extends EasyMockSupport
{
private static final ObjectMapper MAPPER = new DefaultObjectMapper();
@Mock
private MetadataSupervisorManager metadataSupervisorManager;

Expand All @@ -80,7 +83,7 @@ public class SupervisorManagerTest extends EasyMockSupport
@Before
public void setUp()
{
manager = new SupervisorManager(metadataSupervisorManager);
manager = new SupervisorManager(MAPPER, metadataSupervisorManager);
}

@Test
Expand Down Expand Up @@ -175,6 +178,21 @@ public void testCreateOrUpdateAndStartSupervisorNullSpecId()
verifyAll();
}

@Test
public void testShouldUpdateSupervisor()
{
SupervisorSpec spec = new TestSupervisorSpec("id1", supervisor1);
SupervisorSpec spec2 = new TestSupervisorSpec("id2", supervisor2);
Map<String, SupervisorSpec> existingSpecs = ImmutableMap.of(
"id1", spec
);
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
supervisor1.start();
replayAll();
manager.start();
Assert.assertFalse(manager.shouldUpdateSupervisor(spec));
Assert.assertTrue(manager.shouldUpdateSupervisor(spec2));
}
@Test
public void testStopAndRemoveSupervisorNotStarted()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public List<String> getDataSources()

replayAll();

Response response = supervisorResource.specPost(spec, request);
Response response = supervisorResource.specPost(spec, request, false);
verifyAll();

Assert.assertEquals(200, response.getStatus());
Expand All @@ -171,12 +171,61 @@ public List<String> getDataSources()
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent());
replayAll();

response = supervisorResource.specPost(spec, request);
response = supervisorResource.specPost(spec, request, false);
verifyAll();

Assert.assertEquals(503, response.getStatus());
}

@Test
public void testSpecPostskipRestartIfUnmodifiedTrue()
{
SupervisorSpec spec = new TestSupervisorSpec("my-id", null, null)
{

@Override
public List<String> getDataSources()
{
return Collections.singletonList("datasource1");
}
};

EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
EasyMock.expect(supervisorManager.shouldUpdateSupervisor(spec)).andReturn(false);

setupMockRequest();

EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(true);
replayAll();

Response response = supervisorResource.specPost(spec, request, true);
verifyAll();

Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(ImmutableMap.of("id", "my-id"), response.getEntity());

resetAll();

EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
EasyMock.expect(supervisorManager.shouldUpdateSupervisor(spec)).andReturn(true);
EasyMock.expect(supervisorManager.createOrUpdateAndStartSupervisor(spec)).andReturn(true);

setupMockRequest();
setupMockRequestForAudit();

EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(true);
auditManager.doAudit(EasyMock.anyObject());
EasyMock.expectLastCall().once();

replayAll();

response = supervisorResource.specPost(spec, request, true);
verifyAll();

Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(ImmutableMap.of("id", "my-id"), response.getEntity());
}

@Test
public void testSpecPostWithInputSourceSecurityEnabledAuthorized()
{
Expand All @@ -201,7 +250,7 @@ public List<String> getDataSources()
EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(true);
replayAll();

Response response = supervisorResource.specPost(spec, request);
Response response = supervisorResource.specPost(spec, request, false);
verifyAll();

Assert.assertEquals(200, response.getStatus());
Expand All @@ -211,7 +260,7 @@ public List<String> getDataSources()
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent());
replayAll();

response = supervisorResource.specPost(spec, request);
response = supervisorResource.specPost(spec, request, false);
verifyAll();

Assert.assertEquals(503, response.getStatus());
Expand Down Expand Up @@ -241,7 +290,7 @@ public List<String> getDataSources()
EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(true);
replayAll();

Assert.assertThrows(ForbiddenException.class, () -> supervisorResource.specPost(spec, request));
Assert.assertThrows(ForbiddenException.class, () -> supervisorResource.specPost(spec, request, false));
verifyAll();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ protected void makeToolboxFactory(TestUtils testUtils, ServiceEmitter emitter, b
taskStorage,
metadataStorageCoordinator,
emitter,
new SupervisorManager(null)
new SupervisorManager(OBJECT_MAPPER, null)
{
@Override
public boolean checkPointDataSourceMetadata(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,10 @@ public boolean overrideRule(final String dataSource, final List<Rule> newRules,
final String ruleString;
try {
ruleString = jsonMapper.writeValueAsString(newRules);
if (ruleString.equals(jsonMapper.writeValueAsString(rules.get().get(dataSource)))) {
log.info("Retention rules unchanged for datasource[%s] with rules[%s]", dataSource, ruleString);
return true;
}
log.info("Updating datasource[%s] with rules[%s] as per [%s]", dataSource, ruleString, auditInfo);
}
catch (JsonProcessingException e) {
Expand Down
Loading
Loading