Workers

The workers module provides the API functions for starting, stopping and managing workers.

Methods

lightflow.workers.list_workers(config, *, filter_by_queues=None)[source]

Return a list of all available workers.

Parameters:
  • config (Config) – Reference to the configuration object from which the settings are retrieved.
  • filter_by_queues (list) – Restrict the returned workers to workers that listen to at least one of the queue names in this list.
Returns:

A list of WorkerStats objects.

Return type:

list

lightflow.workers.start_worker(queues, config, *, name=None, celery_args=None, check_datastore=True)[source]

Start a worker process.

Parameters:
  • queues (list) – List of queue names this worker accepts jobs from.
  • config (Config) – Reference to the configuration object from which the settings for the worker are retrieved.
  • name (string) – Unique name for the worker. The hostname template variables from Celery can be used. If not given, a unique name is created.
  • celery_args (list) – List of additional Celery worker command line arguments. Please note that this depends on the version of Celery used and might change. Use with caution.
  • check_datastore (bool) – Set to True to check whether the data store is available prior to starting the worker.
lightflow.workers.stop_worker(config, *, worker_ids=None)[source]

Stop a worker process.

Parameters:
  • config (Config) – Reference to the configuration object from which the settings for the worker are retrieved.
  • worker_ids (list) – An optional list of ids for the worker that should be stopped.

Return Classes

class lightflow.queue.models.WorkerStats(name, broker, pid, process_pids, concurrency, job_count, queues)[source]

Represents the worker information returned from celery.

Parameters:
  • name (str) – The name of the worker.
  • broker (BrokerStats) – A reference to a BrokerStats Object the worker is using.
  • pid (int) – The PID of the worker.
  • process_pids (int) – The PIDs of the concurrent task processes.
  • concurrency (int) – The number of concurrent processes.
  • job_count (int) – The number of jobs this worker has processed so far.
  • queues (list) – A list of QueueStats objects that represent the queues this worker is listening on.
classmethod from_celery(name, worker_dict, queues)[source]

Create a WorkerStats object from the dictionary returned by celery.

Parameters:
  • name (str) – The name of the worker.
  • worker_dict (dict) – The dictionary as returned by celery.
  • queues (list) – A list of QueueStats objects that represent the queues this worker is listening on.
Returns:

A fully initialized WorkerStats object.

Return type:

WorkerStats

to_dict()[source]

Return a dictionary of the worker stats.

Returns:Dictionary of the stats.
Return type:dict
class lightflow.queue.models.QueueStats(name, routing_key)[source]

Represents the queue information returned from celery.

Parameters:
  • name (str) – The name of the queue.
  • routing_key (str) – The routing key of the queue.
classmethod from_celery(queue_dict)[source]

Create a QueueStats object from the dictionary returned by celery.

Parameters:queue_dict (dict) – The dictionary as returned by celery.
Returns:A fully initialized QueueStats object.
Return type:QueueStats
to_dict()[source]

Return a dictionary of the queue stats.

Returns:Dictionary of the stats.
Return type:dict

Workflows

The workflows module provides the API functions for starting, stopping and monitoring workflows.

Methods

lightflow.workflows.events(config)[source]

Return a generator that yields workflow events.

For every workflow event that is sent from celery this generator yields an event object.

Parameters:config (Config) – Reference to the configuration object from which the settings are retrieved.
Returns:A generator that returns workflow events.
Return type:generator
lightflow.workflows.list_jobs(config, *, status=0, filter_by_type=None, filter_by_worker=None)[source]

Return a list of Celery jobs.

Parameters:
  • config (Config) – Reference to the configuration object from which the settings are retrieved.
  • status (JobStatus) – The status of the jobs that should be returned.
  • filter_by_type (list) – Restrict the returned jobs to the types in this list.
  • filter_by_worker (list) – Only return jobs that were registered, reserved or are running on the workers given in this list of worker names. Using this option will increase the performance.
Returns:

A list of JobStats.

Return type:

list

lightflow.workflows.list_workflows(config)[source]

List all available workflows.

Returns a list of all workflows that are available from the paths specified in the config. A workflow is defined as a Python file with at least one DAG.

Parameters:config (Config) – Reference to the configuration object from which the settings are retrieved.
Returns:A list of workflows.
Return type:list
lightflow.workflows.start_workflow(name, config, *, queue='workflow', clear_data_store=True, store_args=None)[source]

Start a single workflow by sending it to the workflow queue.

Parameters:
  • name (str) – The name of the workflow that should be started. Refers to the name of the workflow file without the .py extension.
  • config (Config) – Reference to the configuration object from which the settings for the workflow are retrieved.
  • queue (str) – Name of the queue the workflow should be scheduled to.
  • clear_data_store (bool) – Remove any documents created during the workflow run in the data store after the run.
  • store_args (dict) – Dictionary of additional arguments that are ingested into the data store prior to the execution of the workflow.
