Welcome to Lightflow!

Lightflow is a Python 3.5+ library and command-line tool for executing workflows, composed of individual tasks, in a distributed fashion. It is based on Celery and provides task dependencies, data exchange between tasks and an intuitive description of workflows.

User’s Guide

Installation

One of the key goals when developing Lightflow was to keep the infrastructure dependencies as small as possible. Lightflow does not require special file systems, job scheduler systems or special hardware. It runs on most Linux distributions and is known to work on MacOSX as well as on the Ubuntu subsystem of Windows 10. Apart from Python 3.5+, the only dependencies of Lightflow are a running redis and MongoDB database.

Python

Lightflow requires Python 3.5 or higher. It has been developed and tested with both a native Python installation as well as Miniconda/Anaconda.

Lightflow’s main Python dependencies are:

  • Celery - for queuing and managing jobs and running workers
  • NetworkX - for building and interrogating the directed acyclic graphs of a workflow
  • cloudpickle - for exchanging data between tasks running on distributed hosts
  • Click - for the command line client
  • ruamel.yaml - for reading the configuration file

These dependencies are installed during the installation of Lightflow automatically.

redis

Redis is an in-memory key-value database and is required by Lightflow as a communication broker between tasks. It is also used as the default broker for the Celery queuing system, but could be replaced with any other supported Celery broker.

You can either download redis from the offical redis website or install it via the package manager of your distribution. By default, the redis server runs on localhost and port 6379. The Quickstart as well as the Tutorial assume you are running redis using these defaults.

MongoDB

MongoDB is a popular document-oriented database and is used by Lightflow for storing data that should persist during a workflow run.

You can either download MongoDB from the official MongoDB website or install it via the package manager of your distribution:

By default, MongoDB runs on localhost and port 27017. The Quickstart as well as the Tutorial assume you are running MongoDB using these defaults.

Lightflow

After having redis and MongoDB running, installing Lightflow is a breeze. It is available from PyPI and can be installed with:

pip install lightflow

This will install the Lightflow libraries, command line client and example workflows.

Quickstart

You can’t wait to start using Lightflow or have no time to follow the tutorial? No problem, just spend a few minutes with this quickstart guide and you are on your way to using Lightflow.

This quickstart guide assumes that you have a redis database running on localhost and port 6379, a MongoDB database running on localhost and port 27017 as well as Lightflow installed on your system. If you haven’t installed the database systems and Lightflow yet, no problem, just follow the Installation guide.

Configuration and examples

Create an empty directory in your preferred location. We will use this directory in the following to store the configuration file and the example workflows. Lightflow has no restrictions on where this directory should be located and what its name should be.

The first step is to create the global configuration file for Lightflow. This file contains, among other settings, the connection information for redis and MongoDB. The quickest and easiest way to generate a default configuration file is to use the Lightflow command line interface. Make sure you are located in the directory you created earlier and enter:

$ lightflow config default .

This will create a configuration file called lightflow.cfg containing a default configuration. If you were running redis and MongoDB on different hosts than localhost or the default port, edit the appropriate settings in the configuration file. You can find more information about the configuration file in the section Configuration.

Lightflow ships with a number of examples that demonstrate various features of the system. We will copy these examples into a subfolder called examples inside your current directory. This will allow you to modify the examples as you see fit or use them as a starting point for your own workflows. The command line interface offers a quick and easy way to copy the examples:

$ lightflow config examples .

Now you will find a subfolder examples in your directory containing all example workflows. If you like, you can list all available example workflows together with a short description, Make sure you are located in the folder containing the configuration file, then enter:

$ lightflow workflow list

Start the workers

Lightflow uses a worker based scheme. This means a workflow adds jobs onto a central queue from which a number of workers consume jobs and execute them. In order for Lightflow to run a workflow, it needs at least one running worker (obviously). You can start a worker with:

$ lightflow worker start

This will start a worker, which then waits for the first job to be added to the queue. You can start as many workers as you like, but for the quickstart guide one worker is enough.

Run a workflow

With at least one worker running, we are ready to run our first workflow. You can pick any example workflow you like and run it. In the following we will run the most basic of all workflows, the simple workflow. You might need a second terminal in order to run the workflow as the first one is occupied running our worker. In your second terminal enter:

$ lightflow workflow start simple

This will send the workflow simple to the queue. Our worker will pick up the workflow and run it. The default logging level is very verbose so you will see the worker print out a lot of information as it executes the workflow.

Where to go from here

