API Reference#
Core#
Bus#
Classes and exceptions needed to operate a Data Bus
- class blockcrawler.core.bus.Consumer#
Data bus consumer ABC
- exception blockcrawler.core.bus.ConsumerError#
The default base error class for a data bus.
- class blockcrawler.core.bus.DataBus#
Data bus abstract base class
- abstract async register(consumer: Consumer)#
Register a consumer with the data bus. The data bus will send data to the consumer that is sent to the data bus.
- abstract async send(data_package: DataPackage)#
Send data to the data bus. The data will be received from the data bus by consumers.
- class blockcrawler.core.bus.DataPackage#
The base data package for a data bus to send and a consumer to receive.
- class blockcrawler.core.bus.DebugConsumer(filter_function: ~typing.Callable[[~blockcrawler.core.bus.DataPackage], bool] = <function DebugConsumer.<lambda>>)#
Consumer to use for testing your composed crawler. It will print all data received for with the filter function returns True.
This consumer can help understand which data is being passed around asynchronously on the data bus when standard debugging becomes too cumbersome.
- class blockcrawler.core.bus.ParallelDataBus(logger: Logger, *, raise_on_exception=False)#
Data Bus implementation which will send the data packages to consumers in parallel.
- async register(consumer: Consumer)#
Register a consumer with the data bus. The data bus will send data to the consumer that is sent to the data bus.
- async send(data_package: DataPackage)#
Send data to the data bus. The data will be received from the data bus by consumers.
- class blockcrawler.core.bus.Producer#
Data Producer abstract base class
- class blockcrawler.core.bus.SignalManager#
Context manager to manage signals and allow for graceful shutdown. Instantiating the SignalManager will register its own internal signal handler for all signals in the SIG_NAMES attribute tht are supported by the operating system. Entering the context will register the internal signal handler, exiting the manager will restore the original handler if the internal handler is still registered. The internal handler will alter the state of the manager such that interrupted will be True and interrupting_signal will return the name of hte interrupting signal.
Usage:
with SignalManager() as signal_manager: i: int = 0 while not signal_manager.interrupted: i += 1 print(i) sleep(1.0) print(signal_manager.interrupting_signal)
The above code will print an incrementing value every second until a signal occurs. Once the signal occurs, it will exit the while loop and print which signal it was.
RPC#
Remote Procedure Call Client classes and exceptions
- class blockcrawler.core.rpc.RpcClient(provider_url: str, stats_service: StatsService, requests_per_second: Optional[int] = None, max_concurrent_requests: Optional[int] = None)#
High performance RPC Client
- Parameters:
provider_url – URI of the websocket capable endpoint of an RPC server
stats_service – Stats service for recording connection and request metrics
requests_per_second – The maximum number of requests allowed in a second
max_concurrent_requests – The maximum number of requests that can be awaiting a response at any given time. The default will equal the max_concurrent_requests value.
- STAT_CONNECT = 'rpc.connect'#
Stat name for the number of time the client connected to the provider
- STAT_CONNECTION_RESET = 'rpc.connection-reset'#
Stat name for the number of connection resets received from the provider
- STAT_ORPHANED_REQUESTS = 'rpc.orphaned-requests'#
Stat name for the number of requests that were orphaned by being unable to receive a response from the provider.
- STAT_RECONNECT = 'rpc.reconnect'#
Stat name for the number of times the client reconnected to the provider
- STAT_REQUEST_DELAYED = 'rpc-request-delayed'#
Stat name for the number of requests that were delayed due to exceeding the requests per second limit or due to a pause caused by receiving a “Too Many Requests” error from the provider.
- STAT_REQUEST_MS = 'rpc.request-ms'#
Stat name for the total number of milliseconds all request spent round trip
- STAT_REQUEST_SENT = 'rpc.request-sent'#
Stat name for the number of requests sent to the provider
- STAT_RESPONSE_NO_ID = 'rpc.response-without-id'#
Stat name for the number of responses received that had no request ID
- STAT_RESPONSE_RECEIVED = 'rpc.response-received'#
Stat name for the number of responses received from the provider
- STAT_RESPONSE_TOO_MANY_REQUESTS = 'rpc.response-too-many-requests'#
Stat name for the number of “Too Many Requests” errors that were received from the provider.
- STAT_RESPONSE_UNKNOWN_FORMAT = 'rpc.response-unknown-format'#
Stat name for the number of responses received that were in an unknown format
- STAT_RESPONSE_UNKNOWN_ID = 'rpc.response-unknown-id'#
Stat name for the number of responses received that could not be accounted for in the outbound requests.
- async send(method: str, *params: Any) Any#
Send the RPC request for the method with the provided params and return the result, The parameters are analogous to the RPC 2.0 specification
- Parameters:
method – RPC method
params – Parameters for the RPC method
- exception blockcrawler.core.rpc.RpcClientError#
Exception thrown when something goes awry within the client itself
- exception blockcrawler.core.rpc.RpcDecodeError#
An error occurring when the RPC client cannot decode the response
- exception blockcrawler.core.rpc.RpcError#
Base exception for all RPC client errors
- exception blockcrawler.core.rpc.RpcServerError(rpc_version, request_id, error_code, error_message)#
An error raised when the RPC server returns an error response
- property error_code#
The error code returned by the RPC server
- property error_message#
The error message returned by the server
- property request_id#
The request ID for which the error is a result
- property rpc_version#
The version of the RPC response
- exception blockcrawler.core.rpc.RpcTransportError#
Exception thrown when the network transport used by the RPC client errors
Stats#
Class for tracking performance statistics
- class blockcrawler.core.stats.StatsService#
Service for tracking and retrieving statistics stored in memory
- get_count(stat: str) int#
Get the current count of a counter stat
- Parameters:
stat – Counter stat name to retrieve
- get_timings(stat: str) List[int]#
Get the recorded timings for the provided stat
- Parameters:
stat – Name of stat for which you wish to retrieve the timings
- Returns:
A list the number of nanoseconds recorded for the stat
- increment(stat: str, quantity: int = 1) None#
Increment a counter stat statistic by a quantity
- Parameters:
stat – Counter stat name to be incremented
quantity – Value for which the stat will be incremented
- ms_counter(stat: str)#
Return a context manager that will determine the number of milliseconds spent within the context and add that value to the counter stat. This method differs from the timer() method as it will not store a record for each time it is called and can be utilized for long-running operations and high frequency code execution without concern for memory usage.
- Parameters:
stat – Counter stat name to which the number of milliseconds will be added
Example:
with stats_service.ms_counter("timer"): sleep(1.0)
The above example should add 100 to the counter stat “timer”.
- reset() None#
Reset all counter and timer stats to 0 and an empty list respectively.
- timer(stat: str)#
Return a context manager that will determine the number of nanoseconds spent within the context and add an item to a list of timings associated with stat. This method differs from the ms_counter() method as it will store an item for each time it is called which can raise memory concerns for high-volume and long-running processes.
- Parameters:
stat – Counter stat name to which the number of nanoseconds will be recorded
- Example::
- with stats_service.ms_counter(“timer”):
sleep(0.001)
The above example should add an item with teh value 1,000,000 to the timing stat “timer”.
Entities#
Core entities
- class blockcrawler.core.entities.BlockChain(value)#
Enum to identify a blockchain/network combination
- ETHEREUM_MAINNET = 'ethereum-mainnet'#
Ethereum blockchain and mainnet network
- POLYGON_MAINNET = 'polygon-mainnet'#
Polygon blockchain and mainnet network
- class blockcrawler.core.entities.Entity#
Base class from which all entities are derived
Types#
Core types
- blockcrawler.core.types.Address#
A address type for explicitly identifying an address in usage
alias of
str
- class blockcrawler.core.types.HexInt(value: Union[str, int])#
A representation of an integer than can be easily translated between a hexadecimal string and an integer. It will evaluate in most forms as an integer representation.
- property hex_value: str#
Get the hexadecimal string representation of the object
- property int_value: int#
Get the integer representation of the object
- padded_hex(length: int)#
Get a zero-padded hexadecimal string of the object. for example:
HexInt(1).padded_hex(4)
will return the hexadecimal string 0x00001.
Click#
Classes to integrate with teh click library
- class blockcrawler.core.click.AddressParamType#
Click param type to parse input data and produce Address instances
- convert(value: Any, param: Optional[Parameter], ctx: Optional[Context]) Any#
Convert the value to the correct type. This is not called if the value is
None(the missing value).This must accept string values from the command line, as well as values that are already the correct type. It may also convert other compatible types.
The
paramandctxarguments may beNonein certain situations, such as when converting prompt input.If the value cannot be converted, call
fail()with a descriptive message.- Parameters:
value – The value to convert.
param – The parameter that is using this type to convert its value. May be
None.ctx – The current context that arrived at this value. May be
None.
- name: str = 'Address'#
the descriptive name of this type
- class blockcrawler.core.click.BlockChainParamType#
CLick param type to parse input data and produce BlockChain enums
- convert(value: Any, param: Optional[Parameter], ctx: Optional[Context]) Any#
Convert the value to the correct type. This is not called if the value is
None(the missing value).This must accept string values from the command line, as well as values that are already the correct type. It may also convert other compatible types.
The
paramandctxarguments may beNonein certain situations, such as when converting prompt input.If the value cannot be converted, call
fail()with a descriptive message.- Parameters:
value – The value to convert.
param – The parameter that is using this type to convert its value. May be
None.ctx – The current context that arrived at this value. May be
None.
- name: str = 'BlockChain'#
the descriptive name of this type
- class blockcrawler.core.click.EthereumCollectionTypeParamType#
Click param type to parse input data and produce CollectionType instances
- convert(value: Any, param: Optional[Parameter], ctx: Optional[Context]) Any#
Convert the value to the correct type. This is not called if the value is
None(the missing value).This must accept string values from the command line, as well as values that are already the correct type. It may also convert other compatible types.
The
paramandctxarguments may beNonein certain situations, such as when converting prompt input.If the value cannot be converted, call
fail()with a descriptive message.- Parameters:
value – The value to convert.
param – The parameter that is using this type to convert its value. May be
None.ctx – The current context that arrived at this value. May be
None.
- name: str = 'EthereumCollectionType'#
the descriptive name of this type
- class blockcrawler.core.click.HexBytesParamType#
Click param type to parse input data and produce HexBytes instances
- convert(value: Any, param: Optional[Parameter], ctx: Optional[Context]) Any#
Convert the value to the correct type. This is not called if the value is
None(the missing value).This must accept string values from the command line, as well as values that are already the correct type. It may also convert other compatible types.
The
paramandctxarguments may beNonein certain situations, such as when converting prompt input.If the value cannot be converted, call
fail()with a descriptive message.- Parameters:
value – The value to convert.
param – The parameter that is using this type to convert its value. May be
None.ctx – The current context that arrived at this value. May be
None.
- name: str = 'HexBytes'#
the descriptive name of this type
- class blockcrawler.core.click.HexIntParamType#
CLick param type to parse input data and produce HexInt instances
- convert(value: Any, param: Optional[Parameter], ctx: Optional[Context]) Any#
Convert the value to the correct type. This is not called if the value is
None(the missing value).This must accept string values from the command line, as well as values that are already the correct type. It may also convert other compatible types.
The
paramandctxarguments may beNonein certain situations, such as when converting prompt input.If the value cannot be converted, call
fail()with a descriptive message.- Parameters:
value – The value to convert.
param – The parameter that is using this type to convert its value. May be
None.ctx – The current context that arrived at this value. May be
None.
- name: str = 'HexInt'#
the descriptive name of this type
EVM#
Data Packages#
Data packages for sending data to the data bus
- class blockcrawler.evm.data_packages.EvmBlockDataPackage(blockchain: BlockChain, block: EvmBlock)#
Data package for EVM blocks
- block: EvmBlock#
Block
- blockchain: BlockChain#
Blockchain to which the block belongs
- class blockcrawler.evm.data_packages.EvmBlockIDDataPackage(blockchain: BlockChain, block_id: HexInt)#
Data package for placing Block IDs on the Data Bus
- blockchain: BlockChain#
Blockchain to which the block ID belongs
- class blockcrawler.evm.data_packages.EvmLogDataPackage(blockchain: BlockChain, log: EvmLog, transaction_receipt: EvmTransactionReceipt, block: EvmBlock)#
Data package for EVM logs
- block: EvmBlock#
Block from which the transaction containing the log originated
- blockchain: BlockChain#
Blockchain to which the block belongs
- transaction_receipt: EvmTransactionReceipt#
Transaction receipt from which the log originated
- class blockcrawler.evm.data_packages.EvmTransactionDataPackage(blockchain: BlockChain, transaction: EvmTransaction, block: EvmBlock)#
Data package for EVM transactions
- block: EvmBlock#
Block from which the transaction originated
- blockchain: BlockChain#
Blockchain to which the block belongs
- transaction: EvmTransaction#
Transaction
- class blockcrawler.evm.data_packages.EvmTransactionHashDataPackage(blockchain: BlockChain, hash: HexBytes, block: EvmBlock)#
Data package for EVM transaction hashes
- block: EvmBlock#
Block from which the transaction hash originated
- blockchain: BlockChain#
Blockchain to which the block belongs
- hash: HexBytes#
Transaction hash
- class blockcrawler.evm.data_packages.EvmTransactionReceiptDataPackage(blockchain: BlockChain, transaction_receipt: EvmTransactionReceipt, block: EvmBlock)#
Data package for EVM transaction receipts
- block: EvmBlock#
Block from which the transaction receipt originated
- blockchain: BlockChain#
Blockchain to which the block belongs
- transaction_receipt: EvmTransactionReceipt#
Transaction receipt
RPC#
EVM specific RPC Clients
- exception blockcrawler.evm.rpc.CallError#
Exception for issues arising while calling the call method
- with_traceback()#
Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
- class blockcrawler.evm.rpc.ConnectionPoolingEvmRpcClient(pool: List[EvmRpcClient])#
Pooled EVM RPC Client. THis client takes a list of EVM RPC clients to use in a round-robin strategy for sending RPC requests. The client is for high-volume applications that would exceed their request-per-second limit with a single connection or even a single provider.
- Parameters:
pool – List of RPC clients
- STAT_CALL = 'rpc.eth.call'#
Stat name for counts of eth_call RPC calls
- STAT_CONNECT = 'rpc.connect'#
Stat name for the number of time the client connected to the provider
- STAT_CONNECTION_RESET = 'rpc.connection-reset'#
Stat name for the number of connection resets received from the provider
- STAT_GET_BLOCK = 'rpc.eth.get_block_by_number'#
Stat name for counts of eth_getBlockByNumber RPC calls
- STAT_GET_BLOCK_NUMBER = 'rpc.eth.block_number'#
Stat name for counts of eth_blockNumber RPC calls
- STAT_GET_LOGS = 'rpc.eth.get_logs'#
Stat name for counts of eth_getLogs RPC calls
- STAT_GET_TRANSACTION_RECEIPT = 'rpc.eth.get_transaction_receipt'#
Stat name for counts of eth_getTransactionReceipt RPC calls
- STAT_ORPHANED_REQUESTS = 'rpc.orphaned-requests'#
Stat name for the number of requests that were orphaned by being unable to receive a response from the provider.
- STAT_RECONNECT = 'rpc.reconnect'#
Stat name for the number of times the client reconnected to the provider
- STAT_REQUEST_DELAYED = 'rpc-request-delayed'#
Stat name for the number of requests that were delayed due to exceeding the requests per second limit or due to a pause caused by receiving a “Too Many Requests” error from the provider.
- STAT_REQUEST_MS = 'rpc.request-ms'#
Stat name for the total number of milliseconds all request spent round trip
- STAT_REQUEST_SENT = 'rpc.request-sent'#
Stat name for the number of requests sent to the provider
- STAT_RESPONSE_NO_ID = 'rpc.response-without-id'#
Stat name for the number of responses received that had no request ID
- STAT_RESPONSE_RECEIVED = 'rpc.response-received'#
Stat name for the number of responses received from the provider
- STAT_RESPONSE_TOO_MANY_REQUESTS = 'rpc.response-too-many-requests'#
Stat name for the number of “Too Many Requests” errors that were received from the provider.
- STAT_RESPONSE_UNKNOWN_FORMAT = 'rpc.response-unknown-format'#
Stat name for the number of responses received that were in an unknown format
- STAT_RESPONSE_UNKNOWN_ID = 'rpc.response-unknown-id'#
Stat name for the number of responses received that could not be accounted for in the outbound requests.
- async call(request: EthCall) Any#
Call a function on a smart contract via eth_call
- Parameters:
request – Object representation of the call
- Returns:
Decoded values returned by the smart contract function. If there is no return type for the function, the result will be None. Otherwise, it will be a tuple of response types as functions can return multiple values. For example:
(result,) = rpc_client.call(request)
- Raises:
CallError
- async get_block(block_num: HexInt, full_transactions: bool = False) EvmBlock#
Get a block via eth_getBlockByNumber
- Parameters:
block_num – The block number for the block you wish to get.
full_transactions – Return full transactions flag. If True, transactions attribute on the returned block will contain EVMTransaction objects and the transaction_hashes attribute will contain transaction hashes. If False, the transaction_hashes attribute will contain transaction hashes and the transactions attribute will be None.
- async get_block_number() HexInt#
Get the current block height via eth_blockNumber
- async get_logs(topics: List[Union[str, List[str]]], from_block: HexInt, to_block: HexInt, address: Address, starting_block_range_size: Optional[int] = None) AsyncIterable[EvmLog]#
Get logs for the provided topic from one block to another for an address by an optional block size via eth_getLogs.
This method will react to errors for the endpoint returning an error as the block range or result is too large and iterate for as many calls as is necessary to return all the logs.
- Parameters:
topics – List of topics that are ABI encoded. The list is dependent on the log type you wish to search. Explaining how to search logs takes more room than this document could provide.
from_block – The lowest block from which to search for logs
to_block – The highest block from which to search for logs
address – The log address to filter.
starting_block_range_size – The starting size for the block range to query for the logs. The default size is the entire range. This may be not be optimal if the range will house a large number of logs and the method will have to react. Optimizing this size can reduce time spent on RPC calls that will always error.1
- Returns:
An asynchronous iterator to iterate through the logs.
Here’s an example for retrieving all the URI events for a contract:
async for log in await rpc_client.get_logs( [Erc1155Events.URI.event_signature_hash.hex()], HexInt(0), HexInt(16_734_967), Address("0x19c8a3f0b290a36de59a50b4c70a23f9c045ec74"), ): print(log)
- async get_transaction_receipt(tx_hash: HexBytes) EvmTransactionReceipt#
Get a transaction receipt by hash via eth_getTransactionReceipt
- Parameters:
tx_hash – Transaction hash of transaction you wish to retrieve.
- async send(method, *params) Any#
Send the RPC request for the method with the provided params and return the result, The parameters are analogous to the RPC 2.0 specification
- Parameters:
method – RPC method
params – Parameters for the RPC method
- class blockcrawler.evm.rpc.EthCall(from_: Optional[str], to: str, function: Function, parameters: Optional[list] = None, block: Optional[Union[HexInt, Literal['latest'], Literal['earliest'], Literal['pending']]] = 'latest')#
Python representation of the properties of an eth_call to execute a function for a smart contract on an Ethereum Virtual Machine (EVM)
- Parameters:
from – Address from which a transaction would originate. This is optional for view function calls.
to – Address of the contract whose function you will be calling.
function – The function class representation of the contract function
parameters – The list of ordered function parameters to send
block – The block height at which to execute the function
- class blockcrawler.evm.rpc.EvmRpcClient(provider_url: str, stats_service: StatsService, requests_per_second: Optional[int] = None, max_concurrent_requests: Optional[int] = None)#
RPC Client for EVM RPC calls
- STAT_CALL = 'rpc.eth.call'#
Stat name for counts of eth_call RPC calls
- STAT_CONNECT = 'rpc.connect'#
Stat name for the number of time the client connected to the provider
- STAT_CONNECTION_RESET = 'rpc.connection-reset'#
Stat name for the number of connection resets received from the provider
- STAT_GET_BLOCK = 'rpc.eth.get_block_by_number'#
Stat name for counts of eth_getBlockByNumber RPC calls
- STAT_GET_BLOCK_NUMBER = 'rpc.eth.block_number'#
Stat name for counts of eth_blockNumber RPC calls
- STAT_GET_LOGS = 'rpc.eth.get_logs'#
Stat name for counts of eth_getLogs RPC calls
- STAT_GET_TRANSACTION_RECEIPT = 'rpc.eth.get_transaction_receipt'#
Stat name for counts of eth_getTransactionReceipt RPC calls
- STAT_ORPHANED_REQUESTS = 'rpc.orphaned-requests'#
Stat name for the number of requests that were orphaned by being unable to receive a response from the provider.
- STAT_RECONNECT = 'rpc.reconnect'#
Stat name for the number of times the client reconnected to the provider
- STAT_REQUEST_DELAYED = 'rpc-request-delayed'#
Stat name for the number of requests that were delayed due to exceeding the requests per second limit or due to a pause caused by receiving a “Too Many Requests” error from the provider.
- STAT_REQUEST_MS = 'rpc.request-ms'#
Stat name for the total number of milliseconds all request spent round trip
- STAT_REQUEST_SENT = 'rpc.request-sent'#
Stat name for the number of requests sent to the provider
- STAT_RESPONSE_NO_ID = 'rpc.response-without-id'#
Stat name for the number of responses received that had no request ID
- STAT_RESPONSE_RECEIVED = 'rpc.response-received'#
Stat name for the number of responses received from the provider
- STAT_RESPONSE_TOO_MANY_REQUESTS = 'rpc.response-too-many-requests'#
Stat name for the number of “Too Many Requests” errors that were received from the provider.
- STAT_RESPONSE_UNKNOWN_FORMAT = 'rpc.response-unknown-format'#
Stat name for the number of responses received that were in an unknown format
- STAT_RESPONSE_UNKNOWN_ID = 'rpc.response-unknown-id'#
Stat name for the number of responses received that could not be accounted for in the outbound requests.
- async call(request: EthCall) Any#
Call a function on a smart contract via eth_call
- Parameters:
request – Object representation of the call
- Returns:
Decoded values returned by the smart contract function. If there is no return type for the function, the result will be None. Otherwise, it will be a tuple of response types as functions can return multiple values. For example:
(result,) = rpc_client.call(request)
- Raises:
CallError
- async get_block(block_num: HexInt, full_transactions: bool = False) EvmBlock#
Get a block via eth_getBlockByNumber
- Parameters:
block_num – The block number for the block you wish to get.
full_transactions – Return full transactions flag. If True, transactions attribute on the returned block will contain EVMTransaction objects and the transaction_hashes attribute will contain transaction hashes. If False, the transaction_hashes attribute will contain transaction hashes and the transactions attribute will be None.
- async get_block_number() HexInt#
Get the current block height via eth_blockNumber
- async get_logs(topics: List[Union[str, List[str]]], from_block: HexInt, to_block: HexInt, address: Address, starting_block_range_size: Optional[int] = None) AsyncIterable[EvmLog]#
Get logs for the provided topic from one block to another for an address by an optional block size via eth_getLogs.
This method will react to errors for the endpoint returning an error as the block range or result is too large and iterate for as many calls as is necessary to return all the logs.
- Parameters:
topics – List of topics that are ABI encoded. The list is dependent on the log type you wish to search. Explaining how to search logs takes more room than this document could provide.
from_block – The lowest block from which to search for logs
to_block – The highest block from which to search for logs
address – The log address to filter.
starting_block_range_size – The starting size for the block range to query for the logs. The default size is the entire range. This may be not be optimal if the range will house a large number of logs and the method will have to react. Optimizing this size can reduce time spent on RPC calls that will always error.1
- Returns:
An asynchronous iterator to iterate through the logs.
Here’s an example for retrieving all the URI events for a contract:
async for log in await rpc_client.get_logs( [Erc1155Events.URI.event_signature_hash.hex()], HexInt(0), HexInt(16_734_967), Address("0x19c8a3f0b290a36de59a50b4c70a23f9c045ec74"), ): print(log)
- async get_transaction_receipt(tx_hash: HexBytes) EvmTransactionReceipt#
Get a transaction receipt by hash via eth_getTransactionReceipt
- Parameters:
tx_hash – Transaction hash of transaction you wish to retrieve.
- async send(method: str, *params: Any) Any#
Send the RPC request for the method with the provided params and return the result, The parameters are analogous to the RPC 2.0 specification
- Parameters:
method – RPC method
params – Parameters for the RPC method