Returns:

The ID of the workflow job.

Return type:

str

Raises:
  • WorkflowArgumentError – If the workflow requires arguments to be set in store_args that were not supplied to the workflow.
  • WorkflowImportError – If the import of the workflow fails.
lightflow.workflows.stop_workflow(config, *, names=None)[source]

Stop one or more workflows.

Parameters:
  • config (Config) – Reference to the configuration object from which the settings for the workflow are retrieved.
  • names (list) – List of workflow names, workflow ids or workflow job ids for the workflows that should be stopped. If all workflows should be stopped, set it to None.
Returns:

A tuple of the workflow jobs that were successfully stopped and the ones

that could not be stopped.

Return type:

tuple

Return Classes

class lightflow.queue.models.JobStats(name, job_id, job_type, queue, workflow_id, start_time, arguments, acknowledged, func_name, hostname, worker_name, worker_pid, routing_key)[source]

Represents the job (=celery task) information returned from celery.

Parameters:
  • name (str) – The name of the job.
  • job_id (str) – The internal ID of the job.
  • job_type (str) – The type of the job (workflow, dag, task).
  • queue (str) – The name of the queue the job was scheduled to.
  • workflow_id (str) – The id of the workflow that started this job.
  • start_time (datetime) – The time the job was started in UTC.
  • arguments (dict) – The provided arguments to a workflow.
  • acknowledged (bool) – True of the job was acknowledged by the message system.
  • func_name (str) – The name of the function that represents this job.
  • hostname (str) – The name of the host this job runs on.
  • worker_name (str) – The name of the worker this job runs on.
  • worker_pid (int) – The pid of the process this jobs runs on.
  • routing_key (str) – The routing key for this job.
classmethod from_celery(worker_name, job_dict, celery_app)[source]

Create a JobStats object from the dictionary returned by celery.

Parameters:
  • worker_name (str) – The name of the worker this jobs runs on.
  • job_dict (dict) – The dictionary as returned by celery.
  • celery_app – Reference to a celery application object.
Returns:

A fully initialized JobStats object.

Return type:

JobStats

to_dict()[source]

Return a dictionary of the job stats.

Returns:Dictionary of the stats.
Return type:dict

Config

The configuration of Lightflow is passed to the API functions via an instance of the Config class. The configuration is described as a YAML structure and can be loaded from a file. The Config class contains a default configuration, which means that you only need to specify the settings in the config file that you would like to change.

class lightflow.Config[source]

Hosts the global configuration.

The configuration is read from a structured YAML file or a dictionary. The location of the file can either be specified directly, is given in the environment variable LIGHTFLOW_CONFIG_ENV, is looked for in the current execution directory or in the home directory of the user.

celery

Return the celery settings

cli

Return the cli settings

dag_polling_time

Return the waiting time between status checks of the running tasks (sec)

data_store

Return the data store settings

static default()[source]

Returns the default configuration.

extensions

Return the custom settings of extensions

classmethod from_file(filename, *, strict=True)[source]

Create a new Config object from a configuration file.

Parameters:
  • filename (str) – The location and name of the configuration file.
  • strict (bool) – If true raises a ConfigLoadError when the configuration cannot be found.
Returns:

An instance of the Config class.

Raises:

ConfigLoadError – If the configuration cannot be found.

load_from_dict(conf_dict=None)[source]

Load the configuration from a dictionary.

Parameters:conf_dict (dict) – Dictionary with the configuration.
load_from_file(filename=None, *, strict=True)[source]

Load the configuration from a file.

The location of the configuration file can either be specified directly in the parameter filename or is searched for in the following order:

  1. In the environment variable given by LIGHTFLOW_CONFIG_ENV
  2. In the current execution directory
  3. In the user’s home directory
Parameters:
  • filename (str) – The location and name of the configuration file.
  • strict (bool) – If true raises a ConfigLoadError when the configuration cannot be found.
Raises:

ConfigLoadError – If the configuration cannot be found.

logging

Return the logging settings

set_to_default()[source]

Overwrite the configuration with the default configuration.

signal

Return the signal system settings

to_dict()[source]

Returns a copy of the internal configuration as a dictionary.

workflow_polling_time

Return the waiting time between status checks of the running dags (sec)

workflows

Return the workflow folders

Task Data

class lightflow.models.MultiTaskData(*, dataset=None, aliases=None)[source]

Manages multiple TaskData datasets and their aliases.

This class implements the data object that is being passed between tasks. It consists of one or more TaskData datasets in order to accommodate multiple inputs to a single task. Each dataset can be accessed by its index or by one or more aliases. There is a default dataset, which is used whenever the user does not specify the exact dataset to work with.

Parameters:
  • dataset (TaskData) – An initial TaskData dataset.
  • aliases (list) – A list of aliases for the initial dataset.
