Scaling Out¶
Sharded Artifacts¶
Workflows often involve processing large objects which needs to be handled in parts. laminar
provides Layer.shard()
to break apart large objects. In downstream steps, the attribute returns an Accessor
object. An Accessor
will lazily read sharded artifact values, can be iterated over, and supports direct and slice indexing.
# main.py
from laminar import Flow, Layer
class ShardedFlow(Flow): ...
@ShardedFlow.register
class Shard(Layer):
def __call__(self) -> None:
self.shard(foo=[1, 2, 3])
@ShardedFlow.register
class Process(Layer):
def __call__(self, shard: Shard) -> None:
print(list(shard.foo))
print(shard.foo[1])
print(shard.foo[1:])
if flow := ShardedFlow():
flow()
python main.py
>>> [1, 2, 3]
>>> 2
>>> [2, 3]
Note
A sharded object is expected to be Iterable
. Each value returned by __iter__
will be sharded separately.
ForEach Loops¶
Often it is better to break up a problem across many tasks instead of processing it all in one task. The ForEach
layer configuration combined with Layer.shard()
makes this a simple process.
from laminar import Flow, Layer
from laminar.configurations.layers import ForEach, Parameters
class ForeachFlow(Flow): ...
@ForeachFlow.register
class Shard(Layer):
def __call__(self) -> None:
self.shard(foo=[1, 2])
@ForeachFlow.register(
foreach=ForEach(
parameters=[Parameter(layer=Shard, attribute="foo")]
)
)
class Process(Layer):
def __call__(self, shard: Shard) -> None:
print(self.index, shard.foo)
if flow := ForeachFlow():
flow()
python main.py
>>> 0 1
>>> 1 2
laminar
will infer from the defined Parameter
the Layer
and attribute to fork over. The number of tasks to create in the Process
layer is determined by the size of Parameter
. Any Layer
attribute can be defined as a ForEach
parameter, including ones that have not been
sharded.
# main.py
from laminar import Flow, Layer
from laminar.configurations.layers import ForEach, Parameters
class ForeachFlow(Flow): ...
@ForeachFlow.register
class Shard(Layer):
def __call__(self) -> None:
self.bar = "a"
self.shard(foo=[1, 2])
@ForeachFlow.register(
foreach=ForEach(
parameters=[
Parameter(layer=Shard, attribute="foo"),
Parameter(layer=Shard, attribute="bar")
]
)
)
class Process(Layer):
def __call__(self, shard: Shard) -> None:
print(self.index, shard.foo, shard.bar)
if flow := ForeachFlow():
flow()
python main.py
>>> 0 1 "a"
>>> 1 2 "a"
laminar
will infer that an attribute is not sharded and supply that value to each ForEach
task.
Grid Search¶
ForEach
can handle arbitrary numbers of Parameter
inputs. When provided with more than one Parameter
, ForEach
will execute the Layer
for each permutation of the ForEach
parameters.
# main.py
from laminar import Flow, Layer
from laminar.configurations.layers import ForEach, Parameters
class GridFlow(Flow): ...
@GridFlow.register
class Shard(Layer):
def __call__(self) -> None:
self.shard(foo=[1, 2, 3], bar=["a", "b"])
@GridFlow.register(
foreach=ForEach(
parameters=[
Parameter(layer=Shard, attribute="foo"),
Parameter(layer=Shard, attribute="bar")
]
)
)
class Process(Layer):
def __call__(self, shard: Shard) -> None:
print(self.index, shard.foo, shard.bar)
if flow := GridFlow():
flow()
python main.py
>>> 0 1 "a"
>>> 1 2 "a"
>>> 2 3 "a"
>>> 3 1 "b"
>>> 4 2 "b"
>>> 5 3 "b"
laminar
infers that the ForEach
is iterating over two sharded attributes. It generates the cartesian product of each attribute and launches a ForEach
task to handle each resulting fork.
ForEach Joins¶
A ForEach
layer does not need a special join step in order to merge branch values back together. A ForEach
layer used as an input for a downstream layer will have attributes that follow the same rules as if it was created using Layer.shard()
by returning an Accessor
mapped to each ForEach
task.
# main.py
from laminar import Flow, Layer
from laminar.configurations.layers import ForEach, Parameters
class JoinFlow(Flow): ...
@JoinFlow.register
class Shard(Layer):
def __call__(self) -> None:
self.shard(foo=[1, 2])
@JoinFlow.register(
foreach=ForEach(
parameters=[Parameter(layer=Shard, attribute="foo")]
)
)
class Process(Layer):
def __call__(self, shard: Shard) -> None:
self.foo = shard.foo
@JoinFlow.layer
class Join(Layer):
def __call__(self, process: Process) -> None:
print(list(process.foo))
print(process.foo[1])
if flow := JoinFlow():
flow()
python main.py
>>> [1, 2]
>>> 2
Like sharded artifacts, the attribute accessor for ForEach
artifacts will lazily read sharded values, can be iterated over, indexed, and sliced as necessary.
Chained ForEach¶
It is common to performed multiple foreach loops in a row, where each value produced by a foreach task is passed to another foreach task. You can define Parameter(index=None)
in subsequent ForEach
to create a 1:1
mapping of one foreach to another.
# main.py
from laminar import Flow, Layer
from laminar.configurations.layers import ForEach, Parameters
class ChainedFlow(Flow): ...
@ChainedFlow.register
class Shard(Layer):
def __call__(self) -> None:
self.shard(foo=[1, 2, 3])
@ChainedFlow.register(foreach=ForEach(parameters=[Parameter(layer=Shard, attribute="foo")]))
class First(Layer):
def __call__(self, shard: Shard) -> None:
print(self.index, 'First', shard.foo)
self.foo = shard.foo
@ChainedFlow.register(
foreach=ForEach(parameters=[Parameter(layer=First, attribute="foo", index=None)])
)
class Second(Layer):
def __call__(self, first: First) -> None:
print(self.index, 'Second', first.foo)
if flow := ChainedFlow():
flow()
python main.py
>>> 0 'First' 1
>>> 1 'First' 2
>>> 2 'First' 3
>>> 0 'Second' 1
>>> 1 'Second' 2
>>> 2 'Second' 3
This allows us to perform a series of fan-out operations in a row without needing to join until it is necessary.