NDVI

This document shows the computation of NDVI scores for each of the enclosed tesselations (ETs) in GB.

! echo "Run this notebook using version $GDS_ENV_VERSION of the gds_env"
#SERVER_IP = open("../../SERVER_IP").read().strip("\n")
SERVER_IP = "0.0.0.0"
Run this notebook using version 6.0alpha of the gds_env

Set up

import sys
sys.path.insert(0, "../")
import utils
import os
import fsspec
import pandas
import geopandas
import rasterio
import rasterio.mask
from shapely.geometry import box
import rasterstats
import numpy
import rioxarray, xarray
from dask import dataframe as dd
from dask.distributed import Client, LocalCluster, as_completed
from numpy import percentile
from glob import glob
from time import time

Cluster

Since we will run some computations on a Dask cluster, let’s set it up first:

! cat ../../worker-spec.yml
kind: Pod
metadata:
  labels:
    foo: bar
spec:
  restartPolicy: Never
  containers:
  - image: darribas/gds_py:6.0alpha1
    imagePullPolicy: IfNotPresent
    args: [start.sh, dask-worker, --nthreads, '1', --no-dashboard, --memory-limit, 3GB, --death-timeout, '60']
    name: dask
    resources:
      limits:
        cpu: "1"
        memory: 3G
      requests:
        cpu: "1"
        memory: 3G
from dask_kubernetes import KubeCluster
from dask.distributed import Client
import dask.array as da

# Set up cluster
cluster = KubeCluster.from_yaml('../../worker-spec.yml')
# Provision with up to X pods
cluster.scale(50)
# Connect Dask to the cluster
client = Client(cluster)

Local

# Local run (alternative to cluster)
from dask.distributed import Client, LocalCluster
import dask.array as da

cluster = LocalCluster(
    threads_per_worker=1,
    memory_limit='10GB'
)
client = Client(address=cluster)

Data

And paths to the two datasets we’ll use:

  • The full mosaic is stored as a folder of COGs served over HTTP. First let’s grab the URL for the mosaic:

# Cluster
mosaic_url = f"http://{SERVER_IP}:8000/ghs_composite_s2/GHS-composite-S2.vrt"
# Local
mosaic_url = "../../data/ghs_composite_s2/GHS-composite-S2.vrt"

We can connect to it by:

r = rioxarray.open_rasterio(mosaic_url,
                            chunks={"x": 1024, "y": 1024}
                           )
r
<xarray.DataArray (band: 4, y: 182437, x: 121865)>
dask.array<open_rasterio-35d60ec82ecb324990de49151f9b808d<this-array>, shape=(4, 182437, 121865), dtype=uint16, chunksize=(4, 1024, 1024), chunktype=numpy.ndarray>
Coordinates:
  * band         (band) int64 1 2 3 4
  * y            (y) float64 1.612e+06 1.612e+06 ... -2.136e+05 -2.136e+05
  * x            (x) float64 -2.228e+05 -2.228e+05 ... 9.968e+05 9.968e+05
    spatial_ref  int64 0
Attributes:
    _FillValue:    0.0
    scale_factor:  1.0
    add_offset:    0.0
    grid_mapping:  spatial_ref
  • The enclosed tessellation:

# Cluster
#tess_path = f"http://{SERVER_IP}:8000/tessellation/"
tess_path = f"http://{SERVER_IP}:8000/spatial_signatures/tessellation/"
# Local
tess_path = "../../data/spatial_signatures/tessellation/"
  • And the list of chunks:

chunks = glob(f"{tess_path}tess_*.pq")

Explore DataArray size by chunk

In sparsely populated areas, ET cells will tend to be larger and, thus translate into larger sizes of the imagery required to cover it. Let’s explore the extent to which this is an issue.

The chunk_dimension method below completes the following tasks for a given chunk:

  1. Collect bounding boxes for each chunk

  2. Connect to that section of the mosaic

  3. Extract dimensions

def format_bytes(size):
    """
    Convert bytes to human-readable units
    
    Taken from: https://stackoverflow.com/a/49361727
    ...
    
    Arguments
    ---------
    size : int
          File size
    
    Returns
    -------
    fsize : int
           Formatted file size
    unit : str
          Unit in which size is expressed
    """
    # 2**10 = 1024
    power = 2 ** 10
    n = 0
    power_labels = {0: "", 1: "kilo", 2: "mega", 3: "giga", 4: "tera"}
    while size > power:
        size /= power
        n += 1
    return size, power_labels[n] + "bytes"

