Hooks

laminar supports a hook system for users to extend the existing functionality and dynamically adjust the flow in response to changes that occur at execution time.

Hooks are defined by adding decorators.

# main.py

from typing import Generator

from laminar import Flow, Layer
from laminar.configurations import hooks

class HookFlow(Flow):
    ...

@HookFlow.register
class A(Layer):
    def __call__(self) -> None:
        print("in call")

    @hooks.execution
    def configure_before_after(self) -> Generator[None, None, None]:
        print("before call")
        yield
        print("after call")

Dependencies

Hooks, like other Layer functions, can use other layers as dependencies. Layers that hooks depend on are added to the dependency graph and used to determine evaluation order.

@hooks.submission
def configure_container(self, a: A) -> None:
    memory = {"a": 1000, "b": 1500, "c": 2000}
    self.configuration.container.memory = memory[a.foo]

The values from those Layer dependencies can be used to inform the business logic within each hook.

Multiple Hooks

Any number of each type of hook can be defined for a Layer. Here is a replication of configure_before_after from the above execution hook with two hooks:

import random
from typing import Generator
from laminar import Flow, Layer
from laminar.configurations import hooks


class HookFlow(Flow): ...

@HookFlow.register
class A(Layer):
    def __call__(self) -> None:
        print("in call")

    @hooks.execution
    def configure_entry_1(self) -> None:
        print("before call")

    @hooks.execution
    def configure_entry_2(self) -> Generator[None, None, None]:
        yield
        print("after call")

Hooks of the same type are executed in the order that they are defined in the Layer. Using multiple hooks allows for deep and dynamic customization of the entire Layer lifecycle.

Types

Event Hooks

Event hooks are Python generators or functions that can perform actions before and after events occur within the flow. A hook defined as a generator will yield until the Layer has completed executing before finishing. A hook defined as a function will immediately return before the Layer has been executed.

Execution Hooks

Execution hooks run just prior to the invocation of Layer.__call__. This is useful for situations when something needs to occur prior to a layer starting.

For example, the execution hooks can be used to open connections to remote databases.

from typing import Generator

import psycopg2

from laminar import Flow, Layer
from laminar.configurations import hooks

class HookFlow(Flow): ...

@HookFlow.register
class A(Layer):
    def __call__(self) -> None:
        self.cursor.execute("SELECT * FROM <table>")

    @hooks.execution
    def hello_world(self) -> Generator[None, None, None]:
        with psycopg2.connect("dbname=test user=postgres") as self.connection:
            with self.connection.cursor() as self.cursor:
                yield


if flow := HookFlow():
    flow()

Note

Execution hooks are invoked on the Layer executor.

Retry Hooks

Retry hooks run just prior to waiting for a Layer’s retry backoff. This is useful for situations where the Layer needs to be adjusted in response to a failure.

For example, here we double the requested memory every time the Layer needs to retry.

from laminar import Flow, Layer
from laminar.configurations import hooks

class HookFlow(Flow):
    ...


@HookFlow.register
class A(Layer):
    @hooks.retry
    def configure_container(self) -> None:
        self.configuration.container.memory = self.configuration.container.memory * 2

if flow := HookFlow():
    flow()

Note

Retry hooks are invoked on the Flow scheduler.

Submission Hooks

Submission hooks run just prior to a Layer being submitted for execution. This is useful for situations where the Layer needs to be configured in a certain way.

For example, the submission hooks can be used to dynamically adjust resource allocation for a Layer.

from typing import Generator

from laminar import Flow, Layer
from laminar.configurations import hooks

class HookFlow(Flow):
    ...

@HookFlow.register
class A(Layer):
    @hooks.submission
    def configure_container(self) -> None:
        self.configuration.container.cpu = 4
        self.configuration.container.memory = 2000

if flow := HookFlow():
    flow()

Submission hooks are particularly powerful when combined with the ForEach configuration. Each ForEach split can be configured differently based on the input parameters.

from laminar import Flow, Layer
from laminar.configurations import hooks
from laminar.types import unwrap

class HookFlow(Flow):
    ...

@HookFlow.register
class A(Layer):
    baz: List[str]

    def __call__(self) -> None:
        self.shard(baz=["a", "b", "c"])

@HookFlow.register(
    foreach=layers.ForEach(parameters=[layers.ForEachParameter(layer=A, attribute="baz")])
)
class B(Layer):
    baz: List[str]

    def __call__(self, a: A) -> None:
        print(a.baz, self.configuration.container.memory)

    @hooks.submission
    def configure_container(self, a: A) -> None:
        memory = {"a": 1000, "b": 1500, "c": 2000}
        self.configuration.container.memory = memory[a.baz[unwrap(self.index)]]

if flow := HookFlow():
    flow()
python main.py

>>> "a" 1000
>>> "b" 1500
>>> "c" 2000

Note

Submission hooks are invoked on the Flow scheduler.

Condition Hooks

Condition hooks are Python functions that return values that are used to evaluate the state of the Flow at runtime. The returned values are used to make informed decisions what the Flow should do next.

Entry Hooks

Refer to the documentation on conditional branching for an explanation of how entry hooks work.

Flow Hooks

Hooks can also be added to a Flow instead of a Layer. These hooks behave the same way, except they are are invoked on every Layer within a Flow. This is useful for situations where the same setup/teardown needs to occur on every Layer.

Hooks can be defined on a Flow by subclassing the Flow class.

# main.py

from typing import Generator

from laminar import Flow, Layer
from laminar.configurations import hooks

class HelloFlow(Flow):
    @hooks.execution
    def hello_world(self) -> Generator[None, None, None]:
        print(f"before {self.name}")
        yield
        print(f"after {self.name}")

@HelloFlow.register
class A(Layer):
    def __call__(self) -> None:
        print("in A")

@HelloFlow.register
class B(Layer):
    def __call__(self, a: A) -> None:
        print("in B")

if flow := HelloFlow():
    flow()
python main.py

>>> "before A"
>>> "in A"
>>> "after A"
>>> "before B"
>>> "in B"
>>> "after B"