Congratulations, you have finished the quickstart guide. A good place to continue is to have a look at the documented example workflows. They are a great starting point for exploring the features of Lightflow. Alternatively, head over to the tutorial section for a more structured introduction to Lightflow.

Tutorial

Welcome to the Lightflow tutorial! This tutorial will guide you step by step through the development of a workflow. The emphasis is on showing you how to model and implement typical workflow elements with Lightflow and to demonstrate the features Lightflow has to offer. Therefore, the workflow we are going to create does nothing particularly sophisticated. So don’t expect too much. We will push numbers around and perform some basic math operations on them.

Let’s go! While we recommend that you follow the tutorial steps in order, you don’t have to. Each tutorial step introduces a specific concept or feature of Lightflow, so feel free to jump directly to the step that interests you most.

Step 0: Setup

In this step we will set up the environment for the tutorial and create an empty workflow. We assume that you followed the Installation guide and have a redis database running on localhost and port 6379, a MongoDB database running on localhost and port 27017 as well as Lightflow installed on your system.

To test whether you have installed Lightflow correctly, enter the following into a terminal:

$ lightflow

This calls the command line interface of Lightflow and will print the available commands and options.

Configuration file

Start by creating an empty directory with your preferred name (e.g. lightflow_tutorial) in a location of your choice (e.g. your home directory). This will be our working directory for the tutorial containing the lightflow configuration file and our workflow file. Lightflow has no restrictions on where this directory should be located and what it is called.

Next, we will create a configuration file. Lightflow uses the configuration file for storing settings such as the connection information for redis and MongoDB, and the location of your workflow files. To make things easier, we equipped the command line interface with a command to generate a default configuration file for you. Make sure you are located in the directory you created earlier, then enter:

$ lightflow config default .

This creates a configuration file called lightflow.cfg containing the default settings into the current directory.

Let’s have a look into the configuration file. Open lightflow.cfg with your editor of choice. The configuration file uses the YAML format and is broken up into several sections.

Non-default redis or MongoDB

If you are running redis on a different host or port from the default mentioned above, change the host and port settings in the celery as well as signal sections in the configuration file. If your MongoDB configuration deviates from the default, edit the host and port fields in the store section.

We will focus on the first field labelled workflows. This field contains a list of paths where Lightflow should look for workflow files. The paths can either be relative to the configuration file or absolute paths. By default, Lightflow expects to find workflow files in a sub-directory called examples, located in the same directory as your configuration file. However, we would like our tutorial workflow file to live in its own directory called tutorial. Therefore, edit the configuration file by changing examples to tutorial:

workflows:
  - ./tutorial

Save the file and exit your editor.

Tutorial folder

Before we can move on we have to create the tutorial folder for our tutorial workflow file of course. In the same directory as your configuration file, create a sub-directory called tutorial:

$ mkdir tutorial

Now you are ready to write your first workflow! Head over to Step 1: A simple workflow in our tutorial and learn how to write your first workflow.

Step 1: A simple workflow

In this section of the tutorial we will write our first workflow. It will consist of two tasks that are executed in order. Each task will print a message so you can track the execution of the tasks. At the end of this section you will have learned how to create tasks, arrange their execution order and run a workflow.

Workflow file

In Lightflow, workflows are defined using Python. This means you don’t have to learn another language and you can use your favorite Python libraries and modules. Typically you would have a single Python file describing the entire workflow, but for complex workflows you can, of course, split the workflow definition into multiple files. For this tutorial, we will only have a single workflow file.

Change into the tutorial directory and create an empty file called tutorial01.py. This file will contain the workflow for this step of the tutorial. Your directory structure should look like this:

/lightflow_tutorial
    lightflow.cfg
    /tutorial
        tutorial01.py
Create two tasks

Let’s get started with our workflow. First, we will create the two tasks for our small workflow. Open the workflow file you just created with your editor of choice. At the top of the file import the PythonTask class:

from lightflow.tasks import PythonTask

Lightflow is shipped with two task classes: the PythonTask and the BashTask. The PythonTask allows you to execute Python code in your task, while the BashTask provides an easy to use task for executing bash commands. In this tutorial we will use the PythonTask for all our tasks as it is the most flexible type of task. You can pretty much do whatever you like during the execution of a PythonTask.

Next, create the two tasks for our workflow. We are going to be boring here and call the first task first_task and the second task second_task:

first_task = PythonTask(name='first_task',
                        callback=print_first)

