diff --git a/.semver b/.semver index 307ef8e2..b0e259a0 100644 --- a/.semver +++ b/.semver @@ -1,5 +1,5 @@ --- :major: 2 :minor: 3 -:patch: 5 +:patch: 7 :special: '' diff --git a/broker/Driver.py b/broker/Driver.py index 0e7051a1..79c916f1 100644 --- a/broker/Driver.py +++ b/broker/Driver.py @@ -138,7 +138,7 @@ def tools(bn): check_output, gdrive_gmail = gdrive.check_gdrive_about(gmail) if not check_output: log( - f"E: provider's registered gmail=[m]{gmail}[/m] does not match with the set gdrive's gmail=[m]{gdrive_gmail}[/m]", + f"E: provider's registered gmail=[m]{gmail}[/m] does not match with the already set gdrive's gmail=[m]{gdrive_gmail}[/m]", is_code=True, h=False, ) diff --git a/broker/_utils/tools.py b/broker/_utils/tools.py index 1dd3daad..48ed155a 100644 --- a/broker/_utils/tools.py +++ b/broker/_utils/tools.py @@ -696,3 +696,12 @@ def gdrive_about_user() -> str: return ret[1] raise Exception() + + +def get_online_idle_core(ip) -> int: + while True: + with suppress(Exception): + return int(run(["curl", "-s", f"http://{ip}:5000"])) + + log("|", end="") + time.sleep(2) diff --git a/broker/cfg.py b/broker/cfg.py index b0c9fd90..0d021ecf 100644 --- a/broker/cfg.py +++ b/broker/cfg.py @@ -17,8 +17,9 @@ IS_FULL_TEST = False # check whether the full-long test is applied IS_FIRST_CYCLE = True BERG_CMPE_IP = "79.123.176.66" # "berg-cmpe-boun.duckdns.org" // may be down once in a while -NETWORK_ID = "bloxberg_core" # "bloxberg" -# NETWORK_ID = "bloxberg" # "bloxberg" +NETWORK_ID = "bloxberg" +# NETWORK_ID = "bloxberg_boun" +# NETWORK_ID = "bloxberg_core" # NETWORK_ID = "sepolia" TEST_JOB_COUNTER = 0 diff --git a/broker/eblocbroker_scripts/Contract.py b/broker/eblocbroker_scripts/Contract.py index 9183b999..9a240d2a 100644 --- a/broker/eblocbroker_scripts/Contract.py +++ b/broker/eblocbroker_scripts/Contract.py @@ -26,9 +26,9 @@ from brownie.network.transaction import TransactionReceipt if cfg.NETWORK_ID == "sepolia": - GAS_PRICE = 5 + GAS_PRICE = 5.0 else: #: for bloxberg - GAS_PRICE = 0.25 # was 1.21 Gwei + GAS_PRICE = 0.004 # was 1.21 Gwei EXIT_AFTER = 1000 # seconds @@ -168,6 +168,8 @@ def _wait_for_transaction_receipt(self, tx_hash, compact=False, is_verbose=False tx_receipt = cfg.w3.eth.get_transaction_receipt(tx_hash) except TransactionNotFound as e: log(f"warning: TransactionNotFound tx={tx_hash} {e}") + if attempt == 5: + raise Exception("TransactionNotFound") except Exception as e: print_tb(e) tx_receipt = None @@ -439,7 +441,6 @@ def timeout_wrapper(self, method, contract, *args): log(f"warning: Tx: {e}", is_code=True) else: log(f"E: Tx: {e}") - breakpoint() # DEBUG if ("Try increasing the gas price" in str(e)) or ( "Transaction with the same hash was already imported." in str(e) @@ -512,6 +513,7 @@ def _submit_job(self, required_confs, requester, job_price, *args) -> "Transacti except Exception as e: raise Exception(f"Approve transaction is failed, {e}") + time.sleep(2) self.gas_price = GAS_PRICE method_name = "submitJob" idx = 0 @@ -566,6 +568,7 @@ def _submit_job(self, required_confs, requester, job_price, *args) -> "Transacti continue + time.sleep(2) idx += 1 raise Exception("No valid Tx receipt for 'submitJob' is generated") diff --git a/broker/eblocbroker_scripts/contract.yaml b/broker/eblocbroker_scripts/contract.yaml index 6b6f2802..5751c4f8 100644 --- a/broker/eblocbroker_scripts/contract.yaml +++ b/broker/eblocbroker_scripts/contract.yaml @@ -1,5 +1,5 @@ networks: - active_network: bloxberg # sepolia # bloxberg change me at cfg.py file as well + active_network: bloxberg # sepolia | change me at cfg.py file as well bloxberg: project_dir: ~/ebloc-broker/contract eBlocBroker: @@ -9,11 +9,11 @@ networks: address: "0xbCc68aEa77cf128A7ae9eB7588f363Efc76349a7" tx_hash: "0x99004818f181cac56c49d502f871db1a7ecb03a6a6f714a7072ee79f8ad025a6" ResearchCertificate: - address: "0xD6397bf1A42f01C753dcD638C6eD3486963A09eD" - tx_hash: "0x8cf7bab8993c192c0de1b56aa8f9f0b706cbd341b35b425d500803509026c653" + address: '0xB17dE8C64DC6BB678043457F1A7d992F583A37A0' + tx_hash: '0x22dc1725d46c0844e6fc198e37c3c72d55e151aa192adad959c05848a86bb760' AutonomousSoftwareOrg: - address: "0x06EE74fa579C771f182624a3f71845697270A9fF" - tx_hash: "0xe76315a96d47584b7da739a3a4d96e1bf64fbe913f35ed85ac4a1e7c4ca7c14e" + address: '0x450eb7EC90418E9D768f85Ffa1b29187AD57251a' + tx_hash: '0xc07dc41d0e435728a5e405a8697470c57db19db26a16aac6c97bdb1e81110359' sepolia: project_dir: ~/ebloc-broker/contract name: sepolia @@ -31,18 +31,18 @@ networks: tx_hash: "0x596adaa251729551525850d913424946f3096821fc8a12e6bb5de0c6094cd29a" bloxberg-docker: project_dir: /workspace/ebloc-broker/contract - # eBlocBroker: - # address: '0x35F8ecb17a9EB63cB12cc0D15404A41946371F00' - # tx_hash: '0xa3b58df370bce393762ed92ec2d7dcd6d54e53381d42f76c6a18f9d4d1b2662c' - # USDTmy: - # address: '0x499331123D9861A8A812ccFB81552a4A00786ee2' - # tx_hash: '0x88780cb86701fab9df24c3d1808f21bc2395d70338a527b767942d353fe9299e' + # eBlocBroker: + # address: '0x35F8ecb17a9EB63cB12cc0D15404A41946371F00' + # tx_hash: '0xa3b58df370bce393762ed92ec2d7dcd6d54e53381d42f76c6a18f9d4d1b2662c' + # USDTmy: + # address: '0x499331123D9861A8A812ccFB81552a4A00786ee2' + # tx_hash: '0x88780cb86701fab9df24c3d1808f21bc2395d70338a527b767942d353fe9299e' networks_old: bloxberg: project_dir: ~/ebloc-broker/contract eBlocBroker: - address: "0x81aF3aeACbE068e7513443572d998bC2A20A83C1" - tx_hash: "0x39b3fb95d0258e42f71c5da835a32e7a197cab4eee2cea9373d627d2f5593d9d" - USDTmy: # dummy - address: "0x56B59aFC0Df1fcc1B42cf6135f94604cf7F7eb77" - tx_hash: "0x3a47a7971f80bf1a320a8409951a7830bf86bffb5005a65df4973c8943ec45f5" + address: '0x81aF3aeACbE068e7513443572d998bC2A20A83C1' + tx_hash: '0x39b3fb95d0258e42f71c5da835a32e7a197cab4eee2cea9373d627d2f5593d9d' + USDTmy: # dummy + address: '0x56B59aFC0Df1fcc1B42cf6135f94604cf7F7eb77' + tx_hash: '0x3a47a7971f80bf1a320a8409951a7830bf86bffb5005a65df4973c8943ec45f5' diff --git a/broker/eblocbroker_scripts/g.png b/broker/eblocbroker_scripts/g.png new file mode 100644 index 00000000..55146798 Binary files /dev/null and b/broker/eblocbroker_scripts/g.png differ diff --git a/broker/eblocbroker_scripts/original_from_bloxberg.gv b/broker/eblocbroker_scripts/original_from_bloxberg.gv new file mode 100644 index 00000000..4eb19d11 --- /dev/null +++ b/broker/eblocbroker_scripts/original_from_bloxberg.gv @@ -0,0 +1,149 @@ +digraph G { + size ="40"; + // nodesep=.05; + rankdir=LR; + compound=true; + forcelabels=true; + 2; + "1.1"; + 3; + 4; + 6; + "5.2"; + 8; + "7.3"; + 9; + 11; + "10.4"; + 12; + 13; + 14; + "15.5"; + 16; + 17; + "18.6"; + 19; + "20.7"; + 21; + "22.8"; + 23; + 24; + "25.9"; + 26; + 27; + "28.10"; + 29; + 30; + 31; + "32.11"; + 33; + 34; + "35.12"; + "36.13"; + 37; + "38.14"; + 39; + "40.15"; + 41; + 42; + "43.16"; + 44; + 45; + 46; + "47.17"; + 48; + 49; + "50.18"; + 51; + 52; + 53; + "54.19"; + 55; + "56.20"; + 57; + 2 -> "1.1"; + "1.1" -> 3; + "1.1" -> 4; + 3 -> "15.5"; + 4 -> "5.2"; + 4 -> "15.5"; + 4 -> "28.10"; + 6 -> "5.2"; + 8 -> "7.3"; + 8 -> "18.6"; + 8 -> "28.10"; + "7.3" -> 9; + 9 -> "18.6"; + 11 -> "10.4"; + "10.4" -> 12; + "10.4" -> 13; + "10.4" -> 14; + 13 -> "20.7"; + 13 -> "22.8"; + 14 -> "22.8"; + "15.5" -> 16; + "15.5" -> 17; + 16 -> "36.13"; + 16 -> "50.18"; + 17 -> "28.10"; + 17 -> "36.13"; + "18.6" -> 19; + 19 -> "20.7"; + 19 -> "25.9"; + 19 -> "32.11"; + "20.7" -> 21; + 21 -> "25.9"; + 21 -> "32.11"; + 21 -> "35.12"; + "22.8" -> 23; + "22.8" -> 24; + 23 -> "38.14"; + 24 -> "38.14"; + "25.9" -> 26; + "25.9" -> 27; + 26 -> "32.11"; + 27 -> "32.11"; + 27 -> "36.13"; + "28.10" -> 29; + "28.10" -> 30; + "28.10" -> 31; + 29 -> "43.16"; + 30 -> "43.16"; + 31 -> "43.16"; + "32.11" -> 33; + "32.11" -> 34; + 33 -> "38.14"; + 33 -> "40.15"; + 34 -> "35.12"; + 34 -> "36.13"; + 34 -> "40.15"; + 34 -> "50.18"; + 34 -> "56.20"; + "36.13" -> 37; + 37 -> "50.18"; + "38.14" -> 39; + 39 -> "40.15"; + "40.15" -> 41; + "40.15" -> 42; + 42 -> "54.19"; + 42 -> "56.20"; + "43.16" -> 44; + "43.16" -> 45; + "43.16" -> 46; + 44 -> "47.17"; + 45 -> "47.17"; + 46 -> "47.17"; + "47.17" -> 48; + "47.17" -> 49; + 49 -> "50.18"; + 49 -> "54.19"; + "50.18" -> 51; + "50.18" -> 52; + "50.18" -> 53; + 51 -> "54.19"; + 51 -> "56.20"; + 52 -> "54.19"; + 53 -> "54.19"; + "54.19" -> 55; + "56.20" -> 57; +} diff --git a/broker/eblocbroker_scripts/roc.py b/broker/eblocbroker_scripts/roc.py index baa27913..3c986431 100755 --- a/broker/eblocbroker_scripts/roc.py +++ b/broker/eblocbroker_scripts/roc.py @@ -1,13 +1,60 @@ #!/usr/bin/env python3 +import networkx as nx from broker._utils._log import log from broker import cfg, config from broker.config import env, setup_logger from broker.utils import print_tb + Ebb = cfg.Ebb logging = setup_logger() +G = nx.Graph() + + +# G.add_edge("2", "1.1") + +""" +""" + + +def roc_log(): + output = Ebb.get_block_number() + event_filter = config.auto.events.LogSoftwareExecRecord.createFilter( + # argument_filters={"to": str(provider)}, + fromBlock=23066710, + toBlock="latest", + ) + for logged_receipt in event_filter.get_all_entries(): + log(logged_receipt.args) + sw = logged_receipt.args.sourceCodeHash + sw = sw.hex()[32:64] + token_index = config.roc.getTokenIndex(sw) + index = logged_receipt.args.index + sw_str = f"{token_index}.{index}" + # + input_hash_bytes = logged_receipt.args.inputHash + output_hash_bytes = logged_receipt.args.outputHash + + input_hash_token_index = [] + for inp in input_hash_bytes: + _hash = inp.hex()[32:64] + input_hash_token_index.append(config.roc.getTokenIndex(_hash)) + + output_hash_token_index = [] + for out in output_hash_bytes: + _hash = out.hex()[32:64] + output_hash_token_index.append(config.roc.getTokenIndex(_hash)) + + for inp in input_hash_token_index: + G.add_edge(inp, sw_str) + + for out in output_hash_token_index: + G.add_edge(sw_str, out) + + nx.nx_pydot.write_dot(G, "original_from_bloxberg.gv") + def roc(): output = Ebb.get_block_number() @@ -21,11 +68,10 @@ def roc(): log(logged_receipt.args) token_id = logged_receipt.args["tokenId"] - breakpoint() # DEBUG - def main(): - roc() + # roc() + roc_log() if __name__ == "__main__": diff --git a/broker/flask/README.org b/broker/flask/README.org index a81bebca..1dcda3ab 100644 --- a/broker/flask/README.org +++ b/broker/flask/README.org @@ -14,3 +14,13 @@ hypercorn app_ebb:app -b 127.0.0.1:8000 --reload curl -v 127.0.0.1:8000 curl -X POST http://127.0.0.1:8000/webhook -d "0x29e613b04125c16db3f3613563bfdd0ba24cb629 0000-0001-7642-0552" #+end_src + +--------------------------------------------- + +#+begin_src bash +export FLASK_APP=hello +export FLASK_ENV=development +flask run --host=0.0.0.0 + +curl http://192.168.1.117:5000/ +#+end_src diff --git a/broker/flask/hello.py b/broker/flask/hello.py new file mode 100644 index 00000000..ca94fea5 --- /dev/null +++ b/broker/flask/hello.py @@ -0,0 +1,16 @@ +from flask import Flask +from broker.utils import run + +app = Flask(__name__) + + +@app.route("/") +def hello(): + core_info = run(["sinfo", "-h", "-o%C"]).split("/") + + # allocated_cores = int(core_info[0]) + idle_cores = int(core_info[1]) + # other_cores = int(core_info[2]) + # total_cores = int(core_info[3]) + + return str(idle_cores) diff --git a/broker/flask/run.sh b/broker/flask/run.sh new file mode 100755 index 00000000..6be90ecc --- /dev/null +++ b/broker/flask/run.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +export FLASK_APP=hello +export FLASK_ENV=development +flask run --host=0.0.0.0 diff --git a/broker/flask/run_app.sh b/broker/flask/run_app.sh index 36b164cf..0d605feb 100755 --- a/broker/flask/run_app.sh +++ b/broker/flask/run_app.sh @@ -11,6 +11,7 @@ countdown () { num=$(ps axuww | grep -E "[h]ypercorn app_ebb:app" | \ grep -v -e "grep" -e "emacsclient" -e "flycheck_" | wc -l) + if [ $num -ge 1 ]; then echo "warning: app_ebb is already running" exit diff --git a/broker/imports.py b/broker/imports.py index 05412c67..a60c0589 100644 --- a/broker/imports.py +++ b/broker/imports.py @@ -133,6 +133,7 @@ def connect_to_eblocbroker() -> None: env.ROC_CONTRACT_ADDRESS, abi=read_abi_file(env.EBB_SCRIPTS / "abi_ResearchCertificate.json") ) # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + """ from brownie import project as pro_auto _project = pro_auto.load("/home/alper/git/AutonomousSoftwareOrg") # TODO: add as sub-module @@ -141,6 +142,8 @@ def connect_to_eblocbroker() -> None: config._auto = cfg.w3.eth.contract( env.AUTO_CONTRACT_ADDRESS, abi=read_abi_file(env.EBB_SCRIPTS / "abi_AutonomousSoftwareOrg.json") ) + """ + """ depreciated from brownie import project as pro diff --git a/broker/ipfs/job_workflow.yaml b/broker/ipfs/job_workflow.yaml index b4726d43..c2853bc0 100644 --- a/broker/ipfs/job_workflow.yaml +++ b/broker/ipfs/job_workflow.yaml @@ -1,20 +1,23 @@ config: requester_address: '0x72c1a89ff3606aa29686ba8d29e28dccff06430a' - provider_address: '0xe2e146d6B456760150d78819af7d276a1223A6d4' + provider_address: '0x29e613B04125c16db3f3613563bFdd0BA24Cb629' search_cheapest_provider: false source_code: cache_type: public - path: /home/alper/test_eblocbroker/workflow/256_448 + path: /home/alper/test_eblocbroker/workflow/32_56 storage_hours: 0 storage_id: ipfs - dt_in: 372 ## - data_transfer_out: 250 # sum of output weights + dt_in: 0 ## + data_transfer_out: 124 # sum of output weights jobs: job1: cores: 1 - run_time: 4 + run_time: 5 + job2: + cores: 1 + run_time: 5 costs: - '0x29e613B04125c16db3f3613563bFdd0BA24Cb629': 88154 - '0x4934a70Ba8c1C3aCFA72E809118BDd9048563A24': 84600 - '0xe2e146d6B456760150d78819af7d276a1223A6d4': 93120 + '0x29e613B04125c16db3f3613563bFdd0BA24Cb629': 105490 + '0x4934a70Ba8c1C3aCFA72E809118BDd9048563A24': 101000 + '0xe2e146d6B456760150d78819af7d276a1223A6d4': 110700 path: {} diff --git a/broker/ipfs/submit.py b/broker/ipfs/submit.py index cf0c4f13..cb911ab0 100755 --- a/broker/ipfs/submit.py +++ b/broker/ipfs/submit.py @@ -4,7 +4,7 @@ from pathlib import Path from sys import platform from web3.logs import DISCARD - +import time from broker import cfg from broker._utils.tools import _remove, log from broker._utils.web3_tools import get_tx_status @@ -97,7 +97,14 @@ def _ipfs_add(job, target, idx, is_verbose=False): def _submit(provider_addr, job, requester, targets, required_confs): tx_hash = Ebb.submit_job(provider_addr, job.key, job, requester, required_confs) if required_confs >= 1: - tx_receipt = get_tx_status(tx_hash) + while True: + try: + tx_receipt = get_tx_status(tx_hash) + break + except: + time.sleep(2) + tx_hash = Ebb.submit_job(provider_addr, job.key, job, requester, required_confs) + if tx_receipt["status"] == 1: processed_logs = Ebb._eblocbroker.events.LogJob().processReceipt(tx_receipt, errors=DISCARD) try: diff --git a/broker/results/eblocbroker_wf_test-results_October_2023.xlsx b/broker/results/eblocbroker_wf_test-results_October_2023.xlsx index 7107d241..eea21b8c 100644 --- a/broker/results/eblocbroker_wf_test-results_October_2023.xlsx +++ b/broker/results/eblocbroker_wf_test-results_October_2023.xlsx @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:8a145dd06bd1c583b453c5859ba05c22ef710c4f0241432c7fb93e008cc56483 -size 970784 +oid sha256:730c03312bf209ed645d22500423626deb5a26f0c53794f22413b27aa980db55 +size 951949 diff --git a/broker/roc/commit.py b/broker/roc/commit.py index 5837c7ad..99cc703a 100755 --- a/broker/roc/commit.py +++ b/broker/roc/commit.py @@ -10,35 +10,32 @@ import networkx as nx from typing import List -fn = "/home/alper/git/AutonomousSoftwareOrg/graph/original.gv" +fn = "/home/alper/git/AutonomousSoftwareOrg/graph-tools/original.gv" G = nx.drawing.nx_pydot.read_dot(fn) -""" -Ebb = cfg.Ebb -Ebb.get_block_number() -""" data_map = { "17.2": "9ea971a966ec0f612b268d7e089b1f9e", - "10.0": "110e00c9266bf7cb964cd68f5e0a6b96", - "7.0": "3930b695da4c9f46aa0ef0153d8ca288", - "7.1": "8d7506d81cf589cc7603093272b68bef", - "7.2": "b04e7dd0ef6ecd43eb3973b0d6952733", - "7.3": "7080d03c288977b88812b865b761bffa", - "7.4": "84f310b307a08955e01d9c2347adf77e", - "17.1": "0f2290337f9cc909e62e4375a0353d7d", - "17.4": "059dc553c65d19c917e94291016b6714", - "14.0": "468e6561b82c811bb3c2324e2ca0865f", - "14.1": "0e975e19d841b2d5e7facc93eaf3db2e", - "4.0": "c69dc79f17e20e4ba5014f56492cddaf", - "4.1": "c2fee597bc585140bb2f214c6f19d89e", - "17.3": "42742ab7bfc995a7fc3490e663705590", - "17.0": "5a259600f0c0a7984f380cd00b89d1fe", - "22.3": "feb2c4b74f898c2064707843e0585fbe", - "19.0": "be293b48fa60443023e308bea4ec4df8", - "22.0": "a7e8fe2dcfeb2f4ddce864682133b60e", - "22.1": "a715e0a320e144335bb0974acdbe3847", - "22.2": "be038e12f93a275eb013192ab896c8a2", + "10.1": "110e00c9266bf7cb964cd68f5e0a6b96", + "7.16": "3930b695da4c9f46aa0ef0153d8ca288", + "7.9": "8d7506d81cf589cc7603093272b68bef", + "7.3": "b04e7dd0ef6ecd43eb3973b0d6952733", + "7.4": "7080d03c288977b88812b865b761bffa", + "7.5": "84f310b307a08955e01d9c2347adf77e", + "17.17": "0f2290337f9cc909e62e4375a0353d7d", + "17.14": "059dc553c65d19c917e94291016b6714", + "14.6": "468e6561b82c811bb3c2324e2ca0865f", + "14.10": "0e975e19d841b2d5e7facc93eaf3db2e", + "4.7": "b69dc79f17e20e4ba5014f56492cddaf", # c + "4.8": "c2fee597bc585140bb2f214c6f19d89e", + "17.12": "42742ab7bfc995a7fc3490e663705590", + "17.11": "5a259600f0c0a7984f380cd00b89d1fe", + "22.15": "feb2c4b74f898c2064707843e0585fbe", + "19.13": "be293b48fa60443023e308bea4ec4df8", + "22.18": "a7e8fe2dcfeb2f4ddce864682133b60e", + "22.19": "a715e0a320e144335bb0974acdbe3847", + "22.20": "be038e12f93a275eb013192ab896c8a2", + # "1": "7dca945940426af6fb934a8ffb78cc8d", "2": "44f8657617b4d88473d19c3265b8aaa3", "3": "6e006040b06789a03549d4d383ad7d12", @@ -95,7 +92,10 @@ completed_sw = [] -def add_software_exec_record(sw, index, input_hashes, output_hashes): +def add_software_exec_record(sw, input_hashes, output_hashes): + Ebb = cfg.Ebb + Ebb.get_block_number() + fn = env.PROVIDER_ID.lower().replace("0x", "") + ".json" Ebb.brownie_load_account(fn) @@ -103,9 +103,9 @@ def add_software_exec_record(sw, index, input_hashes, output_hashes): log(f"=> sw({sw}) is already created") else: log(f"=> [blue]sw({sw}) to be registered") - config.auto.addSoftwareExecRecord( - sw, index, input_hashes, output_hashes, {"from": env.PROVIDER_ID, "gas": 9900000, "allow_revert": True} - ) + args = {"from": env.PROVIDER_ID, "gas": 9900000, "allow_revert": True} + # tx = config.auto.setNextCounter(sw, args) + tx = config.auto.addSoftwareExecRecord(sw, 0, input_hashes, output_hashes, args) def commit_software(): @@ -129,8 +129,8 @@ def commit_software(): completed_sw.append(sw_node) - #: save to blockchain - add_software_exec_record(data_map[sw_node], int(sw_node.split(".")[1]), input_hashes, output_hashes) + #: save to the blockchain + add_software_exec_record(data_map[sw_node], input_hashes, output_hashes) sw_nodes = [] @@ -138,7 +138,7 @@ def commit_software(): if "." in node: sw_nodes.append(node) -# commit_software() +commit_software() """ order = {} for sw_node in sw_nodes: @@ -149,27 +149,28 @@ def commit_software(): log(sorted_order) """ +#: this will be change in new smart contract sorted_order = { - "10.0": 13, + "10.1": 13, "17.2": 14, - "7.2": 19, - "7.3": 21, - "7.4": 23, - "14.0": 26, - "4.0": 29, - "4.1": 33, - "7.1": 35, - "14.1": 38, - "17.0": 42, - "17.3": 45, - "19.0": 46, - "17.4": 48, - "22.3": 50, - "7.0": 53, - "17.1": 57, - "22.0": 60, - "22.1": 64, - "22.2": 66, + "7.3": 19, + "7.4": 21, + "7.5": 23, + "14.6": 26, + "4.7": 29, + "4.8": 33, + "7.9": 35, + "14.10": 38, + "17.11": 42, + "17.12": 45, + "19.13": 46, + "17.14": 48, + "22.15": 50, + "7.16": 53, + "17.17": 57, + "22.18": 60, + "22.19": 64, + "22.20": 66, } hit_data = {} @@ -183,7 +184,7 @@ def commit_software(): hit_data[succ] = True log(f"{node} => {owned_data}") - # breakpoint() # DEBUG + breakpoint() # DEBUG # set(G.predecessors("17.2")) @@ -196,6 +197,7 @@ def md5_hash(): def commit_hash(_hash): + Ebb = cfg.Ebb roc_num = config.roc.getTokenIndex(_hash) if roc_num == 0: fn = env.PROVIDER_ID.lower().replace("0x", "") + ".json" diff --git a/broker/test_setup_w/README.org b/broker/test_setup_w/README.org index 2bf0a3ef..eae58500 100644 --- a/broker/test_setup_w/README.org +++ b/broker/test_setup_w/README.org @@ -1,13 +1,15 @@ * Random Workflow Generator To run: #+begin_src bash -./generate_w.py +./generate_w.py 6 10 ~/ebloc-broker/broker/ipfs/submit_w.py + +./my_layer_heft.py 6 10 #+end_src -On provider +** On the provider #+begin_src bash -while ~/.ebloc-broker/start.sh; do sleep 15; done +~/.ebloc-broker/broker/start.sh # -while ~/.ebloc-broker/end.sh; do sleep 15; done +~/.ebloc-broker/broker/end.sh #+end_src diff --git a/broker/test_setup_w/generate_w.py b/broker/test_setup_w/generate_w.py index d3d4aebb..821c82e8 100755 --- a/broker/test_setup_w/generate_w.py +++ b/broker/test_setup_w/generate_w.py @@ -49,7 +49,7 @@ def main(): except: pass - print(f"Generate random DAG for {n} {edges}...") + print(f"Generating random DAG for (node, edge) {n} {edges} ...") wf = Workflow() while True: #: to be sure nodes are generated with the exact given node number @@ -66,7 +66,7 @@ def main(): nx.draw_spring(wf.G, with_labels=True) plt.savefig(BASE / "job.png") - base_size = 200 + base_dt_in_size = 200 base_dt_out_size = 250 for i in range(1, job_num + 1): sleep_dur = random.randint(2, 5) # 2 <= x <= 5 @@ -75,7 +75,7 @@ def main(): _job = yaml["config"]["jobs"][f"job{i}"] _job["run_time"] = sleep_dur _job["cores"] = 1 - _job["dt_in"] = base_size + _job["dt_in"] = base_dt_in_size _job["dt_out"] = 0 dt_out = 0 for edge in wf.out_edges(i): diff --git a/broker/test_setup_w/job_workflow.yaml b/broker/test_setup_w/job_workflow.yaml index 0924ea40..7aa096aa 100644 --- a/broker/test_setup_w/job_workflow.yaml +++ b/broker/test_setup_w/job_workflow.yaml @@ -1,19 +1,29 @@ config: - # requester_address: "0x0636278CBD420368b1238ab204b1073df9cC1c5c" - provider_address: '0x29e613B04125c16db3f3613563bFdd0BA24Cb629' + # requester_address: "0x0636278CBD420368b1238ab204b1073df9cC1c5c" + provider_address: '0x4934a70Ba8c1C3aCFA72E809118BDd9048563A24' search_cheapest_provider: false source_code: cache_type: public - path: /home/alper/test_eblocbroker/workflow/256_448 + path: /home/alper/test_eblocbroker/workflow/128_224 + # path: /home/alper/test_eblocbroker/workflow/256_448 storage_hours: 0 storage_id: ipfs - dt_in: 94 ## - data_transfer_out: 250 # sum of output weights + dt_in: 456 ## + data_transfer_out: 250 # sum of output weights jobs: - job154: - cores: 1 - run_time: 2 - job179: + job128: cores: 1 run_time: 4 + job95: + cores: 1 + run_time: 3 + job127: + cores: 1 + run_time: 3 + job101: + cores: 1 + run_time: 3 + job48: + cores: 1 + run_time: 3 dt_out: {} diff --git a/broker/test_setup_w/my_layer_heft.py b/broker/test_setup_w/my_layer_heft.py index b2f25e0b..3c1e0a54 100755 --- a/broker/test_setup_w/my_layer_heft.py +++ b/broker/test_setup_w/my_layer_heft.py @@ -13,7 +13,7 @@ from broker import cfg from broker._utils import _log from broker._utils._log import log -from broker._utils.tools import print_tb +from broker._utils.tools import print_tb, get_online_idle_core from broker._utils.yaml import Yaml from broker.eblocbroker_scripts import Contract from broker.eblocbroker_scripts.job import Job @@ -35,6 +35,13 @@ provider_id["a"] = "0x29e613B04125c16db3f3613563bFdd0BA24Cb629" provider_id["b"] = "0x4934a70Ba8c1C3aCFA72E809118BDd9048563A24" provider_id["c"] = "0xe2e146d6B456760150d78819af7d276a1223A6d4" + +provider_ip = {} +provider_ip["a"] = "192.168.1.117" +provider_ip["b"] = "192.168.1.21" +provider_ip["c"] = "192.168.1.104" + + if len(sys.argv) == 3: n = int(sys.argv[1]) edges = int(sys.argv[2]) @@ -109,6 +116,7 @@ def update_job_stats(self): self.refunded.append(key) + #: save operation is done with open(BASE / "layer_submitted_dict.pkl", "wb") as f: pickle.dump(self.submitted_node_dict, f) @@ -376,7 +384,23 @@ def submit_layering(): continue_flag = True if not continue_flag: - yaml_original["config"]["provider_address"] = provider_id[provider_char] + # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + idle_code = get_online_idle_core(provider_ip[provider_char]) + log(f"provider {provider_char} idle cores: {idle_code}") + if idle_code > 0: + yaml_original["config"]["provider_address"] = provider_id[provider_char] + else: + yaml_original["config"]["provider_address"] = provider_id[provider_char] + for pr in provider_ip: + idle_code = get_online_idle_core(provider_ip[pr]) + if idle_code > 0: + print( + f"#: Load changed to provider={pr} #########################################################################" + ) + yaml_original["config"]["provider_address"] = provider_id[pr] + break + + # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- yaml_original["config"]["source_code"]["path"] = str(BASE) log( f"* w{_idx} => {sorted(partial_layer)} | provider to submit => [bold cyan]{provider_char}[/bold cyan] " diff --git a/broker/test_setup_w/my_scheduler.py b/broker/test_setup_w/my_scheduler.py index c19ebbf9..1e385d5c 100755 --- a/broker/test_setup_w/my_scheduler.py +++ b/broker/test_setup_w/my_scheduler.py @@ -1,6 +1,5 @@ #!/usr/bin/env python3 -# >10 running jobs should be carried to FAILED import datetime import networkx as nx import pickle @@ -15,7 +14,7 @@ from broker import cfg from broker._utils import _log from broker._utils._log import log -from broker._utils.tools import print_tb +from broker._utils.tools import print_tb, get_online_idle_core from broker._utils.yaml import Yaml from broker.eblocbroker_scripts import Contract from broker.eblocbroker_scripts.job import Job @@ -24,6 +23,8 @@ from broker.lib import state from broker.workflow.Workflow import Workflow +#: >10 running jobs should be carried to FAILED + wf = Workflow() _log.ll.LOG_FILENAME = Path.home() / ".ebloc-broker" / "test.log" @@ -50,6 +51,11 @@ provider_id["b"] = "0x4934a70Ba8c1C3aCFA72E809118BDd9048563A24" provider_id["c"] = "0xe2e146d6B456760150d78819af7d276a1223A6d4" +provider_ip = {} +provider_ip["a"] = "192.168.1.117" +provider_ip["b"] = "192.168.1.21" +provider_ip["c"] = "192.168.1.104" + try: with open(BASE / "heft_submitted_dict.pkl", "rb") as f: loaded_dict = pickle.load(f) @@ -151,7 +157,23 @@ def batch_submit(self): G_copy = wf.G.copy() yaml_cfg = Yaml(yaml_fn_wf) yaml_cfg["config"]["source_code"]["path"] = str(BASE) - yaml_cfg["config"]["provider_address"] = provider_id[batch_key] + # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + idle_code = get_online_idle_core(provider_ip[batch_key]) + log(f"provider {batch_key} idle cores: {idle_code}") + if idle_code > 0: + yaml_cfg["config"]["provider_address"] = provider_id[batch_key] + else: + yaml_cfg["config"]["provider_address"] = provider_id[batch_key] + for pr in provider_ip: + idle_code = get_online_idle_core(provider_ip[pr]) + if idle_code > 0: + print( + f"#: Load changed to provider={pr} #########################################################################" + ) + yaml_cfg["config"]["provider_address"] = provider_id[pr] + break + + # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- g_list = [] for node in list(wf.G.nodes): if node != "\\n" and int(node) not in self.batch_to_submit[batch_key]: @@ -239,6 +261,7 @@ def batch_submit(self): except: self.submitted_dict[key] = [int(node)] + #: save operation is done with open(BASE / "heft_submitted_dict.pkl", "wb") as f: pickle.dump(self.submitted_dict, f) diff --git a/broker/test_setup_w/read_txs.py b/broker/test_setup_w/read_txs.py index ac00d1be..4f6bd14b 100755 --- a/broker/test_setup_w/read_txs.py +++ b/broker/test_setup_w/read_txs.py @@ -181,7 +181,7 @@ def read_txs(n, edges, fn): def main(): test = [(16, 28), (32, 56), (64, 112), (128, 224), (256, 448)] - # test = [(16, 28)] + test = [(16, 28)] # test = [(32, 56)] # test = [(64, 112)] # test = [(128, 224)] diff --git a/broker/test_setup_w/read_txs_final_test.py b/broker/test_setup_w/read_txs_final_test.py new file mode 100755 index 00000000..916f5fca --- /dev/null +++ b/broker/test_setup_w/read_txs_final_test.py @@ -0,0 +1,216 @@ +#!/usr/bin/env python3 + +from broker.eblocbroker_scripts import Contract +from broker._utils._log import log +from broker._utils.tools import print_tb +from broker.errors import QuietExit +import pickle +from pathlib import Path +from broker import cfg +from broker.eblocbroker_scripts.utils import Cent + +Ebb: "Contract.Contract" = cfg.Ebb + +is_excel = False +if is_excel: + log( + "type,provider_label,id,sourceCodeHash,index,job_id,received_bn,w_type,elapsed_time,processPayment_tx_hash," + "processPayment_gasUsed,submitJob_tx_hash,submitJob_gas_used,data_transfer_in_to_download,data_transfer_out," + "job_price" + ) + +provider_id = {} +provider_id["0x29e613B04125c16db3f3613563bFdd0BA24Cb629".lower()] = "a" +provider_id["0x4934a70Ba8c1C3aCFA72E809118BDd9048563A24".lower()] = "b" +provider_id["0xe2e146d6B456760150d78819af7d276a1223A6d4".lower()] = "c" + + +def read_txs_layer(n, edges, fn): + BASE = Path.home() / "test_eblocbroker" / "workflow" / f"{n}_{edges}" + try: + with open(BASE / fn, "rb") as f: + loaded_dict = pickle.load(f) + except Exception as e: + print_tb(e) + + name = fn.replace(".pkl", "").replace("submitted_dict_", "") + job_price_sum = 0 + sum_received = 0 + sum_refunded = 0 + total_processpayment_gas = 0 + idx = 0 + total_submitjob_gas = 0 + total_processpayment_gas = 0 + total_refunded = 0 + # for k, v in loaded_dict.items(): + # log(f"{v} => {k}") + + for k, v in loaded_dict.items(): + keys = v.split("_") + provider = keys[0] + job_key = keys[1] + index = keys[2] + received_bn = keys[3] + job_id = keys[4] + event_filter = Ebb._eblocbroker.events.LogProcessPayment.createFilter( + argument_filters={"provider": str(provider)}, + fromBlock=int(received_bn), + toBlock="latest", + ) + if int(job_id) == 0: + output_g = Ebb.get_job_info(keys[0], keys[1], keys[2], 0, keys[3], is_print=False) + _job_price = output_g["submitJob_received_job_price"] + jp = float(Cent(_job_price)._to()) + job_price_sum += jp + total_submitjob_gas += output_g["submitJob_gas_used"] + total_refunded += sum_refunded # output_g["refunded_cent"] + + for logged_receipt in event_filter.get_all_entries(): + if ( + logged_receipt.args["jobKey"] == job_key + and logged_receipt.args["index"] == int(index) + and logged_receipt.args["jobID"] == int(job_id) + ): + output = Ebb.get_job_info(keys[0], keys[1], keys[2], job_id, keys[3], is_print=False) + idx += 1 + recv = logged_receipt.args["receivedCent"] + ref = logged_receipt.args["refundedCent"] + sum_received += float(Cent(recv)._to()) + sum_refunded += float(Cent(ref)._to()) + tx_receipt = Ebb.get_transaction_receipt(logged_receipt["transactionHash"].hex()) + total_processpayment_gas += int(tx_receipt["gasUsed"]) + if is_excel: + if int(job_id) == 0: + log( + f"{name},{provider_id[provider.lower()]},j{idx},{job_key},{index},{job_id},{received_bn},{n}_{edges},{output['run_time'][int(job_id)]}," + f"{tx_receipt['transactionHash'].hex()},{tx_receipt['gasUsed']},{output_g['submitJob_tx_hash']},{output_g['submitJob_gas_used']}," + f"{output_g['data_transfer_in_to_download']},{output_g['data_transfer_out']},{jp}" + ) + else: + log( + f"{name},{provider_id[provider.lower()]},j{idx},{job_key},{index},{job_id},{received_bn},{n}_{edges},{output['run_time'][int(job_id)]}," + f"{tx_receipt['transactionHash'].hex()},{tx_receipt['gasUsed']}" + ) + + if not is_excel: + log(f"LAYER {n} {edges}") + log(f"total_submitjob_gas={total_submitjob_gas}") + log(f"total_processpayment_gas={total_processpayment_gas} idx={idx}") + log(f"total_received={sum_received} [pink]USDmy") + log(f"total_refunded={Cent(total_refunded)._to()} [pink]USDmy") + log(f"job_price_sum={job_price_sum}") + log("--------------------------------------------------------") + + +def read_txs(n, edges, fn): + BASE = Path.home() / "test_eblocbroker" / "workflow" / f"{n}_{edges}" + try: + with open(BASE / fn, "rb") as f: + loaded_dict = pickle.load(f) + except Exception as e: + print_tb(e) + + name = fn.replace(".pkl", "").replace("submitted_dict_", "") + job_price_sum = 0 + total_submitjob_gas = 0 + total_refunded = 0 + sum_received = 0 + sum_refunded = 0 + total_processpayment_gas = 0 + idx = 0 + for k, v in loaded_dict.items(): + # log(f"{k} => {v}") + keys = k.split("_") + provider = keys[0] + job_key = keys[1] + index = keys[2] + received_bn = keys[3] + event_filter = Ebb._eblocbroker.events.LogProcessPayment.createFilter( + argument_filters={"provider": str(provider)}, + fromBlock=int(received_bn), + toBlock="latest", + ) + output_g = Ebb.get_job_info(keys[0], keys[1], keys[2], 0, keys[3], is_print=False) + _job_price = output_g["submitJob_received_job_price"] + jp = float(Cent(_job_price)._to()) + job_price_sum += jp + total_submitjob_gas += output_g["submitJob_gas_used"] + total_refunded += sum_refunded # output_g["refunded_cent"] + # log(f"{sum_received} {_job_price}") + # log(f"{k} => ", end="") + # log(f"{v}") + for logged_receipt in event_filter.get_all_entries(): + if logged_receipt.args["jobKey"] == job_key and logged_receipt.args["index"] == int(index): + idx += 1 + job_id = logged_receipt.args["jobID"] + output = Ebb.get_job_info(keys[0], keys[1], keys[2], job_id, keys[3], is_print=False) + recv = logged_receipt.args["receivedCent"] + # log(f"{Cent(recv)._to()} [pink]USDmy") + sum_received += float(Cent(recv)._to()) + sum_refunded += logged_receipt.args["refundedCent"] + tx_receipt = Ebb.get_transaction_receipt(logged_receipt["transactionHash"].hex()) + total_processpayment_gas += int(tx_receipt["gasUsed"]) + if is_excel: + if job_id == 0: + log( + f"{name},{provider_id[provider.lower()]},j{idx},{job_key},{index},{job_id},{received_bn},{n}_{edges},{output['run_time'][job_id]}," + f"{tx_receipt['transactionHash'].hex()},{tx_receipt['gasUsed']},{output_g['submitJob_tx_hash']},{output_g['submitJob_gas_used']}," + f"{output_g['data_transfer_in_to_download']},{output_g['data_transfer_out']},{jp}" + ) + else: + log( + f"{name},{provider_id[provider.lower()]},j{idx},{job_key},{index},{job_id},{received_bn},{n}_{edges},{output['run_time'][job_id]}," + f"{tx_receipt['transactionHash'].hex()},{tx_receipt['gasUsed']}" + ) + + # print(logged_receipt.args["receivedCent"]) + # print(int(tx_receipt["gasUsed"])) + # log(logged_receipt.args) + # log() + + if not is_excel: + log() + log(f"* HEFT {n} {edges}") + log(f"total_submitjob_gas={total_submitjob_gas}") + log(f"total_processpayment_gas={total_processpayment_gas} idx={idx}") + log(f"total_received={sum_received} [pink]USDmy") + log(f"total_refunded={Cent(total_refunded)._to()} [pink]USDmy") + log(f"job_price_sum={job_price_sum}") + log("--------------------------------------------------------") + + +def main(): + test = [(16, 28), (32, 56), (64, 112), (128, 224), (256, 448)] + # test = [(16, 28)] + # test = [(32, 56)] + # test = [(64, 112)] + test = [(128, 224)] + # test = [(256, 448)] + test = dict(test) + for n, edges in test.items(): + read_txs(n, edges, "heft_submitted_dict.pkl") + """ + log("\n\n\n") + read_txs(n, edges, "heft_submitted_dict_2.pkl") + log("\n\n\n") + read_txs(n, edges, "heft_submitted_dict_3.pkl") + # ----------------------------------------------------- + read_txs_layer(n, edges, "layer_submitted_dict_1.pkl") + log("\n\n\n") + read_txs_layer(n, edges, "layer_submitted_dict_2.pkl") + log("\n\n\n") + read_txs_layer(n, edges, "layer_submitted_dict_3.pkl") + if not is_excel: + log("-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-", "yellow") + """ + + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + pass + except QuietExit as e: + print(f"#> {e}") + except Exception as e: + print_tb(str(e)) diff --git a/research_certificate/deploy.sh b/research_certificate/deploy.sh index cc3915d9..ea1f9acd 100755 --- a/research_certificate/deploy.sh +++ b/research_certificate/deploy.sh @@ -1,8 +1,8 @@ #!/bin/bash main () { - network="sepolia" - # network="bloxberg_core" + # network="sepolia" + network="bloxberg_core" printf "## network=$network\n" rm -rf build/ brownie compile diff --git a/research_certificate/deploy_output.txt b/research_certificate/deploy_output.txt index 4ab29d4d..13bee6a3 100644 --- a/research_certificate/deploy_output.txt +++ b/research_certificate/deploy_output.txt @@ -1,4 +1,4 @@ -## network=sepolia +## network=bloxberg_core Brownie v1.19.3 - Python development framework for Ethereum New compatible solc version available: 0.8.22 @@ -31,9 +31,9 @@ ResearchCertificateProject is the active project. Running 'scripts/ResearchCertificate.py::main'... from=0xD118b6EF83ccF11b34331F1E7285542dDf70Bc49 - Transaction sent: 0x5b7eb179c6a54cf86ccee595de71a9f2d31c55b0e4bb7f57a46a5a19f5e66a07 - Gas price: 1.138964003 gwei Gas limit: 1232479 Nonce: 8 - Waiting for confirmation... - Waiting for confirmation... \ Waiting for confirmation... | Waiting for confirmation... / Waiting for confirmation... - Waiting for confirmation... \ ResearchCertificate.constructor confirmed Block: 4936991 Gas used: 1120436 (90.91%) - ResearchCertificate deployed at: 0x17e85EF468e5e085659d0443e29856a9054f0E7A + Transaction sent: 0x22dc1725d46c0844e6fc198e37c3c72d55e151aa192adad959c05848a86bb760 + Gas price: 0.002 gwei Gas limit: 1229258 Nonce: 4042 + Waiting for confirmation... - Waiting for confirmation... \ Waiting for confirmation... | Waiting for confirmation... / Waiting for confirmation... - Waiting for confirmation... \ Waiting for confirmation... | Waiting for confirmation... / Waiting for confirmation... - ResearchCertificate.constructor confirmed Block: 24136402 Gas used: 1117508 (90.91%) + ResearchCertificate deployed at: 0xB17dE8C64DC6BB678043457F1A7d992F583A37A0 ## setting abi... done diff --git a/setup.cfg b/setup.cfg index 247b35f5..9bcc153d 100755 --- a/setup.cfg +++ b/setup.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 2.3.5 +current_version = 2.3.6 [flake8] exclude = .old_work/*,docs/*,broker/_utils/colorer.py,webapp/ex.py diff --git a/setup.py b/setup.py index 41212b88..3ebcd0b9 100755 --- a/setup.py +++ b/setup.py @@ -14,7 +14,7 @@ name="ebloc-broker", packages=find_packages(), setup_requires=["wheel", "eth-brownie", "ipdb", "rich"], # "dbus-python" - version="2.3.5", # don't change this manually, use bumpversion instead + version="2.3.6", # don't change this manually, use bumpversion instead license="MIT", description=( # noqa: E501 "A Python framework to communicate with ebloc-broker that is "