API Reference#

Core#

Bus#

Classes and exceptions needed to operate a Data Bus

class blockcrawler.core.bus.Consumer#

Data bus consumer ABC

exception blockcrawler.core.bus.ConsumerError#

The default base error class for a data bus.

class blockcrawler.core.bus.DataBus#

Data bus abstract base class

abstract async register(consumer: Consumer)#

Register a consumer with the data bus. The data bus will send data to the consumer that is sent to the data bus.

abstract async send(data_package: DataPackage)#

Send data to the data bus. The data will be received from the data bus by consumers.

class blockcrawler.core.bus.DataPackage#

The base data package for a data bus to send and a consumer to receive.

class blockcrawler.core.bus.DebugConsumer(filter_function: ~typing.Callable[[~blockcrawler.core.bus.DataPackage], bool] = <function DebugConsumer.<lambda>>)#

Consumer to use for testing your composed crawler. It will print all data received for with the filter function returns True.

This consumer can help understand which data is being passed around asynchronously on the data bus when standard debugging becomes too cumbersome.

class blockcrawler.core.bus.ParallelDataBus(logger: Logger, *, raise_on_exception=False)#

Data Bus implementation which will send the data packages to consumers in parallel.

async register(consumer: Consumer)#

Register a consumer with the data bus. The data bus will send data to the consumer that is sent to the data bus.

async send(data_package: DataPackage)#

Send data to the data bus. The data will be received from the data bus by consumers.

class blockcrawler.core.bus.Producer#

Data Producer abstract base class

class blockcrawler.core.bus.SignalManager#

Context manager to manage signals and allow for graceful shutdown. Instantiating the SignalManager will register its own internal signal handler for all signals in the SIG_NAMES attribute tht are supported by the operating system. Entering the context will register the internal signal handler, exiting the manager will restore the original handler if the internal handler is still registered. The internal handler will alter the state of the manager such that interrupted will be True and interrupting_signal will return the name of hte interrupting signal.

Usage:

with SignalManager() as signal_manager:
    i: int = 0
    while not signal_manager.interrupted:
        i += 1
        print(i)
        sleep(1.0)
    print(signal_manager.interrupting_signal)

The above code will print an incrementing value every second until a signal occurs. Once the signal occurs, it will exit the while loop and print which signal it was.

class blockcrawler.core.bus.Transformer(data_bus: DataBus)#

A data bus consumer that places data on the data bus based on the data transformations it makes.

_get_data_bus() DataBus#

Get the data bus instance which will receive transformed data

RPC#

Remote Procedure Call Client classes and exceptions

class blockcrawler.core.rpc.RpcClient(provider_url: str, stats_service: StatsService, requests_per_second: Optional[int] = None, max_concurrent_requests: Optional[int] = None)#

High performance RPC Client

Parameters:
  • provider_url – URI of the websocket capable endpoint of an RPC server

  • stats_service – Stats service for recording connection and request metrics

  • requests_per_second – The maximum number of requests allowed in a second

  • max_concurrent_requests – The maximum number of requests that can be awaiting a response at any given time. The default will equal the max_concurrent_requests value.

STAT_CONNECT = 'rpc.connect'#

Stat name for the number of time the client connected to the provider

STAT_CONNECTION_RESET = 'rpc.connection-reset'#

Stat name for the number of connection resets received from the provider

STAT_ORPHANED_REQUESTS = 'rpc.orphaned-requests'#

Stat name for the number of requests that were orphaned by being unable to receive a response from the provider.

STAT_RECONNECT = 'rpc.reconnect'#

Stat name for the number of times the client reconnected to the provider

STAT_REQUEST_DELAYED = 'rpc-request-delayed'#

Stat name for the number of requests that were delayed due to exceeding the requests per second limit or due to a pause caused by receiving a “Too Many Requests” error from the provider.

STAT_REQUEST_MS = 'rpc.request-ms'#

Stat name for the total number of milliseconds all request spent round trip

STAT_REQUEST_SENT = 'rpc.request-sent'#

Stat name for the number of requests sent to the provider

STAT_RESPONSE_NO_ID = 'rpc.response-without-id'#

Stat name for the number of responses received that had no request ID

STAT_RESPONSE_RECEIVED = 'rpc.response-received'#