second_task = PythonTask(name='second_task',
                         callback=print_second)

The first argument name defines a name for the task so you can track the task more easily. We are using the name of the object here, but you can name the task whatever you think is appropriate. The second argument callback is a callable that is being run when the task is executed. This is the ‘body’ of the task and you are free to execute your own Python code here. In the spirit of boring names for our tutorial, we have named the callables: print_first and print_second. Of course, we haven’t defined the callables yet, so let’s do this next.

Implement the callables

We will use functions as the callables for our PythonTask objects. The functions take a specific form and look like this:

def print_first(data, store, signal, context):
    print('This is the first task')

Add this code above your task instantiations. A callable for a PythonTask has four arguments. We will cover all four arguments in more detail in the following tutorial steps. So for now, you can safely ignore them. All we do in the body of the function is to print a simple string.

The callable for the second task is pretty much the same, we only change the name and the string that is printed:

def print_second(data, store, signal, context):
    print('This is the second task')

At this point we have the task objects that should be run and the code that should be executed for each task. We haven’t defined the order in which we want the tasks to be run yet. This will happen in the next step.

Arrange the tasks in a sequence

In Lightflow tasks are arranged in a Directed Acyclic Graph, or ‘DAG’ for short. While this might sound complicated, what it means is that all you do is to define the dependencies between the tasks, thereby building a network (also called graph) of tasks. The ‘directed’ captures the fact that the dependencies impose a direction on the graph. In our case, we want the first_task to be run before the second_task. Lightflow does not allow for loops in the task graph, represented by the word ‘acyclic’. For example, you are not allowed to set up a graph in which you start with first_task then run second_task followed by running first_task again.

In Lightflow the Dag class takes care of running the tasks in the correct order. Import the Dag class at the top of your workflow file with:

from lightflow.models import Dag

Next, below your task object instantiations at the bottom of your workflow, create an object of the Dag class:

d = Dag('main_dag')

You have to provide a single argument, which is the name you would like to give to the Dag.

The Dag class provides the function define() for setting up the task graph. This is where the magic happens. Lightflow uses a Python dictionary in order to specify the arrangement of the tasks. The key:value relationship of a dictionary is mapped to a parent:child relationship for tasks, thereby defining the dependencies between tasks. For our simple, two task workflow the graph definition looks like this:

d.define({
    first_task: second_task
})

That’s it! You have defined our first workflow and are now ready to run it.

The complete workflow

Here is the complete workflow for this tutorial including a few comments:

from lightflow.models import Dag
from lightflow.tasks import PythonTask


# the callback functions for the task
def print_first(data, store, signal, context):
    print('This is the first task')

def print_second(data, store, signal, context):
    print('This is the second task')


# create the two task objects
first_task = PythonTask(name='first_task',
                        callback=print_first)

second_task = PythonTask(name='second_task',
                         callback=print_second)

# create the main DAG
d = Dag('main_dag')

# set up the graph of the DAG, in which the first_task has
# to be executed first, followed by the second_task.
d.define({
    first_task: second_task
})
Document the workflow

This step is optional, but highly recommended as it will help you remembering what the workflow does. We will add a title and a short description to the workflow. At the top of your workflow file add the following docstring:

""" Tutorial 1: a sequence of two tasks

This workflow uses two tasks in order to demonstrate
the basics of a workflow definition in Lightflow.
"""

Lightflow uses the first line of the docstring when listing all available workflows. Give it a go by changing to the directory where the configuration file is located and enter:

$ lightflow workflow list
tutorial01      Tutorial 1: a sequence of two tasks

Lightflow will list your workflow together with the short description you gave it.

Start a worker

Lightflow uses a worker based scheme. This means a workflow adds jobs onto a central queue from which a number of workers consume jobs and execute them. In order for Lightflow to run our workflow, it needs at least one running worker. Start a worker with:

$ lightflow worker start

This will start a worker, which then waits for the first job to be added to the queue. You can start as many workers as you like, but for now one worker is enough.

Run the workflow

With at least one worker running, we are ready to run our first workflow. You might need a second terminal in order to run the workflow as the first one is occupied running our worker. In your second terminal enter:

$ lightflow workflow start tutorial01

This will send our workflow to the queue. The worker will pick up the workflow and run it. The default logging level is very verbose so you will see the worker print out a lot of information as it executes the workflow.

You will see how the first_task is being executed first and prints the string “This is the first task”, then followed by the second_task and the string “This is the second task”.

Congratulations! You completed the first tutorial successfully.

