Bus Components#
Data Bus#
The Data Bus is the heart of the Block Crawler. It’s how data is passed between the various components implemented. It is also the name of an abstract base class defining the interface for a Data Bus implementation. Data is placed on the Data Bus by Data Producers. It is then sent to every registered Data Consumer to handle as they please. Below is an example of a simple Data Bus:
class SimpleDataBus(DataBus):
def __init__(self) -> None:
self.__consumers: List[Consumer] = []
async def send(self, data_package: DataPackage):
for consumer in self.__consumers:
await consumer.receive(data_package)
async def register(self, consumer: Consumer):
self.__consumers.append(consumer)
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
pass
Data Bus implementations are asynchronous context managers and must be utilized as such:
async def run():
producer = SimpleProducer()
consumer = SimpleConsumer()
async with SimpleDataBus() as data_bus:
await data_bus.register(consumer)
transformer = SimpleTransformer(data_bus)
await data_bus.register(transformer)
await producer(data_bus)
Data Package#
A Data Package is a construct for placing data on the Data Bust be consumed. Having typed data being passed around makes it easier for Data Consumers to filter out data they do not wish to process. Below is an example of a simple Data Package:
@dataclass
class SimpleIntegerDataPackage(DataPackage):
value: int
Data Producer#
Data Producers place Data Packages on the Data Bus via the send method. There is an abstract base class that Data Producers included withing the Block Crawler extend. Below is an example of a simple Data Producer:
class SimpleProducer(Producer):
async def __call__(self, data_bus: DataBus):
for i in range(1_000, 11_000, 1_000):
await data_bus.send(SimpleIntegerDataPackage(i))
await asyncio.sleep(0.1)
Data Consumer#
Data Consumers receive Data Packages from the Data Bus via their receive methods. When processing the Data Package, It is expected that exceptions will be caught and transformed into ConsumerError exceptions. Below is an example of a simple Data Consumer:
class SimpleConsumer(Consumer):
async def receive(self, data_package: DataPackage):
if isinstance(data_package, SimpleStringDataPackage):
try:
print(data_package.value)
except Exception as e:
raise ConsumerError(
f"Unable to print value {data_package.value}",
e,
)
Data Consumers must register with the Data Bus via the register method:
async def run():
producer = SimpleProducer()
consumer = SimpleConsumer()
async with SimpleDataBus() as data_bus:
await data_bus.register(consumer)
transformer = SimpleTransformer(data_bus)
await data_bus.register(transformer)
await producer(data_bus)
Data Transformer#
Data Transformers are a hybrid of a Data Consumer and a Data Producer. They receive information from the Data Bus, transform it in some way, and place it back on the Data Bus to be consumed by another Data Consumer. Data Transformers can be the majority of logic for a crawler implementation. Below is an example of a simple Data Transformer:
class SimpleTransformer(Transformer):
async def receive(self, data_package: DataPackage):
if isinstance(data_package, SimpleIntegerDataPackage):
str_value = f"{data_package.value:,}"
await self._get_data_bus().send(SimpleStringDataPackage(str_value))
Complete Simple Example#
import asyncio
from dataclasses import dataclass
from typing import List
from blockcrawler.core.bus import (
Consumer,
Producer,
DataBus,
ConsumerError,
DataPackage,
Transformer,
)
class SimpleDataBus(DataBus):
def __init__(self) -> None:
self.__consumers: List[Consumer] = []
async def send(self, data_package: DataPackage):
for consumer in self.__consumers:
await consumer.receive(data_package)
async def register(self, consumer: Consumer):
self.__consumers.append(consumer)
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
pass
@dataclass
class SimpleIntegerDataPackage(DataPackage):
value: int
@dataclass
class SimpleStringDataPackage(DataPackage):
value: str
class SimpleConsumer(Consumer):
async def receive(self, data_package: DataPackage):
if isinstance(data_package, SimpleStringDataPackage):
try:
print(data_package.value)
except Exception as e:
raise ConsumerError(
f"Unable to print value {data_package.value}",
e,
)
class SimpleProducer(Producer):
async def __call__(self, data_bus: DataBus):
for i in range(1_000, 11_000, 1_000):
await data_bus.send(SimpleIntegerDataPackage(i))
await asyncio.sleep(0.1)
class SimpleTransformer(Transformer):
async def receive(self, data_package: DataPackage):
if isinstance(data_package, SimpleIntegerDataPackage):
str_value = f"{data_package.value:,}"
await self._get_data_bus().send(SimpleStringDataPackage(str_value))
async def run():
producer = SimpleProducer()
consumer = SimpleConsumer()
async with SimpleDataBus() as data_bus:
await data_bus.register(consumer)
transformer = SimpleTransformer(data_bus)
await data_bus.register(transformer)
await producer(data_bus)
if __name__ == "__main__":
asyncio.run(run())
Executing the above example will result in following displayed in the console:
Download the example and try it yourself.