Source code for lightflow.queue.models

from celery.result import AsyncResult

from lightflow.models.exceptions import JobStatInvalid

class BrokerStats:
    """ Represents the broker information returned from celery.

        hostname (str): The broker hostname.
        port (int): The broker port.
        transport (str): The transport protocol of the broker.
        virtual_host (str): The virtual host, e.g. the database number in redis.
    def __init__(self, hostname, port, transport, virtual_host):
        self.hostname = hostname
        self.port = port
        self.transport = transport
        self.virtual_host = virtual_host

    def from_celery(cls, broker_dict):
        """ Create a BrokerStats object from the dictionary returned by celery.

            broker_dict (dict): The dictionary as returned by celery.

            BrokerStats: A fully initialized BrokerStats object.
        return BrokerStats(

    def to_dict(self):
        """ Return a dictionary of the broker stats.

            dict: Dictionary of the stats.
        return {
            'hostname': self.hostname,
            'port': self.port,
            'transport': self.transport,
            'virtual_host': self.virtual_host

[docs]class QueueStats: """ Represents the queue information returned from celery. Args: name (str): The name of the queue. routing_key (str): The routing key of the queue. """ def __init__(self, name, routing_key): = name self.routing_key = routing_key
[docs] @classmethod def from_celery(cls, queue_dict): """ Create a QueueStats object from the dictionary returned by celery. Args: queue_dict (dict): The dictionary as returned by celery. Returns: QueueStats: A fully initialized QueueStats object. """ return QueueStats( name=queue_dict['name'], routing_key=queue_dict['routing_key'] )
[docs] def to_dict(self): """ Return a dictionary of the queue stats. Returns: dict: Dictionary of the stats. """ return { 'name':, 'routing_key': self.routing_key }
[docs]class WorkerStats: """ Represents the worker information returned from celery. Args: name (str): The name of the worker. broker (BrokerStats): A reference to a BrokerStats Object the worker is using. pid (int): The PID of the worker. process_pids (int): The PIDs of the concurrent task processes. concurrency (int): The number of concurrent processes. job_count (int): The number of jobs this worker has processed so far. queues (list): A list of QueueStats objects that represent the queues this worker is listening on. """ def __init__(self, name, broker, pid, process_pids, concurrency, job_count, queues): = name = broker = pid self.process_pids = process_pids self.concurrency = concurrency self.job_count = job_count self.queues = queues
[docs] @classmethod def from_celery(cls, name, worker_dict, queues): """ Create a WorkerStats object from the dictionary returned by celery. Args: name (str): The name of the worker. worker_dict (dict): The dictionary as returned by celery. queues (list): A list of QueueStats objects that represent the queues this worker is listening on. Returns: WorkerStats: A fully initialized WorkerStats object. """ return WorkerStats( name=name, broker=BrokerStats.from_celery(worker_dict['broker']), pid=worker_dict['pid'], process_pids=worker_dict['pool']['processes'], concurrency=worker_dict['pool']['max-concurrency'], job_count=worker_dict['pool']['writes']['total'], queues=queues )
[docs] def to_dict(self): """ Return a dictionary of the worker stats. Returns: dict: Dictionary of the stats. """ return { 'name':, 'broker':, 'pid':, 'process_pids': self.process_pids, 'concurrency': self.concurrency, 'job_count': self.job_count, 'queues': [q.to_dict() for q in self.queues] }
[docs]class JobStats: """ Represents the job (=celery task) information returned from celery. Args: name (str): The name of the job. job_id (str): The internal ID of the job. job_type (str): The type of the job (workflow, dag, task). queue (str): The name of the queue the job was scheduled to. workflow_id (str): The id of the workflow that started this job. start_time (datetime): The time the job was started in UTC. arguments (dict): The provided arguments to a workflow. acknowledged (bool): True of the job was acknowledged by the message system. func_name (str): The name of the function that represents this job. hostname (str): The name of the host this job runs on. worker_name (str): The name of the worker this job runs on. worker_pid (int): The pid of the process this jobs runs on. routing_key (str): The routing key for this job. """ def __init__(self, name, job_id, job_type, queue, workflow_id, start_time, arguments, acknowledged, func_name, hostname, worker_name, worker_pid, routing_key): = name = job_id self.type = job_type self.workflow_id = workflow_id self.queue = queue self.start_time = start_time self.arguments = arguments self.acknowledged = acknowledged self.func_name = func_name self.hostname = hostname self.worker_name = worker_name self.worker_pid = worker_pid self.routing_key = routing_key
[docs] @classmethod def from_celery(cls, worker_name, job_dict, celery_app): """ Create a JobStats object from the dictionary returned by celery. Args: worker_name (str): The name of the worker this jobs runs on. job_dict (dict): The dictionary as returned by celery. celery_app: Reference to a celery application object. Returns: JobStats: A fully initialized JobStats object. """ if not isinstance(job_dict, dict) or 'id' not in job_dict: raise JobStatInvalid('The job description is missing important fields.') async_result = AsyncResult(id=job_dict['id'], app=celery_app) a_info = if isinstance(, dict) else None return JobStats( name=a_info.get('name', '') if a_info is not None else '', job_id=job_dict['id'], job_type=a_info.get('type', '') if a_info is not None else '', workflow_id=a_info.get('workflow_id', '') if a_info is not None else '', queue=a_info.get('queue', '') if a_info is not None else '', start_time=a_info.get('start_time', None) if a_info is not None else None, arguments=a_info.get('arguments', {}) if a_info is not None else {}, acknowledged=job_dict['acknowledged'], func_name=job_dict['type'], hostname=job_dict['hostname'], worker_name=worker_name, worker_pid=job_dict['worker_pid'], routing_key=job_dict['delivery_info']['routing_key'] )
[docs] def to_dict(self): """ Return a dictionary of the job stats. Returns: dict: Dictionary of the stats. """ return { 'name':, 'id':, 'type': self.type, 'workflow_id': self.workflow_id, 'queue': self.queue, 'start_time': self.start_time, 'arguments': self.arguments, 'acknowledged': self.acknowledged, 'func_name': self.func_name, 'hostname': self.hostname, 'worker_name': self.worker_name, 'worker_pid': self.worker_pid, 'routing_key': self.routing_key }
class JobEvent: """ The base class for job events from celery. Args: uuid (str): The internal event id. job_type (str): The type of job that caused this event (workflow, dag, task). event_type (str): The internal event type name. queue (str): The name of the queue the job was scheduled to. hostname (str): The name of the host on which the job is running. pid (int): The pid of the process that runs the job. name (str): The name of the workflow, dag or task that caused this event. workflow_id (str): The id of the workflow that hosts this job. event_time (datetime): The time when the event was triggered. duration (float, None): The duration it took to execute the job. """ def __init__(self, uuid, job_type, event_type, queue, hostname, pid, name, workflow_id, event_time, duration): self.uuid = uuid self.type = job_type self.event = event_type self.queue = queue self.hostname = hostname = pid = name self.workflow_id = workflow_id self.event_time = event_time self.duration = duration @classmethod def from_event(cls, event): """ Create a JobEvent object from the event dictionary returned by celery. Args: event (dict): The dictionary as returned by celery. Returns: JobEvent: A fully initialized JobEvent object. """ return cls( uuid=event['uuid'], job_type=event['job_type'], event_type=event['type'], queue=event['queue'], hostname=event['hostname'], pid=event['pid'], name=event['name'], workflow_id=event['workflow_id'], event_time=event['time'], duration=event['duration'] ) class JobStartedEvent(JobEvent): """ This event is triggered when a new job starts running. """ def __init__(self, uuid, job_type, event_type, queue, hostname, pid, name, workflow_id, event_time, duration): super().__init__(uuid, job_type, event_type, queue, hostname, pid, name, workflow_id, event_time, duration) class JobSucceededEvent(JobEvent): """ This event is triggered when a job completed successfully. """ def __init__(self, uuid, job_type, event_type, queue, hostname, pid, name, workflow_id, event_time, duration): super().__init__(uuid, job_type, event_type, queue, hostname, pid, name, workflow_id, event_time, duration) class JobStoppedEvent(JobEvent): """ This event is triggered when a job was stopped. """ def __init__(self, uuid, job_type, event_type, queue, hostname, pid, name, workflow_id, event_time, duration): super().__init__(uuid, job_type, event_type, queue, hostname, pid, name, workflow_id, event_time, duration) class JobAbortedEvent(JobEvent): """ This event is triggered when a job was aborted. """ def __init__(self, uuid, job_type, event_type, queue, hostname, pid, name, workflow_id, event_time, duration): super().__init__(uuid, job_type, event_type, queue, hostname, pid, name, workflow_id, event_time, duration)