Stat name for the number of responses received from the provider

STAT_RESPONSE_TOO_MANY_REQUESTS = 'rpc.response-too-many-requests'#

Stat name for the number of “Too Many Requests” errors that were received from the provider.

STAT_RESPONSE_UNKNOWN_FORMAT = 'rpc.response-unknown-format'#

Stat name for the number of responses received that were in an unknown format

STAT_RESPONSE_UNKNOWN_ID = 'rpc.response-unknown-id'#

Stat name for the number of responses received that could not be accounted for in the outbound requests.

async send(method: str, *params: Any) Any#

Send the RPC request for the method with the provided params and return the result, The parameters are analogous to the RPC 2.0 specification

Parameters:
  • method – RPC method

  • params – Parameters for the RPC method

exception blockcrawler.core.rpc.RpcClientError#

Exception thrown when something goes awry within the client itself

exception blockcrawler.core.rpc.RpcDecodeError#

An error occurring when the RPC client cannot decode the response

exception blockcrawler.core.rpc.RpcError#

Base exception for all RPC client errors

exception blockcrawler.core.rpc.RpcServerError(rpc_version, request_id, error_code, error_message)#

An error raised when the RPC server returns an error response

property error_code#

The error code returned by the RPC server

property error_message#

The error message returned by the server

property request_id#

The request ID for which the error is a result

property rpc_version#

The version of the RPC response

exception blockcrawler.core.rpc.RpcTransportError#

Exception thrown when the network transport used by the RPC client errors

Stats#

Class for tracking performance statistics

class blockcrawler.core.stats.StatsService#

Service for tracking and retrieving statistics stored in memory

get_count(stat: str) int#

Get the current count of a counter stat

Parameters:

stat – Counter stat name to retrieve

get_timings(stat: str) List[int]#

Get the recorded timings for the provided stat

Parameters:

stat – Name of stat for which you wish to retrieve the timings

Returns:

A list the number of nanoseconds recorded for the stat

increment(stat: str, quantity: int = 1) None#

Increment a counter stat statistic by a quantity

Parameters:
  • stat – Counter stat name to be incremented

  • quantity – Value for which the stat will be incremented

ms_counter(stat: str)#

Return a context manager that will determine the number of milliseconds spent within the context and add that value to the counter stat. This method differs from the timer() method as it will not store a record for each time it is called and can be utilized for long-running operations and high frequency code execution without concern for memory usage.

Parameters:

stat – Counter stat name to which the number of milliseconds will be added

Example:

with stats_service.ms_counter("timer"):
    sleep(1.0)

The above example should add 100 to the counter stat “timer”.

reset() None#

Reset all counter and timer stats to 0 and an empty list respectively.

timer(stat: str)#

Return a context manager that will determine the number of nanoseconds spent within the context and add an item to a list of timings associated with stat. This method differs from the ms_counter() method as it will store an item for each time it is called which can raise memory concerns for high-volume and long-running processes.

Parameters:

stat – Counter stat name to which the number of nanoseconds will be recorded

Example::
with stats_service.ms_counter(“timer”):

sleep(0.001)

The above example should add an item with teh value 1,000,000 to the timing stat “timer”.

Entities#

Core entities

class blockcrawler.core.entities.BlockChain(value)#

Enum to identify a blockchain/network combination

ETHEREUM_MAINNET = 'ethereum-mainnet'#

Ethereum blockchain and mainnet network

POLYGON_MAINNET = 'polygon-mainnet'#

Polygon blockchain and mainnet network

class blockcrawler.core.entities.Entity#

Base class from which all entities are derived

Types#

Core types

blockcrawler.core.types.Address#

A address type for explicitly identifying an address in usage

alias of str

class blockcrawler.core.types.HexInt(value: Union[str, int])#

A representation of an integer than can be easily translated between a hexadecimal string and an integer. It will evaluate in most forms as an integer representation.

property hex_value: str#

Get the hexadecimal string representation of the object

property int_value: int#

Get the integer representation of the object

padded_hex(length: int)#

Get a zero-padded hexadecimal string of the object. for example:

HexInt(1).padded_hex(4)

will return the hexadecimal string 0x00001.

Click#

Classes to integrate with teh click library

class blockcrawler.core.click.AddressParamType#