Tasks

Python task

The PythonTask is the most basic and most flexible task in Lightflow. It allows you to execute almost arbitrary Python code in your task. The only requirement is that the Python code can be serialised and deserialised safely.

class lightflow.tasks.PythonTask(name, callback=None, *, queue='task', callback_init=None, callback_finally=None, force_run=False, propagate_skip=True)[source]

The Python task executes a user-defined python method.

Parameters:
  • name (str) – The name of the task.
  • callback (callable) –

    A reference to the Python method that should be called by the task as soon as it is run. It has to have the following definition:

    (data, store, signal, context) -> None, Action
    

    with the parameters:

    • data (MultiTaskData): The data object that has been passed from the predecessor task.
    • store (DataStoreDocument): The persistent data store object that allows the task to store data for access across the current workflow run.
    • signal (TaskSignal): The signal object for tasks. It wraps the construction and sending of signals into easy to use methods.
    • context (TaskContext): The context in which the tasks runs.
  • queue (str) – Name of the queue the task should be scheduled to. Defaults to the general task queue.
  • callback_init (callable) –

    An optional callable that is called shortly before the task is run. The definition is:

    (data, store, signal, context) -> None
    

    with the parameters:

    • data (MultiTaskData): The data object that has been passed from the predecessor task.
    • store (DataStoreDocument): The persistent data store object that allows the task to store data for access across the current workflow run.
    • signal (TaskSignal): The signal object for tasks. It wraps the construction and sending of signals into easy to use methods.
    • context (TaskContext): The context in which the tasks runs.
  • callback_finally (callable) –

    An optional callable that is always called at the end of a task, regardless whether it completed successfully, was stopped or was aborted. The definition is:

    (status, data, store, signal, context) -> None
    

    with the parameters:

    • status (TaskStatus): The current status of the task. It can be one of the following:
      • TaskStatus.Success – task was successful
      • TaskStatus.Stopped – task was stopped
      • TaskStatus.Aborted – task was aborted
      • TaskStatus.Error – task raised an exception
    • data (MultiTaskData): The data object that has been passed from the predecessor task.
    • store (DataStoreDocument): The persistent data store object that allows the task to store data for access across the current workflow run.
    • signal (TaskSignal): The signal object for tasks. It wraps the construction and sending of signals into easy to use methods.
    • context (TaskContext): The context in which the tasks runs.
  • force_run (bool) – Run the task even if it is flagged to be skipped.
  • propagate_skip (bool) – Propagate the skip flag to the next task.

Bash task

The BashTask provides an easy to use task for executing bash commands. It allows you to capture and process the standard and error output of the bash command either in ‘real-time’ or once the process has completed as a file object.

class lightflow.tasks.BashTask(name, command, cwd=None, env=None, user=None, group=None, stdin=None, refresh_time=0.1, capture_stdout=False, capture_stderr=False, callback_process=None, callback_end=None, callback_stdout=None, callback_stderr=None, *, queue='task', callback_init=None, callback_finally=None, force_run=False, propagate_skip=True)[source]

The Bash task executes a user-defined bash command or bash file.

All task parameters except the name, callbacks, queue, force_run and propagate_skip can either be their native type or a callable returning the native type.

