Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove unused tasks Import and Transcode #215

Merged
merged 3 commits into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions api/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,3 @@ func respondJson(rw http.ResponseWriter, status int, response interface{}) {
glog.Errorf("Error writing response. err=%q, response=%+v", err, response)
}
}

func nonNilErrs(errs ...error) []error {
var nonNil []error
for _, err := range errs {
if err != nil {
nonNil = append(nonNil, err)
}
}
return nonNil
}
185 changes: 86 additions & 99 deletions task/import.go → task/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,98 +4,24 @@ import (
"bytes"
"context"
"crypto/sha256"
"encoding/json"
"fmt"
"io"
"mime"
"net/http"
"net/url"
"os"
"path"
"strings"
"sync/atomic"

"github.com/golang/glog"
api "github.com/livepeer/go-api-client"
"github.com/livepeer/go-api-client"
"github.com/livepeer/go-api-client/logs"
"github.com/livepeer/go-tools/drivers"
"github.com/livepeer/livepeer-data/pkg/data"
"github.com/livepeer/task-runner/webcrypto"
)

const IPFS_PREFIX = "ipfs://"
const ARWEAVE_PREFIX = "ar://"

type ImportTaskConfig struct {
// Ordered list of IPFS gateways (includes /ipfs/ suffix) to import assets from
ImportIPFSGatewayURLs []*url.URL
}

func TaskImport(tctx *TaskContext) (*TaskHandlerOutput, error) {
var (
ctx = tctx.Context
playbackID = tctx.OutputAsset.PlaybackID
params = *tctx.Task.Params.Import
osSess = tctx.outputOS // Import deals with outputOS only (URL -> ObjectStorage)
vodDecryptPrivateKey = tctx.VodDecryptPrivateKey
)
filename, size, contents, err := getFile(ctx, osSess, tctx.ImportTaskConfig, params, vodDecryptPrivateKey)
if err != nil {
return nil, err
}
defer contents.Close()

// Download the file to local disk (or memory).
input := tctx.Progress.TrackReader(contents, size, 0.09)
sizeInt := int64(size)
sourceFile, err := readFile(filename, &sizeInt, input)
if err != nil {
return nil, err
}
defer sourceFile.Close()

// Probe metadata from the source file and save it to object store.
input = tctx.Progress.TrackReader(sourceFile, size, 0.11)
isRecording := params.RecordedSessionID != ""
metadata, err := Probe(ctx, tctx.OutputAsset.ID, filename, input, !isRecording)
if err != nil {
return nil, err
}
metadataFilePath, _, err := saveMetadataFile(ctx, osSess, playbackID, metadata)
if err != nil {
return nil, err
}

// Save source file to object store.
_, err = sourceFile.Seek(0, io.SeekStart)
if err != nil {
return nil, fmt.Errorf("error seeking to start of source file: %w", err)
}
input = tctx.Progress.TrackReader(sourceFile, size, 0.2)
fullPath := videoFileName(playbackID)
videoFilePath, err := osSess.SaveData(ctx, fullPath, input, nil, fileUploadTimeout)
if err != nil {
return nil, fmt.Errorf("error uploading file=%q to object store: %w", fullPath, err)
}
glog.Infof("Saved file=%s to url=%s", fullPath, videoFilePath)

_, err = sourceFile.Seek(0, io.SeekStart)
if err != nil {
return nil, fmt.Errorf("error seeking to start of source file: %w", err)
}
playbackRecordingID, err := prepareImportedAsset(tctx, metadata, sourceFile)
if err != nil {
return nil, fmt.Errorf("error preparing asset: %w", err)
}
assetSpec := *metadata.AssetSpec
assetSpec.PlaybackRecordingID = playbackRecordingID
return &TaskHandlerOutput{
TaskOutput: &data.TaskOutput{Import: &data.UploadTaskOutput{
VideoFilePath: videoFilePath,
MetadataFilePath: metadataFilePath,
AssetSpec: assetSpec,
}}}, nil
}