Click param type to parse input data and produce Address instances

convert(value: Any, param: Optional[Parameter], ctx: Optional[Context]) Any#

Convert the value to the correct type. This is not called if the value is None (the missing value).

This must accept string values from the command line, as well as values that are already the correct type. It may also convert other compatible types.

The param and ctx arguments may be None in certain situations, such as when converting prompt input.

If the value cannot be converted, call fail() with a descriptive message.

Parameters:
  • value – The value to convert.

  • param – The parameter that is using this type to convert its value. May be None.

  • ctx – The current context that arrived at this value. May be None.

name: str = 'Address'#

the descriptive name of this type

class blockcrawler.core.click.BlockChainParamType#

CLick param type to parse input data and produce BlockChain enums

convert(value: Any, param: Optional[Parameter], ctx: Optional[Context]) Any#

Convert the value to the correct type. This is not called if the value is None (the missing value).

This must accept string values from the command line, as well as values that are already the correct type. It may also convert other compatible types.

The param and ctx arguments may be None in certain situations, such as when converting prompt input.

If the value cannot be converted, call fail() with a descriptive message.

Parameters:
  • value – The value to convert.

  • param – The parameter that is using this type to convert its value. May be None.

  • ctx – The current context that arrived at this value. May be None.

name: str = 'BlockChain'#

the descriptive name of this type

class blockcrawler.core.click.EthereumCollectionTypeParamType#

Click param type to parse input data and produce CollectionType instances

convert(value: Any, param: Optional[Parameter], ctx: Optional[Context]) Any#

Convert the value to the correct type. This is not called if the value is None (the missing value).

This must accept string values from the command line, as well as values that are already the correct type. It may also convert other compatible types.

The param and ctx arguments may be None in certain situations, such as when converting prompt input.

If the value cannot be converted, call fail() with a descriptive message.

Parameters:
  • value – The value to convert.

  • param – The parameter that is using this type to convert its value. May be None.

  • ctx – The current context that arrived at this value. May be None.

name: str = 'EthereumCollectionType'#

the descriptive name of this type

class blockcrawler.core.click.HexBytesParamType#

Click param type to parse input data and produce HexBytes instances

convert(value: Any, param: Optional[Parameter], ctx: Optional[Context]) Any#

Convert the value to the correct type. This is not called if the value is None (the missing value).

This must accept string values from the command line, as well as values that are already the correct type. It may also convert other compatible types.

The param and ctx arguments may be None in certain situations, such as when converting prompt input.

If the value cannot be converted, call fail() with a descriptive message.

Parameters:
  • value – The value to convert.

  • param – The parameter that is using this type to convert its value. May be None.

  • ctx – The current context that arrived at this value. May be None.

name: str = 'HexBytes'#

the descriptive name of this type

class blockcrawler.core.click.HexIntParamType#

CLick param type to parse input data and produce HexInt instances

convert(value: Any, param: Optional[Parameter], ctx: Optional[Context]) Any#

Convert the value to the correct type. This is not called if the value is None (the missing value).

This must accept string values from the command line, as well as values that are already the correct type. It may also convert other compatible types.

The param and ctx arguments may be None in certain situations, such as when converting prompt input.

If the value cannot be converted, call fail() with a descriptive message.

Parameters:
  • value – The value to convert.

  • param – The parameter that is using this type to convert its value. May be None.

  • ctx – The current context that arrived at this value. May be None.

name: str = 'HexInt'#

the descriptive name of this type

EVM#

Data Packages#

Data packages for sending data to the data bus

class blockcrawler.evm.data_packages.EvmBlockDataPackage(blockchain: BlockChain, block: EvmBlock)#

Data package for EVM blocks

block: EvmBlock#

Block

blockchain: BlockChain#

Blockchain to which the block belongs

class blockcrawler.evm.data_packages.EvmBlockIDDataPackage(blockchain: BlockChain, block_id: HexInt)#

Data package for placing Block IDs on the Data Bus

block_id: HexInt#

Block ID

blockchain: BlockChain#

Blockchain to which the block ID belongs

class blockcrawler.evm.data_packages.EvmLogDataPackage(blockchain: BlockChain, log: EvmLog, transaction_receipt: EvmTransactionReceipt, block: EvmBlock)#

