Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Load tiles in parallel on workers and add options to TissueDetectionHE #336

Open
wants to merge 10 commits into
base: dev
Choose a base branch
from

Conversation

tddough98
Copy link
Collaborator

@tddough98 tddough98 commented Oct 14, 2022

This contains two separate improvements

  • add drop_empty_tiles and keep_mask options to the TissueDetectionHE transform to bypass saving tiles with no detected H&E tissue and bypass saving masks
  • parallelize tile image loading by using dask.delayed to avoid loading images on the main thread

The first part is both for convenience and performance. It's possible to generate all tiles and then filter out the empty tiles and remove masks before writing the h5path to disk, but that requires that all the tiles be added to the Tiles which takes IO time. If these tiles and masks are never saved even to in-memory objects, processing can finish faster.

The second part is a core performance issue with distributed processing. I believe it's relevant to #211 and #299. When processing tiles, I've found that loading time >> processing time, and currently, tile image data is loaded on the main thread and scatters the loaded tile to workers. This prevents any parallelism as all but one worker are always waiting for the main thread to load data and send them a tile.

Additionally, as all tiles have to be loaded on the main thread, the block that generates the futures

for tile in self.generate_tiles(
    level=level,
    shape=tile_size,
    stride=tile_stride,
    pad=tile_pad,
    **kwargs,
):
    if not tile.slide_type:
        tile.slide_type = self.slide_type
    # explicitly scatter data, i.e. send the tile data out to the cluster before applying the pipeline
    # according to dask, this can reduce scheduler burden and keep data on workers
    big_future = client.scatter(tile)
    f = client.submit(pipeline.apply, big_future)
    processed_tile_futures.append(f)

has to load all tiles and send them all to workers before ANY tile can be added to the Tiles and the memory can be freed in the next block

# as tiles are processed, add them to h5
for future, tile in dask.distributed.as_completed(
    processed_tile_futures, with_results=True
):
    self.tiles.add(tile)

causing the dramatic memory leaks seen in #211.

I've used dask.delayed to prevent reading from the input file until the image is accessed on the worker. The code that accesses the file and loads the image can now be run by each worker in parallel. To preserve the parallelism, we have to take care not to access and load tile.image on the main thread before loading it on the worker, or to at least wrap accesses in dask.delayed as in SlideData.generate_tiles.

I had some issues with the backends not being picklable. The Backend has to be sent to each worker so it has access to the code that interfaces with the filesystem. I changed Backend filelike attributes to be lazily evaluated with the @Property decorator.

@jacob-rosenthal
Copy link
Collaborator

This looks awesome - glad that you've been able to make headway on figuring out the performance issues with dask!!

My only Q is that right now we have the distributed flag which lets users toggle whether they want to use dask for running pipelines, because getting the dask cluster working well is often challenging and for smaller datasets we want to be able to skip it if not necessary. Looks like with this change, dask will always be used for tile generation. Is it worth keeping an option to avoid using dask at all, or is it fine to always use dask delayed in this case? Does dask delayed require configuring the cluster, etc.?

@tddough98
Copy link
Collaborator Author

There's no issue with using dask.delayed with no client. Dask makes a default client when calling compute. For example, this is totally valid

>>> import dask
>>> delayed_value = dask.delayed(1)
>>> delayed_value.compute()
1

Use of dask.delayed is kept internal by holding it in self._image and accessing tile.image will call compute and return the numpy array for the image without the user knowing.

I've dug a bit more into the memory leaks and there are still issues with this PR. The only reliable way I've found to clear out memory on workers is to restart the client as suggested in Aggressively Clearing Data. This will at least prevent memory leaks from one slide affecting the processing of the next slide, but large slides may still run into memory issues only while processing that large slide.

I left a TODO where I think changes should happen to free memory as tiles are processed, along with the some of the attempts that did not work.

# TODO: Free memory used for tile
# all of these still leave unmanaged memory on each worker
# Each in-memory future holding a Tile shows a size of 48 bytes on the Dask dashboard
# which clearly does not include image data.
# Could it be that loaded image data is somehow not being garbage collected with Tiles?
# future.release()
# future.cancel()
# del result
# del future

This PR still provides improvements from loading tiles in parallel, but these changes are not yet enough to address the memory leaks identified in #211 and #299.

@jacob-rosenthal
Copy link
Collaborator

Now that we merged "merge-master" into dev (#334 ), should we change this PR to merge into dev?

Base automatically changed from merge-master to dev October 26, 2022 19:41
@tddough98
Copy link
Collaborator Author

Just deleted the merged branch which automatically switches these PRs to merge into dev

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants