Workers¶
The workers
module provides the API functions for starting, stopping and managing workers.
Methods¶
-
lightflow.workers.
list_workers
(config, *, filter_by_queues=None)[source]¶ Return a list of all available workers.
Parameters: - config (Config) – Reference to the configuration object from which the settings are retrieved.
- filter_by_queues (list) – Restrict the returned workers to workers that listen to at least one of the queue names in this list.
Returns: A list of WorkerStats objects.
Return type: list
-
lightflow.workers.
start_worker
(queues, config, *, name=None, celery_args=None, check_datastore=True)[source]¶ Start a worker process.
Parameters: - queues (list) – List of queue names this worker accepts jobs from.
- config (Config) – Reference to the configuration object from which the settings for the worker are retrieved.
- name (string) – Unique name for the worker. The hostname template variables from Celery can be used. If not given, a unique name is created.
- celery_args (list) – List of additional Celery worker command line arguments. Please note that this depends on the version of Celery used and might change. Use with caution.
- check_datastore (bool) – Set to True to check whether the data store is available prior to starting the worker.
Return Classes¶
-
class
lightflow.queue.models.
WorkerStats
(name, broker, pid, process_pids, concurrency, job_count, queues)[source]¶ Represents the worker information returned from celery.
Parameters: - name (str) – The name of the worker.
- broker (BrokerStats) – A reference to a BrokerStats Object the worker is using.
- pid (int) – The PID of the worker.
- process_pids (int) – The PIDs of the concurrent task processes.
- concurrency (int) – The number of concurrent processes.
- job_count (int) – The number of jobs this worker has processed so far.
- queues (list) – A list of QueueStats objects that represent the queues this worker is listening on.
-
classmethod
from_celery
(name, worker_dict, queues)[source]¶ Create a WorkerStats object from the dictionary returned by celery.
Parameters: - name (str) – The name of the worker.
- worker_dict (dict) – The dictionary as returned by celery.
- queues (list) – A list of QueueStats objects that represent the queues this worker is listening on.
Returns: A fully initialized WorkerStats object.
Return type:
-
class
lightflow.queue.models.
QueueStats
(name, routing_key)[source]¶ Represents the queue information returned from celery.
Parameters: - name (str) – The name of the queue.
- routing_key (str) – The routing key of the queue.
-
classmethod
from_celery
(queue_dict)[source]¶ Create a QueueStats object from the dictionary returned by celery.
Parameters: queue_dict (dict) – The dictionary as returned by celery. Returns: A fully initialized QueueStats object. Return type: QueueStats
Workflows¶
The workflows
module provides the API functions for starting, stopping and monitoring workflows.
Methods¶
-
lightflow.workflows.
events
(config)[source]¶ Return a generator that yields workflow events.
For every workflow event that is sent from celery this generator yields an event object.
Parameters: config (Config) – Reference to the configuration object from which the settings are retrieved. Returns: A generator that returns workflow events. Return type: generator
-
lightflow.workflows.
list_jobs
(config, *, status=0, filter_by_type=None, filter_by_worker=None)[source]¶ Return a list of Celery jobs.
Parameters: - config (Config) – Reference to the configuration object from which the settings are retrieved.
- status (JobStatus) – The status of the jobs that should be returned.
- filter_by_type (list) – Restrict the returned jobs to the types in this list.
- filter_by_worker (list) – Only return jobs that were registered, reserved or are running on the workers given in this list of worker names. Using this option will increase the performance.
Returns: A list of JobStats.
Return type: list
-
lightflow.workflows.
list_workflows
(config)[source]¶ List all available workflows.
Returns a list of all workflows that are available from the paths specified in the config. A workflow is defined as a Python file with at least one DAG.
Parameters: config (Config) – Reference to the configuration object from which the settings are retrieved. Returns: A list of workflows. Return type: list
-
lightflow.workflows.
start_workflow
(name, config, *, queue='workflow', clear_data_store=True, store_args=None)[source]¶ Start a single workflow by sending it to the workflow queue.
Parameters: - name (str) – The name of the workflow that should be started. Refers to the name of the workflow file without the .py extension.
- config (Config) – Reference to the configuration object from which the settings for the workflow are retrieved.
- queue (str) – Name of the queue the workflow should be scheduled to.
- clear_data_store (bool) – Remove any documents created during the workflow run in the data store after the run.
- store_args (dict) – Dictionary of additional arguments that are ingested into the data store prior to the execution of the workflow.
Returns: The ID of the workflow job.
Return type: str
Raises: WorkflowArgumentError
– If the workflow requires arguments to be set in store_args that were not supplied to the workflow.WorkflowImportError
– If the import of the workflow fails.
-
lightflow.workflows.
stop_workflow
(config, *, names=None)[source]¶ Stop one or more workflows.
Parameters: - config (Config) – Reference to the configuration object from which the settings for the workflow are retrieved.
- names (list) – List of workflow names, workflow ids or workflow job ids for the workflows that should be stopped. If all workflows should be stopped, set it to None.
Returns: - A tuple of the workflow jobs that were successfully stopped and the ones
that could not be stopped.
Return type: tuple
Return Classes¶
-
class
lightflow.queue.models.
JobStats
(name, job_id, job_type, queue, workflow_id, start_time, arguments, acknowledged, func_name, hostname, worker_name, worker_pid, routing_key)[source]¶ Represents the job (=celery task) information returned from celery.
Parameters: - name (str) – The name of the job.
- job_id (str) – The internal ID of the job.
- job_type (str) – The type of the job (workflow, dag, task).
- queue (str) – The name of the queue the job was scheduled to.
- workflow_id (str) – The id of the workflow that started this job.
- start_time (datetime) – The time the job was started in UTC.
- arguments (dict) – The provided arguments to a workflow.
- acknowledged (bool) – True of the job was acknowledged by the message system.
- func_name (str) – The name of the function that represents this job.
- hostname (str) – The name of the host this job runs on.
- worker_name (str) – The name of the worker this job runs on.
- worker_pid (int) – The pid of the process this jobs runs on.
- routing_key (str) – The routing key for this job.
-
classmethod
from_celery
(worker_name, job_dict, celery_app)[source]¶ Create a JobStats object from the dictionary returned by celery.
Parameters: - worker_name (str) – The name of the worker this jobs runs on.
- job_dict (dict) – The dictionary as returned by celery.
- celery_app – Reference to a celery application object.
Returns: A fully initialized JobStats object.
Return type:
Config¶
The configuration of Lightflow is passed to the API functions via an instance of the Config
class. The configuration is described as
a YAML structure and can be loaded from a file. The Config
class contains a default configuration, which means that you only need
to specify the settings in the config file that you would like to change.
-
class
lightflow.
Config
[source]¶ Hosts the global configuration.
The configuration is read from a structured YAML file or a dictionary. The location of the file can either be specified directly, is given in the environment variable LIGHTFLOW_CONFIG_ENV, is looked for in the current execution directory or in the home directory of the user.
-
celery
¶ Return the celery settings
-
cli
¶ Return the cli settings
-
dag_polling_time
¶ Return the waiting time between status checks of the running tasks (sec)
-
data_store
¶ Return the data store settings
-
extensions
¶ Return the custom settings of extensions
-
classmethod
from_file
(filename, *, strict=True)[source]¶ Create a new Config object from a configuration file.
Parameters: - filename (str) – The location and name of the configuration file.
- strict (bool) – If true raises a ConfigLoadError when the configuration cannot be found.
Returns: An instance of the Config class.
Raises: ConfigLoadError
– If the configuration cannot be found.
-
load_from_dict
(conf_dict=None)[source]¶ Load the configuration from a dictionary.
Parameters: conf_dict (dict) – Dictionary with the configuration.
-
load_from_file
(filename=None, *, strict=True)[source]¶ Load the configuration from a file.
The location of the configuration file can either be specified directly in the parameter filename or is searched for in the following order:
- In the environment variable given by LIGHTFLOW_CONFIG_ENV
- In the current execution directory
- In the user’s home directory
Parameters: - filename (str) – The location and name of the configuration file.
- strict (bool) – If true raises a ConfigLoadError when the configuration cannot be found.
Raises: ConfigLoadError
– If the configuration cannot be found.
-
logging
¶ Return the logging settings
-
signal
¶ Return the signal system settings
-
workflow_polling_time
¶ Return the waiting time between status checks of the running dags (sec)
-
workflows
¶ Return the workflow folders
-
Task Data¶
-
class
lightflow.models.
MultiTaskData
(*, dataset=None, aliases=None)[source]¶ Manages multiple TaskData datasets and their aliases.
This class implements the data object that is being passed between tasks. It consists of one or more TaskData datasets in order to accommodate multiple inputs to a single task. Each dataset can be accessed by its index or by one or more aliases. There is a default dataset, which is used whenever the user does not specify the exact dataset to work with.
Parameters: - dataset (TaskData) – An initial TaskData dataset.
- aliases (list) – A list of aliases for the initial dataset.
-
add_alias
(alias, index)[source]¶ Add an alias pointing to the specified index.
Parameters: - alias (str) – The alias that should point to the given index.
- index (int) – The index of the dataset for which an alias should be added.
Raises: DataInvalidIndex
– If the index does not represent a valid dataset.
-
add_dataset
(task_name, dataset=None, *, aliases=None)[source]¶ Add a new dataset to the MultiTaskData.
Parameters: - task_name (str) – The name of the task from which the dataset was received.
- dataset (TaskData) – The dataset that should be added.
- aliases (list) – A list of aliases that should be registered with the dataset.
-
add_task_history
(task_name)[source]¶ Add a task name to the list of tasks that have contributed to all datasets.
Parameters: task_name (str) – The name of the task that contributed.
-
default_dataset
¶ Return the default dataset.
Returns: A reference to the default dataset. Return type: TaskData
-
default_index
¶ Return the index of the default dataset.
-
flatten
(in_place=True)[source]¶ Merge all datasets into a single dataset.
The default dataset is the last dataset to be merged, as it is considered to be the primary source of information and should overwrite all existing fields with the same key.
Parameters: in_place (bool) – Set to True
to replace the existing datasets with the merged one. If set toFalse
, will return a new MultiTaskData object containing the merged dataset.Returns: If the in_place flag is set to False. Return type: MultiTaskData
-
get_by_alias
(alias)[source]¶ Return a dataset by its alias.
Parameters: alias (str) – The alias of the dataset that should be returned. Raises: DataInvalidAlias
– If the alias does not represent a valid dataset.
-
get_by_index
(index)[source]¶ Return a dataset by its index.
Parameters: index (int) – The index of the dataset that should be returned. Raises: DataInvalidIndex
– If the index does not represent a valid dataset.
-
set_default_by_alias
(alias)[source]¶ Set the default dataset by its alias.
After changing the default dataset, all calls without explicitly specifying the dataset by index or alias will be redirected to this dataset.
Parameters: alias (str) – The alias of the dataset that should be made the default. Raises: DataInvalidAlias
– If the alias does not represent a valid dataset.
-
set_default_by_index
(index)[source]¶ Set the default dataset by its index.
After changing the default dataset, all calls without explicitly specifying the dataset by index or alias will be redirected to this dataset.
Parameters: index (int) – The index of the dataset that should be made the default. Raises: DataInvalidIndex
– If the index does not represent a valid dataset.
Persistent Data Store¶
-
class
lightflow.models.
DataStoreDocument
(collection, grid_fs, workflow_id)[source]¶ A single data store document containing the data for a workflow run.
The document provides methods in order to retrieve and set data in the persistent data store. It represents the data for a single workflow run.
Parameters: - collection – A MongoDB collection object pointing to the data store collection.
- grid_fs – A GridFS object used for splitting large, binary data into smaller chunks in order to avoid the 16MB document limit of MongoDB.
- workflow_id – The id of the workflow run this document is associated with.
-
extend
(key, values, *, section='data')[source]¶ Extends a list in the data store with the elements of values.
Parameters: - key (str) – The key pointing to the value that should be stored/updated. It supports MongoDB’s dot notation for nested fields.
- values (list) – A list of the values that should be used to extend the list in the document.
- section (DataStoreDocumentSection) – The section from which the data should be retrieved.
Returns: True
if the list in the database could be extended,otherwise
False
.
Return type: bool
-
get
(key, default=None, *, section='data')[source]¶ Return the field specified by its key from the specified section.
This method access the specified section of the workflow document and returns the value for the given key.
Parameters: - key (str) – The key pointing to the value that should be retrieved. It supports MongoDB’s dot notation for nested fields.
- default – The default value that is returned if the key does not exist.
- section (DataStoreDocumentSection) – The section from which the data should be retrieved.
Returns: - The value from the field that the specified key is pointing to. If the
key does not exist, the default value is returned. If no default value is provided and the key does not exist
None
is returned.
Return type: object
-
push
(key, value, *, section='data')[source]¶ Appends a value to a list in the specified section of the document.
Parameters: - key (str) – The key pointing to the value that should be stored/updated. It supports MongoDB’s dot notation for nested fields.
- value – The value that should be appended to a list in the data store.
- section (DataStoreDocumentSection) – The section from which the data should be retrieved.
Returns: True
if the value could be appended, otherwiseFalse
.Return type: bool
-
set
(key, value, *, section='data')[source]¶ Store a value under the specified key in the given section of the document.
This method stores a value into the specified section of the workflow data store document. Any existing value is overridden. Before storing a value, any linked GridFS document under the specified key is deleted.
Parameters: - key (str) – The key pointing to the value that should be stored/updated. It supports MongoDB’s dot notation for nested fields.
- value – The value that should be stored/updated.
- section (DataStoreDocumentSection) – The section from which the data should be retrieved.
Returns: True
if the value could be set/updated, otherwiseFalse
.Return type: bool