Source code for lightflow.workers
from uuid import uuid4
from .models.datastore import DataStore
from .queue.app import create_app
from .queue.worker import WorkerLifecycle
from .queue.models import WorkerStats, QueueStats
[docs]def start_worker(queues, config, *, name=None, celery_args=None, check_datastore=True):
""" Start a worker process.
Args:
queues (list): List of queue names this worker accepts jobs from.
config (Config): Reference to the configuration object from which the
settings for the worker are retrieved.
name (string): Unique name for the worker. The hostname template variables from
Celery can be used. If not given, a unique name is created.
celery_args (list): List of additional Celery worker command line arguments.
Please note that this depends on the version of Celery used and might change.
Use with caution.
check_datastore (bool): Set to True to check whether the data store is available
prior to starting the worker.
"""
celery_app = create_app(config)
if check_datastore:
with DataStore(**config.data_store,
auto_connect=True, handle_reconnect=False) as ds:
celery_app.user_options['datastore_info'] = ds.server_info
argv = [
'worker',
'-n={}'.format(uuid4() if name is None else name),
'--queues={}'.format(','.join(queues))
]
argv.extend(celery_args or [])
celery_app.steps['consumer'].add(WorkerLifecycle)
celery_app.user_options['config'] = config
celery_app.worker_main(argv)
[docs]def stop_worker(config, *, worker_ids=None):
""" Stop a worker process.
Args:
config (Config): Reference to the configuration object from which the
settings for the worker are retrieved.
worker_ids (list): An optional list of ids for the worker that should be stopped.
"""
if worker_ids is not None and not isinstance(worker_ids, list):
worker_ids = [worker_ids]
celery_app = create_app(config)
celery_app.control.shutdown(destination=worker_ids)
[docs]def list_workers(config, *, filter_by_queues=None):
""" Return a list of all available workers.
Args:
config (Config): Reference to the configuration object from which the
settings are retrieved.
filter_by_queues (list): Restrict the returned workers to workers that listen to
at least one of the queue names in this list.
Returns:
list: A list of WorkerStats objects.
"""
celery_app = create_app(config)
worker_stats = celery_app.control.inspect().stats()
queue_stats = celery_app.control.inspect().active_queues()
if worker_stats is None:
return []
workers = []
for name, w_stat in worker_stats.items():
queues = [QueueStats.from_celery(q_stat) for q_stat in queue_stats[name]]
add_worker = filter_by_queues is None
if not add_worker:
for queue in queues:
if queue.name in filter_by_queues:
add_worker = True
break
if add_worker:
workers.append(WorkerStats.from_celery(name, w_stat, queues))
return workers