From 7b341421a206ef96f05f41b45f61b94730546c9b Mon Sep 17 00:00:00 2001 From: tryiou <73646876+tryiou@users.noreply.github.com> Date: Tue, 17 Dec 2024 11:59:43 +0100 Subject: [PATCH 01/11] Update controller.py update aiorpcx_version check --- server/controller.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/controller.py b/server/controller.py index d2af523..a5ae7de 100644 --- a/server/controller.py +++ b/server/controller.py @@ -19,8 +19,8 @@ async def serve(self, shutdown_event): '''Start the RPC server and wait for the mempool to synchronize. Then start serving external clients. ''' - if not (0, 22, 0) <= aiorpcx_version < (0, 23): - raise RuntimeError('aiorpcX version 0.22.x is required') + if aiorpcx_version < (0, 22, 0): + raise RuntimeError(f'invalid aiorpcX version {aiorpcx_version}') env = self.env min_str, max_str = env.coin.SESSIONCLS.protocol_min_max_strings() From fd91ffa26610f31469dcc5e91cd00a0be98b2d81 Mon Sep 17 00:00:00 2001 From: tryiou <73646876+tryiou@users.noreply.github.com> Date: Tue, 17 Dec 2024 12:01:51 +0100 Subject: [PATCH 02/11] Update requirements.txt remove aiorpcX, installed by electrumx package. add scrypt package. --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index d3c39f9..0677380 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,8 +10,8 @@ uvloop==0.17.0 python-rapidjson==1.10 kawpow==0.9.4.4 msgpack~=1.0.2 -aiorpcX~=0.22.1 asyncio~=3.4.3 lbryschema==0.0.16 beaker==1.12.1 pylru~=1.2.0 +scrypt~=0.8.27 From c51651284556811573c583bead453704d062bf49 Mon Sep 17 00:00:00 2001 From: tryiou <73646876+tryiou@users.noreply.github.com> Date: Tue, 17 Dec 2024 12:11:52 +0100 Subject: [PATCH 03/11] Update Dockerfile remove scrypt from Dockerfile, add it to requirements.txt --- Dockerfile | 1 - 1 file changed, 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index 0b56609..c8e20aa 100644 --- a/Dockerfile +++ b/Dockerfile @@ -5,7 +5,6 @@ WORKDIR /app/plugins RUN apt-get update RUN apt-get install -y build-essential cmake musl-dev gcc g++ libffi-dev libssl-dev python2 python2-dev python3-dev curl libkrb5-dev librocksdb-dev libleveldb-dev libsnappy-dev liblz4-dev \ - && pip install scrypt x11_hash \ && pip3 install -r /app/plugins/requirements.txt \ && rm -rf /var/cache/apk/* \ && rm -rf /usr/share/man \ From 504398a58129ff5f2e9da80d214b6ef6a1e9d309 Mon Sep 17 00:00:00 2001 From: tryiou Date: Thu, 19 Dec 2024 14:54:01 +0100 Subject: [PATCH 04/11] -refactor get_address_history call, break down logic into helpers functions, -introduce a new transaction type 'internal', for inputs/ouputs owned by user. --- server/session.py | 435 +++++++++++++++++++--------------------------- 1 file changed, 180 insertions(+), 255 deletions(-) diff --git a/server/session.py b/server/session.py index 218458f..69935a9 100644 --- a/server/session.py +++ b/server/session.py @@ -662,268 +662,193 @@ async def subscribe_blocks_result(self): return self.session_mgr.bsub_results async def get_address_history(self, addresses): - if type(addresses) is str: + if isinstance(addresses, str): # Normalize to a set addr_lookup = {addresses} else: addr_lookup = set(addresses) + spent = [] spent_ids = set() - processed_txs = set() # track transactions that have already been processed - for address in addr_lookup: - address = str(address) - #self.logger.debug(f'Processing address: {address}') + processed_txs = set() # Track already processed transactions + for address in addr_lookup: try: - hash_x = self.session_mgr._history_address_cache[address] - #self.logger.debug(f'Address found in cache: {hash_x}') - except KeyError: - #self.logger.debug(f'Address not found in cache, converting address to hashX') - try: - hash_x = self.coin.address_to_hashX(address) - self.session_mgr._history_address_cache[address] = hash_x - #self.logger.debug(f'Address converted to hashX: {hash_x}') - except Exception: - self.logger.exception('[GetAddressHistory] Exception while converting address') - continue + address = str(address) + # Retrieve hash_x from cache or convert address + hash_x = self.session_mgr._history_address_cache.get(address) or self.coin.address_to_hashX(address) + self.session_mgr._history_address_cache[address] = hash_x - try: + # Fetch history for the address history = await self.db.limited_history(hash_x, limit=100) - #self.logger.debug(f'History retrieved for address: {address}') for tx_hash, height in history: if tx_hash in processed_txs: - #self.logger.debug(f'Skipping transaction, already processed: {tx_hash}') - continue # skip, already processed - #self.logger.debug(f'Processing transaction: {tx_hash}') + continue # Skip already processed transactions + tx = await self.transaction_get(hash_to_hex_str(tx_hash), verbose=True) if not tx: - #self.logger.debug(f'Transaction not found: {tx_hash}') - continue - processed_txs.add(tx_hash) - - spends = [] - from_addresses = set() - total_send_amount = 0 - my_total_send_amount = 0 - for item in tx['vin']: - prev_tx = await self.transaction_get(item['txid'], verbose=True) - if not prev_tx: - #self.logger.debug(f'Previous transaction not found: {item["txid"]}') - continue - - prev_out_amount = prev_tx['vout'][item['vout']]['value'] - script_pub_key = prev_tx['vout'][item['vout']]['scriptPubKey'] - addrs = [] - - if 'addresses' in script_pub_key: - addrs = script_pub_key['addresses'] - elif 'address' in script_pub_key: - addrs = [script_pub_key['address']] - - # Record total sent coin if sent from one of our addresses - if len(addrs) > 0: - is_my_address = False - for addr in addrs: - if addr in addr_lookup: - my_total_send_amount += prev_out_amount - is_my_address = True - break - - if not is_my_address: - total_send_amount += prev_out_amount - - from_addresses.update(addrs) - - my_total_send_amount_running = my_total_send_amount # track how much sent coin is left to report - is_sending_coin = my_total_send_amount > 0 - - biggest_sent_amount_not_my_address = 0 - biggest_sent_address_not_my_address = '' - biggest_sent_amount_my_address = 0 - biggest_sent_address_my_address = '' - - fees = total_send_amount * -1 - - def valid_spend(p_spent_ids, p_address, p_amount, p_category, p_item, p_tx, p_from_addresses): - p_txid_n = (p_tx['txid'], p_item['n'], p_category) - if p_txid_n not in p_spent_ids: - return { - 'address': p_address, - 'amount': p_amount, - 'fee': 0.0, - 'vout': p_item['n'], - 'category': p_category, - 'confirmations': p_tx['confirmations'], - 'blockhash': p_tx['blockhash'], - 'blocktime': p_tx['blocktime'], - 'time': p_tx['blocktime'], - 'txid': p_tx['txid'], - 'from_addresses': p_from_addresses - }, p_txid_n - else: - return None, None - - def get_address(p_item): - #self.logger.debug(f"p_item['scriptPubKey']: {p_item['scriptPubKey']}") - #self.logger.debug(f"p_item['scriptPubKey']['address']: {p_item['scriptPubKey'].get('address')} {type(p_item['scriptPubKey'].get('address'))}") - - if 'type' not in p_item['scriptPubKey'] or p_item['scriptPubKey']['type'] == 'nonstandard': - #self.logger.debug("Incompatible vout address, skipping") - return None # skip incompatible vout - - if 'address' in p_item['scriptPubKey']: - if isinstance(p_item['scriptPubKey']['address'], str): - return p_item['scriptPubKey']['address'] - - elif 'addresses' in p_item['scriptPubKey']: - if isinstance(p_item['scriptPubKey']['addresses'], str): - return p_item['scriptPubKey']['addresses'] - - elif isinstance(p_item['scriptPubKey']['addresses'], list) and len(p_item['scriptPubKey']['addresses']) > 0: - return p_item['scriptPubKey']['addresses'][0] - - #self.logger.debug("No valid address found") - return None - + continue # Skip if transaction not found + processed_txs.add(tx_hash) + spends = await self._process_transaction(tx, addr_lookup, spent_ids) + spent.extend(spends) # Add processed spends + except Exception as e: + self.logger.exception(f'[GetAddressHistory] Error processing address {address}: {e}') - # First pass: Only process transactions sent to another address, record fees - for item in tx['vout']: - # Add in fees (fees = total_in - total_out) - amount = item['value'] - fees += amount - vout_address = get_address(item) - if not vout_address: - #self.logger.debug('Incompatible vout address, skipping') - continue # incompatible address, skip - - if vout_address in addr_lookup: - #self.logger.debug('Address is not ours, skipping') - continue # not our address, skip - - # Amount is negative for send and positive for receive - # Record sent coin to address if we have outstanding send amount. - # Note that my total sent amount is subtracted by any amounts - # previously marked sent. - # Compare with epsilon instead of 0 to avoid precision inaccuracies. - if my_total_send_amount_running > sys.float_info.epsilon: - if biggest_sent_amount_not_my_address < amount: - biggest_sent_amount_not_my_address = amount - biggest_sent_address_not_my_address = vout_address - # amount reported here cannot be larger than my total send amount - adjusted_amount = amount if my_total_send_amount_running > amount else my_total_send_amount_running - my_total_send_amount_running -= adjusted_amount # track what we've already recorded as sent - spend, txid_n = valid_spend(spent_ids, vout_address, -float(adjusted_amount), 'send', item, tx, - list(from_addresses)) - if spend: - spent_ids.add(txid_n) - spends.append(spend) - #self.logger.debug(f'Spent coin recorded: {spend}') - - # Second pass: Only process transactions for all our own addresses - for item in tx['vout']: - vout_address = get_address(item) - if not vout_address: - #self.logger.debug('Incompatible vout address, skipping') - continue # incompatible address, skip - if vout_address not in addr_lookup: - #self.logger.debug('Address already processed in a previous pass, skipping') - continue # skip, already processed in block above - - amount = item['value'] - - do_not_mark_send = False - if vout_address not in from_addresses: - do_not_mark_send = True - - # Record received coin if this vout address is mine - spend, txid_n = valid_spend(spent_ids, vout_address, float(amount), 'receive', item, tx, - list(from_addresses)) - if spend: - spent_ids.add(txid_n) - spends.append(spend) - #self.logger.debug(f'Received coin recorded: {spend}') - - # Amount is negative for send and positive for receive - # Record sent coin to address if we have outstanding send amount. - # Note that my total sent amount is subtracted by any amounts - # previously marked sent. - # Compare with epsilon instead of 0 to avoid precision inaccuracies. - if my_total_send_amount_running > sys.float_info.epsilon: - # amount reported here cannot be larger than my total send amount - adjusted_amount = amount if my_total_send_amount_running > amount else my_total_send_amount_running - my_total_send_amount_running -= adjusted_amount # track what we've already recorded as sent - if not do_not_mark_send: - spend, txid_n = valid_spend(spent_ids, vout_address, -float(adjusted_amount), 'send', item, tx, - list(from_addresses)) - if spend: - spent_ids.add(txid_n) - spends.append(spend) - #self.logger.debug(f'Spent coin recorded: {spend}') - - if biggest_sent_amount_my_address < amount: - biggest_sent_amount_my_address = amount - biggest_sent_address_my_address = vout_address - - # Assign fees on tx with largest sent amount. Assign fees to transactions - # sent to an address that is not our own. Otherwise assign fee to largest - # sent transaction on our own address if that applies. - if is_sending_coin and fees < 0: - for spend in spends: - biggest_sent_address = biggest_sent_address_not_my_address \ - if biggest_sent_amount_not_my_address > 0 else biggest_sent_address_my_address - if spend['address'] == biggest_sent_address and spend['category'] == 'send': - spend['fee'] = truncate(fees, 10) - #self.logger.debug(f'Assigned fee: {spend}') - break + return spent - # Consolidate spends to self - remove_these = [] - if len(spends) >= 2: # can only compare at least 2 spends - for spend in spends: - filtered_spends = list(filter(lambda sp: sp['address'] == spend['address'], spends)) - if not filtered_spends: - continue - sends = list(filter(lambda sp: sp['category'] == 'send', filtered_spends)) - receives = list(filter(lambda sp: sp['category'] == 'receive', filtered_spends)) - from_spend = None if len(sends) == 0 else sends[0] - from_receive = None if len(receives) == 0 else receives[0] - if not from_spend or not from_receive: - continue # skip if don't have both send and receive - if abs(from_spend['amount']) - from_receive['amount'] > -sys.float_info.epsilon: - from_spend['amount'] += from_receive['amount'] - from_spend['fee'] += from_receive['fee'] - remove_these.append(from_receive) - elif abs(from_spend['amount']) - from_receive['amount'] <= -sys.float_info.epsilon: - from_receive['amount'] += from_spend['amount'] - from_receive['fee'] += from_spend['fee'] - remove_these.append(from_spend) - if len(spends) - len(remove_these) < 2: # done processing if nothing left to compare - break - # Remove all the consolidated spends - if len(remove_these) > 0: - for spend in remove_these: - if spend in spends: - spends.remove(spend) + async def _process_transaction(self, tx, addr_lookup, spent_ids): + """ + Process a single transaction, determine 'send', 'receive', or 'internal' transactions, + and return the processed spends. + """ + spends = [] + from_addresses = set() + to_addresses = set() + total_input_amount = 0 + total_output_amount = 0 + owned_input_amount = 0 + owned_output_amount = 0 + non_owned_input_amount = 0 + non_owned_output_amount = 0 + + # Process transaction inputs (vin) + for item in tx['vin']: + prev_tx = await self.transaction_get(item['txid'], verbose=True) + if not prev_tx: + continue # Skip missing previous transactions + + prev_out = prev_tx['vout'][item['vout']] + script_pub_key = prev_out['scriptPubKey'] + prev_addresses = self._extract_addresses(script_pub_key) + + total_input_amount += prev_out['value'] + from_addresses.update(prev_addresses) + + if prev_addresses & addr_lookup: + owned_input_amount += prev_out['value'] + else: + non_owned_input_amount += prev_out['value'] - # Filter out spends that have zero amount after consolidation - spends = list(filter(lambda sp: abs(sp['amount']) > sys.float_info.epsilon, spends)) + # Process transaction outputs (vout) + for item in tx['vout']: + amount = item['value'] + vout_addresses = self._extract_addresses(item['scriptPubKey']) - # Add timestamp to spends - blocktime = tx.get('blocktime') - if blocktime: - for spend in spends: - spend['time'] = blocktime + if not vout_addresses: + continue # Skip incompatible outputs - spent.extend(spends) - #self.logger.debug(f'Spends recorded for transaction: {spends}') + total_output_amount += amount + to_addresses.update(vout_addresses) - except Exception: - self.logger.exception('[GetAddressHistory] Exception while retrieving history') + if vout_addresses & addr_lookup: + owned_output_amount += amount + else: + non_owned_output_amount += amount + + # Calculate fees + fees = total_input_amount - total_output_amount + + # Categorize the transaction + if owned_input_amount > 0 and non_owned_output_amount > 0: + # Send transaction: owned inputs, some outputs to external addresses + transaction_type = 'send' + for item in tx['vout']: + vout_addresses = self._extract_addresses(item['scriptPubKey']) + if not (vout_addresses & addr_lookup): + spend, txid_n = self._create_spend( + spent_ids, vout_addresses, -float(item['value']), + transaction_type, item, tx, list(from_addresses) + ) + if spend: + self._assign_transaction_fees([spend], fees, is_sending_coin=True) + spends.append(spend) + spends = self._consolidate_spends(spends) + + elif owned_input_amount == 0 and owned_output_amount > 0: + # Receive transaction: no owned inputs, outputs to owned addresses + transaction_type = 'receive' + for item in tx['vout']: + vout_addresses = self._extract_addresses(item['scriptPubKey']) + if vout_addresses & addr_lookup: + spend, txid_n = self._create_spend( + spent_ids, vout_addresses, float(item['value']), + transaction_type, item, tx, list(from_addresses) + ) + if spend: + spend['fee'] = fees + spends.append(spend) + spends = self._consolidate_spends(spends) + + elif owned_input_amount > 0 and owned_output_amount > 0 and non_owned_input_amount == 0 and non_owned_output_amount == 0: + # # Internal transaction: all inputs and outputs are owned addresses + temp_item = tx['vout'][0] # to fix later, picking first vout as placeholder, data not used anymore ... + internal_receive_spend, txid_n = self._create_spend( + spent_ids, + self._extract_addresses(temp_item['scriptPubKey']), + -float(fees), + 'internal', + temp_item, # Pass the actual output item + tx, + list(from_addresses) + ) + if internal_receive_spend: + internal_receive_spend['fee'] = fees + spends.append(internal_receive_spend) + spends = self._consolidate_spends(spends) + + return spends + + def _extract_addresses(self, script_pub_key): + """Extract addresses from scriptPubKey.""" + if 'addresses' in script_pub_key: + return set(script_pub_key['addresses']) + if 'address' in script_pub_key: + return {script_pub_key['address']} + return set() + + def _create_spend(self, spent_ids, addresses, amount, category, item, tx, from_addresses): + """Create a spend entry if not already spent.""" + txid_n = (tx['txid'], item['n'], category) + if txid_n in spent_ids: + return None, None + + spent_ids.add(txid_n) + return { + 'address': next(iter(addresses)), # Use the first address for simplicity + 'amount': float(amount), + 'fee': 0.0, + 'vout': item['n'], + 'category': category, + 'confirmations': tx['confirmations'], + 'blockhash': tx['blockhash'], + 'blocktime': tx['blocktime'], + 'time': tx['blocktime'], + 'txid': tx['txid'], + 'from_addresses': from_addresses, + }, txid_n + + def _assign_transaction_fees(self, spends, fees, is_sending_coin): + """Assign fees to the largest 'send' transaction.""" + if is_sending_coin and fees < 0: + for spend in spends: + if spend['category'] == 'send': + spend['fee'] = truncate(fees, 10) # Assign fees here + break - return spent + def _consolidate_spends(self, spends): + """Consolidate multiple send/receive from/to same address into one element.""" + consolidated = {} + for spend in spends: + key = (spend['address'], spend['category']) + if key not in consolidated: + consolidated[key] = spend + else: + consolidated[key]['amount'] += spend['amount'] + consolidated[key]['fee'] += spend['fee'] + return list(consolidated.values()) async def get_history(self, addresses): self.logger.info('get_history: {}'.format(addresses)) @@ -985,18 +910,18 @@ def valid_spend(p_spent_ids, p_address, p_amount, p_category, p_item, p_tx, p_fr p_txid_n = (p_tx['txid'], p_item['n'], p_category) if p_txid_n not in p_spent_ids: return { - 'address': p_address, - 'amount': p_amount, - 'fee': 0.0, - 'vout': p_item['n'], - 'category': p_category, - 'confirmations': p_tx['confirmations'], - 'blockhash': p_tx['blockhash'], - 'blocktime': p_tx['blocktime'], - 'time': p_tx['blocktime'], - 'txid': p_tx['txid'], - 'from_addresses': p_from_addresses - }, p_txid_n + 'address': p_address, + 'amount': p_amount, + 'fee': 0.0, + 'vout': p_item['n'], + 'category': p_category, + 'confirmations': p_tx['confirmations'], + 'blockhash': p_tx['blockhash'], + 'blocktime': p_tx['blocktime'], + 'time': p_tx['blocktime'], + 'txid': p_tx['txid'], + 'from_addresses': p_from_addresses + }, p_txid_n else: return None, None @@ -1756,7 +1681,7 @@ def format_claim_from_daemon(self, claim, name=None): claim_id = claim['claimId'] raw_claim_id = unhexlify(claim_id)[::-1] if not self.bp.get_claim_info(raw_claim_id): - #raise RPCError("Lbrycrd has {} but not lbryumx, please submit a bug report.".format(claim_id)) + # raise RPCError("Lbrycrd has {} but not lbryumx, please submit a bug report.".format(claim_id)) return {} address = self.bp.get_claim_info(raw_claim_id).address.decode() sequence = self.bp.get_claims_for_name(name.encode('ISO-8859-1')).get(raw_claim_id) @@ -1780,14 +1705,14 @@ def format_claim_from_daemon(self, claim, name=None): "value": hexlify(claim['value'].encode('ISO-8859-1')).decode(), "claim_sequence": sequence, # from index "address": address, # from index - "supports": supports, # fixme: to be included in lbrycrd#124 + "supports": supports, # fixme: to be included in lbrycrd#124 "effective_amount": effective_amount, "valid_at_height": valid_at_height # TODO PR lbrycrd to include it } def format_supports_from_daemon(self, supports): return [[support['txid'], support['n'], get_from_possible_keys(support, 'amount', 'nAmount')] for - support in supports] + support in supports] async def claimtrie_getclaimbyid(self, claim_id): self.assert_claim_id(claim_id) @@ -1922,10 +1847,10 @@ async def claimtrie_getvalueforuris(self, block_hash, *uris): return {uri: await self.claimtrie_getvalueforuri(block_hash, uri) for uri in uris} # TODO: get it all concurrently when lbrycrd pending changes goes into a stable release - #async def getvalue(uri): + # async def getvalue(uri): # value = await self.claimtrie_getvalueforuri(block_hash, uri) # return uri, value, - #return dict([await asyncio.gather(*tuple(getvalue(uri) for uri in uris))][0]) + # return dict([await asyncio.gather(*tuple(getvalue(uri) for uri in uris))][0]) def proof_has_winning_claim(proof): From 659cab386560ddf21d14bfbad3067af0d43a72a5 Mon Sep 17 00:00:00 2001 From: tryiou <73646876+tryiou@users.noreply.github.com> Date: Thu, 19 Dec 2024 15:50:09 +0100 Subject: [PATCH 05/11] Update session.py truncate fees to 10 digits max --- server/session.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/session.py b/server/session.py index 69935a9..40108b4 100644 --- a/server/session.py +++ b/server/session.py @@ -748,7 +748,7 @@ async def _process_transaction(self, tx, addr_lookup, spent_ids): non_owned_output_amount += amount # Calculate fees - fees = total_input_amount - total_output_amount + fees = truncate(total_input_amount - total_output_amount, 10) # Categorize the transaction if owned_input_amount > 0 and non_owned_output_amount > 0: From 6eed54ab2171f79f53e545df3f65cd3290c60136 Mon Sep 17 00:00:00 2001 From: tryiou <73646876+tryiou@users.noreply.github.com> Date: Thu, 19 Dec 2024 16:31:56 +0100 Subject: [PATCH 06/11] Update session.py mod get_address_history: add transaction fee to value for a 'send' entry. --- server/session.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/server/session.py b/server/session.py index 40108b4..214cd0c 100644 --- a/server/session.py +++ b/server/session.py @@ -757,10 +757,9 @@ async def _process_transaction(self, tx, addr_lookup, spent_ids): for item in tx['vout']: vout_addresses = self._extract_addresses(item['scriptPubKey']) if not (vout_addresses & addr_lookup): - spend, txid_n = self._create_spend( - spent_ids, vout_addresses, -float(item['value']), - transaction_type, item, tx, list(from_addresses) - ) + amount = float(item['value']) + fees + spend, txid_n = self._create_spend(spent_ids, vout_addresses, -amount, transaction_type, item, tx, + list(from_addresses)) if spend: self._assign_transaction_fees([spend], fees, is_sending_coin=True) spends.append(spend) From 6d77155471cf55db07878699c354ae217867e54f Mon Sep 17 00:00:00 2001 From: tryiou <73646876+tryiou@users.noreply.github.com> Date: Thu, 19 Dec 2024 17:02:33 +0100 Subject: [PATCH 07/11] Update session.py update get_address_history, account tx fee to amount for a 'send' transaction. --- server/session.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/session.py b/server/session.py index 214cd0c..28d77d7 100644 --- a/server/session.py +++ b/server/session.py @@ -757,13 +757,14 @@ async def _process_transaction(self, tx, addr_lookup, spent_ids): for item in tx['vout']: vout_addresses = self._extract_addresses(item['scriptPubKey']) if not (vout_addresses & addr_lookup): - amount = float(item['value']) + fees - spend, txid_n = self._create_spend(spent_ids, vout_addresses, -amount, transaction_type, item, tx, + spend, txid_n = self._create_spend(spent_ids, vout_addresses, -float(item['value']), transaction_type, item, tx, list(from_addresses)) if spend: self._assign_transaction_fees([spend], fees, is_sending_coin=True) spends.append(spend) spends = self._consolidate_spends(spends) + # add fee to the amount exiting wallet + spends[0]['amount']-=fees elif owned_input_amount == 0 and owned_output_amount > 0: # Receive transaction: no owned inputs, outputs to owned addresses From e674c3fd9cccc506062866139d135d813d3c2a8a Mon Sep 17 00:00:00 2001 From: tryiou <73646876+tryiou@users.noreply.github.com> Date: Thu, 19 Dec 2024 18:10:27 +0100 Subject: [PATCH 08/11] Update session.py cleaning --- server/session.py | 39 +++++++++++++++++---------------------- 1 file changed, 17 insertions(+), 22 deletions(-) diff --git a/server/session.py b/server/session.py index 28d77d7..bf59330 100644 --- a/server/session.py +++ b/server/session.py @@ -757,14 +757,13 @@ async def _process_transaction(self, tx, addr_lookup, spent_ids): for item in tx['vout']: vout_addresses = self._extract_addresses(item['scriptPubKey']) if not (vout_addresses & addr_lookup): - spend, txid_n = self._create_spend(spent_ids, vout_addresses, -float(item['value']), transaction_type, item, tx, - list(from_addresses)) + spend = self._create_spend(spent_ids, vout_addresses, -float(item['value']), + transaction_type, item, tx, + list(from_addresses)) if spend: - self._assign_transaction_fees([spend], fees, is_sending_coin=True) spends.append(spend) spends = self._consolidate_spends(spends) - # add fee to the amount exiting wallet - spends[0]['amount']-=fees + spends = self._assign_transaction_fees(spends, fees) elif owned_input_amount == 0 and owned_output_amount > 0: # Receive transaction: no owned inputs, outputs to owned addresses @@ -772,31 +771,31 @@ async def _process_transaction(self, tx, addr_lookup, spent_ids): for item in tx['vout']: vout_addresses = self._extract_addresses(item['scriptPubKey']) if vout_addresses & addr_lookup: - spend, txid_n = self._create_spend( + spend = self._create_spend( spent_ids, vout_addresses, float(item['value']), transaction_type, item, tx, list(from_addresses) ) if spend: - spend['fee'] = fees spends.append(spend) spends = self._consolidate_spends(spends) + spends = self._assign_transaction_fees(spends, fees) elif owned_input_amount > 0 and owned_output_amount > 0 and non_owned_input_amount == 0 and non_owned_output_amount == 0: # # Internal transaction: all inputs and outputs are owned addresses temp_item = tx['vout'][0] # to fix later, picking first vout as placeholder, data not used anymore ... - internal_receive_spend, txid_n = self._create_spend( + internal_spend = self._create_spend( spent_ids, self._extract_addresses(temp_item['scriptPubKey']), - -float(fees), + 0, 'internal', temp_item, # Pass the actual output item tx, list(from_addresses) ) - if internal_receive_spend: - internal_receive_spend['fee'] = fees - spends.append(internal_receive_spend) + if internal_spend: + spends.append(internal_spend) spends = self._consolidate_spends(spends) + spends = self._assign_transaction_fees(spends, fees) return spends @@ -818,7 +817,7 @@ def _create_spend(self, spent_ids, addresses, amount, category, item, tx, from_a return { 'address': next(iter(addresses)), # Use the first address for simplicity 'amount': float(amount), - 'fee': 0.0, + 'fee': 0.0, # this value is set later with a helper 'vout': item['n'], 'category': category, 'confirmations': tx['confirmations'], @@ -827,15 +826,13 @@ def _create_spend(self, spent_ids, addresses, amount, category, item, tx, from_a 'time': tx['blocktime'], 'txid': tx['txid'], 'from_addresses': from_addresses, - }, txid_n + } - def _assign_transaction_fees(self, spends, fees, is_sending_coin): + def _assign_transaction_fees(self, spends, fees): """Assign fees to the largest 'send' transaction.""" - if is_sending_coin and fees < 0: - for spend in spends: - if spend['category'] == 'send': - spend['fee'] = truncate(fees, 10) # Assign fees here - break + for spend in spends: + spend['fee'] = truncate(fees, 10) # Assign fees here + return spends def _consolidate_spends(self, spends): """Consolidate multiple send/receive from/to same address into one element.""" @@ -846,8 +843,6 @@ def _consolidate_spends(self, spends): consolidated[key] = spend else: consolidated[key]['amount'] += spend['amount'] - consolidated[key]['fee'] += spend['fee'] - return list(consolidated.values()) async def get_history(self, addresses): From 446df4d789784c0da3a442bd4deedd256cd7c260 Mon Sep 17 00:00:00 2001 From: tryiou <73646876+tryiou@users.noreply.github.com> Date: Fri, 20 Dec 2024 16:14:44 +0100 Subject: [PATCH 09/11] Update session.py switch LRUCache to electrumx.lib.lrucache refactor get_address_history to use asyncio refactor transaction_type logic in _process_transaction --- server/session.py | 186 +++++++++++++++++++++------------------------- 1 file changed, 83 insertions(+), 103 deletions(-) diff --git a/server/session.py b/server/session.py index bf59330..1b64fcf 100644 --- a/server/session.py +++ b/server/session.py @@ -1,8 +1,8 @@ +import asyncio import codecs import datetime import sys import time -import pylru import electrumx import electrumx.lib.util as util from binascii import unhexlify, hexlify @@ -12,6 +12,7 @@ from electrumx.lib.hash import sha256, hash_to_hex_str, hex_str_to_hash from electrumx.server.daemon import DaemonError from electrumx.server.session import SessionBase, assert_tx_hash, scripthash_to_hashX, non_negative_integer +from electrumx.lib.lrucache import LRUCache from lbryschema.uri import parse_lbry_uri from lbryschema.error import URIParseError @@ -41,7 +42,7 @@ def __init__(self, *args, **kwargs): self.cost = 5.0 # Connection cost self.cached_gettxoutsetinfo = None self.cached_rawblocks = None - self.session_mgr._history_address_cache = pylru.lrucache(1000) + self.session_mgr._history_address_cache = LRUCache(1000) @classmethod def protocol_min_max_strings(cls): @@ -667,41 +668,55 @@ async def get_address_history(self, addresses): else: addr_lookup = set(addresses) - spent = [] - spent_ids = set() + spent = [] # This will hold the final list of spends processed_txs = set() # Track already processed transactions - for address in addr_lookup: - try: - address = str(address) - # Retrieve hash_x from cache or convert address - hash_x = self.session_mgr._history_address_cache.get(address) or self.coin.address_to_hashX(address) + # Create an async lock to synchronize access to `spent` and `processed_txs` + lock = asyncio.Lock() + + # Create tasks for each address and process them concurrently + tasks = [self._process_address_history(address, processed_txs, spent, addr_lookup, lock) for address in + addr_lookup] + + # Await all tasks concurrently + await asyncio.gather(*tasks) + spent = sorted(spent, key=lambda x: x['time'], reverse=True) + return spent # Return the accumulated list of spends + + async def _process_address_history(self, address, processed_txs, spent, addr_lookup, lock): + try: + address = str(address) + hash_x = self.session_mgr._history_address_cache.get(address) + if hash_x is None: + hash_x = self.coin.address_to_hashX(address) self.session_mgr._history_address_cache[address] = hash_x - # Fetch history for the address - history = await self.db.limited_history(hash_x, limit=100) + # Fetch history for the address + history = await self.db.limited_history(hash_x, limit=100) - for tx_hash, height in history: + for tx_hash, height in history: + async with lock: # Ensure that only one task can check/process `processed_txs` at a time if tx_hash in processed_txs: continue # Skip already processed transactions - tx = await self.transaction_get(hash_to_hex_str(tx_hash), verbose=True) - if not tx: - continue # Skip if transaction not found - processed_txs.add(tx_hash) - spends = await self._process_transaction(tx, addr_lookup, spent_ids) - spent.extend(spends) # Add processed spends - except Exception as e: - self.logger.exception(f'[GetAddressHistory] Error processing address {address}: {e}') + tx = await self.transaction_get(hash_to_hex_str(tx_hash), verbose=True) + if not tx: + continue # Skip if transaction not found - return spent + spends = await self._process_transaction(tx, addr_lookup) # Process the transaction - async def _process_transaction(self, tx, addr_lookup, spent_ids): + async with lock: # Ensure only one task can update `spent` at a time + spent.extend(spends) # Add processed spends to the final list + + except Exception as e: + self.logger.exception(f'[GetAddressHistory] Error processing address {address}: {e}') + + async def _process_transaction(self, tx, addr_lookup): """ - Process a single transaction, determine 'send', 'receive', or 'internal' transactions, - and return the processed spends. + Process a single transaction, compute net change for owned addresses, + and return a single spend row per transaction. """ spends = [] from_addresses = set() @@ -710,8 +725,6 @@ async def _process_transaction(self, tx, addr_lookup, spent_ids): total_output_amount = 0 owned_input_amount = 0 owned_output_amount = 0 - non_owned_input_amount = 0 - non_owned_output_amount = 0 # Process transaction inputs (vin) for item in tx['vin']: @@ -728,8 +741,6 @@ async def _process_transaction(self, tx, addr_lookup, spent_ids): if prev_addresses & addr_lookup: owned_input_amount += prev_out['value'] - else: - non_owned_input_amount += prev_out['value'] # Process transaction outputs (vout) for item in tx['vout']: @@ -744,58 +755,49 @@ async def _process_transaction(self, tx, addr_lookup, spent_ids): if vout_addresses & addr_lookup: owned_output_amount += amount + + # Calculate raw fees and raw net change + raw_fees = abs(total_input_amount - total_output_amount) # Ensure fees are positive + raw_net_change = owned_output_amount - owned_input_amount + + # self.logger.info(f"DEBUG: txid: {tx['txid']}") + # self.logger.info(f"DEBUG: owned_input_amount: {owned_input_amount}, owned_output_amount: {owned_output_amount}") + # self.logger.info(f"DEBUG: total_input_amount: {total_input_amount}, total_output_amount: {total_output_amount}") + + # Determine transaction type + if raw_net_change > 0: + transaction_type = 'receive' # Net gain of funds + elif raw_net_change < 0: + # Check if all input and output addresses are owned + if from_addresses.issubset(addr_lookup) and to_addresses.issubset(addr_lookup): + transaction_type = 'internal' # All inputs and outputs are internal else: - non_owned_output_amount += amount - - # Calculate fees - fees = truncate(total_input_amount - total_output_amount, 10) - - # Categorize the transaction - if owned_input_amount > 0 and non_owned_output_amount > 0: - # Send transaction: owned inputs, some outputs to external addresses - transaction_type = 'send' - for item in tx['vout']: - vout_addresses = self._extract_addresses(item['scriptPubKey']) - if not (vout_addresses & addr_lookup): - spend = self._create_spend(spent_ids, vout_addresses, -float(item['value']), - transaction_type, item, tx, - list(from_addresses)) - if spend: - spends.append(spend) - spends = self._consolidate_spends(spends) - spends = self._assign_transaction_fees(spends, fees) - - elif owned_input_amount == 0 and owned_output_amount > 0: - # Receive transaction: no owned inputs, outputs to owned addresses - transaction_type = 'receive' - for item in tx['vout']: - vout_addresses = self._extract_addresses(item['scriptPubKey']) - if vout_addresses & addr_lookup: - spend = self._create_spend( - spent_ids, vout_addresses, float(item['value']), - transaction_type, item, tx, list(from_addresses) - ) - if spend: - spends.append(spend) - spends = self._consolidate_spends(spends) - spends = self._assign_transaction_fees(spends, fees) - - elif owned_input_amount > 0 and owned_output_amount > 0 and non_owned_input_amount == 0 and non_owned_output_amount == 0: - # # Internal transaction: all inputs and outputs are owned addresses - temp_item = tx['vout'][0] # to fix later, picking first vout as placeholder, data not used anymore ... - internal_spend = self._create_spend( - spent_ids, - self._extract_addresses(temp_item['scriptPubKey']), - 0, - 'internal', - temp_item, # Pass the actual output item - tx, - list(from_addresses) - ) - if internal_spend: - spends.append(internal_spend) - spends = self._consolidate_spends(spends) - spends = self._assign_transaction_fees(spends, fees) + transaction_type = 'send' # Net loss of funds + raw_net_change += raw_fees + else: # net_change == 0 + transaction_type = 'unknown' # No change, assuming it's a transaction between owned addresses + self.logger.info(f"DEBUG: transaction_type unknown, tx: {tx}") + + # truncate + net_change = truncate(raw_net_change, 10) + fees = truncate(raw_fees, 10) + + # self.logger.info(f"DEBUG: Transaction Type: {transaction_type}, net_change: {net_change}, fees: {fees}") + + # Create a single spend entry for this transaction + spend = self._create_spend( + list(to_addresses), # Aggregate 'to' addresses + net_change, + transaction_type, + fees, + tx['vout'][0], # Use the first output as a placeholder for metadata + tx, + list(from_addresses) + ) + + if spend: + spend["fee"] = fees # Assign the calculated fee to the spend + spends.append(spend) return spends @@ -807,17 +809,12 @@ def _extract_addresses(self, script_pub_key): return {script_pub_key['address']} return set() - def _create_spend(self, spent_ids, addresses, amount, category, item, tx, from_addresses): + def _create_spend(self, addresses, amount, category, fees, item, tx, from_addresses): """Create a spend entry if not already spent.""" - txid_n = (tx['txid'], item['n'], category) - if txid_n in spent_ids: - return None, None - - spent_ids.add(txid_n) return { 'address': next(iter(addresses)), # Use the first address for simplicity 'amount': float(amount), - 'fee': 0.0, # this value is set later with a helper + 'fee': fees, 'vout': item['n'], 'category': category, 'confirmations': tx['confirmations'], @@ -828,23 +825,6 @@ def _create_spend(self, spent_ids, addresses, amount, category, item, tx, from_a 'from_addresses': from_addresses, } - def _assign_transaction_fees(self, spends, fees): - """Assign fees to the largest 'send' transaction.""" - for spend in spends: - spend['fee'] = truncate(fees, 10) # Assign fees here - return spends - - def _consolidate_spends(self, spends): - """Consolidate multiple send/receive from/to same address into one element.""" - consolidated = {} - for spend in spends: - key = (spend['address'], spend['category']) - if key not in consolidated: - consolidated[key] = spend - else: - consolidated[key]['amount'] += spend['amount'] - return list(consolidated.values()) - async def get_history(self, addresses): self.logger.info('get_history: {}'.format(addresses)) return await self.get_address_history(addresses) From 9f645aff113a222f7488762e9c4d0f9574b12fd9 Mon Sep 17 00:00:00 2001 From: tryiou <73646876+tryiou@users.noreply.github.com> Date: Fri, 20 Dec 2024 16:15:40 +0100 Subject: [PATCH 10/11] Update requirements.txt remove pylru, newer versions of ElectrumX using own caching structure. --- requirements.txt | 1 - 1 file changed, 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 0677380..ad1fe4e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,5 +13,4 @@ msgpack~=1.0.2 asyncio~=3.4.3 lbryschema==0.0.16 beaker==1.12.1 -pylru~=1.2.0 scrypt~=0.8.27 From 26ac13ce66e4657f1e9d87f16a97b40ea5061611 Mon Sep 17 00:00:00 2001 From: tryiou Date: Fri, 20 Dec 2024 17:23:59 +0100 Subject: [PATCH 11/11] tx fee was assigned twice to the dict in function _process_transaction. --- server/session.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/server/session.py b/server/session.py index 1b64fcf..7fe6fb9 100644 --- a/server/session.py +++ b/server/session.py @@ -756,8 +756,8 @@ async def _process_transaction(self, tx, addr_lookup): if vout_addresses & addr_lookup: owned_output_amount += amount - # Calculate raw fees and raw net change - raw_fees = abs(total_input_amount - total_output_amount) # Ensure fees are positive + # Calculate raw fee and raw net change + raw_fee = abs(total_input_amount - total_output_amount) # Ensure fee are positive raw_net_change = owned_output_amount - owned_input_amount # self.logger.info(f"DEBUG: txid: {tx['txid']}") @@ -773,30 +773,29 @@ async def _process_transaction(self, tx, addr_lookup): transaction_type = 'internal' # All inputs and outputs are internal else: transaction_type = 'send' # Net loss of funds - raw_net_change += raw_fees + raw_net_change += raw_fee else: # net_change == 0 transaction_type = 'unknown' # No change, assuming it's a transaction between owned addresses self.logger.info(f"DEBUG: transaction_type unknown, tx: {tx}") # truncate net_change = truncate(raw_net_change, 10) - fees = truncate(raw_fees, 10) + fee = truncate(raw_fee, 10) - # self.logger.info(f"DEBUG: Transaction Type: {transaction_type}, net_change: {net_change}, fees: {fees}") + # self.logger.info(f"DEBUG: Transaction Type: {transaction_type}, net_change: {net_change}, fee: {fee}") # Create a single spend entry for this transaction spend = self._create_spend( list(to_addresses), # Aggregate 'to' addresses net_change, transaction_type, - fees, + fee, tx['vout'][0], # Use the first output as a placeholder for metadata tx, list(from_addresses) ) if spend: - spend["fee"] = fees # Assign the calculated fee to the spend spends.append(spend) return spends @@ -809,12 +808,12 @@ def _extract_addresses(self, script_pub_key): return {script_pub_key['address']} return set() - def _create_spend(self, addresses, amount, category, fees, item, tx, from_addresses): + def _create_spend(self, addresses, amount, category, fee, item, tx, from_addresses): """Create a spend entry if not already spent.""" return { 'address': next(iter(addresses)), # Use the first address for simplicity 'amount': float(amount), - 'fee': fees, + 'fee': float(fee), 'vout': item['n'], 'category': category, 'confirmations': tx['confirmations'],