Source code for lightflow.tasks.python_task

from lightflow.models import BaseTask, Action
from lightflow.queue import DefaultJobQueueName


[docs]class PythonTask(BaseTask): """ The Python task executes a user-defined python method. Args: name (str): The name of the task. callback (callable): A reference to the Python method that should be called by the task as soon as it is run. It has to have the following definition:: (data, store, signal, context) -> None, Action with the parameters: - **data** (:class:`.MultiTaskData`): The data object that has been passed\ from the predecessor task. - **store** (:class:`.DataStoreDocument`): The persistent data store object\ that allows the task to store data for access across the current\ workflow run. - **signal** (*TaskSignal*): The signal object for tasks. It wraps\ the construction and sending of signals into easy to use methods. - **context** (*TaskContext*): The context in which the tasks runs. queue (str): Name of the queue the task should be scheduled to. Defaults to the general task queue. callback_init (callable): An optional callable that is called shortly before the task is run. The definition is:: (data, store, signal, context) -> None with the parameters: - **data** (:class:`.MultiTaskData`): The data object that has been passed\ from the predecessor task. - **store** (:class:`.DataStoreDocument`): The persistent data store object\ that allows the task to store data for access across the current\ workflow run. - **signal** (*TaskSignal*): The signal object for tasks. It wraps\ the construction and sending of signals into easy to use methods. - **context** (*TaskContext*): The context in which the tasks runs. callback_finally (callable): An optional callable that is always called at the end of a task, regardless whether it completed successfully, was stopped or was aborted. The definition is:: (status, data, store, signal, context) -> None with the parameters: - **status** (*TaskStatus*): The current status of the task. It can\ be one of the following: - ``TaskStatus.Success`` -- task was successful - ``TaskStatus.Stopped`` -- task was stopped - ``TaskStatus.Aborted`` -- task was aborted - ``TaskStatus.Error`` -- task raised an exception - **data** (:class:`.MultiTaskData`): The data object that has been passed\ from the predecessor task. - **store** (:class:`.DataStoreDocument`): The persistent data store object\ that allows the task to store data for access across the current\ workflow run. - **signal** (*TaskSignal*): The signal object for tasks. It wraps\ the construction and sending of signals into easy to use methods. - **context** (*TaskContext*): The context in which the tasks runs. force_run (bool): Run the task even if it is flagged to be skipped. propagate_skip (bool): Propagate the skip flag to the next task. """ def __init__(self, name, callback=None, *, queue=DefaultJobQueueName.Task, callback_init=None, callback_finally=None, force_run=False, propagate_skip=True): super().__init__(name, queue=queue, callback_init=callback_init, callback_finally=callback_finally, force_run=force_run, propagate_skip=propagate_skip) self._callback = callback def run(self, data, store, signal, context, **kwargs): """ The main run method of the Python task. Args: data (:class:`.MultiTaskData`): The data object that has been passed from the predecessor task. store (:class:`.DataStoreDocument`): The persistent data store object that allows the task to store data for access across the current workflow run. signal (TaskSignal): The signal object for tasks. It wraps the construction and sending of signals into easy to use methods. context (TaskContext): The context in which the tasks runs. Returns: Action: An Action object containing the data that should be passed on to the next task and optionally a list of successor tasks that should be executed. """ if self._callback is not None: result = self._callback(data, store, signal, context, **kwargs) return result if result is not None else Action(data)