Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enable healthcheck for sql connection #3521

Merged
merged 4 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ public void loadNamespaces() {
final Collection<NamespaceStorage> namespaceStorages = getConfig().getStorage().discoverNamespaceStorages();
for (NamespaceStorage namespaceStorage : namespaceStorages) {
loaders.submit(() -> {
registry.createNamespace(namespaceStorage, getMetaStorage());
registry.createNamespace(namespaceStorage, getMetaStorage(), getEnvironment());
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.bakdata.conquery.models.query.FilterSearch;
import com.bakdata.conquery.models.worker.Namespace;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dropwizard.core.setup.Environment;

/**
* Handler of namespaces in a ConQuery instance.
Expand All @@ -23,7 +24,7 @@
*/
public interface NamespaceHandler<N extends Namespace> {

N createNamespace(NamespaceStorage storage, MetaStorage metaStorage, IndexService indexService);
N createNamespace(NamespaceStorage storage, MetaStorage metaStorage, IndexService indexService, Environment environment);

void removeNamespace(DatasetId id, N namespace);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.bakdata.conquery.models.worker.DistributedNamespace;
import com.bakdata.conquery.models.worker.ShardNodeInformation;
import com.bakdata.conquery.models.worker.WorkerHandler;
import io.dropwizard.core.setup.Environment;
import lombok.RequiredArgsConstructor;

@RequiredArgsConstructor
Expand All @@ -23,7 +24,7 @@ public class ClusterNamespaceHandler implements NamespaceHandler<DistributedName
private final InternalObjectMapperCreator mapperCreator;

@Override
public DistributedNamespace createNamespace(NamespaceStorage storage, final MetaStorage metaStorage, IndexService indexService) {
public DistributedNamespace createNamespace(NamespaceStorage storage, final MetaStorage metaStorage, IndexService indexService, Environment environment) {
NamespaceSetupData namespaceData = NamespaceHandler.createNamespaceSetup(storage, config, mapperCreator, indexService);
DistributedExecutionManager executionManager = new DistributedExecutionManager(metaStorage, clusterState);
WorkerHandler workerHandler = new WorkerHandler(namespaceData.getCommunicationMapper(), storage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.bakdata.conquery.sql.execution.ResultSetProcessorFactory;
import com.bakdata.conquery.sql.execution.SqlExecutionResult;
import com.bakdata.conquery.sql.execution.SqlExecutionService;
import io.dropwizard.core.setup.Environment;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.jooq.DSLContext;
Expand All @@ -37,15 +38,15 @@ public class LocalNamespaceHandler implements NamespaceHandler<LocalNamespace> {
private final SqlDialectFactory dialectFactory;

@Override
public LocalNamespace createNamespace(NamespaceStorage namespaceStorage, MetaStorage metaStorage, IndexService indexService) {
public LocalNamespace createNamespace(NamespaceStorage namespaceStorage, MetaStorage metaStorage, IndexService indexService, Environment environment) {

NamespaceSetupData namespaceData = NamespaceHandler.createNamespaceSetup(namespaceStorage, config, mapperCreator, indexService);

IdColumnConfig idColumns = config.getIdColumns();
SqlConnectorConfig sqlConnectorConfig = config.getSqlConnectorConfig();
DatabaseConfig databaseConfig = sqlConnectorConfig.getDatabaseConfig(namespaceStorage.getDataset());

DSLContextWrapper dslContextWrapper = DslContextFactory.create(databaseConfig, sqlConnectorConfig);
DSLContextWrapper dslContextWrapper = DslContextFactory.create(databaseConfig, sqlConnectorConfig, environment.healthChecks());
DSLContext dslContext = dslContextWrapper.getDslContext();
SqlDialect sqlDialect = dialectFactory.createSqlDialect(databaseConfig.getDialect());

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package com.bakdata.conquery.models.config;

import java.util.Map;
import jakarta.validation.Valid;

import com.bakdata.conquery.models.datasets.Dataset;
import com.fasterxml.jackson.annotation.JsonIgnore;
import io.dropwizard.util.Duration;
import io.dropwizard.validation.ValidationMethod;
import jakarta.validation.Valid;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
Expand All @@ -31,6 +32,11 @@ public class SqlConnectorConfig {
*/
private Map<String, @Valid DatabaseConfig> databaseConfigs;

/**
* Timeout duration after which a database connection is considered unhealthy (defaults to connection timeout)
*/
private Duration connectivityCheckTimeout;

public DatabaseConfig getDatabaseConfig(Dataset dataset) {
return databaseConfigs.get(dataset.getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.fasterxml.jackson.annotation.JsonIgnoreType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.cache.CacheStats;
import io.dropwizard.core.setup.Environment;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -50,7 +51,7 @@ public class DatasetRegistry<N extends Namespace> extends IdResolveContext imple

private final IndexService indexService;

public N createNamespace(Dataset dataset, MetaStorage metaStorage) throws IOException {
public N createNamespace(Dataset dataset, MetaStorage metaStorage, Environment environment) throws IOException {
// Prepare empty storage
NamespaceStorage datasetStorage = new NamespaceStorage(config.getStorage(), "dataset_" + dataset.getName());
final ObjectMapper persistenceMapper = internalObjectMapperCreator.createInternalObjectMapper(View.Persistence.Manager.class);
Expand All @@ -63,11 +64,11 @@ public N createNamespace(Dataset dataset, MetaStorage metaStorage) throws IOExce
datasetStorage.setPreviewConfig(new PreviewConfig());
datasetStorage.close();

return createNamespace(datasetStorage, metaStorage);
return createNamespace(datasetStorage, metaStorage, environment);
}

public N createNamespace(NamespaceStorage datasetStorage, MetaStorage metaStorage) {
final N namespace = namespaceHandler.createNamespace(datasetStorage, metaStorage, indexService);
public N createNamespace(NamespaceStorage datasetStorage, MetaStorage metaStorage, Environment environment) {
final N namespace = namespaceHandler.createNamespace(datasetStorage, metaStorage, indexService, environment);
add(namespace);
return namespace;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,28 @@
import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.jobs.JobManager;
import com.bakdata.conquery.models.worker.DatasetRegistry;
import com.bakdata.conquery.resources.admin.rest.*;
import com.bakdata.conquery.resources.admin.ui.*;
import com.bakdata.conquery.resources.admin.rest.AdminConceptsResource;
import com.bakdata.conquery.resources.admin.rest.AdminDatasetProcessor;
import com.bakdata.conquery.resources.admin.rest.AdminDatasetResource;
import com.bakdata.conquery.resources.admin.rest.AdminDatasetsResource;
import com.bakdata.conquery.resources.admin.rest.AdminProcessor;
import com.bakdata.conquery.resources.admin.rest.AdminResource;
import com.bakdata.conquery.resources.admin.rest.AdminTablesResource;
import com.bakdata.conquery.resources.admin.rest.AuthOverviewResource;
import com.bakdata.conquery.resources.admin.rest.GroupResource;
import com.bakdata.conquery.resources.admin.rest.PermissionResource;
import com.bakdata.conquery.resources.admin.rest.RoleResource;
import com.bakdata.conquery.resources.admin.rest.UIProcessor;
import com.bakdata.conquery.resources.admin.rest.UserResource;
import com.bakdata.conquery.resources.admin.ui.AdminUIResource;
import com.bakdata.conquery.resources.admin.ui.AuthOverviewUIResource;
import com.bakdata.conquery.resources.admin.ui.ConceptsUIResource;
import com.bakdata.conquery.resources.admin.ui.DatasetsUIResource;
import com.bakdata.conquery.resources.admin.ui.GroupUIResource;
import com.bakdata.conquery.resources.admin.ui.IndexServiceUIResource;
import com.bakdata.conquery.resources.admin.ui.RoleUIResource;
import com.bakdata.conquery.resources.admin.ui.TablesUIResource;
import com.bakdata.conquery.resources.admin.ui.UserUIResource;
import com.bakdata.conquery.resources.admin.ui.model.ConnectorUIResource;
import io.dropwizard.core.setup.AdminEnvironment;
import io.dropwizard.jersey.DropwizardResourceConfig;
Expand Down Expand Up @@ -79,12 +99,12 @@ public AdminServlet(ManagerNode manager) {

adminDatasetProcessor = new AdminDatasetProcessor(
manager.getConfig(),
manager.getValidator(),
manager.getDatasetRegistry(),
manager.getMetaStorage(),
manager.getJobManager(),
manager.getImportHandler(),
manager.getStorageListener()
manager.getStorageListener(),
manager.getEnvironment()
);

jerseyConfig.register(new AbstractBinder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import java.util.Set;
import java.util.stream.Collectors;
import jakarta.inject.Inject;
import jakarta.validation.Validator;
import jakarta.ws.rs.ForbiddenException;
import jakarta.ws.rs.NotFoundException;
import jakarta.ws.rs.WebApplicationException;
Expand Down Expand Up @@ -42,6 +41,7 @@
import com.bakdata.conquery.models.worker.DatasetRegistry;
import com.bakdata.conquery.models.worker.Namespace;
import com.univocity.parsers.csv.CsvParser;
import io.dropwizard.core.setup.Environment;
import lombok.Data;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
Expand All @@ -58,12 +58,13 @@ public class AdminDatasetProcessor {
private static final String ABBREVIATION_MARKER = "\u2026";

private final ConqueryConfig config;
private final Validator validator;
private final DatasetRegistry<? extends Namespace> datasetRegistry;
private final MetaStorage metaStorage;
private final JobManager jobManager;
private final ImportHandler importHandler;
private final StorageListener storageListener;
private final Environment environment;



/**
Expand All @@ -76,7 +77,7 @@ public synchronized Dataset addDataset(Dataset dataset) throws IOException {
throw new WebApplicationException("Dataset already exists", Response.Status.CONFLICT);
}

return datasetRegistry.createNamespace(dataset, metaStorage).getDataset();
return datasetRegistry.createNamespace(dataset, metaStorage, environment).getDataset();
}

/**
Expand Down Expand Up @@ -168,7 +169,7 @@ else if (!table.getDataset().equals(dataset)) {
throw new WebApplicationException("Table already exists", Response.Status.CONFLICT);
}

ValidatorHelper.failOnError(log, validator.validate(table));
ValidatorHelper.failOnError(log, environment.getValidator().validate(table));

namespace.getStorage().addTable(table);
storageListener.onAddTable(table);
Expand All @@ -194,7 +195,7 @@ public synchronized void updateConcept(@NonNull Dataset dataset, @NonNull Concep
*/
public synchronized void addConcept(@NonNull Dataset dataset, @NonNull Concept<?> concept, boolean force) {
concept.setDataset(dataset);
ValidatorHelper.failOnError(log, validator.validate(concept));
ValidatorHelper.failOnError(log, environment.getValidator().validate(concept));

if (datasetRegistry.get(dataset.getId()).getStorage().hasConcept(concept.getId())) {
if (!force) {
Expand All @@ -215,7 +216,7 @@ public synchronized void addConcept(@NonNull Dataset dataset, @NonNull Concept<?
public void setPreviewConfig(PreviewConfig previewConfig, Namespace namespace) {
log.info("Received new {}", previewConfig);

ValidatorHelper.failOnError(log, getValidator().validate(previewConfig));
ValidatorHelper.failOnError(log, environment.getValidator().validate(previewConfig));

namespace.getStorage().setPreviewConfig(previewConfig);
}
Expand Down Expand Up @@ -327,7 +328,7 @@ public EntityIdMap getIdMapping(Namespace namespace) {
}

public void addInternToExternMapping(Namespace namespace, InternToExternMapper internToExternMapper) {
ValidatorHelper.failOnError(log, validator.validate(internToExternMapper));
ValidatorHelper.failOnError(log, environment.getValidator().validate(internToExternMapper));

if (namespace.getStorage().getInternToExternMapper(internToExternMapper.getId()) != null) {
throw new WebApplicationException("InternToExternMapping already exists", Response.Status.CONFLICT);
Expand Down Expand Up @@ -371,7 +372,7 @@ public void clearIndexCache(Namespace namespace) {
public void addSearchIndex(Namespace namespace, SearchIndex searchIndex) {
searchIndex.setDataset(namespace.getDataset());

ValidatorHelper.failOnError(log, validator.validate(searchIndex));
ValidatorHelper.failOnError(log, environment.getValidator().validate(searchIndex));

if (namespace.getStorage().getSearchIndex(searchIndex.getId()) != null) {
throw new WebApplicationException("InternToExternMapping already exists", Response.Status.CONFLICT);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package com.bakdata.conquery.sql;

import javax.annotation.Nullable;

import com.bakdata.conquery.models.config.DatabaseConfig;
import com.bakdata.conquery.models.config.SqlConnectorConfig;
import com.codahale.metrics.health.HealthCheckRegistry;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import org.jooq.DSLContext;
Expand All @@ -12,13 +15,21 @@

public class DslContextFactory {

public static DSLContextWrapper create(DatabaseConfig config, SqlConnectorConfig connectorConfig) {
public static DSLContextWrapper create(DatabaseConfig config, SqlConnectorConfig connectorConfig, @Nullable HealthCheckRegistry healthCheckRegistry) {

HikariConfig hikariConfig = new HikariConfig();
hikariConfig.setJdbcUrl(config.getJdbcConnectionUrl());
hikariConfig.setUsername(config.getDatabaseUsername());
hikariConfig.setPassword(config.getDatabasePassword());

if (healthCheckRegistry != null) {
hikariConfig.setHealthCheckRegistry(healthCheckRegistry);
if (connectorConfig.getConnectivityCheckTimeout() != null) {
long connectivityTimeoutMs = connectorConfig.getConnectivityCheckTimeout().toMilliseconds();
hikariConfig.addHealthCheckProperty("connectivityCheckTimeoutMs", Long.toString(connectivityTimeoutMs));
}
}

HikariDataSource hikariDataSource = new HikariDataSource(hikariConfig);

Settings settings = new Settings()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public HanaTestcontainerContextProvider() {
.databasePassword(hanaContainer.getPassword())
.build();
this.sqlConnectorConfig = new TestSqlConnectorConfig(databaseConfig);
this.dslContextWrapper = DslContextFactory.create(this.databaseConfig, sqlConnectorConfig);
this.dslContextWrapper = DslContextFactory.create(this.databaseConfig, sqlConnectorConfig, null);
}

}
Expand All @@ -202,7 +202,7 @@ public RemoteHanaContextProvider() {
.databasePassword(PASSWORD)
.build();
this.sqlConnectorConfig = new TestSqlConnectorConfig(databaseConfig);
this.dslContextWrapper = DslContextFactory.create(databaseConfig, sqlConnectorConfig);
this.dslContextWrapper = DslContextFactory.create(databaseConfig, sqlConnectorConfig, null);
}

}
Expand Down
Loading
Loading