Parameters:
  • name (str) – The name of the task.
  • command (function, str) – The command or bash file that should be executed.
  • cwd (function, str, None) – The working directory for the command.
  • env (function, dict, None) – A dictionary of environment variables.
  • user (function, int, None) – The user ID of the user with which the command should be executed.
  • group (function, int, None) – The group ID of the group with which the command should be executed.
  • stdin (function, str, None) – An input string that should be passed on to the process.
  • refresh_time (function, float) – The time in seconds the internal output handling waits before checking for new output from the process.
  • capture_stdout (function, bool) – Set to True to capture all standard output in a temporary file.
  • capture_stderr (function, bool) – Set to True to capture all standard errors in a temporary file.
  • callback_process (callable) –

    A callable that is called after the process started. The definition is:

    (pid, data, store, signal, context) -> None
    

    with the parameters:

    • pid (int): The process PID.
    • data (MultiTaskData): The data object that has been passed from the predecessor task.
    • store (DataStoreDocument): The persistent data store object that allows the task to store data for access across the current workflow run.
    • signal (TaskSignal): The signal object for tasks. It wraps the construction and sending of signals into easy to use methods.
    • context (TaskContext): The context in which the tasks runs.
  • callback_end (callable) –

    A callable that is called after the process completed. The definition is:

    (returncode, stdout_file, stderr_file,
     data, store, signal, context) -> None
    

    with the parameters:

    • returncode (int): The return code of the process.
    • stdout_file: A file object with the standard output if the flag capture_stdout was set to True, otherwise None.
    • stderr_file: A file object with the error output if the flag capture_stderr was set to True
      otherwise None.
    • data (MultiTaskData): The data object that has been passed from the predecessor task.
    • store (DataStoreDocument): The persistent data store object that allows the task to store data for access across the current workflow run.
    • signal (TaskSignal): The signal object for tasks. It wraps the construction and sending of signals into easy to use methods.
    • context (TaskContext): The context in which the tasks runs.
  • callback_stdout (callable) –

    A callable that is called for every line of output the process sends to stdout. The definition is:

    (line, data, store, signal, context) -> None
    
    with the parameters:
    • line (str): Single line of the process output as a string.
    • data (MultiTaskData): The data object that has been passed from the predecessor task.
    • store (DataStoreDocument): The persistent data store object that allows the task to store data for access across the current workflow run.
    • signal (TaskSignal): The signal object for tasks. It wraps the construction and sending of signals into easy to use methods.
    • context (TaskContext): The context in which the tasks runs.
  • callback_stderr (callable) –

    A callable that is called for every line of output the process sends to stderr. The definition is:

    (line, data, store, signal, context) -> None
    
    with the parameters:
    • line (str): Single line of the process output as a string.
    • data (MultiTaskData): The data object that has been passed from the predecessor task.
    • store (DataStoreDocument): The persistent data store object that allows the task to store data for access across the current workflow run.
    • signal (TaskSignal): The signal object for tasks. It wraps the construction and sending of signals into easy to use methods.
    • context (TaskContext): The context in which the tasks runs.
  • queue (str) – Name of the queue the task should be scheduled to. Defaults to the general task queue.
  • callback_init (callable) –

    An optional callable that is called shortly before the task is run. The definition is:

    (data, store, signal, context) -> None
    

    with the parameters:

    • data (MultiTaskData): The data object that has been passed from the predecessor task.
    • store (DataStoreDocument): The persistent data store object that allows the task to store data for access across the current workflow run.
    • signal (TaskSignal): The signal object for tasks. It wraps the construction and sending of signals into easy to use methods.
    • context (TaskContext): The context in which the tasks runs.
  • callback_finally (callable) –

    An optional callable that is always called at the end of a task, regardless whether it completed successfully, was stopped or was aborted. The definition is:

    (status, data, store, signal, context) -> None
    

    with the parameters:

    • status (TaskStatus): The current status of the task. It can be one of the following:
      • TaskStatus.Success – task was successful
      • TaskStatus.Stopped – task was stopped
      • TaskStatus.Aborted – task was aborted
      • TaskStatus.Error – task raised an exception
    • data (MultiTaskData): The data object that has been passed from the predecessor task.
    • store (DataStoreDocument): The persistent data store object that allows the task to store data for access across the current workflow run.
    • signal (TaskSignal): The signal object for tasks. It wraps the construction and sending of signals into easy to use methods.
    • context (TaskContext): The context in which the tasks runs.
  • force_run (bool) – Run the task even if it is flagged to be skipped.
  • propagate_skip (bool) – Propagate the skip flag to the next task.

API Reference

Workers

The workers module provides the API functions for starting, stopping and managing workers.

Methods

lightflow.workers.list_workers(config, *, filter_by_queues=None)[source]

Return a list of all available workers.

Parameters:
  • config (Config) – Reference to the configuration object from which the settings are retrieved.
  • filter_by_queues (list) – Restrict the returned workers to workers that listen to at least one of the queue names in this list.
Returns:

A list of WorkerStats objects.

Return type:

list

lightflow.workers.start_worker(queues, config, *, name=None, celery_args=None, check_datastore=True)[source]

Start a worker process.

Parameters:
  • queues (list) – List of queue names this worker accepts jobs from.
  • config (Config) – Reference to the configuration object from which the settings for the worker are retrieved.
  • name (string) – Unique name for the worker. The hostname template variables from Celery can be used. If not given, a unique name is created.
  • celery_args (list) – List of additional Celery worker command line arguments. Please note that this depends on the version of Celery used and might change. Use with caution.
  • check_datastore (bool) – Set to True to check whether the data store is available prior to starting the worker.
lightflow.workers.stop_worker(config, *, worker_ids=None)[source]