func getFile(ctx context.Context, osSess drivers.OSSession, cfg ImportTaskConfig, params api.UploadTaskParams, vodDecryptPrivateKey string) (name string, size uint64, content io.ReadCloser, err error) {
func getFile(ctx context.Context, osSess drivers.OSSession, cfg UploadTaskConfig, params api.UploadTaskParams, vodDecryptPrivateKey string) (name string, size uint64, content io.ReadCloser, err error) {
name, size, content, err = getFileRaw(ctx, osSess, cfg, params)
if err != nil || !isEncryptionEnabled(params) {
return
Expand All @@ -116,7 +42,7 @@ func getFile(ctx context.Context, osSess drivers.OSSession, cfg ImportTaskConfig
return name, size, decrypted, nil
}

func getFileRaw(ctx context.Context, osSess drivers.OSSession, cfg ImportTaskConfig, params api.UploadTaskParams) (name string, size uint64, content io.ReadCloser, err error) {
func getFileRaw(ctx context.Context, osSess drivers.OSSession, cfg UploadTaskConfig, params api.UploadTaskParams) (name string, size uint64, content io.ReadCloser, err error) {
if upedObjKey := params.UploadedObjectKey; upedObjKey != "" {
// TODO: We should simply "move" the file in case of direct import since we
// know the file is already in the object store. Independently, we also have
Expand Down Expand Up @@ -185,19 +111,6 @@ func getFileWithUrl(ctx context.Context, url string) (name string, size uint64,
return filename(req, resp), size, resp.Body, nil
}

func prepareImportedAsset(tctx *TaskContext, metadata *FileMetadata, sourceFile io.ReadSeekCloser) (string, error) {
if sessID := tctx.Params.Import.RecordedSessionID; sessID != "" {
return sessID, nil
}

playbackRecordingID, err := Prepare(tctx, metadata.AssetSpec, sourceFile)
if err != nil {
glog.Errorf("Error preparing file assetId=%s taskType=import err=%q", tctx.OutputAsset.ID, err)
return "", err
}
return playbackRecordingID, nil
}

func filename(req *http.Request, resp *http.Response) string {
contentDisposition := resp.Header.Get("Content-Disposition")
_, params, _ := mime.ParseMediaType(contentDisposition)
Expand All @@ -210,15 +123,89 @@ func filename(req *http.Request, resp *http.Response) string {
return ""
}

func saveMetadataFile(ctx context.Context, osSess drivers.OSSession, playbackID string, metadata interface{}) (string, string, error) {
path := metadataFileName(playbackID)
raw, err := json.Marshal(metadata)
func readFileToMemory(r io.Reader) (io.ReadSeekCloser, error) {
fileInMem, err := io.ReadAll(r)
if err != nil {
return "", "", fmt.Errorf("error marshaling file metadat: %w", err)
return nil, err
}
fullPath, err := osSess.SaveData(ctx, path, bytes.NewReader(raw), nil, fileUploadTimeout)
return nopCloser{bytes.NewReader(fileInMem)}, nil
}

type autodeletingFile struct {
*os.File
}

func (adf *autodeletingFile) Reader() io.ReadSeekCloser {
adf.Seek(0, io.SeekStart)
return adf
}

func (adf *autodeletingFile) Close() error {
err := adf.File.Close()
os.Remove(adf.File.Name())
return err
}

func getTempFile(size int64) (*os.File, error) {
file, err := os.CreateTemp("", "transcode")
if err != nil {
return "", "", fmt.Errorf("error saving metadata file: %w", err)
glog.Errorf("Error creating temporary file err=%v", err)
return nil, err
}
glog.Infof("Created temporary file name=%s", file.Name())
if size > 0 {
offset, err := file.Seek(size, io.SeekStart)
if err != nil || offset != size {
os.Remove(file.Name())
glog.Errorf("Error creating temporary file name=%s with size=%d offset=%d err=%v", file.Name(), size, offset, err)
return nil, err
}
file.Seek(0, io.SeekStart)
}
return fullPath, path, nil
return file, nil
}

func readFile(name string, sizePtr *int64, content io.Reader) (io.ReadSeekCloser, error) {
var size int64
if sizePtr != nil {
size = *sizePtr
}
glog.Infof("Source file name=%s size=%d", name, size)
if size > 0 && size < maxFileSizeForMemory {
// use memory
return readFileToMemory(content)
}
if file, err := getTempFile(size); err != nil {
return readFileToMemory(content)
} else {
if _, err = file.ReadFrom(content); err != nil {
file.Close()
os.Remove(file.Name())
return nil, err
}
file.Seek(0, io.SeekStart)
return &autodeletingFile{file}, nil
}
}

type Accumulator struct {
size uint64
}

func NewAccumulator() *Accumulator {
return &Accumulator{}
}

func (a *Accumulator) Size() uint64 {
return atomic.LoadUint64(&a.size)
}

func (a *Accumulator) Accumulate(size uint64) {
atomic.AddUint64(&a.size, size)
}

type nopCloser struct {
*bytes.Reader
}

func (nopCloser) Close() error { return nil }
4 changes: 0 additions & 4 deletions task/paths.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@ func videoFileName(playbackID string) string {
return path.Join(playbackID, "video")
}

func metadataFileName(playbackID string) string {
return path.Join(playbackID, "video.json")
}

func extractOSUriFilePath(osUri, playbackID string) (string, error) {
u, err := url.Parse(osUri)
if err != nil {
Expand Down
Loading