def chunk_dimension(chunk_path):
    if type(chunk_path) is not str:
        chunk_path = chunk_path["paths"]
    with fsspec.open(chunk_path) as p:
        chunk = geopandas.read_parquet(p, columns=["hindex", "tessellation"])
    section = r.rio.clip_box(*chunk.total_bounds)
    x_range = section.coords["x"].max() - section.coords["x"].min()
    y_range = section.coords["y"].max() - section.coords["y"].min()
    area = x_range * y_range / 1e6
    size, unit = format_bytes(section.nbytes)
    p = chunk_path.split("/")[-1]
    out = pandas.Series([p, float(area), section.nbytes, size, unit], 
                        index=["cID", "area_sqkm", "size", "hsize", "unit"]
                       )
    return out

And we can run this distributed across the Dask client:

%%time

chunks_dd = dd.from_pandas(pandas.DataFrame({"paths": chunks}), 
                           npartitions=8
                          )
sizes = chunks_dd.map_partitions(lambda df: df.apply(chunk_dimension, axis=1),
                                 meta=[("cID", "str"),
                                       ("area_sqkm", "f8"),
                                       ("size", "f8"),
                                       ("hsize", "f8"),
                                       ("unit", "str")
                                      ]
                                ).compute()
CPU times: user 13.7 s, sys: 1.15 s, total: 14.8 s
Wall time: 1min 21s

From the sizes table we can see there’s a few chunks that cover a large swath of land, but most are very manageable otherwise. Hence, what we will do is run in parallel all the smaller chunks and then sequentially the few larger ones. In particular, we will leave aside the top three:

sizes.sort_values("size", ascending=False).head(10)
cID area_sqkm size hsize unit
5 tess_5.pq 98852.771104 7896278496 7.353982 gigabytes
87 tess_87.pq 98381.295659 7858582248 7.318875 gigabytes
97 tess_97.pq 33881.959783 2706574856 2.520694 gigabytes
83 tess_83.pq 23445.222729 1872902528 1.744276 gigabytes
89 tess_89.pq 18351.235343 1466001600 1.365320 gigabytes
48 tess_48.pq 17581.976333 1404555840 1.308095 gigabytes
38 tess_38.pq 16949.749936 1354048752 1.261056 gigabytes
100 tess_100.pq 16602.280239 1326292344 1.235206 gigabytes
71 tess_71.pq 15812.966472 1263241712 1.176486 gigabytes
85 tess_85.pq 11608.782981 927409416 884.446541 megabytes

Distribute computation by ET cell

to_run_seq = [5, 87, 97, 83]
out_folder = "/home/jovyan/work/ndvi/"
done_already = [
    int(i.strip(".pq").split("_")[-1]) for i in glob(f"{out_folder}*.pq")
]
chunks_for_bag = [
    i for i in chunks if \
    (int(i.strip(".pq").split("_")[-1]) not in to_run_seq) and \
    (int(i.strip(".pq").split("_")[-1]) not in done_already)
]

To calculate NDVI for each ET cell, we write a method that loads the segment of the mosaic and uses raster_stats to compute NDV for each ET cell:

def ndvi_for_chunk(pars):
    t0 = time()
    chunk_path, r_path, out_path = pars
    # Read vectors
    with fsspec.open(chunk_path) as p:
        chunk = geopandas.read_parquet(p,
                                       columns=["hindex", "tessellation"]
                                      )
    # Calculate NDVI
    with rasterio.open(r_path) as src:
        img, transform = rasterio.mask.mask(src, 
                                            [box(*chunk.total_bounds)],
                                            crop=True
                                           )
        meta = src.meta
    ndvi = (img[3] + -1 * img[0]) / (img[3] + img[0])
    ndvi[numpy.where(img[0] == meta["nodata"])] = numpy.nan
    ndvi[numpy.where(img[3] == meta["nodata"])] = numpy.nan
    # Transfer NDVI to vector
    stats = rasterstats.zonal_stats(chunk,
                                    ndvi,
                                    affine=transform,
                                    stats=["mean"],
                                    all_touched=True,
                                    nodata=numpy.nan
                                   )
    # Process output
    out = pandas.DataFrame(stats, index=chunk["hindex"])
    t1 = time()
    if out_path is None:
        return out
    else:
        out.to_parquet(out_path)
        return f"{chunk_path.split('/')[-1]} | {t1-t0} secs"

