Flow¶
-
class laminar.components.Flow(*, datastore: DataStore =
Local(root='/home/runner/work/laminar/laminar/.laminar', cache={}, protocols={'laminar.configurations.datastores.ArchiveProtocol': ArchiveProtocol(), 'laminar.configurations.datastores.RecordProtocol': RecordProtocol()})
, executor: Executor =Docker(concurrency=1, timeout=86400)
, scheduler: Scheduler =Scheduler()
)¶ Bases:
object
Collection of tasks that execute in a specific order.
Usage:
from laminar import Flow, Layer class HelloFlow(Flow): ...
Methods
Add a layer to the flow.
Attributes
A mapping of each layer and the layers it depends on.
A mapping of each layer and the layers that depend on it.
name
Execution of the flow
Flow configuration
Layers registered with the flow
-
__init__(*, datastore: DataStore =
Local(root='/home/runner/work/laminar/laminar/.laminar', cache={}, protocols={'laminar.configurations.datastores.ArchiveProtocol': ArchiveProtocol(), 'laminar.configurations.datastores.RecordProtocol': RecordProtocol()})
, executor: Executor =Docker(concurrency=1, timeout=86400)
, scheduler: Scheduler =Scheduler()
)¶ - Parameters¶
- datastore: DataStore =
Local(root='/home/runner/work/laminar/laminar/.laminar', cache={}, protocols={'laminar.configurations.datastores.ArchiveProtocol': ArchiveProtocol(), 'laminar.configurations.datastores.RecordProtocol': RecordProtocol()})
¶ Datastore to execute the flow with. Optional; Defaults to datastores.Local().
- executor: Executor =
Docker(concurrency=1, timeout=86400)
¶ Executor to run layers with. Optional; Defaults to executors.Docker().
- scheduler: Scheduler =
Scheduler()
¶ Scheduler to manage the flow with. Optional; Defaults to schedulers.Scheduler().
- datastore: DataStore =
- Raises¶
FlowError – If the flow is used to configure the Memory datastore without a Thread executor.
- configuration : Configuration¶
Flow configuration
- property dependencies : dict[str, set[str]]¶
A mapping of each layer and the layers it depends on.
- property dependents : dict[str, set[str]]¶
A mapping of each layer and the layers that depend on it.
- classmethod register(layer: type[LayerT]) type[LayerT] ¶
-
classmethod register(*, catch: Catch =
layers.Catch()
, container: Container =layers.Container()
, foreach: ForEach =layers.ForEach()
, retry: Retry =layers.Retry()
) Callable[[type[LayerT]], type[LayerT]] Add a layer to the flow.
Usage:
@Flow.register class Task(Layer): ... @Flow.register(...) class Task(Layer): ...
-
__init__(*, datastore: DataStore =