Recovery

Dealing with failures is a natural part of developing workflows. For a variety of reasons, workflows may fail and need to be recovered from.

Catch

It’s possible that sometimes you expect an error to be raised and want to recover from it. Catch allows the user to configure layers to catch specific errors and subclasses of those errors.

from laminar import Flow, Layer
from laminar.configurations import layers

class CatchFlow(Flow):
    ...

@CatchFlow.register(catch=Catch(TypeError, IOError))
class A(Layer):
    def __call__(self) -> None:
        raise ...

Layers that catch errors exit immediately with their state written to the datastore. As a result, the layer state may be incomplete and users are responsible for handling any resulting inconsistencies.

Retry

Maybe requests to other services can be flaky or maybe you want your Flow to be tolerant of AWS EC2 Spot failures. Whatever the reason, there are situations where it is useful for layers to be retried on failure.

Retry allows the user to configure the retry policy per layer.

from laminar import Flow, Layer
from laminar.configurations import layers

class RetryFlow(Flow):
    ...

@RetryFlow.register(retry=layers.Retry(attempts=3))
class A(Layer):
    ...

Retry performs a jittered exponential backoff as the number of attempts increase. Each input to the retry backoff calculation can also be modified.

Retry(attempts=3, delay=0.1, backoff=2, jitter=0.1)

Resume

Sometimes a workflow execution fails and the entire flow needs to be re-run. You could choose to run the flow from the beginning, retracing through the graph. However if you want to recover work that has already been performed you would instead want to resume the flow from where it stopped.

execution_id = ...

flow.execution(execution_id).resume()

Flow.Execution.resume functions very similarly to Flow.__call__. It will also start at the beginning of the flow execution, but preemptively skips any layer that has already finished.