Stop a worker process.

Parameters:
  • config (Config) – Reference to the configuration object from which the settings for the worker are retrieved.
  • worker_ids (list) – An optional list of ids for the worker that should be stopped.

Return Classes

class lightflow.queue.models.WorkerStats(name, broker, pid, process_pids, concurrency, job_count, queues)[source]

Represents the worker information returned from celery.

Parameters:
  • name (str) – The name of the worker.
  • broker (BrokerStats) – A reference to a BrokerStats Object the worker is using.
  • pid (int) – The PID of the worker.
  • process_pids (int) – The PIDs of the concurrent task processes.
  • concurrency (int) – The number of concurrent processes.
  • job_count (int) – The number of jobs this worker has processed so far.
  • queues (list) – A list of QueueStats objects that represent the queues this worker is listening on.
classmethod from_celery(name, worker_dict, queues)[source]

Create a WorkerStats object from the dictionary returned by celery.

Parameters:
  • name (str) – The name of the worker.
  • worker_dict (dict) – The dictionary as returned by celery.
  • queues (list) – A list of QueueStats objects that represent the queues this worker is listening on.
Returns:

A fully initialized WorkerStats object.

Return type:

WorkerStats

to_dict()[source]

Return a dictionary of the worker stats.

Returns:Dictionary of the stats.
Return type:dict
class lightflow.queue.models.QueueStats(name, routing_key)[source]

Represents the queue information returned from celery.

Parameters:
  • name (str) – The name of the queue.
  • routing_key (str) – The routing key of the queue.
classmethod from_celery(queue_dict)[source]

Create a QueueStats object from the dictionary returned by celery.

Parameters:queue_dict (dict) – The dictionary as returned by celery.
Returns:A fully initialized QueueStats object.
Return type:QueueStats
to_dict()[source]

Return a dictionary of the queue stats.

Returns:Dictionary of the stats.
Return type:dict

Workflows

The workflows module provides the API functions for starting, stopping and monitoring workflows.

Methods

lightflow.workflows.events(config)[source]

Return a generator that yields workflow events.

For every workflow event that is sent from celery this generator yields an event object.

Parameters:config (Config) – Reference to the configuration object from which the settings are retrieved.
Returns:A generator that returns workflow events.
Return type:generator
lightflow.workflows.list_jobs(config, *, status=0, filter_by_type=None, filter_by_worker=None)[source]

Return a list of Celery jobs.

Parameters:
  • config (Config) – Reference to the configuration object from which the settings are retrieved.
  • status (JobStatus) – The status of the jobs that should be returned.
  • filter_by_type (list) – Restrict the returned jobs to the types in this list.
  • filter_by_worker (list) – Only return jobs that were registered, reserved or are running on the workers given in this list of worker names. Using this option will increase the performance.
Returns:

A list of JobStats.

Return type:

list

lightflow.workflows.list_workflows(config)[source]

List all available workflows.

Returns a list of all workflows that are available from the paths specified in the config. A workflow is defined as a Python file with at least one DAG.

Parameters:config (Config) – Reference to the configuration object from which the settings are retrieved.
Returns:A list of workflows.
Return type:list
lightflow.workflows.start_workflow(name, config, *, queue='workflow', clear_data_store=True, store_args=None)[source]

Start a single workflow by sending it to the workflow queue.

Parameters:
  • name (str) – The name of the workflow that should be started. Refers to the name of the workflow file without the .py extension.
  • config (Config) – Reference to the configuration object from which the settings for the workflow are retrieved.
  • queue (str) – Name of the queue the workflow should be scheduled to.
  • clear_data_store (bool) – Remove any documents created during the workflow run in the data store after the run.
  • store_args (dict) – Dictionary of additional arguments that are ingested into the data store prior to the execution of the workflow.
Returns:

The ID of the workflow job.

Return type:

str

Raises:
  • WorkflowArgumentError – If the workflow requires arguments to be set in store_args that were not supplied to the workflow.
  • WorkflowImportError – If the import of the workflow fails.
lightflow.workflows.stop_workflow(config, *, names=None)[source]

Stop one or more workflows.

Parameters:
  • config (Config) – Reference to the configuration object from which the settings for the workflow are retrieved.
  • names (list) – List of workflow names, workflow ids or workflow job ids for the workflows that should be stopped. If all workflows should be stopped, set it to None.
Returns:

A tuple of the workflow jobs that were successfully stopped and the ones

that could not be stopped.