Data package for EVM logs

block: EvmBlock#

Block from which the transaction containing the log originated

blockchain: BlockChain#

Blockchain to which the block belongs

transaction_receipt: EvmTransactionReceipt#

Transaction receipt from which the log originated

class blockcrawler.evm.data_packages.EvmTransactionDataPackage(blockchain: BlockChain, transaction: EvmTransaction, block: EvmBlock)#

Data package for EVM transactions

block: EvmBlock#

Block from which the transaction originated

blockchain: BlockChain#

Blockchain to which the block belongs

transaction: EvmTransaction#

Transaction

class blockcrawler.evm.data_packages.EvmTransactionHashDataPackage(blockchain: BlockChain, hash: HexBytes, block: EvmBlock)#

Data package for EVM transaction hashes

block: EvmBlock#

Block from which the transaction hash originated

blockchain: BlockChain#

Blockchain to which the block belongs

hash: HexBytes#

Transaction hash

class blockcrawler.evm.data_packages.EvmTransactionReceiptDataPackage(blockchain: BlockChain, transaction_receipt: EvmTransactionReceipt, block: EvmBlock)#

Data package for EVM transaction receipts

block: EvmBlock#

Block from which the transaction receipt originated

blockchain: BlockChain#

Blockchain to which the block belongs

transaction_receipt: EvmTransactionReceipt#

Transaction receipt

RPC#

EVM specific RPC Clients

exception blockcrawler.evm.rpc.CallError#

Exception for issues arising while calling the call method

with_traceback()#

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

class blockcrawler.evm.rpc.ConnectionPoolingEvmRpcClient(pool: List[EvmRpcClient])#

Pooled EVM RPC Client. THis client takes a list of EVM RPC clients to use in a round-robin strategy for sending RPC requests. The client is for high-volume applications that would exceed their request-per-second limit with a single connection or even a single provider.

Parameters:

pool – List of RPC clients

STAT_CALL = 'rpc.eth.call'#

Stat name for counts of eth_call RPC calls

STAT_CONNECT = 'rpc.connect'#

Stat name for the number of time the client connected to the provider

STAT_CONNECTION_RESET = 'rpc.connection-reset'#

Stat name for the number of connection resets received from the provider

STAT_GET_BLOCK = 'rpc.eth.get_block_by_number'#

Stat name for counts of eth_getBlockByNumber RPC calls

STAT_GET_BLOCK_NUMBER = 'rpc.eth.block_number'#

Stat name for counts of eth_blockNumber RPC calls

STAT_GET_LOGS = 'rpc.eth.get_logs'#

Stat name for counts of eth_getLogs RPC calls

STAT_GET_TRANSACTION_RECEIPT = 'rpc.eth.get_transaction_receipt'#

Stat name for counts of eth_getTransactionReceipt RPC calls

STAT_ORPHANED_REQUESTS = 'rpc.orphaned-requests'#

Stat name for the number of requests that were orphaned by being unable to receive a response from the provider.

STAT_RECONNECT = 'rpc.reconnect'#

Stat name for the number of times the client reconnected to the provider

STAT_REQUEST_DELAYED = 'rpc-request-delayed'#

Stat name for the number of requests that were delayed due to exceeding the requests per second limit or due to a pause caused by receiving a “Too Many Requests” error from the provider.

STAT_REQUEST_MS = 'rpc.request-ms'#

Stat name for the total number of milliseconds all request spent round trip

STAT_REQUEST_SENT = 'rpc.request-sent'#

Stat name for the number of requests sent to the provider

STAT_RESPONSE_NO_ID = 'rpc.response-without-id'#

Stat name for the number of responses received that had no request ID

STAT_RESPONSE_RECEIVED = 'rpc.response-received'#

Stat name for the number of responses received from the provider

STAT_RESPONSE_TOO_MANY_REQUESTS = 'rpc.response-too-many-requests'#

Stat name for the number of “Too Many Requests” errors that were received from the provider.

STAT_RESPONSE_UNKNOWN_FORMAT = 'rpc.response-unknown-format'#

Stat name for the number of responses received that were in an unknown format

STAT_RESPONSE_UNKNOWN_ID = 'rpc.response-unknown-id'#