We can use this function with Dask to run each (small) chunk in parallel. With the following bit of bespoke code (sourced from here), we make sure only as many jobs as cores are running at any given time:

%%time

n_workers = len(cluster.workers)

pars_to_submit = [
    (
        i, 
        mosaic_url, 
        f"{out_folder}ndvi_{i.split('/')[-1]}"
    ) for i in chunks_for_bag
]
inputs = iter(pars_to_submit)
futures = [
    client.submit(ndvi_for_chunk, next(inputs)) for i in range(n_workers)
]
ac = as_completed(futures)
for finished_future in ac:
    # submit new future 
    try:
        new_future = client.submit(ndvi_for_chunk, next(inputs))
        ac.add(new_future)
    except StopIteration:
        pass
    print(finished_future.result())
tess_96.pq | 176.1357979774475 secs
tess_34.pq | 189.24869465827942 secs
tess_51.pq | 189.69177389144897 secs
tess_21.pq | 190.82427144050598 secs
tess_29.pq | 192.0962357521057 secs
tess_35.pq | 209.44454169273376 secs
tess_31.pq | 216.980553150177 secs
tess_80.pq | 224.91864657402039 secs
tess_86.pq | 228.31078553199768 secs
tess_8.pq | 241.759094953537 secs
tess_19.pq | 241.96550750732422 secs
tess_69.pq | 243.02748799324036 secs
tess_65.pq | 253.60629415512085 secs
tess_66.pq | 280.157434463501 secs
tess_90.pq | 164.63939261436462 secs
tess_101.pq | 368.9089524745941 secs
tess_64.pq | 381.10221910476685 secs
tess_82.pq | 193.65632891654968 secs
tess_2.pq | 173.8371868133545 secs
tess_15.pq | 154.46832656860352 secs
tess_28.pq | 154.55630898475647 secs
tess_93.pq | 167.06554055213928 secs
tess_60.pq | 181.24081301689148 secs
tess_62.pq | 219.98308324813843 secs
tess_58.pq | 210.41175413131714 secs
tess_79.pq | 160.60391902923584 secs
tess_99.pq | 224.48635458946228 secs
tess_42.pq | 260.6704902648926 secs
tess_26.pq | 233.42561745643616 secs
tess_3.pq | 155.10514616966248 secs
tess_91.pq | 182.85995650291443 secs
tess_70.pq | 201.1124472618103 secs
tess_47.pq | 182.7477216720581 secs
tess_89.pq | 407.668381690979 secs
tess_74.pq | 187.44021558761597 secs
tess_16.pq | 176.50726699829102 secs
tess_22.pq | 166.78810381889343 secs
tess_63.pq | 177.00910091400146 secs
tess_84.pq | 220.33403754234314 secs
tess_92.pq | 255.7303695678711 secs
tess_88.pq | 239.6361527442932 secs
tess_53.pq | 210.7572808265686 secs
tess_14.pq | 260.92917490005493 secs
tess_77.pq | 224.05887055397034 secs
tess_61.pq | 359.33106875419617 secs
tess_18.pq | 146.86708426475525 secs
tess_27.pq | 220.95680165290833 secs
tess_20.pq | 181.27921295166016 secs
tess_4.pq | 253.60609555244446 secs
tess_50.pq | 208.73760104179382 secs
tess_10.pq | 177.56747770309448 secs
tess_48.pq | 281.5159513950348 secs
tess_49.pq | 201.79051685333252 secs
tess_59.pq | 203.40360569953918 secs
tess_71.pq | 281.03534603118896 secs
tess_76.pq | 231.91485619544983 secs
tess_33.pq | 194.08718633651733 secs
tess_32.pq | 325.78052020072937 secs
tess_57.pq | 191.2747642993927 secs
tess_67.pq | 317.9279489517212 secs
tess_23.pq | 174.35500979423523 secs
tess_44.pq | 168.06291508674622 secs
tess_45.pq | 196.44418263435364 secs
tess_55.pq | 152.75208282470703 secs
tess_54.pq | 221.55555033683777 secs
tess_7.pq | 217.29637932777405 secs
tess_52.pq | 154.59290027618408 secs
tess_41.pq | 185.0793354511261 secs
tess_102.pq | 185.03288888931274 secs
tess_94.pq | 158.22853803634644 secs
tess_85.pq | 310.7229986190796 secs
tess_72.pq | 181.57512545585632 secs
tess_81.pq | 166.95266604423523 secs
tess_25.pq | 141.98819208145142 secs
tess_38.pq | 419.97113037109375 secs
tess_78.pq | 181.82678651809692 secs
tess_39.pq | 156.8633315563202 secs
tess_0.pq | 133.6713342666626 secs
tess_56.pq | 151.15450191497803 secs
tess_68.pq | 186.38796997070312 secs
tess_98.pq | 226.2064390182495 secs
tess_12.pq | 251.47998714447021 secs
tess_6.pq | 309.0805878639221 secs
CPU times: user 2min 35s, sys: 19.3 s, total: 2min 54s
Wall time: 21min 44s

