Multiprocessing

This tutorial is about using multiprocessing with specific resqpy functions to speed up multiple function calls.

You should edit the file paths in the examples to point to your own files.

Installing Dask

To use the multi_processing module, Dask needs to be installed in the Python environment because it is not a dependency of the project. Dask is a flexible open-source Python library for parallel computing. It scales Python code from multi-core local machines to large distributed clusters on-prem or in the cloud.

Dask contains multiple modules but only the distributed module is needed here. Dask Distributed can be installed using pip, conda, or from source.

Pip

python -m pip install dask distributed

Conda

conda install dask distributed -c conda-forge

Source

git clone https://github.com/dask/distributed.git
cd distributed
python -m pip install .

If using a Job Queue Cluster, Dask Jobqueue must also be installed. This can be installed in the same ways.

Pip

python -m pip install dask-jobqueue

Conda

conda install dask-jobqueue -c conda-forge

Source

git clone https://github.com/dask/dask-jobqueue.git
cd dask-jobqueue
python -m pip install .

Cluster & Client Setup

If using a local machine, a LocalCluster must be setup. If using a job queing system, a JobQueueCluster can be used such as an SGECluster, SLURMCluster, PBSCluster, LSFCluster etc. Full details can be found at https://docs.dask.org/en/latest/deploying.html

A client can also be setup to provide a live feedback dashboard or to capture diagnosics, which is explained in the next section.

Local Cluster

Documentation of creating a LocalCluster can be found at https://distributed.dask.org/en/stable/api.html#distributed.LocalCluster

from dask.distributed import Client, LocalCluster

cluster = LocalCluster()
client = Client(cluster)

Job Queue Cluster Example

As an example, an SGE Cluster can be setup using Dask Jobqueue. Documentation of creating a JobQueueCluster can be found at https://jobqueue.dask.org/en/latest/api.html

from dask.distributed import client
from dask_jobqueue import SGECluster

cluster = SGECluster(
    processes=1,        # Number of workers per job.
    cores=96,           # Total amount of physical cores for all workers.
    memory="360 GiB",   # Usable memory per node.
    scheduler_options={"dashboard_address": ":0"}   # Other scheduler options.
)
client = Client(cluster)

Viewing the Client

If using a Local Cluster, the client dashboard is typically served at http://localhost:8787/status , but may be served elsewhere if this port is taken. The address of the dashboard will be displayed if you are in a Jupyter Notebook, or can be queried from client.dashboard_link.

Some clusters restrict the ports that are visible to the outside world. These ports may include the default port for the web interface, 8787. There are a few ways to handle this:

  • Open port 8787 to the outside world. Often this involves asking your cluster administrator.

  • Use a different port that is publicly accessible using the scheduler_options argument, like above.

  • Use fancier techniques, like Port Forwarding.

You can capture some of the same information that the dashboard presents for offline processing using the Client.get_task_stream and Client.profile methods. These capture the start and stop time of every task and transfer, as well as the results of a statistical profiler. More info on this can be found at https://docs.dask.org/en/stable/diagnostics-distributed.html#capture-diagnostics

Uplading Packages/ Files to the Workers

If using a Job Queue Cluster, the resqpy package may need to be uploaded for the workers to use. A dependency file that contains the path of the installed resqpy package or the location of a local git clone of the repo can be uploaded to the client.

dependencies = """
import sys
sys.path.insert(0, "path/to/local/resqpy/clone")
"""

with tempfile.TemporaryDirectory() as tempdir:
    filename = os.path.join(tempdir, "dependencies.py")
    with open(filename, "w") as f:
        f.write(dependencies)

    client.wait_for_workers()
    client.upload_file(filename)

Environment variables may also need to be set such as the Numba thread limit, which can be done by running a defined function.

def set_numba_threads():
    os.environ["NUMBA_NUM_THREADS"] = "1"

client.run(set_numba_threads)

Adding a Logger

A custom logger and file handler can be setup in a similar way to the environment variables. The log levels of other loggers can also be specified, such as Numba in the following example.

def setup_logging():
    logging.basicConfig(
        filename="path/to/log/file",
        filemode='a',
        format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',
        datefmt='%H:%M:%S',
        level=logging.DEBUG,
    )
    logging.getLogger("numba").setLevel(logging.WARNING)

client.run(setup_logging)

Resqpy Wrapper Functions

To run the multiprocessing function, a wrapper function for the corresponding resqpy function is required. These can be found within the multi_processing.wrappers module. Currently there is only a wrapper function for the find_faces_to_represent_surface_regular function, however any wrapper function can be created, providing that it returns the following:

  • index (int): the index passed to the function.

  • success (bool): whether the function call was successful, whatever that definiton is.

  • epc_file (str): the epc file path where the objects are stored.

  • uuid_list (List[str]): list of UUIDs of relevant objects.

The multiprocessing function will combine all of the objects that have their UUIDs returned, into a single epc file.

Calling the Multiprocessing Function

The multiprocessing function must receive the following arguments:

  • function (Callable): the wrapper function to be called, that must return the items described above.

  • kwargs_list (List[Dict[Any]]): A list of keyword argument dictionaries that are used when calling the function.

  • recombined_epc (Path/str): A pathlib Path or path string of where the combined epc will be saved.

  • cluster (LocalCluster/JobQueueCluster): the relevant cluster, as explained above.

  • consolidate (bool): if True and an equivalent part already exists in a model, it is not duplicated and the uuids are noted as equivalent.

from resqpy.multi_processing import function_multiprocessing

success_list = function_multiprocessing(func, kwargs_list, recombined_epc, cluster=cluster)

A list of successes from the wrapper function in order of their call is returned.

Note: the resqpy.multi_processing sub-package was previously named resqpy.multiprocessing. The name was change with major release v4.0.0 in order to avoid potential namespace clashes with the standard python multiprocessing package.