Stat name for the number of responses received that could not be accounted for in the outbound requests.

async call(request: EthCall) Any#

Call a function on a smart contract via eth_call

Parameters:

request – Object representation of the call

Returns:

Decoded values returned by the smart contract function. If there is no return type for the function, the result will be None. Otherwise, it will be a tuple of response types as functions can return multiple values. For example:

(result,) = rpc_client.call(request)

Raises:

CallError

async get_block(block_num: HexInt, full_transactions: bool = False) EvmBlock#

Get a block via eth_getBlockByNumber

Parameters:
  • block_num – The block number for the block you wish to get.

  • full_transactions – Return full transactions flag. If True, transactions attribute on the returned block will contain EVMTransaction objects and the transaction_hashes attribute will contain transaction hashes. If False, the transaction_hashes attribute will contain transaction hashes and the transactions attribute will be None.

async get_block_number() HexInt#

Get the current block height via eth_blockNumber

async get_logs(topics: List[Union[str, List[str]]], from_block: HexInt, to_block: HexInt, address: Address, starting_block_range_size: Optional[int] = None) AsyncIterable[EvmLog]#

Get logs for the provided topic from one block to another for an address by an optional block size via eth_getLogs.

This method will react to errors for the endpoint returning an error as the block range or result is too large and iterate for as many calls as is necessary to return all the logs.

Parameters:
  • topics – List of topics that are ABI encoded. The list is dependent on the log type you wish to search. Explaining how to search logs takes more room than this document could provide.

  • from_block – The lowest block from which to search for logs

  • to_block – The highest block from which to search for logs

  • address – The log address to filter.

  • starting_block_range_size – The starting size for the block range to query for the logs. The default size is the entire range. This may be not be optimal if the range will house a large number of logs and the method will have to react. Optimizing this size can reduce time spent on RPC calls that will always error.1

Returns:

An asynchronous iterator to iterate through the logs.

Here’s an example for retrieving all the URI events for a contract:

async for log in await rpc_client.get_logs(
    [Erc1155Events.URI.event_signature_hash.hex()],
    HexInt(0),
    HexInt(16_734_967),
    Address("0x19c8a3f0b290a36de59a50b4c70a23f9c045ec74"),
):
    print(log)
async get_transaction_receipt(tx_hash: HexBytes) EvmTransactionReceipt#

Get a transaction receipt by hash via eth_getTransactionReceipt

Parameters:

tx_hash – Transaction hash of transaction you wish to retrieve.

async send(method, *params) Any#

Send the RPC request for the method with the provided params and return the result, The parameters are analogous to the RPC 2.0 specification

Parameters:
  • method – RPC method

  • params – Parameters for the RPC method

class blockcrawler.evm.rpc.EthCall(from_: Optional[str], to: str, function: Function, parameters: Optional[list] = None, block: Optional[Union[HexInt, Literal['latest'], Literal['earliest'], Literal['pending']]] = 'latest')#

Python representation of the properties of an eth_call to execute a function for a smart contract on an Ethereum Virtual Machine (EVM)

Parameters:
  • from – Address from which a transaction would originate. This is optional for view function calls.

  • to – Address of the contract whose function you will be calling.

  • function – The function class representation of the contract function

  • parameters – The list of ordered function parameters to send

  • block – The block height at which to execute the function

class blockcrawler.evm.rpc.EvmRpcClient(provider_url: str, stats_service: StatsService, requests_per_second: Optional[int] = None, max_concurrent_requests: Optional[int] = None)#

RPC Client for EVM RPC calls

STAT_CALL = 'rpc.eth.call'#

Stat name for counts of eth_call RPC calls

STAT_CONNECT = 'rpc.connect'#

Stat name for the number of time the client connected to the provider

STAT_CONNECTION_RESET = 'rpc.connection-reset'#

Stat name for the number of connection resets received from the provider

STAT_GET_BLOCK = 'rpc.eth.get_block_by_number'#

Stat name for counts of eth_getBlockByNumber RPC calls

STAT_GET_BLOCK_NUMBER = 'rpc.eth.block_number'#

Stat name for counts of eth_blockNumber RPC calls

STAT_GET_LOGS = 'rpc.eth.get_logs'#

Stat name for counts of eth_getLogs RPC calls