And the larger chunks can be run sequentially outside Dask to maximise memory availability:

%%time

pars_to_submit = [
    (
        f"{tess_path}tess_{i}.pq", 
        mosaic_url, 
        f"{out_folder}ndvi_tess_{i}.pq"
    ) for i in to_run_seq
]

print(f"Starting to compute sequentially the {len(pars_to_submit)} largest chunks")
for chunk_pars in pars_to_submit:
    print(f"\nWorking on chunk {chunk_pars[0]}")
    print(ndvi_for_chunk(chunk_pars))
Starting to compute sequentially the 4 largest chunks

Working on chunk ../../data/spatial_signatures/tessellation/tess_5.pq
<ipython-input-9-683a09d4bba0>:16: RuntimeWarning: invalid value encountered in true_divide
  ndvi = (img[3] + -1 * img[0]) / (img[3] + img[0])
tess_5.pq | 852.3413059711456 secs

Working on chunk ../../data/spatial_signatures/tessellation/tess_87.pq
tess_87.pq | 821.503981590271 secs

Working on chunk ../../data/spatial_signatures/tessellation/tess_97.pq
tess_97.pq | 261.8684220314026 secs

Working on chunk ../../data/spatial_signatures/tessellation/tess_83.pq
tess_83.pq | 260.13328790664673 secs
CPU times: user 35min 47s, sys: 2min 50s, total: 38min 37s
Wall time: 36min 36s
client.close()

[DEPRECATED] xarray-based computation

def geom2ndvi(row, ndvi, geom="geometry"):
    try:
        val = ndvi.rio.clip([row[geom]])\
                  .mean()\
                  .values\
                  .tolist()
    except:
        val = None
    return val

def ndvi_from_chunk(db_path,
                    ndvi_ddf, 
                    geom="tessellation"
                   ):
    if type(db_path) is pandas.Series:
        db_path = db_path.iloc[0]
    with fsspec.open(db_path) as file:
        db = geopandas.read_parquet(file, 
                                    columns=["hindex", "tessellation"]
                                   ).set_index("hindex")#.head() # <-- head only!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
    ndvi_vals = ndvi_ddf.rio.clip_box(*db.total_bounds).load()
    rower = lambda row: geom2ndvi(row, ndvi_vals, geom=geom)
    return db.apply(rower, axis=1)

r = rioxarray.open_rasterio(mosaic_url,
                            chunks={"x": 1280, "y": 1280}
                           )
ndvi = (r.sel(band=4) - r.sel(band=1)) / \
       (r.sel(band=4) + r.sel(band=1))

The first, geom2ndvi takes a single row with the geometry of the cell, and a DataArray and returns the NDVI for that cell.

This method can be applied sequentially to an entire table from the URL of the mosaic, which is what we describe in ndvi_from_chunk.

NOTE For this to work effectively, the extent of db.total_bounds needs to fit comfortably in memory

To run the above distributedly, we will set up a Dask DataFrame with all the URLs of the file names:

get_chunk_no = lambda c: int(c.split("tess_")[1].strip(".pq"))
chunk_names = dd.from_pandas(pandas.DataFrame({"file": chunks,
                                               "cID": list(map(get_chunk_no, chunks))
                                              }).set_index("cID")\
                                                .drop(to_run_seq),
                                              chunksize=1
                                             )
chunk_names
Dask DataFrame Structure:
file
npartitions=98
0 object
1 ...
... ...
101 ...
102 ...
Dask Name: from_pandas, 98 tasks

Now we can map the computation of each chunk across the cluster:

