Chip making - Pt. B¶
A second part of the chip making exploration.
From points to (good) chips¶
import pandas
import geopandas
import dask.dataframe as ddf
import dask
from dask.distributed import LocalCluster, Client
import dask_geopandas
print(dask_geopandas.__version__)
tmp_dir = '/home/jovyan'
out_f_xys = f'{tmp_dir}/chip_xys_liv'
grid_dir = f'{tmp_dir}/grid'
joined_dir = f'{tmp_dir}/joined'
v0.1.0a4+24.g393dcb5
ram_per_worker = 5
with dask.config.set(
{"distributed.worker.resources.RAM": ram_per_worker}
):
cluster = LocalCluster(
n_workers=10, memory_limit=f'{ram_per_worker} GB'
)
client = Client(cluster)
client
Client
|
Cluster
|
client.shutdown()
Estimate memory¶
Below we back-of-the-envelope’y test how much RAM each task requires, to then pass it on to Dask as an annotation on the resources required:
%load_ext memory_profiler
%%memit
xys = pandas.read_parquet(out_f_xys+'/chunk_0.pq')
chip_len = abs((xys.head() - xys.head().shift()).loc[1, 'Y'])
%%memit
xy_pts = geopandas.points_from_xy(xys['X'], xys['Y'])
%%memit
buf = xy_pts.buffer(chip_len/2, cap_style=3)
buf = geopandas.GeoDataFrame({'geometry': buf}, crs='EPSG:27700')
%%memit
jd = geopandas.sjoin(
buf,
sigs[['signature_type', 'geometry']],
how='inner',
op='within'
)
Computation graph¶
And can start the computation graph:
xys = ddf.read_parquet(out_f_xys)
chip_len = abs((xys.head() - xys.head().shift()).loc[1, 'Y'])
with dask.annotate(resources={'RAM': 2}):
xy_pts = dask_geopandas.points_from_xy(
xys, 'X', 'Y'
)
These can be turned into pixel polygons:
with dask.annotate(resources={'RAM': 2}):
grid = xy_pts.buffer(chip_len/2, cap_style=3)
A bit of wiring in before the spatial join:
grid = grid.reset_index()
grid.columns = ['index', 'geometry']
grid = grid.set_crs('EPSG:27700')
grid = dask_geopandas.from_dask_dataframe(grid)
To perform the spatial join, we need to load the polygons of the signatures:
sigs = geopandas.read_file(
'/home/jovyan/data/spatial_signatures/signatures_combined_levels_simplified.gpkg'
)
/opt/conda/lib/python3.8/site-packages/geopandas/geodataframe.py:577: RuntimeWarning: Sequential read of iterator was interrupted. Resetting iterator. This can negatively impact the performance. for feature in features_lst:
With the full grid
at hand, we can now express the spatial join:
with dask.annotate(resources={'RAM': 4}):
joined = dask_geopandas.sjoin(
grid,
sigs[['signature_type', 'geometry']],
how='inner',
op='within'
)
Computation¶
And the computation happens out-of-core as we write it to disk:
%%time
! rm -rf $joined_dir
joined.to_parquet(joined_dir)
CPU times: user 8.73 s, sys: 1.37 s, total: 10.1 s
Wall time: 1min 32s
! rm -rf /home/jovyan/sigs
tst = dask_geopandas.from_geopandas(sigs, npartitions=8)
tst.to_parquet('/home/jovyan/sigs')
Output rechunking¶
Check in a subset¶
To avoid memory errors, explore:
https://stackoverflow.com/questions/45052535/dask-distributed-how-to-run-one-task-per-worker-making-that-task-running-on-a/45056892#45056892
Also explore this issue:
https://github.com/geopandas/dask-geopandas/issues/114
And further docs on dask-geopandas
:
https://github.com/geopandas/dask-geopandas/tree/master/notebooks