Return type:

tuple

Return Classes

class lightflow.queue.models.JobStats(name, job_id, job_type, queue, workflow_id, start_time, arguments, acknowledged, func_name, hostname, worker_name, worker_pid, routing_key)[source]

Represents the job (=celery task) information returned from celery.

Parameters:
  • name (str) – The name of the job.
  • job_id (str) – The internal ID of the job.
  • job_type (str) – The type of the job (workflow, dag, task).
  • queue (str) – The name of the queue the job was scheduled to.
  • workflow_id (str) – The id of the workflow that started this job.
  • start_time (datetime) – The time the job was started in UTC.
  • arguments (dict) – The provided arguments to a workflow.
  • acknowledged (bool) – True of the job was acknowledged by the message system.
  • func_name (str) – The name of the function that represents this job.
  • hostname (str) – The name of the host this job runs on.
  • worker_name (str) – The name of the worker this job runs on.
  • worker_pid (int) – The pid of the process this jobs runs on.
  • routing_key (str) – The routing key for this job.
classmethod from_celery(worker_name, job_dict, celery_app)[source]

Create a JobStats object from the dictionary returned by celery.

Parameters:
  • worker_name (str) – The name of the worker this jobs runs on.
  • job_dict (dict) – The dictionary as returned by celery.
  • celery_app – Reference to a celery application object.
Returns:

A fully initialized JobStats object.

Return type:

JobStats

to_dict()[source]

Return a dictionary of the job stats.

Returns:Dictionary of the stats.
Return type:dict

Config

The configuration of Lightflow is passed to the API functions via an instance of the Config class. The configuration is described as a YAML structure and can be loaded from a file. The Config class contains a default configuration, which means that you only need to specify the settings in the config file that you would like to change.

class lightflow.Config[source]

Hosts the global configuration.

The configuration is read from a structured YAML file or a dictionary. The location of the file can either be specified directly, is given in the environment variable LIGHTFLOW_CONFIG_ENV, is looked for in the current execution directory or in the home directory of the user.

celery

Return the celery settings

cli

Return the cli settings

dag_polling_time

Return the waiting time between status checks of the running tasks (sec)

data_store

Return the data store settings

static default()[source]

Returns the default configuration.

extensions

Return the custom settings of extensions

classmethod from_file(filename, *, strict=True)[source]

Create a new Config object from a configuration file.

Parameters:
  • filename (str) – The location and name of the configuration file.
  • strict (bool) – If true raises a ConfigLoadError when the configuration cannot be found.
Returns:

An instance of the Config class.

Raises:

ConfigLoadError – If the configuration cannot be found.

load_from_dict(conf_dict=None)[source]

Load the configuration from a dictionary.

Parameters:conf_dict (dict) – Dictionary with the configuration.
load_from_file(filename=None, *, strict=True)[source]

Load the configuration from a file.

The location of the configuration file can either be specified directly in the parameter filename or is searched for in the following order:

  1. In the environment variable given by LIGHTFLOW_CONFIG_ENV
  2. In the current execution directory
  3. In the user’s home directory
Parameters:
  • filename (str) – The location and name of the configuration file.
  • strict (bool) – If true raises a ConfigLoadError when the configuration cannot be found.
Raises:

ConfigLoadError – If the configuration cannot be found.

logging

Return the logging settings

set_to_default()[source]

Overwrite the configuration with the default configuration.

signal

Return the signal system settings

to_dict()[source]

Returns a copy of the internal configuration as a dictionary.

workflow_polling_time

Return the waiting time between status checks of the running dags (sec)

workflows

Return the workflow folders

Task Data

class lightflow.models.MultiTaskData(*, dataset=None, aliases=None)[source]

Manages multiple TaskData datasets and their aliases.

This class implements the data object that is being passed between tasks. It consists of one or more TaskData datasets in order to accommodate multiple inputs to a single task. Each dataset can be accessed by its index or by one or more aliases. There is a default dataset, which is used whenever the user does not specify the exact dataset to work with.

Parameters:
  • dataset (TaskData) – An initial TaskData dataset.
  • aliases (list) – A list of aliases for the initial dataset.
add_alias(alias, index)[source]

Add an alias pointing to the specified index.

Parameters:
  • alias (str) – The alias that should point to the given index.
  • index (int) – The index of the dataset for which an alias should be added.
Raises:

DataInvalidIndex – If the index does not represent a valid dataset.

add_dataset(task_name, dataset=None, *, aliases=None)[source]