%%time
ndvis = chunk_names["file"].map_partitions(ndvi_from_chunk,
                                           ndvi_ddf=ndvi,
                                           meta=(None, 'f8')
                                          ).compute()
/opt/conda/lib/python3.8/site-packages/distributed/worker.py:3373: UserWarning: Large object of size 3.19 MB detected in task graph: 
  ([[["('truediv-b8410295353ac44e273fc0c8c4446e1f',  ... e, None), None)
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and 
keep data on workers

    future = client.submit(func, big_data)    # bad

    big_future = client.scatter(big_data)     # good
    future = client.submit(func, big_future)  # good
  warnings.warn(
distributed.scheduler - INFO - Remove worker <Worker 'tcp://10.1.193.230:33069', name: 29, memory: 138, processing: 233>
distributed.core - INFO - Removing comms to tcp://10.1.193.230:33069
distributed.batched - INFO - Batched Comm Closed: in <closed TCP>: ConnectionResetError: [Errno 104] Connection reset by peer
distributed.scheduler - INFO - Unexpected worker completed task, likely due to work stealing.  Expected: <Worker 'tcp://10.1.178.219:33919', name: 34, memory: 151, processing: 62>, Got: <Worker 'tcp://10.1.178.222:38459', name: 26, memory: 131, processing: 160>, Key: ('open_rasterio-795c3b956cc1d5c020e1f7e90e23a4a6<this-array>-76af8d691afb5bd6af8c9a459da53936', 0, 90, 60)
distributed.scheduler - INFO - Remove worker <Worker 'tcp://10.1.193.231:45125', name: 21, memory: 131, processing: 138>
distributed.core - INFO - Removing comms to tcp://10.1.193.231:45125
distributed.scheduler - INFO - Remove worker <Worker 'tcp://10.1.193.232:39839', name: 49, memory: 146, processing: 1256>
distributed.core - INFO - Removing comms to tcp://10.1.193.232:39839
distributed.scheduler - INFO - Remove worker <Worker 'tcp://10.1.193.233:41573', name: 11, memory: 208, processing: 135>
distributed.core - INFO - Removing comms to tcp://10.1.193.233:41573
distributed.scheduler - INFO - Remove worker <Worker 'tcp://10.1.193.229:37205', name: 3, memory: 262, processing: 115>
distributed.core - INFO - Removing comms to tcp://10.1.193.229:37205
distributed.scheduler - INFO - Register worker <Worker 'tcp://10.1.193.238:44913', name: 41, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.1.193.238:44913
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register worker <Worker 'tcp://10.1.193.234:36803', name: 36, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.1.193.234:36803
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register worker <Worker 'tcp://10.1.193.237:35007', name: 24, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.1.193.237:35007
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register worker <Worker 'tcp://10.1.193.236:41427', name: 18, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.1.193.236:41427
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register worker <Worker 'tcp://10.1.193.235:34131', name: 38, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.1.193.235:34131
distributed.core - INFO - Starting established connection
tornado.application - ERROR - Uncaught exception in write_error
Traceback (most recent call last):
  File "/opt/conda/lib/python3.8/site-packages/tornado/web.py", line 1681, in _execute
    result = self.prepare()
  File "/opt/conda/lib/python3.8/site-packages/notebook/base/handlers.py", line 498, in prepare
    raise web.HTTPError(403)
tornado.web.HTTPError: HTTP 403: Forbidden

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/lib/python3.8/site-packages/tornado/web.py", line 1217, in send_error
    self.write_error(status_code, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/notebook/base/handlers.py", line 581, in write_error
    html = self.render_template('%s.html' % status_code, **ns)
  File "/opt/conda/lib/python3.8/site-packages/notebook/base/handlers.py", line 511, in render_template
    template = self.get_template(name)
  File "/opt/conda/lib/python3.8/site-packages/notebook/base/handlers.py", line 507, in get_template
    return self.settings['jinja2_env'].get_template(name)
KeyError: 'jinja2_env'
ndvis
hindex
c000e109777t0000    0.196805
c000e109777t0001    0.165362
c000e109777t0002    0.195056
c000e109777t0003    0.306654
c000e109777t0004    0.216186
                      ...   
c101e634937t0004    0.959660
c101e634937t0003    0.506815
c101e634937t0000    0.567918
c101e634937t0002    0.537546
c101e634937t0001    3.189631
Length: 495, dtype: float64

[DEPRECATED] Alternative using geocube

The alternative involves geocube’s zonal stats and make_geocube. In this approach, we first rasterize our ET cells in a grid aligned with the mosaic, then calculate the NDVI. At present, this approach is discarded because the resolution of the mosaic (10m) makes it too coarse to obtain an NDVI for each cell.

from geocube.api.core import make_geocube

We use the same set of ET cells:

url = f"http://{SERVER_IP}:8000/spatial_signatures/tessellation/tess_6.pq"
tst = geopandas.read_parquet(url)
tst_sub = tst.cx[315891.95:330000, 213727.69:250000]
tst_sub.info()
<class 'geopandas.geodataframe.GeoDataFrame'>
Int64Index: 2049 entries, 15006 to 258237
Data columns (total 3 columns):
 #   Column       Non-Null Count  Dtype   
---  ------       --------------  -----   
 0   uID          2049 non-null   int64   
 1   geometry     2049 non-null   geometry
 2   enclosureID  2049 non-null   int64   
dtypes: geometry(1), int64(2)
memory usage: 64.0 KB

Before rasterization, we need to load the segment of the mosaic that overlaps (note no bits are streamed to memory, all lazy evaluation):

ndvi_segment = ndvi.rio.clip_box(*tst_sub.total_bounds)

We need to rasterize the features:

%%time
out_grid = make_geocube(
    vector_data = tst_sub,
    measurements=["uID"],
    like=ndvi_segment
)
CPU times: user 318 ms, sys: 5.81 ms, total: 324 ms
Wall time: 323 ms

This creates a DataSet object with a rasterised version of the tesselations in tst. Now we append the NDVI:

out_grid["ndvi"] = ndvi_segment

And with both aligned, we can group by each uID and calculate average NDVI:

%%time
g = out_grid.drop("spatial_ref")\
            .groupby(out_grid["uID"])
CPU times: user 598 ms, sys: 2.23 ms, total: 601 ms
Wall time: 598 ms

And we can get the average easily:

%%time
ndvi_mean = g.mean()
CPU times: user 6.21 s, sys: 2.89 ms, total: 6.22 s
Wall time: 6.21 s
mn = ndvi_mean.to_dataframe()[["ndvi"]]
/opt/conda/lib/python3.8/site-packages/dask/core.py:121: RuntimeWarning: invalid value encountered in true_divide
  return func(*(_execute_task(a, cache) for a in args))
/opt/conda/lib/python3.8/site-packages/dask/array/numpy_compat.py:41: RuntimeWarning: invalid value encountered in true_divide
  x = np.divide(x1, x2, out)
geom2ndvi(tst_sub.query("uID == 6712717").iloc[0], ndvi)
/opt/conda/lib/python3.8/site-packages/dask/core.py:121: RuntimeWarning: invalid value encountered in true_divide
  return func(*(_execute_task(a, cache) for a in args))
0.6968843539763994
%time out = tst_sub.head().apply(lambda r: geom2ndvi(r, ndvi), axis=1)
/opt/conda/lib/python3.8/site-packages/dask/core.py:121: RuntimeWarning: invalid value encountered in true_divide
  return func(*(_execute_task(a, cache) for a in args))
/opt/conda/lib/python3.8/site-packages/dask/core.py:121: RuntimeWarning: invalid value encountered in true_divide
  return func(*(_execute_task(a, cache) for a in args))
/opt/conda/lib/python3.8/site-packages/dask/core.py:121: RuntimeWarning: invalid value encountered in true_divide
  return func(*(_execute_task(a, cache) for a in args))
/opt/conda/lib/python3.8/site-packages/dask/core.py:121: RuntimeWarning: invalid value encountered in true_divide
  return func(*(_execute_task(a, cache) for a in args))
CPU times: user 1.6 s, sys: 14.6 ms, total: 1.62 s
Wall time: 1.6 s
/opt/conda/lib/python3.8/site-packages/dask/core.py:121: RuntimeWarning: invalid value encountered in true_divide
  return func(*(_execute_task(a, cache) for a in args))
tst_sub.query("uID == 6712717")
uID geometry enclosureID
22988 6712717 POLYGON Z ((329700.369 234529.467 0.000, 32945... 658102
mn.head()
ndvi
uID
6712717.0 NaN
6712718.0 0.682241
6712719.0 0.675091
6712720.0 NaN
6712721.0 0.537544