PYME.IO.cluster_streaming module¶
- class PYME.IO.cluster_streaming.Stream(server_address, server_port, dir_manager=None, filter=None)¶
Bases:
object
Class to handle spooling files in an asynchronous / non blocking manner to a single server
TODO - use this in the spoolers as well??? It’s a little cleaner than the existing code.
- close()¶
- finalize(filename=None)¶
Stop our threads and tidy up
- put(filename, data)¶
Add a file to the queue to be put
calling put with data == None is a sentinel that no more data will follow, and that keep-alive should be switched off. This should normaly be accompanied by a filename = ‘__part_pyramid_finish/<pyramid_dir>’
- class PYME.IO.cluster_streaming.Streamer(serverfilter='fv-az569-89', servers=None, distribution_fcn=<function distribution_function_round_robin>, filter=None)¶
Bases:
object
Create a spooler instance which keeps one persistent connection to each node on the cluster and allows non-blocking streaming of data to these nodes
- Parameters
- serverfilterstring
The cluster identifier (when multiple clusters on one network)
- serverslist
A manual list of servers. Usage is not recommended
- distribution_fcncallable
a function which assigns files to frames. Takes (at least) n_servers as an argument (provided by the spooler), but optionally any addition keyword arguments you pass to put(). In simple cases, this will be a frame number - e.g. i - see round_robin fcn above. distribution_fcn must provide defaults for all parameters other than n_servers so that sensible behaviour is achived when no value is provided.
- filter: callable
a function which performs some operation on the data before it’s saved. Typically format conversion and/or compression.
- close()¶
- put(filename, data, **kwargs)¶
Put, choosing a stream using the distribution function
kwargs are used as arguments for the server distribution function.
- put_stream(idx, filename, data)¶
Put to a specific stream
- PYME.IO.cluster_streaming.distribution_function_round_robin(n_servers, i=None)¶
- PYME.IO.cluster_streaming.n_cluster_servers(serverfilter)¶
Convenience function for hard-coding distribution functions in, e.g. the clusterh5 case