Add a new dataset to the MultiTaskData.

Parameters:
  • task_name (str) – The name of the task from which the dataset was received.
  • dataset (TaskData) – The dataset that should be added.
  • aliases (list) – A list of aliases that should be registered with the dataset.
add_task_history(task_name)[source]

Add a task name to the list of tasks that have contributed to all datasets.

Parameters:task_name (str) – The name of the task that contributed.
default_dataset

Return the default dataset.

Returns:A reference to the default dataset.
Return type:TaskData
default_index

Return the index of the default dataset.

flatten(in_place=True)[source]

Merge all datasets into a single dataset.

The default dataset is the last dataset to be merged, as it is considered to be the primary source of information and should overwrite all existing fields with the same key.

Parameters:in_place (bool) – Set to True to replace the existing datasets with the merged one. If set to False, will return a new MultiTaskData object containing the merged dataset.
Returns:If the in_place flag is set to False.
Return type:MultiTaskData
get_by_alias(alias)[source]

Return a dataset by its alias.

Parameters:alias (str) – The alias of the dataset that should be returned.
Raises:DataInvalidAlias – If the alias does not represent a valid dataset.
get_by_index(index)[source]

Return a dataset by its index.

Parameters:index (int) – The index of the dataset that should be returned.
Raises:DataInvalidIndex – If the index does not represent a valid dataset.
set_default_by_alias(alias)[source]

Set the default dataset by its alias.

After changing the default dataset, all calls without explicitly specifying the dataset by index or alias will be redirected to this dataset.

Parameters:alias (str) – The alias of the dataset that should be made the default.
Raises:DataInvalidAlias – If the alias does not represent a valid dataset.
set_default_by_index(index)[source]

Set the default dataset by its index.

After changing the default dataset, all calls without explicitly specifying the dataset by index or alias will be redirected to this dataset.

Parameters:index (int) – The index of the dataset that should be made the default.
Raises:DataInvalidIndex – If the index does not represent a valid dataset.

Persistent Data Store

class lightflow.models.DataStoreDocument(collection, grid_fs, workflow_id)[source]

A single data store document containing the data for a workflow run.

The document provides methods in order to retrieve and set data in the persistent data store. It represents the data for a single workflow run.

Parameters:
  • collection – A MongoDB collection object pointing to the data store collection.
  • grid_fs – A GridFS object used for splitting large, binary data into smaller chunks in order to avoid the 16MB document limit of MongoDB.
  • workflow_id – The id of the workflow run this document is associated with.
extend(key, values, *, section='data')[source]

Extends a list in the data store with the elements of values.

Parameters:
  • key (str) – The key pointing to the value that should be stored/updated. It supports MongoDB’s dot notation for nested fields.
  • values (list) – A list of the values that should be used to extend the list in the document.
  • section (DataStoreDocumentSection) – The section from which the data should be retrieved.
Returns:

True if the list in the database could be extended,

otherwise False.

Return type:

bool

get(key, default=None, *, section='data')[source]

Return the field specified by its key from the specified section.

This method access the specified section of the workflow document and returns the value for the given key.

Parameters:
  • key (str) – The key pointing to the value that should be retrieved. It supports MongoDB’s dot notation for nested fields.
  • default – The default value that is returned if the key does not exist.
  • section (DataStoreDocumentSection) – The section from which the data should be retrieved.
Returns:

The value from the field that the specified key is pointing to. If the

key does not exist, the default value is returned. If no default value is provided and the key does not exist None is returned.

Return type:

object

push(key, value, *, section='data')[source]

Appends a value to a list in the specified section of the document.

Parameters:
  • key (str) – The key pointing to the value that should be stored/updated. It supports MongoDB’s dot notation for nested fields.
  • value – The value that should be appended to a list in the data store.
  • section (DataStoreDocumentSection) – The section from which the data should be retrieved.
Returns:

True if the value could be appended, otherwise False.

Return type:

bool

set(key, value, *, section='data')[source]

Store a value under the specified key in the given section of the document.

This method stores a value into the specified section of the workflow data store document. Any existing value is overridden. Before storing a value, any linked GridFS document under the specified key is deleted.

Parameters:
  • key (str) – The key pointing to the value that should be stored/updated. It supports MongoDB’s dot notation for nested fields.
  • value – The value that should be stored/updated.
  • section (DataStoreDocumentSection) – The section from which the data should be retrieved.
Returns:

True if the value could be set/updated, otherwise False.

Return type:

bool