Skip to content

Nodes

AAct nodes are simply classes which inherit from Node and implements different ways of handling and sending messages.

aact.Node

Bases: BaseModel, Generic[InputType, OutputType]

Node is the base class for all nodes in the aact framework. It is a generic class that takes two type parameters: InputType and OutputType. The InputType and OutputType is used not only for static type checking but also for runtime message type validation, so it is important that you pass the correct types.

Each of InputType and OutputType can be either: 1. a subclass of aact.messages.DataModel, or 2. a union of multiple aact.DataModel subclasses, or 3. aact.DataModel itself to allow any type of message (not recommended).[^1]

Any subclass of aact.Node must implement the event_handler method, which is the main computation logic of the node. The event_handler method takes two arguments: input_channel and input_message, and returns an async iterator of tuples of output channel and output message.

For example, the following code snippet shows a simple node that takes a aact.messages.Text message from the a channel and echo it to the b channel.

from aact import Node, Message
from aact.messages import Text

from typing import AsyncIterator

class EchoNode(Node[Text, Text]):
    def event_handler(self, input_channel: str, input_message: Message[Text]) -> AsyncIterator[str, Message[Text]]:
        yield "b", Message[Text](data=input_message.data)

Built-in Nodes

aact provides several built-in nodes that you can use out of the box. Here are some of the built-in nodes:

  • aact.nodes.listener.ListenerNode: A node that listens to the audio input from the microphone.
  • aact.nodes.speaker.SpeakerNode: A node that plays the audio output to the speaker.
  • aact.nodes.record.RecordNode: A node that records the messages to a file.
  • aact.nodes.print.PrintNode: A node that prints the messages to the console.
  • aact.nodes.tick.TickNode: A node that sends a tick message at a fixed interval.
  • aact.nodes.random.RandomNode: A node that sends a random number message.
  • aact.nodes.transcriber.TranscriberNode: A node that transcribes the audio messages to text.
  • aact.nodes.tts.TTSNode: A node that converts the text messages to audio.

Common usage

The usage of nodes is in the quick start guide.

Advanced usage

Send messages on your own

The default behavior of sending messages in the base Node class is handled in the event_loop method. If you want to send messages on your own, you can directly use the Redis instance r to publish messages to the output channels.


class YourNode(Node[InputType, OutputType]):

    async def func_where_you_send_messages(self):
        await self.r.publish(your_output_channel, Message[OutputType](data=your_output_message).model_dump_json())

Customize set up and tear down

You can customize the set up and tear down of the node by overriding the __aenter__ and __aexit__ methods. For example, you can open a file in the __aenter__ method and close it in the __aexit__ method.


class YourNode(Node[InputType, OutputType]):

    async def __aenter__(self) -> Self:
        self.file = open("your_file.txt", "w")
        return await super().__aenter__()

    async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
        self.file.close()
        return await super().__aexit__(exc_type, exc_value, traceback)

This will ensure the file is closed properly even if an exception is raised.

Background tasks

You can run background tasks in the node by creating a task in the __aenter__ method and cancelling it in the __aexit__ method.


class YourNode(Node[InputType, OutputType]):

    async def __aenter__(self) -> Self:

        self.task = asyncio.create_task(self.background_task())
        return await super().__aenter__()

    async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
        self.task.cancel()

        try:
            await self.task
        except asyncio.CancelledError:
            pass

[^1]: Only if you know what you are doing. For example, in the aact.nodes.record.RecordNode, the InputType is aact.messages.DataModel because it can accept any type of message. But in most cases, you should specify the InputType and OutputType to be a specific subclass of aact.messages.DataModel.

