Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(server): add UploadAssetFromURL on gcs file implementation #1379

Merged
merged 6 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion .github/workflows/ci_server.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ jobs:
version: v1.63.4
args: --timeout=10m
working-directory: server

ci-server-test:
runs-on: ubuntu-latest
services:
Expand All @@ -31,6 +30,13 @@ jobs:
ports:
- 27017:27017
steps:
- name: Start Fake GCS Server
run: |
docker run -d --name fake-gcs-server \
-p 4443:4443 \
-v /tmp/gcs:/storage \
fsouza/fake-gcs-server:1.52.1 \
-scheme http
- name: checkout
uses: actions/checkout@v3
- name: set up
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ services:
volumes:
- ./mongo:/data/db
gcs:
image: fsouza/fake-gcs-server
image: fsouza/fake-gcs-server:1.52.1
ports:
- 4443:4443
volumes:
Expand Down
2 changes: 1 addition & 1 deletion server/internal/app/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func initFile(ctx context.Context, conf *config.Config) (fileRepo gateway.File)
var err error
if conf.GCS.IsConfigured() {
log.Infofc(ctx, "file: GCS storage is used: %s\n", conf.GCS.BucketName)
fileRepo, err = gcs.NewFile(conf.GCS.BucketName, conf.AssetBaseURL, conf.GCS.PublicationCacheControl)
fileRepo, err = gcs.NewFile(conf.Dev, conf.GCS.BucketName, conf.AssetBaseURL, conf.GCS.PublicationCacheControl)
if err != nil {
log.Warnf("file: failed to init GCS storage: %s\n", err.Error())
}
Expand Down
5 changes: 5 additions & 0 deletions server/internal/infrastructure/fs/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ func (f *fileRepo) UploadAsset(ctx context.Context, file *file.File) (*url.URL,
return getAssetFileURL(f.urlBase, filename), size, nil
}

func (f *fileRepo) UploadAssetFromURL(ctx context.Context, u *url.URL) (*url.URL, int64, error) {
// Note: not implemented
return nil, 0, errors.New("UploadAssetFromURL: not implemented for local file storage")
}

func (f *fileRepo) RemoveAsset(ctx context.Context, u *url.URL) error {
if u == nil {
return nil
Expand Down
72 changes: 70 additions & 2 deletions server/internal/infrastructure/gcs/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import (
"errors"
"fmt"
"io"
"net/http"
"net/url"
"path"
"strings"
"time"

"cloud.google.com/go/storage"
"github.com/kennygrant/sanitize"
Expand All @@ -18,6 +20,7 @@ import (
"github.com/reearth/reearthx/rerror"
"github.com/spf13/afero"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
)

const (
Expand All @@ -30,12 +33,17 @@ const (
)

type fileRepo struct {
isDev bool
bucketName string
base *url.URL
cacheControl string
}

func NewFile(bucketName, base string, cacheControl string) (gateway.File, error) {
const (
devBaseURL = "http://localhost:4443/storage/v1/b"
)

func NewFile(isDev bool, bucketName, base string, cacheControl string) (gateway.File, error) {
if bucketName == "" {
return nil, errors.New("bucket name is empty")
}
Expand All @@ -52,6 +60,7 @@ func NewFile(bucketName, base string, cacheControl string) (gateway.File, error)
}

return &fileRepo{
isDev: isDev,
bucketName: bucketName,
base: u,
cacheControl: cacheControl,
Expand Down Expand Up @@ -104,6 +113,61 @@ func (f *fileRepo) RemoveAsset(ctx context.Context, u *url.URL) error {
return f.delete(ctx, sn)
}

func (f *fileRepo) UploadAssetFromURL(ctx context.Context, u *url.URL) (*url.URL, int64, error) {
if u == nil {
return nil, 0, errors.New("invalid URL")
}

ctxWithTimeout, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(ctxWithTimeout, http.MethodGet, u.String(), nil)
if err != nil {
return nil, 0, fmt.Errorf("failed to create request: %w", err)
}

client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
log.Errorfc(ctx, "gcs: failed to fetch URL: %v", err)
return nil, 0, errors.New("failed to fetch URL")
}

if resp.StatusCode != http.StatusOK {
log.Errorfc(ctx, "gcs: failed to fetch URL, status: %d", resp.StatusCode)
return nil, 0, errors.New("failed to fetch URL")
}

if resp.ContentLength > 0 && resp.ContentLength >= fileSizeLimit {
return nil, 0, gateway.ErrFileTooLarge
}

defer func() {
if err := resp.Body.Close(); err != nil {
log.Errorfc(ctx, "gcs: failed to close response body: %v", err)
}
}()

fileName := path.Base(u.Path)
if fileName == "" {
return nil, 0, gateway.ErrInvalidFile
}
fileName = sanitize.Path(newAssetID() + path.Ext(fileName))
filename := path.Join(gcsAssetBasePath, fileName)

size, err := f.upload(ctx, filename, resp.Body)
if err != nil {
log.Errorfc(ctx, "gcs: upload from URL failed: %v", err)
return nil, 0, err
}

gcsURL := getGCSObjectURL(f.base, filename)
if gcsURL == nil {
return nil, 0, gateway.ErrInvalidFile
}

return gcsURL, size, nil
}

// plugin

func (f *fileRepo) ReadPluginFile(ctx context.Context, pid id.PluginID, filename string) (io.ReadCloser, error) {
Expand Down Expand Up @@ -231,7 +295,11 @@ func (f *fileRepo) RemoveExportProjectZip(ctx context.Context, filename string)
// helpers

func (f *fileRepo) bucket(ctx context.Context) (*storage.BucketHandle, error) {
client, err := storage.NewClient(ctx)
opts := []option.ClientOption{}
if f.isDev {
opts = append(opts, option.WithoutAuthentication(), option.WithEndpoint(devBaseURL))
}
client, err := storage.NewClient(ctx, opts...)
if err != nil {
return nil, err
}
Expand Down
110 changes: 110 additions & 0 deletions server/internal/infrastructure/gcs/file_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,88 @@
package gcs

import (
"context"
"errors"
"fmt"
"io"
"net/url"
"os"
"path"
"strings"
"testing"

"cloud.google.com/go/storage"
"github.com/google/uuid"
"github.com/reearth/reearthx/log"
"github.com/stretchr/testify/assert"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
)

func uploadTestData(client *storage.Client, bucketName string, testFileName string) {
ctx := context.Background()

bucket := client.Bucket(bucketName)
err := bucket.Create(ctx, "", nil)
if err != nil {
panic(err)
}

object := bucket.Object(testFileName)
if err := object.Delete(ctx); err != nil && !errors.Is(err, storage.ErrObjectNotExist) {
panic(err)
}
writer := object.NewWriter(ctx)

content, err := os.Open("testdata/geojson.json")
if err != nil {
panic(err)
}

_, err = io.Copy(writer, content)
if err != nil {
panic(err)
}

if err := writer.Close(); err != nil {
panic(err)
}
}

func createBucket(client *storage.Client, bucketName string) {
ctx := context.Background()
bucket := client.Bucket(bucketName)
err := bucket.Create(ctx, "", nil)
if err != nil {
panic(err)
}
}

func deleteBucketWithObjects(client *storage.Client, bucketName string) {
ctx := context.Background()
bucket := client.Bucket(bucketName)

it := bucket.Objects(ctx, nil)
for {
objAttrs, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
panic(err)
}
if err := bucket.Object(objAttrs.Name).Delete(ctx); err != nil {
panic(err)
}
}

if err := bucket.Delete(ctx); err != nil {
panic(err)
}

log.Printf("Bucket %s deleted successfully", bucketName)
}

func TestGetGCSObjectURL(t *testing.T) {
e, _ := url.Parse("https://hoge.com/assets/xxx.yyy")
b, _ := url.Parse("https://hoge.com/assets")
Expand All @@ -22,3 +98,37 @@ func TestGetGCSObjectNameFromURL(t *testing.T) {
assert.Equal(t, "", getGCSObjectNameFromURL(nil, u))
assert.Equal(t, "", getGCSObjectNameFromURL(b, nil))
}

func TestUploadAssetFromURL(t *testing.T) {
ctx := context.Background()

// Mock fileRepo
baseURL, _ := url.Parse("http://0.0.0.0:4443/download/storage/v1/b")

distBucketName := strings.ToLower(uuid.New().String())
srcBucketName := fmt.Sprintf("test-bucket-%s", distBucketName)
client, _ := storage.NewClient(ctx, option.WithoutAuthentication(), option.WithEndpoint(devBaseURL))

defer func() {
deleteBucketWithObjects(client, distBucketName)
deleteBucketWithObjects(client, srcBucketName)
err := client.Close()
if err != nil {
t.Fatalf("failed to close client: %v", err)
}
}()

createBucket(client, distBucketName)

testFileName := uuid.New().String()
uploadTestData(client, srcBucketName, testFileName)

newFileRepo, err := NewFile(true, distBucketName, baseURL.String(), "")
assert.NoError(t, err)

srcURL, _ := url.Parse(fmt.Sprintf("%s/%s/o/%s", baseURL.String(), srcBucketName, testFileName))
uploadedURL, _, err := newFileRepo.UploadAssetFromURL(ctx, srcURL)

assert.NoError(t, err)
assert.Equal(t, fmt.Sprintf("%s/assets/%s", baseURL.String(), path.Base(uploadedURL.Path)), uploadedURL.String())
}
34 changes: 34 additions & 0 deletions server/internal/infrastructure/gcs/testdata/geojson.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
{ "type": "FeatureCollection",
"features": [
{ "type": "Feature",
"geometry": {"type": "Point", "coordinates": [102.0, 0.5]},
"properties": {"prop0": "value0"}
},
{ "type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[102.0, 0.0], [103.0, 1.0], [104.0, 0.0], [105.0, 1.0]
]
},
"properties": {
"prop0": "value0",
"prop1": 0.0
}
},
{ "type": "Feature",
"geometry": {
"type": "Polygon",
"coordinates": [
[ [100.0, 0.0], [101.0, 0.0], [101.0, 1.0],
[100.0, 1.0], [100.0, 0.0] ]
]

},
"properties": {
"prop0": "value0",
"prop1": {"this": "that"}
}
}
]
}
5 changes: 5 additions & 0 deletions server/internal/infrastructure/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ func (f *fileRepo) UploadAsset(ctx context.Context, file *file.File) (*url.URL,
return u, s, nil
}

func (f *fileRepo) UploadAssetFromURL(ctx context.Context, u *url.URL) (*url.URL, int64, error) {
// Note: not implemented
return nil, 0, errors.New("UploadAssetFromURL: not implemented for local file storage")
}

func (f *fileRepo) RemoveAsset(ctx context.Context, u *url.URL) error {
log.Infofc(ctx, "s3: asset deleted: %s", u)

Expand Down
1 change: 1 addition & 0 deletions server/internal/usecase/gateway/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ var (
type File interface {
ReadAsset(context.Context, string) (io.ReadCloser, error)
UploadAsset(context.Context, *file.File) (*url.URL, int64, error)
UploadAssetFromURL(context.Context, *url.URL) (*url.URL, int64, error)
RemoveAsset(context.Context, *url.URL) error

ReadPluginFile(context.Context, id.PluginID, string) (io.ReadCloser, error)
Expand Down
Loading