Fetching Gas Prices Script
def __get_gas_from_metaswap(self, priority, gas_upper_bound):
gas_provider = f'https://gas-api.metaswap.codefi.network/networks/{self.chain_id}/suggestedGasFees'
resp = None
try:
resp = requests.get(gas_provider, timeout=RequestTimeout)
resp_json = resp.json()
max_fee_per_gas = Decimal(resp_json[priority]['suggestedMaxFeePerGas'])
max_priority_fee_per_gas = Decimal(resp_json[priority]['suggestedMaxPriorityFeePerGas'])
apm.span_label(max_fee_per_gas=max_fee_per_gas, max_priority_fee_per_gas=max_priority_fee_per_gas,
gas_price_provider=gas_provider)
if max_fee_per_gas > gas_upper_bound:
raise OutOfRangeTransactionFee(f'gas price exceeded. {gas_upper_bound=} but it is {max_fee_per_gas}')
gas_params = {
'maxFeePerGas': Web3.to_wei(max_fee_per_gas, 'GWei'),
'maxPriorityFeePerGas': Web3.to_wei(max_priority_fee_per_gas, 'GWei')
}
return gas_params
except (RequestException, JSONDecodeError, KeyError):
if not DevEnv:
logging.exception(f'Failed to get gas info from metaswap {resp.status_code=}')
raise FailedToGetGasPrice("Failed to get gas info from metaswap")
# fixme: do it by asyncio way
def __get_gas_from_rpc(self, priority, gas_upper_bound):
gas_price = None
for provider in self.providers:
provider_url = str(provider.provider.endpoint_uri) # noqa
try:
gas_price = provider.eth.gas_price
apm.span_label(gas_price_from_provider=str(gas_price / 1e9), gas_price_provider=provider_url)
except (ConnectionError, ReadTimeout, ValueError) as e:
logging.error(f'Failed to get gas price from {provider_url}, {e=}')
continue
break
if gas_price is None:
raise FailedToGetGasPrice("Non of RCP could provide gas price!")
if gas_price / 1e9 > gas_upper_bound:
raise OutOfRangeTransactionFee(f'gas price exceeded. {gas_upper_bound=} but it is {gas_price / 1e9}')
return dict(gasPrice=int(gas_price * TxPriorityMultiplier.get(priority, 1)))
def _get_gas_price(self, gas_upper_bound, priority) -> dict:
gas_params = {}
if self.chain_id == 97: # Test BNB Network
gas_params['gasPrice'] = Web3.to_wei(10.1, 'GWei')
elif DevEnv or self.chain_id in ChainIdsWithGasFromRPC:
gas_params = self.__get_gas_from_rpc(priority, gas_upper_bound)
else:
try:
gas_params = self.__get_gas_from_metaswap(priority, gas_upper_bound)
except FailedToGetGasPrice:
gas_params = self.__get_gas_from_rpc(priority, gas_upper_bound)
apm.span_label(**gas_params)
return gas_params__get_gas_from_metaswap
Gas Provider URL:
Fetching and Processing Data:
Extract Gas Fees:
Log Span Label:
Check Upper Bound:
Prepare Gas Parameters:
Exception Handling:
__get_gas_from_rpc
Loop Through Providers:
Check Gas Price:
Return Gas Parameters:
__get_gas_price
Function Definition and Parameters:
Initialize Gas Parameters:
Conditional Check for Test BNB Network
Conditional Check for Development Environment or Specific Chains
Attempt to Get Gas Price from Metamask Swap API
Logging the Gas Parameters
Returning the Gas Parameters
Last updated