Source code in src/aact/nodes/base.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
class Node(BaseModel, Generic[InputType, OutputType]):
    """
    Node is the base class for all nodes in the aact framework. It is a generic class that takes two type parameters:
    `InputType` and `OutputType`. The InputType and OutputType is used not only for static type checking but also for
    runtime message type validation, so it is important that you pass the correct types.

    Each of `InputType` and `OutputType` can be either:
    1. a subclass of `aact.messages.DataModel`, or
    2. a union of multiple `aact.DataModel` subclasses, or
    3. `aact.DataModel` itself to allow any type of message (not recommended).[^1]

    Any subclass of `aact.Node` must implement the `event_handler` method, which is the main computation logic of the
    node. The `event_handler` method takes two arguments: `input_channel` and `input_message`, and returns an async
    iterator of tuples of output channel and output message.

    For example, the following code snippet shows a simple node that takes a `aact.messages.Text` message from the `a`
    channel and echo it to the `b` channel.

    ```python
    from aact import Node, Message
    from aact.messages import Text

    from typing import AsyncIterator

    class EchoNode(Node[Text, Text]):
        def event_handler(self, input_channel: str, input_message: Message[Text]) -> AsyncIterator[str, Message[Text]]:
            yield "b", Message[Text](data=input_message.data)
    ```

    ## Built-in Nodes

    aact provides several built-in nodes that you can use out of the box. Here are some of the built-in nodes:

    - `aact.nodes.listener.ListenerNode`: A node that listens to the audio input from the microphone.
    - `aact.nodes.speaker.SpeakerNode`: A node that plays the audio output to the speaker.
    - `aact.nodes.record.RecordNode`: A node that records the messages to a file.
    - `aact.nodes.print.PrintNode`: A node that prints the messages to the console.
    - `aact.nodes.tick.TickNode`: A node that sends a tick message at a fixed interval.
    - `aact.nodes.random.RandomNode`: A node that sends a random number message.
    - `aact.nodes.transcriber.TranscriberNode`: A node that transcribes the audio messages to text.
    - `aact.nodes.tts.TTSNode`: A node that converts the text messages to audio.

    ## Common usage

    The usage of nodes is in the [quick start guide](aact.html/#usage).

    ## Advanced usage

    ### Send messages on your own

    The default behavior of sending messages in the base Node class is handled in the `event_loop` method. If you want to
    send messages on your own, you can directly use the Redis instance `r` to publish messages to the output channels.

    ```python

    class YourNode(Node[InputType, OutputType]):

        async def func_where_you_send_messages(self):
            await self.r.publish(your_output_channel, Message[OutputType](data=your_output_message).model_dump_json())

    ```

    ### Customize set up and tear down

    You can customize the set up and tear down of the node by overriding the `__aenter__` and `__aexit__` methods. For
    example, you can open a file in the `__aenter__` method and close it in the `__aexit__` method.

    ```python

    class YourNode(Node[InputType, OutputType]):

        async def __aenter__(self) -> Self:
            self.file = open("your_file.txt", "w")
            return await super().__aenter__()

        async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
            self.file.close()
            return await super().__aexit__(exc_type, exc_value, traceback)
    ```

    This will ensure the file is closed properly even if an exception is raised.

    ### Background tasks

    You can run background tasks in the node by creating a task in the `__aenter__` method and cancelling it in the
    `__aexit__` method.

    ```python

    class YourNode(Node[InputType, OutputType]):

        async def __aenter__(self) -> Self:

            self.task = asyncio.create_task(self.background_task())
            return await super().__aenter__()

        async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
            self.task.cancel()

            try:
                await self.task
            except asyncio.CancelledError:
                pass
    ```

    [^1]: Only if you know what you are doing. For example, in the `aact.nodes.record.RecordNode`, the `InputType` is
    `aact.messages.DataModel` because it can accept any type of message. But in most cases, you should specify the
    `InputType` and `OutputType` to be a specific subclass of `aact.messages.DataModel`.
    """

    input_channel_types: dict[str, Type[InputType]]
    """
    A dictionary that maps the input channel names to the corresponding input message types.
    """
    output_channel_types: dict[str, Type[OutputType]]
    """
    A dictionary that maps the output channel names to the corresponding output message types.
    """
    redis_url: str
    """
    The URL of the Redis server. It should be in the format of `redis://<host>:<port>/<db>`.
    """
    model_config = ConfigDict(extra="allow")
    """
    @private
    """

    def __init__(
        self,
        input_channel_types: list[tuple[str, Type[InputType]]],
        output_channel_types: list[tuple[str, Type[OutputType]]],
        redis_url: str = "redis://localhost:6379/0",
    ):
        try:
            super().__init__(
                input_channel_types=dict(input_channel_types),
                output_channel_types=dict(output_channel_types),
                redis_url=redis_url,
            )
        except ValidationError as _:
            raise NodeConfigurationError(
                "You passed an invalid configuration to the Node.\n"
                f"The required input channel types are: {self.model_fields['input_channel_types'].annotation}\n"
                f"The input channel types are: {input_channel_types}\n"
                f"The required output channel types are: {self.model_fields['output_channel_types'].annotation}\n"
                f"The output channel types are: {output_channel_types}\n"
            )

        self.r: Redis = Redis.from_url(redis_url)
        """
        @private
        """
        self.pubsub = self.r.pubsub()
        """
        @private
        """
        self.logger = logging.getLogger("aact.nodes.base.Node")
        """
        @private
        """

    async def __aenter__(self) -> Self:
        try:
            await self.r.ping()
        except ConnectionError:
            raise ValueError(
                f"Could not connect to Redis with the provided url. {self.redis_url}"
            )
        await self.pubsub.subscribe(*self.input_channel_types.keys())
        return self

    async def __aexit__(self, _: Any, __: Any, ___: Any) -> None:
        await self.pubsub.unsubscribe()
        await self.r.aclose()

    async def _wait_for_input(
        self,
    ) -> AsyncIterator[tuple[str, Message[InputType]]]:
        async for message in self.pubsub.listen():
            channel = message["channel"].decode("utf-8")
            if message["type"] == "message" and channel in self.input_channel_types:
                try:
                    data = Message[
                        self.input_channel_types[channel]  # type: ignore[name-defined]
                    ].model_validate_json(message["data"])
                except ValidationError as e:
                    self.logger.error(
                        f"Failed to validate message from {channel}: {message['data']}. Error: {e}"
                    )
                    raise e
                yield channel, data
        raise Exception("Input channel closed unexpectedly")

    async def event_loop(
        self,
    ) -> None:
        """
        The main event loop of the node.
        The default implementation of the event loop is to wait for input messages from the input channels and call the
        `event_handler` method for each input message, and send each output message to the corresponding output channel.
        """
        try:
            async for input_channel, input_message in self._wait_for_input():
                async for output_channel, output_message in self.event_handler(
                    input_channel, input_message
                ):
                    await self.r.publish(
                        output_channel, output_message.model_dump_json()
                    )
        except NodeExitSignal as e:
            self.logger.info(f"Event loop cancelled: {e}. Exiting gracefully.")
        except Exception as e:
            raise e

    @abstractmethod
    async def event_handler(
        self, _: str, __: Message[InputType]
    ) -> AsyncIterator[tuple[str, Message[OutputType]]]:
        """
        @private
        """
        raise NotImplementedError("event_handler must be implemented in a subclass.")
        yield "", self.output_type()  # unreachable: dummy return value

