{ "cells": [ { "cell_type": "markdown", "id": "hourly-cradle", "metadata": {}, "source": [ "# Chip making - Pt. B\n", "\n", "A second part of the chip making exploration.\n", "\n", "### From points to (good) chips" ] }, { "cell_type": "code", "execution_count": 2, "id": "invalid-berkeley", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "v0.1.0a4+24.g393dcb5\n" ] } ], "source": [ "import pandas\n", "import geopandas\n", "import dask.dataframe as ddf\n", "import dask\n", "from dask.distributed import LocalCluster, Client\n", "\n", "import dask_geopandas\n", "print(dask_geopandas.__version__)\n", "\n", "tmp_dir = '/home/jovyan'\n", "out_f_xys = f'{tmp_dir}/chip_xys_liv'\n", "grid_dir = f'{tmp_dir}/grid'\n", "joined_dir = f'{tmp_dir}/joined'" ] }, { "cell_type": "code", "execution_count": 3, "id": "flush-bankruptcy", "metadata": { "tags": [] }, "outputs": [ { "data": { "text/html": [ "\n", "\n", "\n", "\n", "\n", "
\n", "

Client

\n", "\n", "
\n", "

Cluster

\n", "
    \n", "
  • Workers: 10
  • \n", "
  • Cores: 20
  • \n", "
  • Memory: 50.00 GB
  • \n", "
\n", "
" ], "text/plain": [ "" ] }, "execution_count": 3, "metadata": {}, "output_type": "execute_result" } ], "source": [ "ram_per_worker = 5\n", "with dask.config.set(\n", " {\"distributed.worker.resources.RAM\": ram_per_worker}\n", "):\n", " cluster = LocalCluster(\n", " n_workers=10, memory_limit=f'{ram_per_worker} GB'\n", " )\n", "client = Client(cluster)\n", "client" ] }, { "cell_type": "code", "execution_count": 9, "id": "champion-fellow", "metadata": {}, "outputs": [], "source": [ "client.shutdown()" ] }, { "cell_type": "markdown", "id": "boxed-picnic", "metadata": {}, "source": [ "## Estimate memory" ] }, { "cell_type": "markdown", "id": "complimentary-excellence", "metadata": {}, "source": [ "---\n", "\n", "Below we back-of-the-envelope'y test how much RAM each task requires, to then pass it on to Dask as an annotation on the resources required:" ] }, { "cell_type": "code", "execution_count": null, "id": "typical-responsibility", "metadata": {}, "outputs": [], "source": [ "%load_ext memory_profiler" ] }, { "cell_type": "code", "execution_count": null, "id": "fancy-reviewer", "metadata": {}, "outputs": [], "source": [ "%%memit\n", "xys = pandas.read_parquet(out_f_xys+'/chunk_0.pq')" ] }, { "cell_type": "code", "execution_count": null, "id": "requested-weather", "metadata": {}, "outputs": [], "source": [ "chip_len = abs((xys.head() - xys.head().shift()).loc[1, 'Y'])" ] }, { "cell_type": "code", "execution_count": null, "id": "affecting-booth", "metadata": {}, "outputs": [], "source": [ "%%memit\n", "xy_pts = geopandas.points_from_xy(xys['X'], xys['Y'])" ] }, { "cell_type": "code", "execution_count": null, "id": "distributed-location", "metadata": {}, "outputs": [], "source": [ "%%memit\n", "buf = xy_pts.buffer(chip_len/2, cap_style=3)" ] }, { "cell_type": "code", "execution_count": null, "id": "offshore-candidate", "metadata": {}, "outputs": [], "source": [ "buf = geopandas.GeoDataFrame({'geometry': buf}, crs='EPSG:27700')" ] }, { "cell_type": "code", "execution_count": null, "id": "compact-lodging", "metadata": { "tags": [] }, "outputs": [], "source": [ "%%memit\n", "jd = geopandas.sjoin(\n", " buf,\n", " sigs[['signature_type', 'geometry']], \n", " how='inner', \n", " op='within'\n", ")" ] }, { "cell_type": "markdown", "id": "cooked-record", "metadata": {}, "source": [ "---" ] }, { "cell_type": "markdown", "id": "israeli-wealth", "metadata": {}, "source": [ "## Computation graph" ] }, { "cell_type": "markdown", "id": "simple-abraham", "metadata": {}, "source": [ "And can start the computation graph:" ] }, { "cell_type": "code", "execution_count": 61, "id": "sapphire-direction", "metadata": {}, "outputs": [], "source": [ "xys = ddf.read_parquet(out_f_xys)\n", "chip_len = abs((xys.head() - xys.head().shift()).loc[1, 'Y'])\n", "with dask.annotate(resources={'RAM': 2}):\n", " xy_pts = dask_geopandas.points_from_xy(\n", " xys, 'X', 'Y'\n", " )" ] }, { "cell_type": "markdown", "id": "mental-draft", "metadata": {}, "source": [ "These can be turned into pixel polygons:" ] }, { "cell_type": "code", "execution_count": 62, "id": "periodic-watch", "metadata": {}, "outputs": [], "source": [ "with dask.annotate(resources={'RAM': 2}):\n", " grid = xy_pts.buffer(chip_len/2, cap_style=3)" ] }, { "cell_type": "markdown", "id": "according-correspondence", "metadata": {}, "source": [ "A bit of wiring in before the spatial join:" ] }, { "cell_type": "code", "execution_count": 63, "id": "considered-circuit", "metadata": { "tags": [] }, "outputs": [], "source": [ "grid = grid.reset_index()\n", "grid.columns = ['index', 'geometry']\n", "grid = grid.set_crs('EPSG:27700')\n", "grid = dask_geopandas.from_dask_dataframe(grid)" ] }, { "cell_type": "markdown", "id": "preceding-korean", "metadata": {}, "source": [ "To perform the spatial join, we need to load the polygons of the signatures:" ] }, { "cell_type": "code", "execution_count": 4, "id": "tribal-liberty", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/opt/conda/lib/python3.8/site-packages/geopandas/geodataframe.py:577: RuntimeWarning: Sequential read of iterator was interrupted. Resetting iterator. This can negatively impact the performance.\n", " for feature in features_lst:\n" ] } ], "source": [ "sigs = geopandas.read_file(\n", " '/home/jovyan/data/spatial_signatures/signatures_combined_levels_simplified.gpkg'\n", ")" ] }, { "cell_type": "markdown", "id": "descending-bronze", "metadata": {}, "source": [ "With the full `grid` at hand, we can now express the spatial join:" ] }, { "cell_type": "code", "execution_count": 64, "id": "quiet-pittsburgh", "metadata": {}, "outputs": [], "source": [ "with dask.annotate(resources={'RAM': 4}):\n", " joined = dask_geopandas.sjoin(\n", " grid, \n", " sigs[['signature_type', 'geometry']], \n", " how='inner', \n", " op='within'\n", " )" ] }, { "cell_type": "markdown", "id": "interesting-liability", "metadata": {}, "source": [ "## Computation" ] }, { "cell_type": "markdown", "id": "tracked-disclosure", "metadata": {}, "source": [ "And the computation happens out-of-core as we write it to disk:" ] }, { "cell_type": "code", "execution_count": 65, "id": "valued-jungle", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 8.73 s, sys: 1.37 s, total: 10.1 s\n", "Wall time: 1min 32s\n" ] } ], "source": [ "%%time\n", "! rm -rf $joined_dir\n", "joined.to_parquet(joined_dir)" ] }, { "cell_type": "code", "execution_count": 5, "id": "serial-force", "metadata": {}, "outputs": [], "source": [ "! rm -rf /home/jovyan/sigs\n", "tst = dask_geopandas.from_geopandas(sigs, npartitions=8)\n", "tst.to_parquet('/home/jovyan/sigs')" ] }, { "cell_type": "markdown", "id": "comparative-liberal", "metadata": {}, "source": [ "## Output rechunking" ] }, { "cell_type": "markdown", "id": "educational-madness", "metadata": {}, "source": [ "## Check in a subset" ] }, { "cell_type": "markdown", "id": "large-flash", "metadata": {}, "source": [ "---" ] }, { "cell_type": "markdown", "id": "religious-output", "metadata": {}, "source": [ "To avoid memory errors, explore:\n", "\n", "> https://stackoverflow.com/questions/45052535/dask-distributed-how-to-run-one-task-per-worker-making-that-task-running-on-a/45056892#45056892\n", "\n", "Also explore this issue:\n", "\n", "> https://github.com/geopandas/dask-geopandas/issues/114\n", "\n", "And further docs on `dask-geopandas`:\n", "\n", "> https://github.com/geopandas/dask-geopandas/tree/master/notebooks" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.7" } }, "nbformat": 4, "nbformat_minor": 5 }