Execution

class laminar.components.Execution(id: str | NoneType, flow: 'Flow', retry: bool = False)

Bases: object

Methods

execute

Execute a single layer of the flow.

layer

Get a registered flow layer.

next

Chain multiple flow executions together.

parameters

Configure parameters for a flow execution

resume

Resume a flow execution from where it failed.

schedule

Schedule layers to run in sequence in the flow execution.

Attributes

finished

Flow execution is finished.

retry

True if the flow execution is being retried, else False.

running

Flow execution is currently running.

id

ID of the flow execution

flow

Flow being executed

__init__(id: str | None, flow: Flow, retry: bool = False)
execute(*, layer: Layer) Execution

Execute a single layer of the flow.

Usage:

class ExecutionFlow(Flow): ...

@ExecutionFlow.register
class A(Layer): ...

flow = ExecutionFlow()
flow.execution(...).execute(layer=flow.layer(A, index=0, splits=2))
Parameters
layer: Layer

Layer of the flow to execute.

property finished : bool

Flow execution is finished.

flow : Flow

Flow being executed

id : str | None

ID of the flow execution

layer(layer: str, **atributes: Any) Layer
layer(layer: type[LayerT], **attributes: Any) LayerT
layer(layer: LayerT, **attributes: Any) LayerT

Get a registered flow layer.

Usage:

flow.execution(...).layer("A")
flow.execution(...).layer(A)
flow.execution(...).layer(A())
flow.execution(...).layer(A(), index=0, splits=2)
Parameters
layer: str
layer: type[LayerT]
layer: LayerT

Layer to get.

**attributes

Keyword attributes to add to the Layer.

Returns

Layer that is registered to the flow.

next(*, flow: Flow, linker: Callable[[Execution], Parameters]) Execution

Chain multiple flow executions together.

Usage:

class Flow1(Flow): ...
class Flow2(Flow): ...

@Flow1.register
class A(Layer):
    foo: str

flow1 = Flow1()

flow_1().next(
    flow=Flow2(),
    linker=lambda execution: Parameters(foo=execution.layer(A).foo)
)
Parameters
flow: Flow

Flow to execute next.

linker: Callable[[Execution], Parameters]

Function for passing parameters to the next flow.

parameters(**artifacts: Any) Execution

Configure parameters for a flow execution

Usage:

flow.execution(...).parameters(foo="bar")
Parameters
**artifacts: Any

Key/value pairs of parameters to add to the flow.

Returns

ID of the execution the parameters were added to.

resume() Execution

Resume a flow execution from where it failed.

Notes

Resuming a flow execution will skip all layers that finished on the previous attempt.

Usage:

flow.execution(...).resume()
retry : bool = False

True if the flow execution is being retried, else False.

property running : bool

Flow execution is currently running.

schedule(*, dependencies: dict[str, set[str]]) Execution

Schedule layers to run in sequence in the flow execution.

Parameters
dependencies: dict[str, set[str]]

Mapping of layers to layers it depends on.