input_channel_types instance-attribute

input_channel_types: dict[str, Type[InputType]]

A dictionary that maps the input channel names to the corresponding input message types.

output_channel_types instance-attribute

output_channel_types: dict[str, Type[OutputType]]

A dictionary that maps the output channel names to the corresponding output message types.

redis_url instance-attribute

redis_url: str

The URL of the Redis server. It should be in the format of redis://<host>:<port>/<db>.

model_config class-attribute instance-attribute

model_config = ConfigDict(extra='allow')

@private

r instance-attribute

r: Redis = from_url(redis_url)

@private

pubsub instance-attribute

pubsub = pubsub()

@private

logger instance-attribute

logger = getLogger('aact.nodes.base.Node')

@private

event_loop async

event_loop() -> None

The main event loop of the node. The default implementation of the event loop is to wait for input messages from the input channels and call the event_handler method for each input message, and send each output message to the corresponding output channel.

Source code in src/aact/nodes/base.py
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
async def event_loop(
    self,
) -> None:
    """
    The main event loop of the node.
    The default implementation of the event loop is to wait for input messages from the input channels and call the
    `event_handler` method for each input message, and send each output message to the corresponding output channel.
    """
    try:
        async for input_channel, input_message in self._wait_for_input():
            async for output_channel, output_message in self.event_handler(
                input_channel, input_message
            ):
                await self.r.publish(
                    output_channel, output_message.model_dump_json()
                )
    except NodeExitSignal as e:
        self.logger.info(f"Event loop cancelled: {e}. Exiting gracefully.")
    except Exception as e:
        raise e

event_handler abstractmethod async

event_handler(_: str, __: Message[InputType]) -> AsyncIterator[tuple[str, Message[OutputType]]]

@private

Source code in src/aact/nodes/base.py
244
245
246
247
248
249
250
251
252
@abstractmethod
async def event_handler(
    self, _: str, __: Message[InputType]
) -> AsyncIterator[tuple[str, Message[OutputType]]]:
    """
    @private
    """
    raise NotImplementedError("event_handler must be implemented in a subclass.")
    yield "", self.output_type()  # unreachable: dummy return value

aact.NodeFactory

To use nodes in the dataflow, you need to register them in the NodeFactory before using them. The reason for this is to allow users write string names in toml files which can be converted to actual classes at runtime.

To register a node, you need to use the @NodeFactory.register decorator.

Example:

from aact import Node, NodeFactory

@NodeFactory.register("node_name")
class YourNode(Node[your_input_type, your_output_type]):
    # Your implementation of the node
For power users You can initialize a node using the `NodeFactory.make` method.
from aact import NodeFactory

node = NodeFactory.make("node_name", ...)# your arguments

registry class-attribute instance-attribute

registry: dict[str, type[Node[DataModel, DataModel]]] = {}

@private

register classmethod

register(name: str) -> Callable[[type[Node[InputType, OutputType]]], type[Node[InputType, OutputType]]]

@private

Source code in src/aact/nodes/registry.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
@classmethod
def register(
    cls, name: str
) -> Callable[
    [type[Node[InputType, OutputType]]], type[Node[InputType, OutputType]]
]:
    """
    @private
    """

    def inner_wrapper(
        wrapped_class: type[Node[InputType, OutputType]],
    ) -> type[Node[InputType, OutputType]]:
        if name in cls.registry:
            logger.warning("Executor %s already exists. Will replace it", name)
        cls.registry[name] = wrapped_class
        return wrapped_class

    return inner_wrapper

make classmethod

make(name: str, **kwargs: Any) -> Node[DataModel, DataModel]

@private

Source code in src/aact/nodes/registry.py
67
68
69
70
71
72
73
74
@classmethod
def make(cls, name: str, **kwargs: Any) -> Node[DataModel, DataModel]:
    """
    @private
    """
    if name not in cls.registry:
        raise ValueError(f"Executor {name} not found in registry")
    return cls.registry[name](**kwargs)