Flow

class laminar.components.Flow(*, datastore: DataStore = Local(root='/home/runner/work/laminar/laminar/.laminar', cache={}, protocols={'laminar.configurations.datastores.ArchiveProtocol': ArchiveProtocol(), 'laminar.configurations.datastores.RecordProtocol': RecordProtocol()}), executor: Executor = Docker(concurrency=1, timeout=86400), scheduler: Scheduler = Scheduler())

Bases: object

Collection of tasks that execute in a specific order.

Usage:

from laminar import Flow, Layer

class HelloFlow(Flow): ...

Methods

register

Add a layer to the flow.

Attributes

dependencies

A mapping of each layer and the layers it depends on.

dependents

A mapping of each layer and the layers that depend on it.

name

execution

Execution of the flow

configuration

Flow configuration

registry

Layers registered with the flow

__init__(*, datastore: DataStore = Local(root='/home/runner/work/laminar/laminar/.laminar', cache={}, protocols={'laminar.configurations.datastores.ArchiveProtocol': ArchiveProtocol(), 'laminar.configurations.datastores.RecordProtocol': RecordProtocol()}), executor: Executor = Docker(concurrency=1, timeout=86400), scheduler: Scheduler = Scheduler())
Parameters
datastore: DataStore = Local(root='/home/runner/work/laminar/laminar/.laminar', cache={}, protocols={'laminar.configurations.datastores.ArchiveProtocol': ArchiveProtocol(), 'laminar.configurations.datastores.RecordProtocol': RecordProtocol()})

Datastore to execute the flow with. Optional; Defaults to datastores.Local().

executor: Executor = Docker(concurrency=1, timeout=86400)

Executor to run layers with. Optional; Defaults to executors.Docker().

scheduler: Scheduler = Scheduler()

Scheduler to manage the flow with. Optional; Defaults to schedulers.Scheduler().

Raises

FlowError – If the flow is used to configure the Memory datastore without a Thread executor.

configuration : Configuration

Flow configuration

property dependencies : dict[str, set[str]]

A mapping of each layer and the layers it depends on.

property dependents : dict[str, set[str]]

A mapping of each layer and the layers that depend on it.

execution : Execution

Execution of the flow

classmethod register(layer: type[LayerT]) type[LayerT]
classmethod register(*, catch: Catch = layers.Catch(), container: Container = layers.Container(), foreach: ForEach = layers.ForEach(), retry: Retry = layers.Retry()) Callable[[type[LayerT]], type[LayerT]]

Add a layer to the flow.

Usage:

@Flow.register
class Task(Layer): ...

@Flow.register(...)
class Task(Layer): ...
registry : dict[str, Layer]

Layers registered with the flow