Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TIS-471/netex testing fixes #228

Merged
merged 4 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.github.benmanes.caffeine.cache.Caffeine;
import fi.digitraffic.tis.vaco.caching.mapper.CacheStatsMapper;
import fi.digitraffic.tis.vaco.caching.model.CacheSummaryStatistics;
import fi.digitraffic.tis.vaco.queuehandler.model.Entry;
import fi.digitraffic.tis.vaco.ruleset.model.Ruleset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -32,13 +33,15 @@ public class CachingService {
private final Cache<String, Ruleset> rulesetCache;
private final Cache<String, String> sqsQueueUrlCache;
private final Cache<Path, Path> localPathCache;
private final Cache<String, Entry> entryCache;
private final CacheStatsMapper cacheStatsMapper;

public CachingService(CacheStatsMapper cacheStatsMapper) {
this.cacheStatsMapper = Objects.requireNonNull(cacheStatsMapper);
this.rulesetCache = rulesetNameCache();
this.sqsQueueUrlCache = sqsQueueUrlCache();
this.localPathCache = localPathCache();
this.entryCache = entryCache();
}

public Optional<Ruleset> cacheRuleset(String key, Function<String, Ruleset> loader) {
Expand All @@ -65,6 +68,20 @@ public void invalidateLocalTemporaryPath(Path key) {
localPathCache.invalidate(key);
}


public String keyForEntry(String publicId, boolean skipErrorsField) {
return publicId + " (full=" + skipErrorsField + ")";
}

public Optional<Entry> cacheEntry(String key, Function<String, Entry> loader) {
return Optional.ofNullable(entryCache.get(key, loader));
}

public void invalidateEntry(Entry entry) {
entryCache.invalidate(keyForEntry(entry.publicId(), true));
entryCache.invalidate(keyForEntry(entry.publicId(), false));
}

private static Cache<String, Ruleset> rulesetNameCache() {
return Caffeine.newBuilder()
.recordStats()
Expand Down Expand Up @@ -98,10 +115,20 @@ private Cache<Path, Path> localPathCache() {
.build();
}

private Cache<String, Entry> entryCache() {
return Caffeine.newBuilder()
.recordStats()
.maximumSize(500)
.expireAfterWrite(Duration.ofDays(1))
.build();
}

public Map<String, CacheSummaryStatistics> getStats() {
return Map.of(
"rulesets", cacheStatsMapper.toCacheSummaryStatistics(rulesetCache),
"SQS queue URLs", cacheStatsMapper.toCacheSummaryStatistics(sqsQueueUrlCache),
"local temporary file paths", cacheStatsMapper.toCacheSummaryStatistics(localPathCache));
"local temporary file paths", cacheStatsMapper.toCacheSummaryStatistics(localPathCache),
"entries", cacheStatsMapper.toCacheSummaryStatistics(entryCache));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

@RestController
@RequestMapping("/company")
@PreAuthorize("hasAuthority('vaco.apiuser') and hasAuthority('vaco.company_admin')")
@PreAuthorize("hasAuthority('vaco.apiuser') and hasAnyAuthority('vaco.company_admin')")
public class CompanyController {

private final VacoProperties vacoProperties;
Expand Down
24 changes: 16 additions & 8 deletions src/main/java/fi/digitraffic/tis/vaco/entries/EntryService.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package fi.digitraffic.tis.vaco.entries;

import fi.digitraffic.tis.utilities.Streams;
import fi.digitraffic.tis.vaco.caching.CachingService;
import fi.digitraffic.tis.vaco.entries.model.Status;
import fi.digitraffic.tis.vaco.findings.FindingService;
import fi.digitraffic.tis.vaco.process.TaskService;
import fi.digitraffic.tis.vaco.process.model.Task;
import fi.digitraffic.tis.vaco.queuehandler.QueueHandlerService;
import fi.digitraffic.tis.vaco.queuehandler.model.Entry;
import org.springframework.stereotype.Service;

Expand All @@ -13,21 +14,24 @@

@Service
public class EntryService {

private final EntryRepository entryRepository;
private final CachingService cachingService;
private final QueueHandlerService queueHandlerService;
private final TaskService taskService;
private final FindingService findingService;

public EntryService(TaskService taskService,
FindingService findingService,
EntryRepository entryRepository) {
public EntryService(EntryRepository entryRepository,
CachingService cachingService,
QueueHandlerService queueHandlerService,
TaskService taskService) {
this.queueHandlerService = Objects.requireNonNull(queueHandlerService);
this.taskService = Objects.requireNonNull(taskService);
this.findingService = Objects.requireNonNull(findingService);
this.entryRepository = Objects.requireNonNull(entryRepository);
this.cachingService = Objects.requireNonNull(cachingService);
}

public Optional<Status> getStatus(String publicId) {
return entryRepository.findByPublicId(publicId, false)
.map(Entry::status);
return queueHandlerService.findEntry(publicId).map(Entry::status);
}

public Optional<Status> getStatus(String publicId, String taskName) {
Expand All @@ -39,15 +43,18 @@ public Optional<Status> getStatus(String publicId, String taskName) {

public void markComplete(Entry entry) {
entryRepository.completeEntryProcessing(entry);
cachingService.invalidateEntry(entry);
}

public void markStarted(Entry entry) {
entryRepository.startEntryProcessing(entry);
entryRepository.markStatus(entry, Status.PROCESSING);
cachingService.invalidateEntry(entry);
}

public void markUpdated(Entry entry) {
entryRepository.updateEntryProcessing(entry);
cachingService.invalidateEntry(entry);
}

/**
Expand All @@ -59,6 +66,7 @@ public void markUpdated(Entry entry) {
public void updateStatus(Entry entry) {
Status status = resolveStatus(entry);
entryRepository.markStatus(entry, status);
cachingService.invalidateEntry(entry);
}

private Status resolveStatus(Entry entry) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ public MessagingService(SqsClient sqsClient,
}

public <P> CompletableFuture<P> sendMessage(String queueName, P payload) {
if (payload == null) {
logger.warn("send {} !! Tried to send null payload, ignoring", queueName);
return CompletableFuture.completedFuture(payload);
}
try {
logger.debug("send {} <- {}", queueName, payload);
return sqsTemplate.sendAsync(queueName, payload).thenApply(sr -> sr.message().getPayload());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import fi.digitraffic.tis.utilities.Streams;
import fi.digitraffic.tis.vaco.caching.CachingService;
import fi.digitraffic.tis.vaco.company.model.Company;
import fi.digitraffic.tis.vaco.company.model.ImmutableCompany;
import fi.digitraffic.tis.vaco.company.model.PartnershipType;
Expand Down Expand Up @@ -32,19 +33,22 @@ public class QueueHandlerService {

private final Logger logger = LoggerFactory.getLogger(getClass());

private final CachingService cachingService;
private final MeService meService;
private final MessagingService messagingService;
private final CompanyService companyService;
private final EntryRepository entryRepository;
private final EntryRequestMapper entryRequestMapper;
private final PartnershipService partnershipService;

public QueueHandlerService(MeService meService,
public QueueHandlerService(CachingService cachingService,
MeService meService,
EntryRequestMapper entryRequestMapper,
MessagingService messagingService,
CompanyService companyService,
EntryRepository entryRepository,
PartnershipService partnershipService) {
this.cachingService = Objects.requireNonNull(cachingService);
this.meService = Objects.requireNonNull(meService);
this.entryRequestMapper = Objects.requireNonNull(entryRequestMapper);
this.messagingService = Objects.requireNonNull(messagingService);
Expand All @@ -69,7 +73,10 @@ public Entry processQueueEntry(EntryRequest entryRequest) {
.build();
messagingService.submitProcessingJob(job);

return result;
return cachingService.cacheEntry(
cachingService.keyForEntry(result.publicId(), true),
key -> result)
.get();
}

/**
Expand Down Expand Up @@ -112,20 +119,33 @@ public Optional<Entry> findEntry(String publicId) {
}

public Optional<Entry> findEntry(String publicId, boolean skipErrorsField) {
return entryRepository.findByPublicId(publicId, skipErrorsField);
return cachingService.cacheEntry(
cachingService.keyForEntry(publicId, skipErrorsField),
key -> entryRepository.findByPublicId(publicId, skipErrorsField).orElse(null));
}

public Entry getEntry(String publicId, boolean skipErrorsField) {
return entryRepository.findByPublicId(publicId, skipErrorsField)
.orElseThrow(() -> new UnknownEntityException(publicId, "Entry not found"));
return cachingService.cacheEntry(
cachingService.keyForEntry(publicId, skipErrorsField),
key -> entryRepository.findByPublicId(publicId, skipErrorsField)
.orElseThrow(() -> new UnknownEntityException(publicId, "Entry not found"))
).get();
}

public List<Entry> getAllQueueEntriesFor(String businessId, boolean full) {
return entryRepository.findAllByBusinessId(businessId, full);
List<Entry> entries = entryRepository.findAllByBusinessId(businessId, full);
entries.forEach(entry -> cachingService.cacheEntry(
cachingService.keyForEntry(entry.publicId(), full),
key -> entry));
return entries;
}

public List<Entry> getAllEntriesVisibleForCurrentUser(boolean full) {
return Streams.flatten(meService.findCompanies(), c -> entryRepository.findAllByBusinessId(c.businessId(), full))
List<Entry> entries = Streams.flatten(meService.findCompanies(), c -> entryRepository.findAllByBusinessId(c.businessId(), full))
.toList();
entries.forEach(entry -> cachingService.cacheEntry(
cachingService.keyForEntry(entry.publicId(), full),
key -> entry));
return entries;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import fi.digitraffic.tis.vaco.rules.Rule;
import fi.digitraffic.tis.vaco.rules.model.ImmutableResultMessage;
import fi.digitraffic.tis.vaco.rules.model.ResultMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import java.io.IOException;
Expand All @@ -28,6 +30,7 @@

@Component
public class DownloadRule implements Rule<Entry, ResultMessage> {
private final Logger logger = LoggerFactory.getLogger(getClass());
public static final String DOWNLOAD_SUBTASK = "prepare.download";
private final TaskService taskService;
private final VacoProperties vacoProperties;
Expand Down Expand Up @@ -78,6 +81,10 @@ public CompletableFuture<ResultMessage> execute(Entry entry) {
.outputs(ruleS3Output.asUri(vacoProperties.s3ProcessingBucket()))
.uploadedFiles(Map.of(result.asUri(vacoProperties.s3ProcessingBucket()), List.of(downloadedFilePackage)))
.build();
} catch (Exception e) {
logger.warn("Caught unrecoverable exception during file download", e);
taskService.markStatus(tracked, Status.FAILED);
return null;
} finally {
try {
Files.deleteIfExists(tempFilePath);
Expand Down
1 change: 1 addition & 0 deletions src/main/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
<logger name="org.mobilitydata" level="WARN" />
<logger name="org.xnio" level="WARN" />
<logger name="org.thymeleaf" level="WARN" />
<logger name="org.apache.commons.beanutils" level="WARN" />

<root level="DEBUG">
<appender-ref ref="STDOUT" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ void canListCacheSummaryStatistics() throws Exception {
assertThat(stats,
equalTo(Map.of("rulesets", emptyStatistics,
"SQS queue URLs", emptyStatistics,
"local temporary file paths", emptyStatistics)));
"local temporary file paths", emptyStatistics,
"entries", emptyStatistics)));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Authorization: Bearer {{tis_token}}

###

POST {{serviceRootUri}}/api/admin-tasks/group-ids/DGWe8Qi1KpRGFwBaUMUo2/assign?businessId=2942108-7
POST {{serviceRootUri}}/api/admin-tasks/group-ids/2Y9AulYbZLLenONMr9fyq/assign?businessId=2942108-7
Content-Type: application/json
Authorization: Bearer {{tis_token}}

Expand Down
2 changes: 1 addition & 1 deletion src/test/java/fi/digitraffic/tis/vaco/e2e/me_local.http
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ client_id={{clientId}}&grant_type=client_credentials&scope={{clientId}}/.default

###

GET http://localhost:8080/api/me
GET {{serviceRootUri}}/api/me
Content-Type: application/json
Authorization: Bearer {{tis_token}}
4 changes: 2 additions & 2 deletions src/test/java/fi/digitraffic/tis/vaco/e2e/queue/gtfs.http
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ Content-Type: application/json
Authorization: Bearer {{tis_token}}

{
"url": "https://tvv.fra1.digitaloceanspaces.com/231.zip",
"url": "https://s3.eu-north-1.amazonaws.com/perftesting-manual-v1.0/gtfs_all.zip?response-content-disposition=inline&X-Amz-Security-Token=IQoJb3JpZ2luX2VjEPv%2F%2F%2F%2F%2F%2F%2F%2F%2F%2FwEaCmV1LW5vcnRoLTEiRjBEAiAR%2Fjsdl41ftrnJDHqiC0wX9DBs3VNqpAquIawoK6C64QIgeW4LvVlnL0iPhZbyVb8SIaYh8wR5x6VA%2BiO5WRlmi9Yq9AMIpP%2F%2F%2F%2F%2F%2F%2F%2F%2F%2FARAAGgw5MjE1NTYwMjc1MTQiDMhrpiPuqhyqhaxXsyrIA%2BFyYQ%2FVigQeNfOLgTP1xIhZap%2BjgNaDUqUOFEnb5Nv78NCMDwNXGtgYg%2BbOqMdL3Yi8ya1RbAZXkkF36%2F3DejSVHmSJ4a6OLpN5bcOcgFILVdbv6w9v7ITggQb7qVHQ1kIf6x8yuYOzAplZOE4RgBYRNLmV7osJlb1b8wpUiKquBg9%2FFQK%2BtPBND%2FZWcTRr7rU%2FqTiZkC4tf6aWNYLARmwvj4pym7hbaLk7KNI%2BqW%2BOeoIvp3y91uv0zWeVkFLtIEGSkKtgJyP%2F%2BEMn7C53uI%2BQleNtGHndk3ufwax35aGsuoQkUDEjxL4d9uIB0okdXhXIuj193NPlllIJcwNJjWXdCqMvlKyD3uSo%2BiGY%2FiIRwgLX1GD2MvpBtf0UzenlzLuXPTB2xBv5gxisoz5sSUSKNugiVDKodWAm0LUbGDVXCCVA7eSLsOsksWU4YFvv3mp6Fd%2B4yIMNpN1dnc8D%2FIv%2F55rjYScZmdyE1JabmNygSVzFSeWCEx69giHXCgAb%2BHaAkycbaORK%2BIl1SrBloEjweGOs4PBbU0XMTt2lp6YBZcCA%2BG1lLfmnBLKRq7tm%2FyF19UhHmRxEX%2Bl0ab53cx%2BPhXObwTwkizDZyZmtBjqVAiGcfPF%2Bff8bjYYNFS%2BJ2Ic1YlZc1b9gjSGEOOoOUgzrMNnIihnp1yzKbOWHtkymdfkXShbr1Liw41ABKTyN987GRX0kuCQc%2FfM%2BVR6BmqhUOZiWeKfXObxgpw5BBu8qh2v3R%2Bx3tXs7K5%2FNfCSoajkpA0Q7TMFkjy89esPz7ozTzr8JwNwP%2FN69K4imnHQf5ox5okcvE2vXJudBY3118iGq7y83hjbCy7ZdGWb17W%2BCW7EEjNeuUQfRy7445NBy0Mqb3FYmcPMLAsFKzWbEJWGwcjDYynqZnLWe4se%2BXegz3Sv3h7dZaitiT2PDoBbsI%2F9y8yYgsH0y%2BFVuqJZbWvlcMoheVs0x8R7GUCCV5cY129Bcz%2Fs%3D&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Date=20240116T111724Z&X-Amz-SignedHeaders=host&X-Amz-Expires=43200&X-Amz-Credential=ASIA5NEIFBB5BO5QQRN5%2F20240116%2Feu-north-1%2Fs3%2Faws4_request&X-Amz-Signature=3a67dcc84a93a09109b9f5c33a96c69081a0db12c8c75cefedf3fc7cf544b908",
"format": "gtfs",
"businessId": "2942108-7",
"etag": "etagg",
"name": "GTFS Pori local",
"name": "loadtest - GTFS - all",
"validations": [
{
"name": "gtfs.canonical.v4_1_0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ Content-Type: application/json
Authorization: Bearer {{tis_token}}

{
"url": "https://tvv.fra1.digitaloceanspaces.com/237_netex.zip",
"url": "https://s3.eu-north-1.amazonaws.com/perftesting-manual-v1.0/netex_all.zip?response-content-disposition=inline&X-Amz-Security-Token=IQoJb3JpZ2luX2VjEPv%2F%2F%2F%2F%2F%2F%2F%2F%2F%2FwEaCmV1LW5vcnRoLTEiRjBEAiAR%2Fjsdl41ftrnJDHqiC0wX9DBs3VNqpAquIawoK6C64QIgeW4LvVlnL0iPhZbyVb8SIaYh8wR5x6VA%2BiO5WRlmi9Yq9AMIpP%2F%2F%2F%2F%2F%2F%2F%2F%2F%2FARAAGgw5MjE1NTYwMjc1MTQiDMhrpiPuqhyqhaxXsyrIA%2BFyYQ%2FVigQeNfOLgTP1xIhZap%2BjgNaDUqUOFEnb5Nv78NCMDwNXGtgYg%2BbOqMdL3Yi8ya1RbAZXkkF36%2F3DejSVHmSJ4a6OLpN5bcOcgFILVdbv6w9v7ITggQb7qVHQ1kIf6x8yuYOzAplZOE4RgBYRNLmV7osJlb1b8wpUiKquBg9%2FFQK%2BtPBND%2FZWcTRr7rU%2FqTiZkC4tf6aWNYLARmwvj4pym7hbaLk7KNI%2BqW%2BOeoIvp3y91uv0zWeVkFLtIEGSkKtgJyP%2F%2BEMn7C53uI%2BQleNtGHndk3ufwax35aGsuoQkUDEjxL4d9uIB0okdXhXIuj193NPlllIJcwNJjWXdCqMvlKyD3uSo%2BiGY%2FiIRwgLX1GD2MvpBtf0UzenlzLuXPTB2xBv5gxisoz5sSUSKNugiVDKodWAm0LUbGDVXCCVA7eSLsOsksWU4YFvv3mp6Fd%2B4yIMNpN1dnc8D%2FIv%2F55rjYScZmdyE1JabmNygSVzFSeWCEx69giHXCgAb%2BHaAkycbaORK%2BIl1SrBloEjweGOs4PBbU0XMTt2lp6YBZcCA%2BG1lLfmnBLKRq7tm%2FyF19UhHmRxEX%2Bl0ab53cx%2BPhXObwTwkizDZyZmtBjqVAiGcfPF%2Bff8bjYYNFS%2BJ2Ic1YlZc1b9gjSGEOOoOUgzrMNnIihnp1yzKbOWHtkymdfkXShbr1Liw41ABKTyN987GRX0kuCQc%2FfM%2BVR6BmqhUOZiWeKfXObxgpw5BBu8qh2v3R%2Bx3tXs7K5%2FNfCSoajkpA0Q7TMFkjy89esPz7ozTzr8JwNwP%2FN69K4imnHQf5ox5okcvE2vXJudBY3118iGq7y83hjbCy7ZdGWb17W%2BCW7EEjNeuUQfRy7445NBy0Mqb3FYmcPMLAsFKzWbEJWGwcjDYynqZnLWe4se%2BXegz3Sv3h7dZaitiT2PDoBbsI%2F9y8yYgsH0y%2BFVuqJZbWvlcMoheVs0x8R7GUCCV5cY129Bcz%2Fs%3D&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Date=20240116T111639Z&X-Amz-SignedHeaders=host&X-Amz-Expires=43200&X-Amz-Credential=ASIA5NEIFBB5BO5QQRN5%2F20240116%2Feu-north-1%2Fs3%2Faws4_request&X-Amz-Signature=c3b4f80e50f748087ce9adaf82c0565228936257cb5ff4f0058fd0d8422e2438",
"format": "netex",
"businessId": "2942108-7",
"etag": "etagg",
"name": "NeTEx custom local",
"name": "loadtest - NeTEx - all",
"validations": [
{
"name": "netex.entur.v1_0_1",
Expand Down
Loading
Loading