add_alias(alias, index)[source]

Add an alias pointing to the specified index.

Parameters:
  • alias (str) – The alias that should point to the given index.
  • index (int) – The index of the dataset for which an alias should be added.
Raises:

DataInvalidIndex – If the index does not represent a valid dataset.

add_dataset(task_name, dataset=None, *, aliases=None)[source]

Add a new dataset to the MultiTaskData.

Parameters:
  • task_name (str) – The name of the task from which the dataset was received.
  • dataset (TaskData) – The dataset that should be added.
  • aliases (list) – A list of aliases that should be registered with the dataset.
add_task_history(task_name)[source]

Add a task name to the list of tasks that have contributed to all datasets.

Parameters:task_name (str) – The name of the task that contributed.
default_dataset

Return the default dataset.

Returns:A reference to the default dataset.
Return type:TaskData
default_index

Return the index of the default dataset.

flatten(in_place=True)[source]

Merge all datasets into a single dataset.

The default dataset is the last dataset to be merged, as it is considered to be the primary source of information and should overwrite all existing fields with the same key.

Parameters:in_place (bool) – Set to True to replace the existing datasets with the merged one. If set to False, will return a new MultiTaskData object containing the merged dataset.
Returns:If the in_place flag is set to False.
Return type:MultiTaskData
get_by_alias(alias)[source]

Return a dataset by its alias.

Parameters:alias (str) – The alias of the dataset that should be returned.
Raises:DataInvalidAlias – If the alias does not represent a valid dataset.
get_by_index(index)[source]

Return a dataset by its index.

Parameters:index (int) – The index of the dataset that should be returned.
Raises:DataInvalidIndex – If the index does not represent a valid dataset.
set_default_by_alias(alias)[source]

Set the default dataset by its alias.

After changing the default dataset, all calls without explicitly specifying the dataset by index or alias will be redirected to this dataset.

Parameters:alias (str) – The alias of the dataset that should be made the default.
Raises:DataInvalidAlias – If the alias does not represent a valid dataset.
set_default_by_index(index)[source]

Set the default dataset by its index.

After changing the default dataset, all calls without explicitly specifying the dataset by index or alias will be redirected to this dataset.

Parameters:index (int) – The index of the dataset that should be made the default.
Raises:DataInvalidIndex – If the index does not represent a valid dataset.

Persistent Data Store

class lightflow.models.DataStoreDocument(collection, grid_fs, workflow_id)[source]

A single data store document containing the data for a workflow run.

The document provides methods in order to retrieve and set data in the persistent data store. It represents the data for a single workflow run.

Parameters:
  • collection – A MongoDB collection object pointing to the data store collection.
  • grid_fs – A GridFS object used for splitting large, binary data into smaller chunks in order to avoid the 16MB document limit of MongoDB.
  • workflow_id – The id of the workflow run this document is associated with.
extend(key, values, *, section='data')[source]

Extends a list in the data store with the elements of values.

Parameters:
  • key (str) – The key pointing to the value that should be stored/updated. It supports MongoDB’s dot notation for nested fields.
  • values (list) – A list of the values that should be used to extend the list in the document.
  • section (DataStoreDocumentSection) – The section from which the data should be retrieved.
Returns:

True if the list in the database could be extended,

otherwise False.

Return type:

bool

get(key, default=None, *, section='data')[source]

Return the field specified by its key from the specified section.

This method access the specified section of the workflow document and returns the value for the given key.

Parameters:
  • key (str) – The key pointing to the value that should be retrieved. It supports MongoDB’s dot notation for nested fields.
  • default – The default value that is returned if the key does not exist.
  • section (DataStoreDocumentSection) – The section from which the data should be retrieved.
Returns:

The value from the field that the specified key is pointing to. If the

key does not exist, the default value is returned. If no default value is provided and the key does not exist None is returned.

Return type:

object

push(key, value, *, section='data')[source]

Appends a value to a list in the specified section of the document.

Parameters:
  • key (str) – The key pointing to the value that should be stored/updated. It supports MongoDB’s dot notation for nested fields.
  • value – The value that should be appended to a list in the data store.
  • section (DataStoreDocumentSection) – The section from which the data should be retrieved.
Returns:

True if the value could be appended, otherwise False.

Return type:

bool

set(key, value, *, section='data')[source]

Store a value under the specified key in the given section of the document.

This method stores a value into the specified section of the workflow data store document. Any existing value is overridden. Before storing a value, any linked GridFS document under the specified key is deleted.

Parameters:
  • key (str) – The key pointing to the value that should be stored/updated. It supports MongoDB’s dot notation for nested fields.
  • value – The value that should be stored/updated.
  • section (DataStoreDocumentSection) – The section from which the data should be retrieved.
Returns:

True if the value could be set/updated, otherwise False.

Return type:

bool