Scheduler

class laminar.configurations.schedulers.Scheduler

Bases: object

Base scheduler

Methods

compile

Compile an intermediate representation of the Flow.

create

Create a delegated scheduler to schedule the Flow.

invoke

Invoke the delegated scheduler to start a Flow execution.

loop

runnable

Find all runnable layers.

running

Schedule runnable layers.

schedule

Schedule layers for execution.

skippable

Find all skippable layers.

wait

Wait on the completion of running layers.

Attributes

__init__()
compile(*, execution: Execution) dict[str, Any]

Compile an intermediate representation of the Flow.

create(*, ir: dict[str, Any]) None

Create a delegated scheduler to schedule the Flow.

invoke() None

Invoke the delegated scheduler to start a Flow execution.

runnable(*, dependencies: dict[str, set[str]], pending: set[str], finished: set[str]) tuple[set[str], set[str]]

Find all runnable layers.

Parameters
dependencies: dict[str, set[str]]

Layer dependencies

pending: set[str]

Pending layers

finished: set[str]

Finished layers

Returns

  • Remaining pending layers

  • Runnable layers

running(*, execution: Execution, runnable: set[str], running: set[Task[list[Layer]]]) set[Task[list[Layer]]]

Schedule runnable layers.

Parameters
execution: Execution

Execution that the layers are being run in.

runnalbe

Runnable layers.

running: set[Task[list[Layer]]]

Currently running layers.

Returns

Async tasks for new and existing running layers.

async schedule(*, layer: Layer, attempt: int = 1) list[Layer]

Schedule layers for execution.

Parameters
execution

Flow execution ID

layer: Layer

Layer to execute

attempt: int = 1

Scheduling attempt for this layer

Returns

Layer splits that were executed

skippable(*, execution: Execution, runnable: set[str], finished: set[str]) tuple[set[str], set[str]]

Find all skippable layers.

Parameters
execution: Execution

Execution being schedule.

runnable: set[str]

Runnable layers.

finished: set[str]

Finished layers.

Returns

  • Runnable layers

  • Finished layers

async wait(*, running: set[Task[list[Layer]]], finished: set[str], condition: str) tuple[set[Task[list[Layer]]], set[str]]

Wait on the completion of running layers.

Parameters
running: set[Task[list[Layer]]]

Running layers.

finished: set[str]

Finished layers

condition: str

Condition to wait on.

Returns

  • Remaining running layers

  • Finished layers