Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Registre arrets: première automatisation #4428

Merged
merged 5 commits into from
Feb 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions apps/transport/lib/S3/aggregates_uploader.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
defmodule Transport.S3.AggregatesUploader do
@moduledoc """
Helpers to upload a file, computes its sha256, and update a "latest" file.
"""

@spec upload_aggregate!(Path.t(), String.t(), String.t()) :: :ok
@doc """
This method takes a local `file` and upload 4 different files to our S3 `aggregates` bucket (the bucket is expected to exist):
- the `remote_path` and `remote_latest_path` containing the data from `file`
- two companions files with `.sha256sum` extension appended (SHA256 sum is computed on the fly)

Example

with_tmp_file(fn file ->
File.write(file, "some relevant data")
upload_aggregate!(file, "aggregate-20250127193035.csv", "aggregate-latest.csv")
end)
"""
def upload_aggregate!(file, remote_path, remote_latest_path) do
with_tmp_file(fn checksum_file ->
sha256!(file, checksum_file)

upload_files!(file, checksum_file, remote_path)
|> update_latest_files!(remote_latest_path)
end)
end

@spec with_tmp_file((Path.t() -> any())) :: any()
def with_tmp_file(cb) do
file = mk_tmp_file()

try do
cb.(file)
after
:ok = File.rm(file)
end
end

defp mk_tmp_file do
path = System.tmp_dir!() |> Path.join(Ecto.UUID.generate())

File.touch!(path)

path
end

defp sha256!(file, checksum_file) do
hash_state = :crypto.hash_init(:sha256)

hash =
File.stream!(file, 2048)
|> Enum.reduce(hash_state, fn chunk, prev_state ->
:crypto.hash_update(prev_state, chunk)
end)
|> :crypto.hash_final()
|> Base.encode16()
|> String.downcase()

File.write!(checksum_file, hash)
end

defp upload_files!(file, checksum_file, remote_path) do
remote_checksum_path = checksum_filename(remote_path)

stream_upload!(file, remote_path)
stream_upload!(checksum_file, remote_checksum_path)

{remote_path, remote_checksum_path}
end

defp update_latest_files!({remote_path, remote_checksum_path}, remote_latest_path) do
remote_latest_checksum_path = checksum_filename(remote_latest_path)

copy!(remote_path, remote_latest_path)
copy!(remote_checksum_path, remote_latest_checksum_path)

:ok
end

defp checksum_filename(base_filename) do
"#{base_filename}.sha256sum"
end

defp stream_upload!(file, filename) do
Transport.S3.stream_to_s3!(:aggregates, file, filename)
end

