Source code for lightflow.tasks.bash_task

import os
import sys
from time import sleep
from threading import Thread
from functools import partial
from subprocess import Popen, PIPE
from tempfile import TemporaryFile

from lightflow.queue import DefaultJobQueueName
from lightflow.logger import get_logger
from lightflow.models import BaseTask, TaskParameters, Action, StopTask, AbortWorkflow


logger = get_logger(__name__)


class BashTaskOutputReader(Thread):
    """ Helper class to read the output of the process. """
    def __init__(self, process, stdout_file, stderr_file,
                 callback_stdout, callback_stderr, refresh_time,
                 data, store, signal, context):
        """ Initializes the reader for the process output.

        Args:
            process: Reference to a Popen object representing the running process.
            stdout_file: The file object for the standard output of the process.
            stderr_file: The file object for the standard error of the process.
            callback_stdout: The callback that should be called for every line of the
                standard output.
            callback_stderr: The callback that should be called for every line of the
                standard error.
            refresh_time (float): The time in seconds before checking for new output
                from the process.
            data (:class:`.MultiTaskData`): The data object that has been passed from the
                predecessor task.
            store (:class:`.DataStoreDocument`): The persistent data store object that allows the
                task to store data for access across the current workflow run.
            signal (TaskSignal): The signal object for tasks. It wraps the construction
                and sending of signals into easy to use methods.
            context (TaskContext): The context in which the tasks runs.
        """
        super().__init__()

        self._process = process
        self._stdout_file = stdout_file
        self._stderr_file = stderr_file
        self._callback_stdout = callback_stdout
        self._callback_stderr = callback_stderr
        self._refresh_time = refresh_time
        self._data = data
        self._store = store
        self._signal = signal
        self._context = context
        self._exc_obj = None

    @property
    def data(self):
        return self._data

    @property
    def exc_obj(self):
        return self._exc_obj

    def run(self):
        """ Drain the process output streams. """
        read_stdout = partial(self._read_output, stream=self._process.stdout,
                              callback=self._callback_stdout,
                              output_file=self._stdout_file)

        read_stderr = partial(self._read_output, stream=self._process.stderr,
                              callback=self._callback_stderr,
                              output_file=self._stderr_file)

        # capture the process output as long as the process is active
        try:
            while self._process.poll() is None:
                result_stdout = read_stdout()
                result_stderr = read_stderr()

                if not result_stdout and not result_stderr:
                    sleep(self._refresh_time)

            # read remaining lines
            while read_stdout():
                pass

            while read_stderr():
                pass

        except (StopTask, AbortWorkflow) as exc:
            self._exc_obj = exc

    def _read_output(self, stream, callback, output_file):
        """ Read the output of the process, executed the callback and save the output.

        Args:
            stream: A file object pointing to the output stream that should be read.
            callback(callable, None): A callback function that is called for each new
                line of output.
            output_file: A file object to which the full output is written.

        Returns:
            bool: True if a line was read from the output, otherwise False.
        """
        if (callback is None and output_file is None) or stream.closed:
            return False

        line = stream.readline()
        if line:
            if callback is not None:
                callback(line.decode(),
                         self._data, self._store, self._signal, self._context)

            if output_file is not None:
                output_file.write(line)

            return True
        else:
            return False


