Skip to content

Commit

Permalink
Eudat ipfs cache is added.
Browse files Browse the repository at this point in the history
  • Loading branch information
avatar-lavventura committed Oct 2, 2018
1 parent a6d0acd commit a0bf6a5
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 44 deletions.
13 changes: 11 additions & 2 deletions Driver.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#!/usr/bin/env python

import owncloud
from subprocess import call
import sys, os, time, subprocess, string, driverFunc, lib, _thread
from colored import stylize
Expand All @@ -25,7 +26,7 @@

web3 = getWeb3()
eBlocBroker = connectEblocBroker(web3)

oc = None
# cmd: ps aux | grep \'[d]riverCancel\' | grep \'python3\' | wc -l
p1 = subprocess.Popen(['ps', 'aux'], stdout=subprocess.PIPE)
#-----------
Expand Down Expand Up @@ -324,10 +325,18 @@ def isDriverOn(): #{
str(loggedJobs[i].args['storageID']), hashlib.md5(userID.encode('utf-8')).hexdigest(),
eBlocBroker, web3)
elif str(loggedJobs[i].args['storageID']) == '1':
if oc is None:
log("Login into owncloud")
with open(lib.EBLOCPATH + '/eudatPassword.txt', 'r') as content_file:
password = content_file.read().strip()
oc = owncloud.Client('https://b2drop.eudat.eu/')
oc.login('5f0db7e4-3078-4988-8fa5-f066984a8a97', password) # Unlocks EUDAT account
password = None

log("New job has been received. EUDAT call |" + time.ctime(), "green")
driverEudat(loggedJobs[i].args['jobKey'], str(loggedJobs[i].args['index']), userInfo[4],
hashlib.md5(userID.encode('utf-8')).hexdigest(),
eBlocBroker, web3)
eBlocBroker, web3, oc)
#thread.start_new_thread(driverFunc.driverEudat, (loggedJobs[i].args['jobKey'], str(loggedJobs[i].args['index'])))
elif str(loggedJobs[i].args['storageID']) == '2':
log("New job has been received. IPFS with miniLock call |" + time.ctime(), "green")
Expand Down
83 changes: 41 additions & 42 deletions driverEudat.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python

import owncloud, hashlib, getpass, sys, os, time, subprocess, lib
import hashlib, getpass, sys, os, time, subprocess, lib
from subprocess import call
import os.path
from colored import stylize
Expand All @@ -14,6 +14,8 @@
storageIDGlobal = None
cacheTypeGlobal = None
shareTokenGlobal = '-1'
eudatFolderType = None


# Paths===================================================
ipfsHashes = lib.PROGRAM_PATH
Expand Down Expand Up @@ -88,27 +90,15 @@ def cache(userID): #{
else:
eudatDownloadFolder(globalCacheFolder, cachedFolder)
elif cacheTypeGlobal is 'ipfs':
log('Adding from owncloud mount point into IPFS...', 'blue')
ipfsHash = subprocess.check_output(['ipfs', 'add', lib.OC + '/' + jobKeyGlobal + '/' + jobKeyGlobal + '.tar.gz']).decode('utf-8').strip()
return True, ipfsHash.split()[1]
return True, ''
#}

# Assume job is sent and .tar.gz file
def eudatDownloadFolder(resultsFolderPrev, resultsFolder): #{
'''
# cmd: unzip -l $resultsFolder/output.zip | grep $eudatFolderName/run.sh
# Checks does zip contains run.sh file
p1 = subprocess.Popen(['unzip', '-l', resultsFolder + '/output.zip'], stdout=subprocess.PIPE)
#-----------
p2 = subprocess.Popen(['grep', eudatFolderName + '/run.sh'], stdin=p1.stdout, stdout=subprocess.PIPE)
p1.stdout.close()
#-----------
out = p2.communicate()[0].decode('utf-8').strip()
if not '/run.sh' in out:
log("Error: Folder does not contain run.sh file.")
return
'''