defp copy!(s3_path, filename) do
Transport.S3.remote_copy_file!(:aggregates, s3_path, filename)
end
end
10 changes: 9 additions & 1 deletion apps/transport/lib/S3/s3.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule Transport.S3 do
This module contains common code related to S3 object storage.
"""
require Logger
@type bucket_feature :: :history | :on_demand_validation | :gtfs_diff | :logos
@type bucket_feature :: :history | :on_demand_validation | :gtfs_diff | :logos | :aggregates

@spec bucket_name(bucket_feature()) :: binary()
def bucket_name(feature) do
Expand Down Expand Up @@ -55,4 +55,12 @@ defmodule Transport.S3 do
|> ExAws.S3.download_file(remote_path, local_path)
|> Transport.Wrapper.ExAWS.impl().request!()
end

@spec remote_copy_file!(bucket_feature(), binary(), binary()) :: any()
def remote_copy_file!(feature, remote_path_src, remote_path_dest) do
bucket = Transport.S3.bucket_name(feature)

ExAws.S3.put_object_copy(bucket, remote_path_dest, bucket, remote_path_src)
|> Transport.Wrapper.ExAWS.impl().request!()
end
end
26 changes: 26 additions & 0 deletions apps/transport/lib/jobs/stops_registry_snapshot_job.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
defmodule Transport.Jobs.StopsRegistrySnapshotJob do
@moduledoc """
Job in charge of building a snapshot of the stops registry.
"""
use Oban.Worker, unique: [period: {1, :days}], tags: ["registry"], max_attempts: 3
require Logger
import Transport.S3.AggregatesUploader

@impl Oban.Worker
def perform(%Oban.Job{}) do
with_tmp_file(fn file ->
:ok = Transport.Registry.Engine.execute(file)

upload_aggregate!(
file,
"stops_registry_#{timestamp()}.csv",
"stops_registry_latest.csv"
)
end)
end

defp timestamp do
DateTime.utc_now()
|> Calendar.strftime("%Y%m%d.%H%M%S.%f")
end
end
33 changes: 33 additions & 0 deletions apps/transport/test/support/s3_test_utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,25 @@ defmodule Transport.Test.S3TestUtils do
end)
end

def s3_mock_stream_file(
path: expected_path,
bucket: expected_bucket,
acl: expected_acl,
file_content: expected_file_content
) do
Transport.ExAWS.Mock
|> expect(:request!, fn %ExAws.S3.Upload{
src: src = %File.Stream{},
bucket: ^expected_bucket,
path: ^expected_path,
opts: [acl: ^expected_acl],
service: :s3
} ->
assert src |> Enum.join("\n") == expected_file_content
:ok
end)
end

def s3_mocks_delete_object(expected_bucket, expected_path) do
Transport.ExAWS.Mock
|> expect(:request!, fn %ExAws.Operation.S3{
Expand All @@ -43,4 +62,18 @@ defmodule Transport.Test.S3TestUtils do
:ok
end)
end

def s3_mocks_remote_copy_file(expected_bucket, expected_src_path, expected_dest_path) do
Transport.ExAWS.Mock
|> expect(:request!, fn %ExAws.Operation.S3{
bucket: ^expected_bucket,
path: ^expected_dest_path,
http_method: :put,
service: :s3,
headers: headers
} ->
assert Map.get(headers, "x-amz-copy-source") =~ "/#{expected_bucket}/#{expected_src_path}"
%{body: %{}}
end)
end
end
33 changes: 33 additions & 0 deletions apps/transport/test/transport/S3/aggregates_uploader_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
defmodule Transport.S3.AggregatesUploaderTest do
use ExUnit.Case, async: true

alias Transport.S3.AggregatesUploader
alias Transport.Test.S3TestUtils
import Mox
setup :verify_on_exit!

test "export to S3" do
aggregate = "aggregate-20250127193035.csv"
latest_aggregate = "aggregate-latest.csv"
checksum = "#{aggregate}.sha256sum"
latest_checksum = "#{latest_aggregate}.sha256sum"

bucket_name = Transport.S3.bucket_name(:aggregates)

test_data = "some relevant data"

# Compute this with sha256sum: echo -n "some relevant data" | sha256sum
expected_sha256 = "28d89bc28c02b0ed66f22b400b535e800e3a6b305e931c18dc01f8bf3582f1f9"

S3TestUtils.s3_mock_stream_file(path: aggregate, bucket: bucket_name, acl: :private, file_content: test_data)
S3TestUtils.s3_mock_stream_file(path: checksum, bucket: bucket_name, acl: :private, file_content: expected_sha256)
S3TestUtils.s3_mocks_remote_copy_file(bucket_name, aggregate, latest_aggregate)
S3TestUtils.s3_mocks_remote_copy_file(bucket_name, checksum, latest_checksum)

AggregatesUploader.with_tmp_file(fn file ->
File.write(file, test_data)

:ok = AggregatesUploader.upload_aggregate!(file, aggregate, latest_aggregate)
end)
end
end
3 changes: 2 additions & 1 deletion config/dev.exs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ config :transport,
history: "resource-history-dev",
on_demand_validation: "on-demand-validation-dev",
gtfs_diff: "gtfs-diff-dev",
logos: "logos-dev"
logos: "logos-dev",
aggregates: "aggregates-dev"
}

config :oauth2, Datagouvfr.Authentication,
Expand Down
3 changes: 2 additions & 1 deletion config/prod.exs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ config :transport,
history: "resource-history-prod",
on_demand_validation: "on-demand-validation-prod",
gtfs_diff: "gtfs-diff-prod",
logos: "logos-prod"
logos: "logos-prod",
aggregates: "aggregates-prod"
}

# Configure Sentry for production and staging.
Expand Down
6 changes: 4 additions & 2 deletions config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ if app_env == :staging do
history: "resource-history-staging",
on_demand_validation: "on-demand-validation-staging",
gtfs_diff: "gtfs-diff-staging",
logos: "logos-staging"
logos: "logos-staging",
aggregates: "aggregates-staging"
}
end

Expand Down Expand Up @@ -162,7 +163,8 @@ oban_prod_crontab = [
{"30 5 * * *", Transport.Jobs.ImportDatasetMonthlyMetricsJob},
{"45 5 * * *", Transport.Jobs.ImportResourceMonthlyMetricsJob},
{"0 8 * * *", Transport.Jobs.WarnUserInactivityJob},
{"*/5 * * * *", Transport.Jobs.UpdateCounterCacheJob}
{"*/5 * * * *", Transport.Jobs.UpdateCounterCacheJob},
{"0 4 * * *", Transport.Jobs.StopsRegistrySnapshotJob}
]

# Make sure that all modules exist
Expand Down
3 changes: 2 additions & 1 deletion config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ config :transport,
history: "resource-history-test",
on_demand_validation: "on-demand-validation-test",
gtfs_diff: "gtfs-diff-test",
logos: "logos-test"
logos: "logos-test",
aggregates: "aggregates-test"
},
workflow_notifier: Transport.Jobs.Workflow.ProcessNotifier,
export_secret_key: "fake_export_secret_key",
Expand Down