Chip making - Pt. B

A second part of the chip making exploration.

From points to (good) chips

import pandas
import geopandas
import dask.dataframe as ddf
import dask
from dask.distributed import LocalCluster, Client

import dask_geopandas
print(dask_geopandas.__version__)

tmp_dir = '/home/jovyan'
out_f_xys = f'{tmp_dir}/chip_xys_liv'
grid_dir = f'{tmp_dir}/grid'
joined_dir = f'{tmp_dir}/joined'
v0.1.0a4+24.g393dcb5
ram_per_worker = 5
with dask.config.set(
    {"distributed.worker.resources.RAM": ram_per_worker}
):
    cluster = LocalCluster(
        n_workers=10, memory_limit=f'{ram_per_worker} GB'
    )
client = Client(cluster)
client

Client

Cluster

  • Workers: 10
  • Cores: 20
  • Memory: 50.00 GB
client.shutdown()

Estimate memory


Below we back-of-the-envelope’y test how much RAM each task requires, to then pass it on to Dask as an annotation on the resources required:

%load_ext memory_profiler
%%memit
xys = pandas.read_parquet(out_f_xys+'/chunk_0.pq')
chip_len = abs((xys.head() - xys.head().shift()).loc[1, 'Y'])
%%memit
xy_pts = geopandas.points_from_xy(xys['X'], xys['Y'])
%%memit
buf = xy_pts.buffer(chip_len/2, cap_style=3)
buf = geopandas.GeoDataFrame({'geometry': buf}, crs='EPSG:27700')
%%memit
jd = geopandas.sjoin(
    buf,
    sigs[['signature_type', 'geometry']], 
    how='inner', 
    op='within'
)

Computation graph

And can start the computation graph:

xys = ddf.read_parquet(out_f_xys)
chip_len = abs((xys.head() - xys.head().shift()).loc[1, 'Y'])
with dask.annotate(resources={'RAM': 2}):
    xy_pts = dask_geopandas.points_from_xy(
        xys, 'X', 'Y'
    )

These can be turned into pixel polygons:

with dask.annotate(resources={'RAM': 2}):
    grid = xy_pts.buffer(chip_len/2, cap_style=3)

A bit of wiring in before the spatial join:

grid = grid.reset_index()
grid.columns = ['index', 'geometry']
grid = grid.set_crs('EPSG:27700')
grid = dask_geopandas.from_dask_dataframe(grid)

To perform the spatial join, we need to load the polygons of the signatures:

sigs = geopandas.read_file(
    '/home/jovyan/data/spatial_signatures/signatures_combined_levels_simplified.gpkg'
)
/opt/conda/lib/python3.8/site-packages/geopandas/geodataframe.py:577: RuntimeWarning: Sequential read of iterator was interrupted. Resetting iterator. This can negatively impact the performance.
  for feature in features_lst:

With the full grid at hand, we can now express the spatial join:

with dask.annotate(resources={'RAM': 4}):
    joined = dask_geopandas.sjoin(
        grid, 
        sigs[['signature_type', 'geometry']], 
        how='inner', 
        op='within'
    )

Computation

And the computation happens out-of-core as we write it to disk:

%%time
! rm -rf $joined_dir
joined.to_parquet(joined_dir)
CPU times: user 8.73 s, sys: 1.37 s, total: 10.1 s
Wall time: 1min 32s
! rm -rf /home/jovyan/sigs
tst = dask_geopandas.from_geopandas(sigs, npartitions=8)
tst.to_parquet('/home/jovyan/sigs')

Output rechunking

Check in a subset


To avoid memory errors, explore:

https://stackoverflow.com/questions/45052535/dask-distributed-how-to-run-one-task-per-worker-making-that-task-running-on-a/45056892#45056892

Also explore this issue:

https://github.com/geopandas/dask-geopandas/issues/114

And further docs on dask-geopandas:

https://github.com/geopandas/dask-geopandas/tree/master/notebooks