[docs]class BashTask(BaseTask): """ The Bash task executes a user-defined bash command or bash file. All task parameters except the name, callbacks, queue, force_run and propagate_skip can either be their native type or a callable returning the native type. Args: name (str): The name of the task. command (function, str): The command or bash file that should be executed. cwd (function, str, None): The working directory for the command. env (function, dict, None): A dictionary of environment variables. user (function, int, None): The user ID of the user with which the command should be executed. group (function, int, None): The group ID of the group with which the command should be executed. stdin (function, str, None): An input string that should be passed on to the process. refresh_time (function, float): The time in seconds the internal output handling waits before checking for new output from the process. capture_stdout (function, bool): Set to ``True`` to capture all standard output in a temporary file. capture_stderr (function, bool): Set to ``True`` to capture all standard errors in a temporary file. callback_process (callable): A callable that is called after the process started. The definition is:: (pid, data, store, signal, context) -> None with the parameters: - **pid** (*int*): The process PID. - **data** (:class:`.MultiTaskData`): The data object that has been passed\ from the predecessor task. - **store** (:class:`.DataStoreDocument`): The persistent data store object\ that allows the task to store data for access across the current\ workflow run. - **signal** (*TaskSignal*): The signal object for tasks. It wraps\ the construction and sending of signals into easy to use methods. - **context** (*TaskContext*): The context in which the tasks runs. callback_end (callable): A callable that is called after the process completed. The definition is:: (returncode, stdout_file, stderr_file, data, store, signal, context) -> None with the parameters: - **returncode** (*int*): The return code of the process. - **stdout_file**: A file object with the standard output\ if the flag ``capture_stdout`` was set to ``True``,\ otherwise ``None``. - **stderr_file**: A file object with the error output\ if the flag ``capture_stderr`` was set to ``True`` otherwise ``None.`` - **data** (:class:`.MultiTaskData`): The data object that has been passed\ from the predecessor task. - **store** (:class:`.DataStoreDocument`): The persistent data store object\ that allows the task to store data for access across the current\ workflow run. - **signal** (*TaskSignal*): The signal object for tasks. It wraps\ the construction and sending of signals into easy to use methods. - **context** (*TaskContext*): The context in which the tasks runs. callback_stdout (callable): A callable that is called for every line of output the process sends to stdout. The definition is:: (line, data, store, signal, context) -> None with the parameters: - **line** (*str*): Single line of the process output as a string. - **data** (:class:`.MultiTaskData`): The data object that has been passed\ from the predecessor task. - **store** (:class:`.DataStoreDocument`): The persistent data store object\ that allows the task to store data for access across the current\ workflow run. - **signal** (*TaskSignal*): The signal object for tasks. It wraps\ the construction and sending of signals into easy to use methods. - **context** (*TaskContext*): The context in which the tasks runs. callback_stderr (callable): A callable that is called for every line of output the process sends to stderr. The definition is:: (line, data, store, signal, context) -> None with the parameters: - **line** (*str*): Single line of the process output as a string. - **data** (:class:`.MultiTaskData`): The data object that has been passed\ from the predecessor task. - **store** (:class:`.DataStoreDocument`): The persistent data store object\ that allows the task to store data for access across the current\ workflow run. - **signal** (*TaskSignal*): The signal object for tasks. It wraps\ the construction and sending of signals into easy to use methods. - **context** (*TaskContext*): The context in which the tasks runs. queue (str): Name of the queue the task should be scheduled to. Defaults to the general task queue. callback_init (callable): An optional callable that is called shortly before the task is run. The definition is:: (data, store, signal, context) -> None with the parameters: - **data** (:class:`.MultiTaskData`): The data object that has been passed\ from the predecessor task. - **store** (:class:`.DataStoreDocument`): The persistent data store object\ that allows the task to store data for access across the current\ workflow run. - **signal** (*TaskSignal*): The signal object for tasks. It wraps\ the construction and sending of signals into easy to use methods. - **context** (*TaskContext*): The context in which the tasks runs. callback_finally (callable): An optional callable that is always called at the end of a task, regardless whether it completed successfully, was stopped or was aborted. The definition is:: (status, data, store, signal, context) -> None with the parameters: - **status** (*TaskStatus*): The current status of the task. It can\ be one of the following: - ``TaskStatus.Success`` -- task was successful - ``TaskStatus.Stopped`` -- task was stopped - ``TaskStatus.Aborted`` -- task was aborted - ``TaskStatus.Error`` -- task raised an exception - **data** (:class:`.MultiTaskData`): The data object that has been passed\ from the predecessor task. - **store** (:class:`.DataStoreDocument`): The persistent data store object\ that allows the task to store data for access across the current\ workflow run. - **signal** (*TaskSignal*): The signal object for tasks. It wraps\ the construction and sending of signals into easy to use methods. - **context** (*TaskContext*): The context in which the tasks runs. force_run (bool): Run the task even if it is flagged to be skipped. propagate_skip (bool): Propagate the skip flag to the next task. """ def __init__(self, name, command, cwd=None, env=None, user=None, group=None, stdin=None, refresh_time=0.1, capture_stdout=False, capture_stderr=False, callback_process=None, callback_end=None, callback_stdout=None, callback_stderr=None, *, queue=DefaultJobQueueName.Task, callback_init=None, callback_finally=None, force_run=False, propagate_skip=True): super().__init__(name, queue=queue, callback_init=callback_init, callback_finally=callback_finally, force_run=force_run, propagate_skip=propagate_skip) self.params = TaskParameters( command=command, cwd=cwd, env=env, user=user, group=group, stdin=stdin, refresh_time=refresh_time, capture_stdout=capture_stdout, capture_stderr=capture_stderr ) self._callback_process = callback_process self._callback_end = callback_end self._callback_stdout = callback_stdout self._callback_stderr = callback_stderr def run(self, data, store, signal, context, **kwargs): """ The main run method of the Python task. Args: data (:class:`.MultiTaskData`): The data object that has been passed from the predecessor task. store (:class:`.DataStoreDocument`): The persistent data store object that allows the task to store data for access across the current workflow run. signal (TaskSignal): The signal object for tasks. It wraps the construction and sending of signals into easy to use methods. context (TaskContext): The context in which the tasks runs. Returns: Action (Action): An Action object containing the data that should be passed on to the next task and optionally a list of successor tasks that should be executed. """ params = self.params.eval(data, store, exclude=['command']) capture_stdout = self._callback_stdout is not None or params.capture_stdout capture_stderr = self._callback_stderr is not None or params.capture_stderr stdout_file = TemporaryFile() if params.capture_stdout else None stderr_file = TemporaryFile() if params.capture_stderr else None stdout = PIPE if capture_stdout else None stderr = PIPE if capture_stderr else None # change the user or group under which the process should run if params.user is not None or params.group is not None: pre_exec = self._run_as(params.user, params.group) else: pre_exec = None # call the command proc = Popen(self.params.eval_single('command', data, store), cwd=params.cwd, shell=True, env=params.env, preexec_fn=pre_exec, stdout=stdout, stderr=stderr, stdin=PIPE if params.stdin is not None else None) # if input is available, send it to the process if params.stdin is not None: proc.stdin.write(params.stdin.encode(sys.getfilesystemencoding())) # send a notification that the process has been started try: if self._callback_process is not None: self._callback_process(proc.pid, data, store, signal, context) except (StopTask, AbortWorkflow): proc.terminate() raise # send the output handling to a thread if capture_stdout or capture_stderr: output_reader = BashTaskOutputReader(proc, stdout_file, stderr_file, self._callback_stdout, self._callback_stderr, params.refresh_time, data, store, signal, context) output_reader.start() else: output_reader = None # wait for the process to complete and watch for a stop signal while proc.poll() is None or\ (output_reader is not None and output_reader.is_alive()): sleep(params.refresh_time) if signal.is_stopped: proc.terminate() if output_reader is not None: output_reader.join() data = output_reader.data # if a stop or abort exception was raised, stop the bash process and re-raise if output_reader.exc_obj is not None: if proc.poll() is None: proc.terminate() raise output_reader.exc_obj # send a notification that the process has completed if self._callback_end is not None: if stdout_file is not None: stdout_file.seek(0) if stderr_file is not None: stderr_file.seek(0) self._callback_end(proc.returncode, stdout_file, stderr_file, data, store, signal, context) if stdout_file is not None: stdout_file.close() if stderr_file is not None: stderr_file.close() return Action(data) @staticmethod def _run_as(user, group): """ Function wrapper that sets the user and group for the process """ def wrapper(): if user is not None: os.setuid(user) if group is not None: os.setgid(group) return wrapper