Welcome to Lightflow!¶
Lightflow is a Python 3.5+ library and command-line tool for executing workflows, composed of individual tasks, in a distributed fashion. It is based on Celery and provides task dependencies, data exchange between tasks and an intuitive description of workflows.
User’s Guide¶
Installation¶
One of the key goals when developing Lightflow was to keep the infrastructure dependencies as small as possible. Lightflow does not require special file systems, job scheduler systems or special hardware. It runs on most Linux distributions and is known to work on MacOSX as well as on the Ubuntu subsystem of Windows 10. Apart from Python 3.5+, the only dependencies of Lightflow are a running redis and MongoDB database.
Python¶
Lightflow requires Python 3.5 or higher. It has been developed and tested with both a native Python installation as well as Miniconda/Anaconda.
Lightflow’s main Python dependencies are:
- Celery - for queuing and managing jobs and running workers
- NetworkX - for building and interrogating the directed acyclic graphs of a workflow
- cloudpickle - for exchanging data between tasks running on distributed hosts
- Click - for the command line client
- ruamel.yaml - for reading the configuration file
These dependencies are installed during the installation of Lightflow automatically.
redis¶
Redis is an in-memory key-value database and is required by Lightflow as a communication broker between tasks. It is also used as the default broker for the Celery queuing system, but could be replaced with any other supported Celery broker.
You can either download redis from the offical redis website or install it via the package
manager of your distribution. By default, the redis server runs on localhost
and port 6379
. The Quickstart as well as the Tutorial
assume you are running redis using these defaults.
MongoDB¶
MongoDB is a popular document-oriented database and is used by Lightflow for storing data that should persist during a workflow run.
You can either download MongoDB from the official MongoDB website or install it via the package manager of your distribution:
By default, MongoDB runs on localhost
and port 27017
. The Quickstart as well as the Tutorial
assume you are running MongoDB using these defaults.
Lightflow¶
After having redis and MongoDB running, installing Lightflow is a breeze. It is available from PyPI and can be installed with:
pip install lightflow
This will install the Lightflow libraries, command line client and example workflows.
Quickstart¶
You can’t wait to start using Lightflow or have no time to follow the tutorial? No problem, just spend a few minutes with this quickstart guide and you are on your way to using Lightflow.
This quickstart guide assumes that you have a redis database running on localhost
and port 6379
,
a MongoDB database running on localhost
and port 27017
as well as Lightflow installed on your system. If you haven’t
installed the database systems and Lightflow yet, no problem, just follow the Installation guide.
Configuration and examples¶
Create an empty directory in your preferred location. We will use this directory in the following to store the configuration file and the example workflows. Lightflow has no restrictions on where this directory should be located and what its name should be.
The first step is to create the global configuration file for Lightflow. This file contains, among other settings, the connection information for redis and MongoDB. The quickest and easiest way to generate a default configuration file is to use the Lightflow command line interface. Make sure you are located in the directory you created earlier and enter:
$ lightflow config default .
This will create a configuration file called lightflow.cfg
containing a default configuration. If you were running redis and MongoDB on
different hosts than localhost
or the default port, edit the appropriate settings in the configuration file. You can find more
information about the configuration file in the section Configuration.
Lightflow ships with a number of examples that demonstrate various features of the system. We will copy these examples into a subfolder called examples
inside your current directory. This will allow you to modify the examples as you see fit or use them as a starting point for your own workflows.
The command line interface offers a quick and easy way to copy the examples:
$ lightflow config examples .
Now you will find a subfolder examples
in your directory containing all example workflows. If you like, you can list all available example workflows
together with a short description, Make sure you are located in the folder containing the configuration file, then enter:
$ lightflow workflow list
Start the workers¶
Lightflow uses a worker based scheme. This means a workflow adds jobs onto a central queue from which a number of workers consume jobs and execute them. In order for Lightflow to run a workflow, it needs at least one running worker (obviously). You can start a worker with:
$ lightflow worker start
This will start a worker, which then waits for the first job to be added to the queue. You can start as many workers as you like, but for the quickstart guide one worker is enough.
A recommended setup for multiple workers
What is special about Lightflow, in comparison with other workflow systems, is that it also uses workers for running the workflow itself. This means, there
is no central daemon and thus no single point of failure. Lightflow uses three queues for running a workflow. Two queues, labelled workflow
and dag
, for
managing the workflows and one queue, labelled task
, for executing the individual tasks of a workflow. A typical setup of workers would consist of one worker
dealing with workflow related jobs, thus consuming jobs only from the workflow
and dag
queues, and one or more workers executing the actual tasks.
You can use the -q
argument in the command line interface in order to restrict the queues a worker consumes jobs from. For the recommended setup
discussed above you would start one worker with:
$ lightflow worker start -q workflow,dag
and at least one more worker with:
$ lightflow worker start -q task
Run a workflow¶
With at least one worker running, we are ready to run our first workflow. You can pick any example workflow you like and run it. In the following we
will run the most basic of all workflows, the simple
workflow. You might need a second terminal in order to run the workflow as the first one
is occupied running our worker. In your second terminal enter:
$ lightflow workflow start simple
This will send the workflow simple
to the queue. Our worker will pick up the workflow and run it. The default logging level is very verbose so you
will see the worker print out a lot of information as it executes the workflow.
Where to go from here¶
Congratulations, you have finished the quickstart guide. A good place to continue is to have a look at the documented example workflows. They are a great starting point for exploring the features of Lightflow. Alternatively, head over to the tutorial section for a more structured introduction to Lightflow.
Tutorial¶
Welcome to the Lightflow tutorial! This tutorial will guide you step by step through the development of a workflow. The emphasis is on showing you how to model and implement typical workflow elements with Lightflow and to demonstrate the features Lightflow has to offer. Therefore, the workflow we are going to create does nothing particularly sophisticated. So don’t expect too much. We will push numbers around and perform some basic math operations on them.
Let’s go! While we recommend that you follow the tutorial steps in order, you don’t have to. Each tutorial step introduces a specific concept or feature of Lightflow, so feel free to jump directly to the step that interests you most.
Step 0: Setup¶
In this step we will set up the environment for the tutorial and create an empty workflow. We assume that you followed the Installation guide
and have a redis database running on localhost
and port 6379
, a MongoDB database running on localhost
and port 27017
as well as Lightflow installed on your system.
To test whether you have installed Lightflow correctly, enter the following into a terminal:
$ lightflow
This calls the command line interface of Lightflow and will print the available commands and options.
Configuration file¶
Start by creating an empty directory with your preferred name (e.g. lightflow_tutorial
) in a location of your choice (e.g. your home directory).
This will be our working directory for the tutorial containing the lightflow configuration file and our workflow file. Lightflow has no restrictions on where
this directory should be located and what it is called.
Next, we will create a configuration file. Lightflow uses the configuration file for storing settings such as the connection information for redis and MongoDB, and the location of your workflow files. To make things easier, we equipped the command line interface with a command to generate a default configuration file for you. Make sure you are located in the directory you created earlier, then enter:
$ lightflow config default .
This creates a configuration file called lightflow.cfg
containing the default settings into the current directory.
Let’s have a look into the configuration file. Open lightflow.cfg
with your editor of choice. The configuration file uses the YAML format
and is broken up into several sections.
Non-default redis or MongoDB
If you are running redis on a different host or port from the default mentioned above, change the host
and port settings in the celery
as well as signal
sections in the configuration file.
If your MongoDB configuration deviates from the default, edit the host and port fields in the store
section.
We will focus on the first field labelled workflows
. This field contains a list of paths where Lightflow should look for workflow files.
The paths can either be relative to the configuration file or absolute paths. By default, Lightflow expects to find workflow files in a sub-directory
called examples
, located in the same directory as your configuration file. However, we would like our tutorial workflow file to
live in its own directory called tutorial
. Therefore, edit the configuration file by changing examples
to tutorial
:
workflows:
- ./tutorial
Save the file and exit your editor.
Tutorial folder¶
Before we can move on we have to create the tutorial
folder for our tutorial workflow file of course. In the same
directory as your configuration file, create a sub-directory called tutorial
:
$ mkdir tutorial
Now you are ready to write your first workflow! Head over to Step 1: A simple workflow in our tutorial and learn how to write your first workflow.
Step 1: A simple workflow¶
In this section of the tutorial we will write our first workflow. It will consist of two tasks that are executed in order. Each task will print a message so you can track the execution of the tasks. At the end of this section you will have learned how to create tasks, arrange their execution order and run a workflow.
Workflow file¶
In Lightflow, workflows are defined using Python. This means you don’t have to learn another language and you can use your favorite Python libraries and modules. Typically you would have a single Python file describing the entire workflow, but for complex workflows you can, of course, split the workflow definition into multiple files. For this tutorial, we will only have a single workflow file.
Change into the tutorial
directory and create an empty file called tutorial01.py
. This file will contain the workflow for this step of the tutorial.
Your directory structure should look like this:
/lightflow_tutorial
lightflow.cfg
/tutorial
tutorial01.py
Create two tasks¶
Let’s get started with our workflow. First, we will create the two tasks for our small workflow. Open the workflow file you just created with your editor of choice. At the top of the file import the PythonTask class:
from lightflow.tasks import PythonTask
Lightflow is shipped with two task classes: the PythonTask
and the BashTask
. The PythonTask
allows you to execute Python code in your task, while
the BashTask
provides an easy to use task for executing bash commands. In this tutorial we will use the PythonTask
for all our tasks as it is the most
flexible type of task. You can pretty much do whatever you like during the execution of a PythonTask
.
Next, create the two tasks for our workflow. We are going to be boring here and call the first task first_task
and the second task second_task
:
first_task = PythonTask(name='first_task',
callback=print_first)
second_task = PythonTask(name='second_task',
callback=print_second)
The first argument name
defines a name for the task so you can track the task more easily. We are using the name of the object here, but you can name the
task whatever you think is appropriate. The second argument callback
is a callable that is being run when the task is executed. This is the ‘body’ of the task
and you are free to execute your own Python code here. In the spirit of boring names for our tutorial, we have named the callables: print_first
and
print_second
. Of course, we haven’t defined the callables yet, so let’s do this next.
Implement the callables¶
We will use functions as the callables for our PythonTask
objects. The functions take a specific form and look like this:
def print_first(data, store, signal, context):
print('This is the first task')
Add this code above your task instantiations. A callable for a PythonTask
has four arguments. We will cover all four arguments in more detail
in the following tutorial steps. So for now, you can safely ignore them. All we do in the body of the function is to print a simple string.
The callable for the second task is pretty much the same, we only change the name and the string that is printed:
def print_second(data, store, signal, context):
print('This is the second task')
At this point we have the task objects that should be run and the code that should be executed for each task. We haven’t defined the order in which we want the tasks to be run yet. This will happen in the next step.
Arrange the tasks in a sequence¶
In Lightflow tasks are arranged in a Directed Acyclic Graph, or ‘DAG’ for short. While this might sound complicated, what it means is that all you do is to
define the dependencies between the tasks, thereby building a network (also called graph) of tasks. The ‘directed’ captures the fact that the dependencies impose
a direction on the graph. In our case, we want the first_task
to be run before the second_task
. Lightflow does not allow for loops in the task graph,
represented by the word ‘acyclic’. For example, you are not allowed to set up a graph in which you start with first_task
then run second_task
followed
by running first_task
again.
In Lightflow the Dag
class takes care of running the tasks in the correct order. Import the Dag
class at the top of your workflow file with:
from lightflow.models import Dag
Next, below your task object instantiations at the bottom of your workflow, create an object of the Dag
class:
d = Dag('main_dag')
You have to provide a single argument, which is the name you would like to give to the Dag.
The Dag
class provides the function define()
for setting up the task graph. This is where the magic happens. Lightflow uses a Python dictionary
in order to specify the arrangement of the tasks. The key:value relationship of a dictionary is mapped to a parent:child relationship for tasks,
thereby defining the dependencies between tasks. For our simple, two task workflow the graph definition looks like this:
d.define({
first_task: second_task
})
That’s it! You have defined our first workflow and are now ready to run it.
The complete workflow¶
Here is the complete workflow for this tutorial including a few comments:
from lightflow.models import Dag
from lightflow.tasks import PythonTask
# the callback functions for the task
def print_first(data, store, signal, context):
print('This is the first task')
def print_second(data, store, signal, context):
print('This is the second task')
# create the two task objects
first_task = PythonTask(name='first_task',
callback=print_first)
second_task = PythonTask(name='second_task',
callback=print_second)
# create the main DAG
d = Dag('main_dag')
# set up the graph of the DAG, in which the first_task has
# to be executed first, followed by the second_task.
d.define({
first_task: second_task
})
Document the workflow¶
This step is optional, but highly recommended as it will help you remembering what the workflow does. We will add a title and a short description to the workflow. At the top of your workflow file add the following docstring:
""" Tutorial 1: a sequence of two tasks
This workflow uses two tasks in order to demonstrate
the basics of a workflow definition in Lightflow.
"""
Lightflow uses the first line of the docstring when listing all available workflows. Give it a go by changing to the directory where the configuration file is located and enter:
$ lightflow workflow list
tutorial01 Tutorial 1: a sequence of two tasks
Lightflow will list your workflow together with the short description you gave it.
Start a worker¶
Lightflow uses a worker based scheme. This means a workflow adds jobs onto a central queue from which a number of workers consume jobs and execute them. In order for Lightflow to run our workflow, it needs at least one running worker. Start a worker with:
$ lightflow worker start
This will start a worker, which then waits for the first job to be added to the queue. You can start as many workers as you like, but for now one worker is enough.
Run the workflow¶
With at least one worker running, we are ready to run our first workflow. You might need a second terminal in order to run the workflow as the first one is occupied running our worker. In your second terminal enter:
$ lightflow workflow start tutorial01
This will send our workflow to the queue. The worker will pick up the workflow and run it. The default logging level is very verbose so you will see the worker print out a lot of information as it executes the workflow.
You will see how the first_task
is being executed first and prints the string “This is the first task”, then followed by the second_task
and the
string “This is the second task”.
Congratulations! You completed the first tutorial successfully.
Tasks¶
Python task¶
The PythonTask
is the most basic and most flexible task in Lightflow. It allows you to execute almost arbitrary Python code in your task.
The only requirement is that the Python code can be serialised and deserialised safely.
-
class
lightflow.tasks.
PythonTask
(name, callback=None, *, queue='task', callback_init=None, callback_finally=None, force_run=False, propagate_skip=True)[source]¶ The Python task executes a user-defined python method.
Parameters: - name (str) – The name of the task.
- callback (callable) –
A reference to the Python method that should be called by the task as soon as it is run. It has to have the following definition:
(data, store, signal, context) -> None, Action
with the parameters:
- data (
MultiTaskData
): The data object that has been passed from the predecessor task. - store (
DataStoreDocument
): The persistent data store object that allows the task to store data for access across the current workflow run. - signal (TaskSignal): The signal object for tasks. It wraps the construction and sending of signals into easy to use methods.
- context (TaskContext): The context in which the tasks runs.
- data (
- queue (str) – Name of the queue the task should be scheduled to. Defaults to the general task queue.
- callback_init (callable) –
An optional callable that is called shortly before the task is run. The definition is:
(data, store, signal, context) -> None
with the parameters:
- data (
MultiTaskData
): The data object that has been passed from the predecessor task. - store (
DataStoreDocument
): The persistent data store object that allows the task to store data for access across the current workflow run. - signal (TaskSignal): The signal object for tasks. It wraps the construction and sending of signals into easy to use methods.
- context (TaskContext): The context in which the tasks runs.
- data (
- callback_finally (callable) –
An optional callable that is always called at the end of a task, regardless whether it completed successfully, was stopped or was aborted. The definition is:
(status, data, store, signal, context) -> None
with the parameters:
- status (TaskStatus): The current status of the task. It can be one of the following:
TaskStatus.Success
– task was successfulTaskStatus.Stopped
– task was stoppedTaskStatus.Aborted
– task was abortedTaskStatus.Error
– task raised an exception
- data (
MultiTaskData
): The data object that has been passed from the predecessor task. - store (
DataStoreDocument
): The persistent data store object that allows the task to store data for access across the current workflow run. - signal (TaskSignal): The signal object for tasks. It wraps the construction and sending of signals into easy to use methods.
- context (TaskContext): The context in which the tasks runs.
- status (TaskStatus): The current status of the task. It can be one of the following:
- force_run (bool) – Run the task even if it is flagged to be skipped.
- propagate_skip (bool) – Propagate the skip flag to the next task.
Bash task¶
The BashTask
provides an easy to use task for executing bash commands. It allows you to capture and process the standard and error output
of the bash command either in ‘real-time’ or once the process has completed as a file object.
-
class
lightflow.tasks.
BashTask
(name, command, cwd=None, env=None, user=None, group=None, stdin=None, refresh_time=0.1, capture_stdout=False, capture_stderr=False, callback_process=None, callback_end=None, callback_stdout=None, callback_stderr=None, *, queue='task', callback_init=None, callback_finally=None, force_run=False, propagate_skip=True)[source]¶ The Bash task executes a user-defined bash command or bash file.
All task parameters except the name, callbacks, queue, force_run and propagate_skip can either be their native type or a callable returning the native type.
Parameters: - name (str) – The name of the task.
- command (function, str) – The command or bash file that should be executed.
- cwd (function, str, None) – The working directory for the command.
- env (function, dict, None) – A dictionary of environment variables.
- user (function, int, None) – The user ID of the user with which the command should be executed.
- group (function, int, None) – The group ID of the group with which the command should be executed.
- stdin (function, str, None) – An input string that should be passed on to the process.
- refresh_time (function, float) – The time in seconds the internal output handling waits before checking for new output from the process.
- capture_stdout (function, bool) – Set to
True
to capture all standard output in a temporary file. - capture_stderr (function, bool) – Set to
True
to capture all standard errors in a temporary file. - callback_process (callable) –
A callable that is called after the process started. The definition is:
(pid, data, store, signal, context) -> None
with the parameters:
- pid (int): The process PID.
- data (
MultiTaskData
): The data object that has been passed from the predecessor task. - store (
DataStoreDocument
): The persistent data store object that allows the task to store data for access across the current workflow run. - signal (TaskSignal): The signal object for tasks. It wraps the construction and sending of signals into easy to use methods.
- context (TaskContext): The context in which the tasks runs.
- callback_end (callable) –
A callable that is called after the process completed. The definition is:
(returncode, stdout_file, stderr_file, data, store, signal, context) -> None
with the parameters:
- returncode (int): The return code of the process.
- stdout_file: A file object with the standard output if the flag
capture_stdout
was set toTrue
, otherwiseNone
. - stderr_file: A file object with the error output if the flag
capture_stderr
was set toTrue
- otherwise
None.
- stderr_file: A file object with the error output if the flag
- data (
MultiTaskData
): The data object that has been passed from the predecessor task. - store (
DataStoreDocument
): The persistent data store object that allows the task to store data for access across the current workflow run. - signal (TaskSignal): The signal object for tasks. It wraps the construction and sending of signals into easy to use methods.
- context (TaskContext): The context in which the tasks runs.
- callback_stdout (callable) –
A callable that is called for every line of output the process sends to stdout. The definition is:
(line, data, store, signal, context) -> None
- with the parameters:
- line (str): Single line of the process output as a string.
- data (
MultiTaskData
): The data object that has been passed from the predecessor task. - store (
DataStoreDocument
): The persistent data store object that allows the task to store data for access across the current workflow run. - signal (TaskSignal): The signal object for tasks. It wraps the construction and sending of signals into easy to use methods.
- context (TaskContext): The context in which the tasks runs.
- callback_stderr (callable) –
A callable that is called for every line of output the process sends to stderr. The definition is:
(line, data, store, signal, context) -> None
- with the parameters:
- line (str): Single line of the process output as a string.
- data (
MultiTaskData
): The data object that has been passed from the predecessor task. - store (
DataStoreDocument
): The persistent data store object that allows the task to store data for access across the current workflow run. - signal (TaskSignal): The signal object for tasks. It wraps the construction and sending of signals into easy to use methods.
- context (TaskContext): The context in which the tasks runs.
- queue (str) – Name of the queue the task should be scheduled to. Defaults to the general task queue.
- callback_init (callable) –
An optional callable that is called shortly before the task is run. The definition is:
(data, store, signal, context) -> None
with the parameters:
- data (
MultiTaskData
): The data object that has been passed from the predecessor task. - store (
DataStoreDocument
): The persistent data store object that allows the task to store data for access across the current workflow run. - signal (TaskSignal): The signal object for tasks. It wraps the construction and sending of signals into easy to use methods.
- context (TaskContext): The context in which the tasks runs.
- data (
- callback_finally (callable) –
An optional callable that is always called at the end of a task, regardless whether it completed successfully, was stopped or was aborted. The definition is:
(status, data, store, signal, context) -> None
with the parameters:
- status (TaskStatus): The current status of the task. It can be one of the following:
TaskStatus.Success
– task was successfulTaskStatus.Stopped
– task was stoppedTaskStatus.Aborted
– task was abortedTaskStatus.Error
– task raised an exception
- data (
MultiTaskData
): The data object that has been passed from the predecessor task. - store (
DataStoreDocument
): The persistent data store object that allows the task to store data for access across the current workflow run. - signal (TaskSignal): The signal object for tasks. It wraps the construction and sending of signals into easy to use methods.
- context (TaskContext): The context in which the tasks runs.
- status (TaskStatus): The current status of the task. It can be one of the following:
- force_run (bool) – Run the task even if it is flagged to be skipped.
- propagate_skip (bool) – Propagate the skip flag to the next task.
API Reference¶
Workers¶
The workers
module provides the API functions for starting, stopping and managing workers.
Methods¶
-
lightflow.workers.
list_workers
(config, *, filter_by_queues=None)[source]¶ Return a list of all available workers.
Parameters: - config (Config) – Reference to the configuration object from which the settings are retrieved.
- filter_by_queues (list) – Restrict the returned workers to workers that listen to at least one of the queue names in this list.
Returns: A list of WorkerStats objects.
Return type: list
-
lightflow.workers.
start_worker
(queues, config, *, name=None, celery_args=None, check_datastore=True)[source]¶ Start a worker process.
Parameters: - queues (list) – List of queue names this worker accepts jobs from.
- config (Config) – Reference to the configuration object from which the settings for the worker are retrieved.
- name (string) – Unique name for the worker. The hostname template variables from Celery can be used. If not given, a unique name is created.
- celery_args (list) – List of additional Celery worker command line arguments. Please note that this depends on the version of Celery used and might change. Use with caution.
- check_datastore (bool) – Set to True to check whether the data store is available prior to starting the worker.
Return Classes¶
-
class
lightflow.queue.models.
WorkerStats
(name, broker, pid, process_pids, concurrency, job_count, queues)[source]¶ Represents the worker information returned from celery.
Parameters: - name (str) – The name of the worker.
- broker (BrokerStats) – A reference to a BrokerStats Object the worker is using.
- pid (int) – The PID of the worker.
- process_pids (int) – The PIDs of the concurrent task processes.
- concurrency (int) – The number of concurrent processes.
- job_count (int) – The number of jobs this worker has processed so far.
- queues (list) – A list of QueueStats objects that represent the queues this worker is listening on.
-
classmethod
from_celery
(name, worker_dict, queues)[source]¶ Create a WorkerStats object from the dictionary returned by celery.
Parameters: - name (str) – The name of the worker.
- worker_dict (dict) – The dictionary as returned by celery.
- queues (list) – A list of QueueStats objects that represent the queues this worker is listening on.
Returns: A fully initialized WorkerStats object.
Return type:
-
class
lightflow.queue.models.
QueueStats
(name, routing_key)[source]¶ Represents the queue information returned from celery.
Parameters: - name (str) – The name of the queue.
- routing_key (str) – The routing key of the queue.
-
classmethod
from_celery
(queue_dict)[source]¶ Create a QueueStats object from the dictionary returned by celery.
Parameters: queue_dict (dict) – The dictionary as returned by celery. Returns: A fully initialized QueueStats object. Return type: QueueStats
Workflows¶
The workflows
module provides the API functions for starting, stopping and monitoring workflows.
Methods¶
-
lightflow.workflows.
events
(config)[source]¶ Return a generator that yields workflow events.
For every workflow event that is sent from celery this generator yields an event object.
Parameters: config (Config) – Reference to the configuration object from which the settings are retrieved. Returns: A generator that returns workflow events. Return type: generator
-
lightflow.workflows.
list_jobs
(config, *, status=0, filter_by_type=None, filter_by_worker=None)[source]¶ Return a list of Celery jobs.
Parameters: - config (Config) – Reference to the configuration object from which the settings are retrieved.
- status (JobStatus) – The status of the jobs that should be returned.
- filter_by_type (list) – Restrict the returned jobs to the types in this list.
- filter_by_worker (list) – Only return jobs that were registered, reserved or are running on the workers given in this list of worker names. Using this option will increase the performance.
Returns: A list of JobStats.
Return type: list
-
lightflow.workflows.
list_workflows
(config)[source]¶ List all available workflows.
Returns a list of all workflows that are available from the paths specified in the config. A workflow is defined as a Python file with at least one DAG.
Parameters: config (Config) – Reference to the configuration object from which the settings are retrieved. Returns: A list of workflows. Return type: list
-
lightflow.workflows.
start_workflow
(name, config, *, queue='workflow', clear_data_store=True, store_args=None)[source]¶ Start a single workflow by sending it to the workflow queue.
Parameters: - name (str) – The name of the workflow that should be started. Refers to the name of the workflow file without the .py extension.
- config (Config) – Reference to the configuration object from which the settings for the workflow are retrieved.
- queue (str) – Name of the queue the workflow should be scheduled to.
- clear_data_store (bool) – Remove any documents created during the workflow run in the data store after the run.
- store_args (dict) – Dictionary of additional arguments that are ingested into the data store prior to the execution of the workflow.
Returns: The ID of the workflow job.
Return type: str
Raises: WorkflowArgumentError
– If the workflow requires arguments to be set in store_args that were not supplied to the workflow.WorkflowImportError
– If the import of the workflow fails.
-
lightflow.workflows.
stop_workflow
(config, *, names=None)[source]¶ Stop one or more workflows.
Parameters: - config (Config) – Reference to the configuration object from which the settings for the workflow are retrieved.
- names (list) – List of workflow names, workflow ids or workflow job ids for the workflows that should be stopped. If all workflows should be stopped, set it to None.
Returns: - A tuple of the workflow jobs that were successfully stopped and the ones
that could not be stopped.
Return type: tuple
Return Classes¶
-
class
lightflow.queue.models.
JobStats
(name, job_id, job_type, queue, workflow_id, start_time, arguments, acknowledged, func_name, hostname, worker_name, worker_pid, routing_key)[source]¶ Represents the job (=celery task) information returned from celery.
Parameters: - name (str) – The name of the job.
- job_id (str) – The internal ID of the job.
- job_type (str) – The type of the job (workflow, dag, task).
- queue (str) – The name of the queue the job was scheduled to.
- workflow_id (str) – The id of the workflow that started this job.
- start_time (datetime) – The time the job was started in UTC.
- arguments (dict) – The provided arguments to a workflow.
- acknowledged (bool) – True of the job was acknowledged by the message system.
- func_name (str) – The name of the function that represents this job.
- hostname (str) – The name of the host this job runs on.
- worker_name (str) – The name of the worker this job runs on.
- worker_pid (int) – The pid of the process this jobs runs on.
- routing_key (str) – The routing key for this job.
-
classmethod
from_celery
(worker_name, job_dict, celery_app)[source]¶ Create a JobStats object from the dictionary returned by celery.
Parameters: - worker_name (str) – The name of the worker this jobs runs on.
- job_dict (dict) – The dictionary as returned by celery.
- celery_app – Reference to a celery application object.
Returns: A fully initialized JobStats object.
Return type:
Config¶
The configuration of Lightflow is passed to the API functions via an instance of the Config
class. The configuration is described as
a YAML structure and can be loaded from a file. The Config
class contains a default configuration, which means that you only need
to specify the settings in the config file that you would like to change.
-
class
lightflow.
Config
[source]¶ Hosts the global configuration.
The configuration is read from a structured YAML file or a dictionary. The location of the file can either be specified directly, is given in the environment variable LIGHTFLOW_CONFIG_ENV, is looked for in the current execution directory or in the home directory of the user.
-
celery
¶ Return the celery settings
-
cli
¶ Return the cli settings
-
dag_polling_time
¶ Return the waiting time between status checks of the running tasks (sec)
-
data_store
¶ Return the data store settings
-
extensions
¶ Return the custom settings of extensions
-
classmethod
from_file
(filename, *, strict=True)[source]¶ Create a new Config object from a configuration file.
Parameters: - filename (str) – The location and name of the configuration file.
- strict (bool) – If true raises a ConfigLoadError when the configuration cannot be found.
Returns: An instance of the Config class.
Raises: ConfigLoadError
– If the configuration cannot be found.
-
load_from_dict
(conf_dict=None)[source]¶ Load the configuration from a dictionary.
Parameters: conf_dict (dict) – Dictionary with the configuration.
-
load_from_file
(filename=None, *, strict=True)[source]¶ Load the configuration from a file.
The location of the configuration file can either be specified directly in the parameter filename or is searched for in the following order:
- In the environment variable given by LIGHTFLOW_CONFIG_ENV
- In the current execution directory
- In the user’s home directory
Parameters: - filename (str) – The location and name of the configuration file.
- strict (bool) – If true raises a ConfigLoadError when the configuration cannot be found.
Raises: ConfigLoadError
– If the configuration cannot be found.
-
logging
¶ Return the logging settings
-
signal
¶ Return the signal system settings
-
workflow_polling_time
¶ Return the waiting time between status checks of the running dags (sec)
-
workflows
¶ Return the workflow folders
-
Task Data¶
-
class
lightflow.models.
MultiTaskData
(*, dataset=None, aliases=None)[source]¶ Manages multiple TaskData datasets and their aliases.
This class implements the data object that is being passed between tasks. It consists of one or more TaskData datasets in order to accommodate multiple inputs to a single task. Each dataset can be accessed by its index or by one or more aliases. There is a default dataset, which is used whenever the user does not specify the exact dataset to work with.
Parameters: - dataset (TaskData) – An initial TaskData dataset.
- aliases (list) – A list of aliases for the initial dataset.
-
add_alias
(alias, index)[source]¶ Add an alias pointing to the specified index.
Parameters: - alias (str) – The alias that should point to the given index.
- index (int) – The index of the dataset for which an alias should be added.
Raises: DataInvalidIndex
– If the index does not represent a valid dataset.
-
add_dataset
(task_name, dataset=None, *, aliases=None)[source]¶ Add a new dataset to the MultiTaskData.
Parameters: - task_name (str) – The name of the task from which the dataset was received.
- dataset (TaskData) – The dataset that should be added.
- aliases (list) – A list of aliases that should be registered with the dataset.
-
add_task_history
(task_name)[source]¶ Add a task name to the list of tasks that have contributed to all datasets.
Parameters: task_name (str) – The name of the task that contributed.
-
default_dataset
¶ Return the default dataset.
Returns: A reference to the default dataset. Return type: TaskData
-
default_index
¶ Return the index of the default dataset.
-
flatten
(in_place=True)[source]¶ Merge all datasets into a single dataset.
The default dataset is the last dataset to be merged, as it is considered to be the primary source of information and should overwrite all existing fields with the same key.
Parameters: in_place (bool) – Set to True
to replace the existing datasets with the merged one. If set toFalse
, will return a new MultiTaskData object containing the merged dataset.Returns: If the in_place flag is set to False. Return type: MultiTaskData
-
get_by_alias
(alias)[source]¶ Return a dataset by its alias.
Parameters: alias (str) – The alias of the dataset that should be returned. Raises: DataInvalidAlias
– If the alias does not represent a valid dataset.
-
get_by_index
(index)[source]¶ Return a dataset by its index.
Parameters: index (int) – The index of the dataset that should be returned. Raises: DataInvalidIndex
– If the index does not represent a valid dataset.
-
set_default_by_alias
(alias)[source]¶ Set the default dataset by its alias.
After changing the default dataset, all calls without explicitly specifying the dataset by index or alias will be redirected to this dataset.
Parameters: alias (str) – The alias of the dataset that should be made the default. Raises: DataInvalidAlias
– If the alias does not represent a valid dataset.
-
set_default_by_index
(index)[source]¶ Set the default dataset by its index.
After changing the default dataset, all calls without explicitly specifying the dataset by index or alias will be redirected to this dataset.
Parameters: index (int) – The index of the dataset that should be made the default. Raises: DataInvalidIndex
– If the index does not represent a valid dataset.
Persistent Data Store¶
-
class
lightflow.models.
DataStoreDocument
(collection, grid_fs, workflow_id)[source]¶ A single data store document containing the data for a workflow run.
The document provides methods in order to retrieve and set data in the persistent data store. It represents the data for a single workflow run.
Parameters: - collection – A MongoDB collection object pointing to the data store collection.
- grid_fs – A GridFS object used for splitting large, binary data into smaller chunks in order to avoid the 16MB document limit of MongoDB.
- workflow_id – The id of the workflow run this document is associated with.
-
extend
(key, values, *, section='data')[source]¶ Extends a list in the data store with the elements of values.
Parameters: - key (str) – The key pointing to the value that should be stored/updated. It supports MongoDB’s dot notation for nested fields.
- values (list) – A list of the values that should be used to extend the list in the document.
- section (DataStoreDocumentSection) – The section from which the data should be retrieved.
Returns: True
if the list in the database could be extended,otherwise
False
.
Return type: bool
-
get
(key, default=None, *, section='data')[source]¶ Return the field specified by its key from the specified section.
This method access the specified section of the workflow document and returns the value for the given key.
Parameters: - key (str) – The key pointing to the value that should be retrieved. It supports MongoDB’s dot notation for nested fields.
- default – The default value that is returned if the key does not exist.
- section (DataStoreDocumentSection) – The section from which the data should be retrieved.
Returns: - The value from the field that the specified key is pointing to. If the
key does not exist, the default value is returned. If no default value is provided and the key does not exist
None
is returned.
Return type: object
-
push
(key, value, *, section='data')[source]¶ Appends a value to a list in the specified section of the document.
Parameters: - key (str) – The key pointing to the value that should be stored/updated. It supports MongoDB’s dot notation for nested fields.
- value – The value that should be appended to a list in the data store.
- section (DataStoreDocumentSection) – The section from which the data should be retrieved.
Returns: True
if the value could be appended, otherwiseFalse
.Return type: bool
-
set
(key, value, *, section='data')[source]¶ Store a value under the specified key in the given section of the document.
This method stores a value into the specified section of the workflow data store document. Any existing value is overridden. Before storing a value, any linked GridFS document under the specified key is deleted.
Parameters: - key (str) – The key pointing to the value that should be stored/updated. It supports MongoDB’s dot notation for nested fields.
- value – The value that should be stored/updated.
- section (DataStoreDocumentSection) – The section from which the data should be retrieved.
Returns: True
if the value could be set/updated, otherwiseFalse
.Return type: bool