Source code for lightflow.workflows

import os
import glob

from .models import Workflow
from .models.signal import Client, Request, SignalConnection
from .models.exceptions import (WorkflowImportError,
                                JobEventTypeUnsupported, JobStatInvalid,
                                DirectedAcyclicGraphInvalid, WorkflowDefinitionError)

from .queue.app import create_app
from .queue.models import JobStats
from .queue.event import event_stream, create_event_model
from .queue.const import JobExecPath, JobStatus, JobType, DefaultJobQueueName


[docs]def start_workflow(name, config, *, queue=DefaultJobQueueName.Workflow, clear_data_store=True, store_args=None): """ Start a single workflow by sending it to the workflow queue. Args: name (str): The name of the workflow that should be started. Refers to the name of the workflow file without the .py extension. config (Config): Reference to the configuration object from which the settings for the workflow are retrieved. queue (str): Name of the queue the workflow should be scheduled to. clear_data_store (bool): Remove any documents created during the workflow run in the data store after the run. store_args (dict): Dictionary of additional arguments that are ingested into the data store prior to the execution of the workflow. Returns: str: The ID of the workflow job. Raises: WorkflowArgumentError: If the workflow requires arguments to be set in store_args that were not supplied to the workflow. WorkflowImportError: If the import of the workflow fails. """ try: wf = Workflow.from_name(name, queue=queue, clear_data_store=clear_data_store, arguments=store_args) except DirectedAcyclicGraphInvalid as e: raise WorkflowDefinitionError(workflow_name=name, graph_name=e.graph_name) celery_app = create_app(config) result = celery_app.send_task(JobExecPath.Workflow, args=(wf,), queue=queue, routing_key=queue) return result.id
[docs]def stop_workflow(config, *, names=None): """ Stop one or more workflows. Args: config (Config): Reference to the configuration object from which the settings for the workflow are retrieved. names (list): List of workflow names, workflow ids or workflow job ids for the workflows that should be stopped. If all workflows should be stopped, set it to None. Returns: tuple: A tuple of the workflow jobs that were successfully stopped and the ones that could not be stopped. """ jobs = list_jobs(config, filter_by_type=JobType.Workflow) if names is not None: filtered_jobs = [] for job in jobs: if (job.id in names) or (job.name in names) or (job.workflow_id in names): filtered_jobs.append(job) else: filtered_jobs = jobs success = [] failed = [] for job in filtered_jobs: client = Client(SignalConnection(**config.signal, auto_connect=True), request_key=job.workflow_id) if client.send(Request(action='stop_workflow')).success: success.append(job) else: failed.append(job) return success, failed
[docs]def list_workflows(config): """ List all available workflows. Returns a list of all workflows that are available from the paths specified in the config. A workflow is defined as a Python file with at least one DAG. Args: config (Config): Reference to the configuration object from which the settings are retrieved. Returns: list: A list of workflows. """ workflows = [] for path in config.workflows: filenames = glob.glob(os.path.join(os.path.abspath(path), '*.py')) for filename in filenames: module_name = os.path.splitext(os.path.basename(filename))[0] workflow = Workflow() try: workflow.load(module_name, validate_arguments=False, strict_dag=True) workflows.append(workflow) except DirectedAcyclicGraphInvalid as e: raise WorkflowDefinitionError(workflow_name=module_name, graph_name=e.graph_name) except WorkflowImportError: continue return workflows
[docs]def list_jobs(config, *, status=JobStatus.Active, filter_by_type=None, filter_by_worker=None): """ Return a list of Celery jobs. Args: config (Config): Reference to the configuration object from which the settings are retrieved. status (JobStatus): The status of the jobs that should be returned. filter_by_type (list): Restrict the returned jobs to the types in this list. filter_by_worker (list): Only return jobs that were registered, reserved or are running on the workers given in this list of worker names. Using this option will increase the performance. Returns: list: A list of JobStats. """ celery_app = create_app(config) # option to filter by the worker (improves performance) if filter_by_worker is not None: inspect = celery_app.control.inspect( destination=filter_by_worker if isinstance(filter_by_worker, list) else [filter_by_worker]) else: inspect = celery_app.control.inspect() # get active, registered or reserved jobs if status == JobStatus.Active: job_map = inspect.active() elif status == JobStatus.Registered: job_map = inspect.registered() elif status == JobStatus.Reserved: job_map = inspect.reserved() elif status == JobStatus.Scheduled: job_map = inspect.scheduled() else: job_map = None if job_map is None: return [] result = [] for worker_name, jobs in job_map.items(): for job in jobs: try: job_stats = JobStats.from_celery(worker_name, job, celery_app) if (filter_by_type is None) or (job_stats.type == filter_by_type): result.append(job_stats) except JobStatInvalid: pass return result
[docs]def events(config): """ Return a generator that yields workflow events. For every workflow event that is sent from celery this generator yields an event object. Args: config (Config): Reference to the configuration object from which the settings are retrieved. Returns: generator: A generator that returns workflow events. """ celery_app = create_app(config) for event in event_stream(celery_app, filter_by_prefix='task'): try: yield create_event_model(event) except JobEventTypeUnsupported: pass