diff --git a/apps/transport/lib/S3/aggregates_uploader.ex b/apps/transport/lib/S3/aggregates_uploader.ex new file mode 100644 index 0000000000..7d39b7fb63 --- /dev/null +++ b/apps/transport/lib/S3/aggregates_uploader.ex @@ -0,0 +1,91 @@ +defmodule Transport.S3.AggregatesUploader do + @moduledoc """ + Helpers to upload a file, computes its sha256, and update a "latest" file. + """ + + @spec upload_aggregate!(Path.t(), String.t(), String.t()) :: :ok + @doc """ + This method takes a local `file` and upload 4 different files to our S3 `aggregates` bucket (the bucket is expected to exist): + - the `remote_path` and `remote_latest_path` containing the data from `file` + - two companions files with `.sha256sum` extension appended (SHA256 sum is computed on the fly) + + Example + + with_tmp_file(fn file -> + File.write(file, "some relevant data") + upload_aggregate!(file, "aggregate-20250127193035.csv", "aggregate-latest.csv") + end) + """ + def upload_aggregate!(file, remote_path, remote_latest_path) do + with_tmp_file(fn checksum_file -> + sha256!(file, checksum_file) + + upload_files!(file, checksum_file, remote_path) + |> update_latest_files!(remote_latest_path) + end) + end + + @spec with_tmp_file((Path.t() -> any())) :: any() + def with_tmp_file(cb) do + file = mk_tmp_file() + + try do + cb.(file) + after + :ok = File.rm(file) + end + end + + defp mk_tmp_file do + path = System.tmp_dir!() |> Path.join(Ecto.UUID.generate()) + + File.touch!(path) + + path + end + + defp sha256!(file, checksum_file) do + hash_state = :crypto.hash_init(:sha256) + + hash = + File.stream!(file, 2048) + |> Enum.reduce(hash_state, fn chunk, prev_state -> + :crypto.hash_update(prev_state, chunk) + end) + |> :crypto.hash_final() + |> Base.encode16() + |> String.downcase() + + File.write!(checksum_file, hash) + end + + defp upload_files!(file, checksum_file, remote_path) do + remote_checksum_path = checksum_filename(remote_path) + + stream_upload!(file, remote_path) + stream_upload!(checksum_file, remote_checksum_path) + + {remote_path, remote_checksum_path} + end + + defp update_latest_files!({remote_path, remote_checksum_path}, remote_latest_path) do + remote_latest_checksum_path = checksum_filename(remote_latest_path) + + copy!(remote_path, remote_latest_path) + copy!(remote_checksum_path, remote_latest_checksum_path) + + :ok + end + + defp checksum_filename(base_filename) do + "#{base_filename}.sha256sum" + end + + defp stream_upload!(file, filename) do + Transport.S3.stream_to_s3!(:aggregates, file, filename) + end + + defp copy!(s3_path, filename) do + Transport.S3.remote_copy_file!(:aggregates, s3_path, filename) + end +end diff --git a/apps/transport/lib/S3/s3.ex b/apps/transport/lib/S3/s3.ex index 64c17a93af..0a3b5b14ec 100644 --- a/apps/transport/lib/S3/s3.ex +++ b/apps/transport/lib/S3/s3.ex @@ -3,7 +3,7 @@ defmodule Transport.S3 do This module contains common code related to S3 object storage. """ require Logger - @type bucket_feature :: :history | :on_demand_validation | :gtfs_diff | :logos + @type bucket_feature :: :history | :on_demand_validation | :gtfs_diff | :logos | :aggregates @spec bucket_name(bucket_feature()) :: binary() def bucket_name(feature) do @@ -55,4 +55,12 @@ defmodule Transport.S3 do |> ExAws.S3.download_file(remote_path, local_path) |> Transport.Wrapper.ExAWS.impl().request!() end + + @spec remote_copy_file!(bucket_feature(), binary(), binary()) :: any() + def remote_copy_file!(feature, remote_path_src, remote_path_dest) do + bucket = Transport.S3.bucket_name(feature) + + ExAws.S3.put_object_copy(bucket, remote_path_dest, bucket, remote_path_src) + |> Transport.Wrapper.ExAWS.impl().request!() + end end diff --git a/apps/transport/lib/jobs/stops_registry_snapshot_job.ex b/apps/transport/lib/jobs/stops_registry_snapshot_job.ex new file mode 100644 index 0000000000..2d82739965 --- /dev/null +++ b/apps/transport/lib/jobs/stops_registry_snapshot_job.ex @@ -0,0 +1,26 @@ +defmodule Transport.Jobs.StopsRegistrySnapshotJob do + @moduledoc """ + Job in charge of building a snapshot of the stops registry. + """ + use Oban.Worker, unique: [period: {1, :days}], tags: ["registry"], max_attempts: 3 + require Logger + import Transport.S3.AggregatesUploader + + @impl Oban.Worker + def perform(%Oban.Job{}) do + with_tmp_file(fn file -> + :ok = Transport.Registry.Engine.execute(file) + + upload_aggregate!( + file, + "stops_registry_#{timestamp()}.csv", + "stops_registry_latest.csv" + ) + end) + end + + defp timestamp do + DateTime.utc_now() + |> Calendar.strftime("%Y%m%d.%H%M%S.%f") + end +end diff --git a/apps/transport/test/support/s3_test_utils.ex b/apps/transport/test/support/s3_test_utils.ex index 7f3bbcec26..3aaefcec76 100644 --- a/apps/transport/test/support/s3_test_utils.ex +++ b/apps/transport/test/support/s3_test_utils.ex @@ -32,6 +32,25 @@ defmodule Transport.Test.S3TestUtils do end) end + def s3_mock_stream_file( + path: expected_path, + bucket: expected_bucket, + acl: expected_acl, + file_content: expected_file_content + ) do + Transport.ExAWS.Mock + |> expect(:request!, fn %ExAws.S3.Upload{ + src: src = %File.Stream{}, + bucket: ^expected_bucket, + path: ^expected_path, + opts: [acl: ^expected_acl], + service: :s3 + } -> + assert src |> Enum.join("\n") == expected_file_content + :ok + end) + end + def s3_mocks_delete_object(expected_bucket, expected_path) do Transport.ExAWS.Mock |> expect(:request!, fn %ExAws.Operation.S3{ @@ -43,4 +62,18 @@ defmodule Transport.Test.S3TestUtils do :ok end) end + + def s3_mocks_remote_copy_file(expected_bucket, expected_src_path, expected_dest_path) do + Transport.ExAWS.Mock + |> expect(:request!, fn %ExAws.Operation.S3{ + bucket: ^expected_bucket, + path: ^expected_dest_path, + http_method: :put, + service: :s3, + headers: headers + } -> + assert Map.get(headers, "x-amz-copy-source") =~ "/#{expected_bucket}/#{expected_src_path}" + %{body: %{}} + end) + end end diff --git a/apps/transport/test/transport/S3/aggregates_uploader_test.exs b/apps/transport/test/transport/S3/aggregates_uploader_test.exs new file mode 100644 index 0000000000..94311806cc --- /dev/null +++ b/apps/transport/test/transport/S3/aggregates_uploader_test.exs @@ -0,0 +1,33 @@ +defmodule Transport.S3.AggregatesUploaderTest do + use ExUnit.Case, async: true + + alias Transport.S3.AggregatesUploader + alias Transport.Test.S3TestUtils + import Mox + setup :verify_on_exit! + + test "export to S3" do + aggregate = "aggregate-20250127193035.csv" + latest_aggregate = "aggregate-latest.csv" + checksum = "#{aggregate}.sha256sum" + latest_checksum = "#{latest_aggregate}.sha256sum" + + bucket_name = Transport.S3.bucket_name(:aggregates) + + test_data = "some relevant data" + + # Compute this with sha256sum: echo -n "some relevant data" | sha256sum + expected_sha256 = "28d89bc28c02b0ed66f22b400b535e800e3a6b305e931c18dc01f8bf3582f1f9" + + S3TestUtils.s3_mock_stream_file(path: aggregate, bucket: bucket_name, acl: :private, file_content: test_data) + S3TestUtils.s3_mock_stream_file(path: checksum, bucket: bucket_name, acl: :private, file_content: expected_sha256) + S3TestUtils.s3_mocks_remote_copy_file(bucket_name, aggregate, latest_aggregate) + S3TestUtils.s3_mocks_remote_copy_file(bucket_name, checksum, latest_checksum) + + AggregatesUploader.with_tmp_file(fn file -> + File.write(file, test_data) + + :ok = AggregatesUploader.upload_aggregate!(file, aggregate, latest_aggregate) + end) + end +end diff --git a/config/dev.exs b/config/dev.exs index 8b94631bdf..87b43ab8a1 100644 --- a/config/dev.exs +++ b/config/dev.exs @@ -72,7 +72,8 @@ config :transport, history: "resource-history-dev", on_demand_validation: "on-demand-validation-dev", gtfs_diff: "gtfs-diff-dev", - logos: "logos-dev" + logos: "logos-dev", + aggregates: "aggregates-dev" } config :oauth2, Datagouvfr.Authentication, diff --git a/config/prod.exs b/config/prod.exs index 86270024ca..ce7cd51bb1 100644 --- a/config/prod.exs +++ b/config/prod.exs @@ -9,7 +9,8 @@ config :transport, history: "resource-history-prod", on_demand_validation: "on-demand-validation-prod", gtfs_diff: "gtfs-diff-prod", - logos: "logos-prod" + logos: "logos-prod", + aggregates: "aggregates-prod" } # Configure Sentry for production and staging. diff --git a/config/runtime.exs b/config/runtime.exs index c377ae54f3..81b1b2a1a3 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -94,7 +94,8 @@ if app_env == :staging do history: "resource-history-staging", on_demand_validation: "on-demand-validation-staging", gtfs_diff: "gtfs-diff-staging", - logos: "logos-staging" + logos: "logos-staging", + aggregates: "aggregates-staging" } end @@ -162,7 +163,8 @@ oban_prod_crontab = [ {"30 5 * * *", Transport.Jobs.ImportDatasetMonthlyMetricsJob}, {"45 5 * * *", Transport.Jobs.ImportResourceMonthlyMetricsJob}, {"0 8 * * *", Transport.Jobs.WarnUserInactivityJob}, - {"*/5 * * * *", Transport.Jobs.UpdateCounterCacheJob} + {"*/5 * * * *", Transport.Jobs.UpdateCounterCacheJob}, + {"0 4 * * *", Transport.Jobs.StopsRegistrySnapshotJob} ] # Make sure that all modules exist diff --git a/config/test.exs b/config/test.exs index e5ee64d501..6b0a67fef3 100644 --- a/config/test.exs +++ b/config/test.exs @@ -45,7 +45,8 @@ config :transport, history: "resource-history-test", on_demand_validation: "on-demand-validation-test", gtfs_diff: "gtfs-diff-test", - logos: "logos-test" + logos: "logos-test", + aggregates: "aggregates-test" }, workflow_notifier: Transport.Jobs.Workflow.ProcessNotifier, export_secret_key: "fake_export_secret_key",