PYME.cluster.ruleserver module

This module defines a RESTful HTTP server, the PYME Rule Server, which manages task distribution across a cluster through the use of rules. Rules are essentially a json template which defines how tasks may be generated (by substitution into the template on the client side). Generation of individual tasks on the client has the dual benefits of a) reducing the CPU load and memory requirements on the server and b) dramatically decreasing the network bandwidth used for task distribution.

The exposed REST API is as follows:



Brief Description



Add a new rule



Release tasks IDs associated with a rule



Retrieve a list of advertisements



Submit bids on advertised tasks



Advise rule server of task completion



Get status information

Implementation details follow. The json format and parameters for the REST calls are defined in the docstrings of the python functions defining that method (linked from the table above). The server is launched by the PYMERuleServer command (see PYME.cluster.PYMERuleServer) which loads this in a separate thread and takes care of logging and zeroconf registration, rather than by directly running this file.

class PYME.cluster.ruleserver.Bid(bidder_id, cost)

Bases: tuple

Create new instance of Bid(bidder_id, cost)

property bidder_id

Alias for field number 0

property cost

Alias for field number 1

class PYME.cluster.ruleserver.IntegerIDRule(ruleID, task_template, inputs_by_task=None, max_task_ID=100000, task_timeout=600, rule_timeout=3600, on_completion=None)

Bases: Rule

A rule which generates tasks based on a template.


A unique ID for this rule


A json template for generating tasks (see templates section below)

inputs_by_tasklist of dict

A list of dictionaries mapping recipe variable names to URIs to treat as inputs (recipes only)


The maximum number of tasks that can be generated by this rule. If the final number of tasks is not known at creation, set this to a value which is garuanteed to be higher and then call mark_release_complete() after all tasks have been released / made available.

task_timeout: float