STAT_GET_TRANSACTION_RECEIPT = 'rpc.eth.get_transaction_receipt'#

Stat name for counts of eth_getTransactionReceipt RPC calls

STAT_ORPHANED_REQUESTS = 'rpc.orphaned-requests'#

Stat name for the number of requests that were orphaned by being unable to receive a response from the provider.

STAT_RECONNECT = 'rpc.reconnect'#

Stat name for the number of times the client reconnected to the provider

STAT_REQUEST_DELAYED = 'rpc-request-delayed'#

Stat name for the number of requests that were delayed due to exceeding the requests per second limit or due to a pause caused by receiving a “Too Many Requests” error from the provider.

STAT_REQUEST_MS = 'rpc.request-ms'#

Stat name for the total number of milliseconds all request spent round trip

STAT_REQUEST_SENT = 'rpc.request-sent'#

Stat name for the number of requests sent to the provider

STAT_RESPONSE_NO_ID = 'rpc.response-without-id'#

Stat name for the number of responses received that had no request ID

STAT_RESPONSE_RECEIVED = 'rpc.response-received'#

Stat name for the number of responses received from the provider

STAT_RESPONSE_TOO_MANY_REQUESTS = 'rpc.response-too-many-requests'#

Stat name for the number of “Too Many Requests” errors that were received from the provider.

STAT_RESPONSE_UNKNOWN_FORMAT = 'rpc.response-unknown-format'#

Stat name for the number of responses received that were in an unknown format

STAT_RESPONSE_UNKNOWN_ID = 'rpc.response-unknown-id'#

Stat name for the number of responses received that could not be accounted for in the outbound requests.

async call(request: EthCall) Any#

Call a function on a smart contract via eth_call

Parameters:

request – Object representation of the call

Returns:

Decoded values returned by the smart contract function. If there is no return type for the function, the result will be None. Otherwise, it will be a tuple of response types as functions can return multiple values. For example:

(result,) = rpc_client.call(request)

Raises:

CallError

async get_block(block_num: HexInt, full_transactions: bool = False) EvmBlock#

Get a block via eth_getBlockByNumber

Parameters:
  • block_num – The block number for the block you wish to get.

  • full_transactions – Return full transactions flag. If True, transactions attribute on the returned block will contain EVMTransaction objects and the transaction_hashes attribute will contain transaction hashes. If False, the transaction_hashes attribute will contain transaction hashes and the transactions attribute will be None.

async get_block_number() HexInt#

Get the current block height via eth_blockNumber

async get_logs(topics: List[Union[str, List[str]]], from_block: HexInt, to_block: HexInt, address: Address, starting_block_range_size: Optional[int] = None) AsyncIterable[EvmLog]#

Get logs for the provided topic from one block to another for an address by an optional block size via eth_getLogs.

This method will react to errors for the endpoint returning an error as the block range or result is too large and iterate for as many calls as is necessary to return all the logs.

Parameters:
  • topics – List of topics that are ABI encoded. The list is dependent on the log type you wish to search. Explaining how to search logs takes more room than this document could provide.

  • from_block – The lowest block from which to search for logs

  • to_block – The highest block from which to search for logs

  • address – The log address to filter.

  • starting_block_range_size – The starting size for the block range to query for the logs. The default size is the entire range. This may be not be optimal if the range will house a large number of logs and the method will have to react. Optimizing this size can reduce time spent on RPC calls that will always error.1

Returns:

An asynchronous iterator to iterate through the logs.

Here’s an example for retrieving all the URI events for a contract:

async for log in await rpc_client.get_logs(
    [Erc1155Events.URI.event_signature_hash.hex()],
    HexInt(0),
    HexInt(16_734_967),
    Address("0x19c8a3f0b290a36de59a50b4c70a23f9c045ec74"),
):
    print(log)
async get_transaction_receipt(tx_hash: HexBytes) EvmTransactionReceipt#

Get a transaction receipt by hash via eth_getTransactionReceipt

Parameters:

tx_hash – Transaction hash of transaction you wish to retrieve.

async send(method: str, *params: Any) Any#

Send the RPC request for the method with the provided params and return the result, The parameters are analogous to the RPC 2.0 specification

Parameters:
  • method – RPC method

  • params – Parameters for the RPC method