global eudatFolderType
# Downloads shared file as .zip, much faster.
# cmd: wget -4 -o /dev/stdout https://b2drop.eudat.eu/s/$shareToken/download --output-document=$resultsFolderPrev/output.zip
ret = subprocess.check_output(['wget', '-4', '-o', '/dev/stdout', 'https://b2drop.eudat.eu/s/' + shareTokenGlobal +
Expand All @@ -121,10 +111,22 @@ def eudatDownloadFolder(resultsFolderPrev, resultsFolder): #{
log(ret)

time.sleep(0.25)
if os.path.isfile(resultsFolderPrev + '/output.zip'): #{
if os.path.isfile(resultsFolderPrev + '/output.zip'):
'''
# cmd: unzip -l $resultsFolder/output.zip | grep $eudatFolderName/run.sh
# Checks does zip contains run.sh file
p1 = subprocess.Popen(['unzip', '-l', resultsFolderPrev + '/output.zip'], stdout=subprocess.PIPE)
#-----------
p2 = subprocess.Popen(['grep', jobKeyGlobal + '.tar.gz'], stdin=p1.stdout, stdout=subprocess.PIPE)
p1.stdout.close()
#-----------
out = p2.communicate()[0].decode('utf-8').strip()
if jobKeyGlobal + '.tar.gz' in out:
eudatFolderType = 'tar.gz'
'''
subprocess.run(['unzip', '-jo', resultsFolderPrev + '/output.zip', '-d', resultsFolderPrev, '-x', '*result-*.tar.gz'])
subprocess.run(['rm', '-f', resultsFolderPrev + '/output.zip'])
#}

'''
if glob.glob(resultsFolder + '/*.tar.gz'): #{ check file ending in .tar.gz exist
Expand All @@ -141,22 +143,17 @@ def eudatDownloadFolder(resultsFolderPrev, resultsFolder): #{
'''
#}


def eudatGetShareToken(fID): #{
global cacheTypeGlobal
# Checks already shared or not
# TODO: store shareToken id with jobKey in some file, later do: oc.decline_remote_share(<share_id>) to cancel shared folder
if cacheTypeGlobal is 'ipfs' and os.path.isdir(lib.OC + '/' + jobKeyGlobal):
log('Eudat share is already accepted...', 'green')
# TODO: store shareToken id with jobKey in some file, later do: oc.decline_remote_share(int(<share_id>)) to cancel shared folder at endCode or after some time later
if os.path.isdir(lib.OC + '/' + jobKeyGlobal): # and cacheTypeGlobal is 'ipfs'
log('Eudat shared folder is already accepted and exist on Eudat mounted folder...', 'green')
if cacheTypeGlobal is 'local':
cacheTypeGlobal = 'ipfs'
return True

global shareTokenGlobal
with open(lib.EBLOCPATH + '/eudatPassword.txt', 'r') as content_file:
password = content_file.read().strip()

log("Login into owncloud")
oc = owncloud.Client('https://b2drop.eudat.eu/')
oc.login('5f0db7e4-3078-4988-8fa5-f066984a8a97', password) # Unlocks EUDAT account
password = None
global oc, shareTokenGlobal
shareList = oc.list_open_remote_share()

acceptFlag = 0
Expand All @@ -175,18 +172,19 @@ def eudatGetShareToken(fID): #{
log("Found. InputId=" + inputID + " |ShareToken: " + shareTokenGlobal)
if cacheTypeGlobal is 'ipfs': #{
val = oc.accept_remote_share(int(inputID));
print(val) #delete
log('Sleeping 3 seconds for accepted folder to emerger on mounted Eudat folder...')
time.sleep(3)

tryCount = 0;
while True: #{
if tryCount is 5:
log('Mounted Eudat does not see shared folder\'s path.', 'red')
return False
if os.path.isdir(lib.OC + '/' + jobKeyGlobal): # Checking is shared file emerged on mounted owncloud
break
time.sleep(10)
try:
oc.list(jobKeyGlobal) # jobKey is valid folder under owncloud cloud
break
except:
print('Remote share did not accepted yet...')
tryCount += 1
log('Sleeping 10 seconds...')
time.sleep(10)
#}
#}
break
Expand All @@ -199,18 +197,20 @@ def eudatGetShareToken(fID): #{
return True
#}

def driverEudat(jobKey, index, fID, userID, eBlocBroker, web3): #{
def driverEudat(jobKey, index, fID, userID, eBlocBroker, web3, ocIn): #{
global jobKeyGlobal
global indexGlobal
global storageIDGlobal
global shareTokenGlobal
global cacheTypeGlobal

global oc

jobKeyGlobal = jobKey
indexGlobal = index
storageIDGlobal = '1'
cacheTypeGlobal = 'ipfs'

cacheTypeGlobal = 'local'
oc = ocIn

log("key: " + jobKey)
log("index: " + index)

Expand All @@ -227,8 +227,7 @@ def driverEudat(jobKey, index, fID, userID, eBlocBroker, web3): #{
if not os.path.isdir(resultsFolderPrev): # If folder does not exist
os.makedirs(resultsFolderPrev)
os.makedirs(resultsFolder)



if cacheTypeGlobal is 'local':
# Untar cached tar file into local directory
subprocess.run(['tar', '-xf', lib.PROGRAM_PATH + '/' + userID + '/cache' + '/' + jobKeyGlobal + '.tar.gz', '--strip-components=1', '-C', resultsFolder])
Expand Down

0 comments on commit a0bf6a5

Please sign in to comment.