A timeout in seconds for each task. If a task takes longer than task_timeout seconds it is assumed that the node executing it has fallen over and we retry on a different node (up to a maximum number of times set by the ‘ruleserver-retries` config option).


A timeout in seconds from the last task we processed after which we assume that no more tasks are coming and we can delete the rule (to keep memory down in a long-term usage scenario).



Templates are a string which can be substituted to generate tasks. There are currently two supported formats for templates, those for localization tasks, and those for recipes. Each take the form of a json dictionary:


 "id" : "{{ruleID}}~{{taskID}}",
 "type" : "localization",
 "taskdef" : {"frameIndex" : {{taskID}}, "metadata" : "PYME-CLUSTER://path/to/series/metadata.json"},
 "inputs" : {"frames" : "PYME-CLUSTER://path/to/series.pcs"},
 "outputs" : {"fitResults" : "PYME-CLUSTER://__aggregate_h5r/path/to/analysis.h5r/FitResults",
              "driftResults" : "PYME-CLUSTER://__aggregate_h5r/path/to/analysis.h5r/DriftResults"}

Recipes The recipe can either be specified inline:

 "id" : "{{ruleID}}~{{taskID}}",
 "type" : "recipe",
 "taskdef" : {"recipe" : "<recipe as a YAML string>"},
 "inputs" : {{taskInputs}},
 "output_dir" : "PYME-CLUSTER://path/to/output/dir",

or using a cluster URI:

 "id" : "{{ruleID}}~{{taskID}}",
 "type" : "recipe",
 "taskdefRef" : "PYME-CLUSTER://path/to/recipe.yaml",
 "inputs" : {{taskInputs}},
 "output_dir" : "PYME-CLUSTER://path/to/output/dir",

The rule will substitute {{taskInputs}} with a dictionary mapping integer task IDs to recipe input files, e.g.

{0 : {"recipe_input_0" : "input_0_URI_0","recipe_input_1" : "input_1_URI_0"},
 1 : {"recipe_input_0" : "input_0_URI_1","recipe_input_1" : "input_1_URI_1"},
 2 : {"recipe_input_0" : "input_0_URI_2","recipe_input_1" : "input_1_URI_2"},

Alternatively the inputs dictionary can be supplied directly (without relying on the taskInputs substitution).

Rule Chaining

Rules may also define a chained rule, to be run on completion of the original rule. This is accomplished by supplying an “on_completion” dictionary. This is a dictionary, {'template' : <template>, 'max_tasks' : max_tasks, 'rule_timeout' : timeout, 'on_completion' : {...}}, with the template following the format above and everything but the template being optional (max_tasks defaults to 1). Follow on / chained rules are slightly more restricted than standard rules in that recipe “inputs” must be hardcoded (no {{taskInputs}} substitution).

Chained rules are created once the original rule is finished (see IntegerIDRule.finished). All tasks for the chained rule will be released immediately.

TASK_INFO_DTYPE = dtype([('status', 'u1'), ('nRetries', 'u1'), ('expiry', '<f4'), ('cost', '<f4')])
property advert

The task advertisment.

If the rule has tasks available, a dictionary of the form: {"ruleID" : str, "taskTemplate" : str, "availableTaskIDs" : [list of int], "inputsByTask" : [optional] dict mapping task IDs to inputs}

“inputsByTask” is only provided for some recipe tasks.


Bid on tasks (and return any that match). Note the current implementation is very naive and doesn’t check bid cost - i.e. the first bid gets the task. This works if (and only if) the clients are well behaved and preferentially bid on tasks which have a lowest cost for them.


A dictionary containing the ruleID, the IDs of the tasks to bid on, and their costs {"ruleID" : str ,"taskIDs" : [list of int],"taskCosts" : [list of float]}

successful_bids: dict

A dictionary containing the ruleID, the IDs of the tasks awarded, and the rule template {"ruleID" : ruleID, "taskIDs": [list of int], "template" : "<rule template>"}

property expired

Whether the rule has expired (no available tasks, no tasks assigned, and time > expiry) and can be removed

property finished

Whether the rule has finished (the maximum number of tasks which can be created have been completed).


Mark rule as inactive (generates no adverts) to facilitate aborting / pausing long-running rules.


Get information / status about this rule


A status dictionary of the form: {'tasksPosted': int, 'tasksRunning': int, 'tasksCompleted': int, 'tasksFailed' : int, 'averageExecutionCost' : float}

make_range_available(start, end)

Make a range of tasks available (to be called once the underlying data is available) [start, end) Parameters ———- start : int

first task number to release (inclusive)


last task number to release (exclusive)



if asked to release a range which is invalid for the max tasks we can create from this rule


Mark a set of tasks as completed and/or failed


A dictionary of the form: {"ruleID": str, "taskIDs" : [list of int], "status" : [list of int]}

There should be an entry in status for each entry in taskIDs, the valid values of status being STATUS_COMPLETE=3 or STATUS_FAILED=4.


Signal that all tasks which are going to be released for this rule have been, and that the rule should evaluate as finished once these tasks have been evaluated. Used when the number of tasks is not known in advance. (See also __init__ docs)

FIXME - What happens when task release doesn’t start at 0?

n_tasksint, optional

mark complete at a given number of tasks. The default is to mark complete at the highest task number which has been released prior to making this call. The use case for this optional parameter is to avoid the need for locking when this gets called from another thread than the make_range_available() and implies a promise by the calling code that it will not release any more tasks above the given number.


If n_tasks is supplied and is either smaller than the largest task already released or larger than larger than max_task_ID

class PYME.cluster.ruleserver.Rule

Bases: object

class PYME.cluster.ruleserver.RuleServer

Bases: object

add_integer_id_rule(max_tasks=1000000.0, release_start=None, release_end=None, ruleID=None, timeout=3600, body='')

HTTP endpoint (POST) for adding a new rule.

Add a rule that generates tasks based on an integer ID, such as a frame number (localization) or an index into a list of inputs (recipes). By default, tasks are not released on creation, but later with release_rule_tasks(). This allows for the creation of a rule before a series has finished spooling.


The maximum number of tasks that could match this rule. Generally the number of frames in a localization series (if known in advance) or the number of inputs on which to run a recipe. Allows us to pre-allocate a task array of the correct size (passing this rather than relying on the default reduces memory usage).

release_start, release_endint

Release a range of tasks for computation after creating the rule (avoids a call to release_rule_tasks()


A unique identifier for the rule. If not provided, one will be automatically generated


How long this rule should live for, in seconds. Defaults to an hour.


A json dictionary {'template' : "<rule template>", 'inputsByTask' : [list of URIs]}. See IntegerIDRule for the format of the rule template. The inputsByTask parameter is only used for recipes, and can be omitted for localisation analysis, or for recipe tasks using a hard coded "inputs" dictionary. An optional additional parameter, “on_completion” parameter may be given, itself consisting of a new {'template': <template>, 'on_completion': {...}} dictionary.

resultjson str

The result of adding the rule. A dict of the form: {"ok" : "True", 'ruleID' : str}


HTTP endpoint (POST) for bidding on tasks.

bodyjson list of bids

A list of bids, each of which is a dictionary of the form {"ruleID" : str ,"taskIDs" : [list of int],"taskCosts" : [list of float]} see IntegerIDRule.Bid() for details.

assignments: json str

A json formatted list of task assignments, each of which has the form: {"ruleID" : ruleID, "taskIDs": [list of int], "template" : "<rule template>"} See


a throttled version of queue info


HTTP Endpoint (GET) - visible at “distributor/queues” - for querying ruleserver status.

status: json str

A dictionary of the form {"ok" : True, "result" : {ruleID0 :, ruleID1 :}} See


HTTP Endpoint (POST) to mark tasks as having been completed

bodyjson list

A list of entries of the form {"ruleID": str, "taskIDs" : [list of int], "status" : [list of int]} where status has the same length as taskIDs with each entry being either STATUS_COMPLETE or STATUS_FAILED. See IntegerIDRule.mark_complete().

successjson str

{"ok" : "True"} if successful.

mark_release_complete(ruleID, n_tasks=None)

HTTP Endpoint (POST) to signal that no more tasks will be released for a rule and the rule can be regarded as finished once the previously released tasks are complete.


ID of the rule to update

n_tasksint, [optional, discouraged]

a fixed number of tasks to truncate the rule at. Used for avoiding locks/race conditions in a multi-threaded client process. NOTE - this parameter may disappear in a future version to enable rules with incomplete ranges to be marked as complete (see comments in IntegerIDRule.finished). Locking or otherwise structuring the client such that no release_rule_tasks() calls can occur after a call to mark_release_complete() is therefore preferred.


{"ok" : "True"} if successful.


If value is less than number of tasks already assigned.

FIXME - HTTP endpoints should not raise - they should handle the error and return an error response to the client.
release_rule_tasks(ruleID, release_start, release_end, body='')

HTTP Endpoint (POST or GET ) for releasing tasks associated with a rule.

When performing spooling data analysis we typically have one rule per series and one task per frame. This method allows for the tasks associated with a particular frame (or range of frames) to be released as they become available.


The rule ID

release_start, release_end: int

The range of tasks IDs to release

bodystr, empty

Needed for interface compatibility, ignored

successjson str

{"ok" : "True"} if successful.


HTTP endpoint (GET) for retrieving task advertisements.

Note - by default, a limited number of advertisements are posted at any given time (to limit bandwidth). This should be enough to keep all the workers busy, with new adverts being posted once tasks are assigned to workers.

advertsjson str

List of advertisements. See IntegerIDRule.advert

class PYME.cluster.ruleserver.ServerThread(port, bind_addr='', profile=False)

Bases: Thread

This constructor should always be called with keyword arguments. Arguments are:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.

name is the thread name. By default, a unique name is constructed of the form “Thread-N” where N is a small decimal number.

args is the argument tuple for the target invocation. Defaults to ().

kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.

class PYME.cluster.ruleserver.WFRuleServer(port, bind_addr='')

Bases: APIHTTPServer, RuleServer

Combines the RuleServer with it’s web framework.

Largely an artifact of initial experiments using cherrypy (allowed quickly switching between cherrypy and our internal webframework).

PYME.cluster.ruleserver.on_SIGHUP(signum, frame)