Functional data

This notebook links various functional layers to ET cells across GB. Various methods are used based on the nature of input data, from areal interpolation to zonal statistics.

All data are furhter measured within a relevant spatial context.

Population estimates

Population estimates are linked using area weighted interpolation based on building geometry.

import warnings

import geopandas as gpd
import pandas as pd
import numpy as np
import libpysal
import tobler
from time import time
import scipy
import xarray
import rioxarray
import rasterstats
from tqdm.notebook import tqdm

from dask.distributed import Client, LocalCluster, as_completed
import dask.dataframe as dd
warnings.filterwarnings('ignore', message='.*initial implementation of Parquet.*')
population_est = gpd.read_parquet("../../urbangrammar_samba/functional_data/population_estimates/gb_population_estimates.pq")
chunk = gpd.read_parquet("../../urbangrammar_samba/spatial_signatures/tessellation/tess_0.pq")
xmin, ymin, xmax, ymax = chunk.total_bounds
%%time
ests = tobler.area_weighted.area_interpolate(population_est.cx[xmin:xmax, ymin:ymax], chunk.set_geometry("buildings"), extensive_variables=['population'])
for chunk_id in range(103):
    s = time()
    chunk = gpd.read_parquet(f"../../urbangrammar_samba/spatial_signatures/tessellation/tess_{chunk_id}.pq", columns=["hindex", "buildings"]).set_geometry("buildings")
    xmin, ymin, xmax, ymax = chunk.total_bounds
    ests = tobler.area_weighted.area_interpolate(population_est.cx[xmin:xmax, ymin:ymax], chunk, extensive_variables=['population'])
    pop = pd.DataFrame({'hindex': chunk.hindex.values, "population": ests.population.values})
    pop.to_parquet(f"../../urbangrammar_samba/spatial_signatures/functional/population/pop_{chunk_id}")
    print(f"Chunk {chunk_id} processed sucessfully in {time() - s} seconds.")

Night lights

Night lights are merged using zonal statistics and parallelisation using dask.

workers = 8
client = Client(LocalCluster(n_workers=workers, threads_per_worker=1))
client
def _night_lights(chunk_id):
    import rioxarray
    
    s = time()
    
    chunk = gpd.read_parquet(f"../../urbangrammar_samba/spatial_signatures/tessellation/tess_{chunk_id}.pq", columns=["hindex", "tessellation"])
    nl = xarray.open_rasterio("../../urbangrammar_samba/functional_data/employment/night_lights_osgb.tif")
    nl_clip = nl.rio.clip_box(*chunk.total_bounds)
    arr = nl_clip.values
    affine = nl_clip.rio.transform()
    stats_nl = rasterstats.zonal_stats(
        chunk.tessellation, 
        raster=arr[0],
        affine=affine,
        stats=['mean'],
        all_touched=True,
        nodata = np.nan,
    )
    chunk["night_lights"] = [x['mean'] for x in stats_nl]
    chunk[["hindex", "night_lights"]].to_parquet(f"../../urbangrammar_samba/spatial_signatures/functional/night_lights/nl_{chunk_id}")
    
    return f"Chunk {chunk_id} processed sucessfully in {time() - s} seconds."
inputs = iter(range(103))
futures = [client.submit(_night_lights, next(inputs)) for i in range(workers)]
ac = as_completed(futures)
for finished_future in ac:
    # submit new future 
    try:
        new_future = client.submit(_night_lights, next(inputs))
        ac.add(new_future)
    except StopIteration:
        pass
    print(finished_future.result())

Worplace population by industry

Worplace population is linked using area weighted interpolation based on building geometry.

wpz = gpd.read_parquet('../../urbangrammar_samba/functional_data/employment/workplace/workplace_by_industry_gb.pq')
for chunk_id in range(103):
    s = time()
    chunk = gpd.read_parquet(f"../../urbangrammar_samba/spatial_signatures/tessellation/tess_{chunk_id}.pq", columns=["hindex", "buildings"]).set_geometry("buildings")
    xmin, ymin, xmax, ymax = chunk.total_bounds
    ests = tobler.area_weighted.area_interpolate(wpz.cx[xmin:xmax, ymin:ymax], chunk, extensive_variables=wpz.columns[1:-1].to_list())
    ests['hindex'] = chunk.hindex.values
    ests.drop(columns="geometry").to_parquet(f"../../urbangrammar_samba/spatial_signatures/functional/workplace/pop_{chunk_id}")
    print(f"Chunk {chunk_id} processed sucessfully in {time() - s} seconds.")

CORINE Land cover

CORINE Land cover is linked using area weighted interpolation based on tessellation geometry.

corine = gpd.read_parquet("../../urbangrammar_samba/functional_data/land_use/corine/corine_gb.pq")
def _dask_binning(corine, cells, n_chunks=512):
    import dask_geopandas as dgpd
    from scipy.sparse import coo_matrix
    
    ids_src, ids_tgt = cells.sindex.query_bulk(corine.geometry, predicate="intersects")
    df = gpd.GeoDataFrame({'clc': corine.geometry.values[ids_src], 'tess': cells.geometry.values[ids_tgt]})
    ddf = dgpd.from_geopandas(df, npartitions=n_chunks)
    areas = ddf.clc.intersection(ddf.tess).area.compute()
    table = coo_matrix(
        (areas, (ids_src, ids_tgt),),
        shape=(corine.shape[0], cells.shape[0]),
        dtype=np.float32,
    )

    table = table.todok()

    return table


def _dask_area_interpolate(corine, cells, n_chunks=512, categorical_variables=None):
    table = _dask_binning(corine, cells, n_chunks)
    
    if categorical_variables:
        categorical = {}
        for variable in categorical_variables:
            unique = corine[variable].unique()
            for value in unique:
                mask = corine[variable] == value
                categorical[f"{variable}_{value}"] = np.asarray(
                    table[mask].sum(axis=0)
                )[0]

        categorical = pd.DataFrame(categorical)
        categorical = categorical.div(cells.area, axis="rows")
    
    return categorical
for chunk_id in range(103):
    s = time()
    chunk = gpd.read_parquet(f"../../urbangrammar_samba/spatial_signatures/tessellation/tess_{chunk_id}.pq", columns=["hindex", "tessellation"])
    xmin, ymin, xmax, ymax = chunk.total_bounds
    ests = _dask_area_interpolate(corine.cx[xmin:xmax, ymin:ymax], chunk, categorical_variables=["Code_18"])
    ests['hindex'] = chunk.hindex.values
    ests.to_parquet(f"../../urbangrammar_samba/spatial_signatures/functional/corine/corine_{chunk_id}.pq")
    print(f"Chunk {chunk_id} processed sucessfully in {time() - s} seconds.")

Retail centres

CDRC Retail centres are linked as a distance to the nearest one.

retail = gpd.read_file("../../urbangrammar_samba/functional_data/retail_centres/Pre Release.zip!Retail_Centres_UK.gpkg")
workers = 16
client = Client(LocalCluster(n_workers=workers, threads_per_worker=1))
client
def measure_nearest(chunk):
    s = time()
    gdf = gpd.read_parquet(f'../../urbangrammar_samba/spatial_signatures/tessellation/tess_{chunk}.pq')
    b = gdf.total_bounds
    
    initial_buffer = 500
    buffered = gdf.tessellation.buffer(initial_buffer)
    distance = []
    for orig, geom in zip(gdf.tessellation, buffered.geometry):
        query = retail.sindex.query(geom, predicate='intersects')
        b = initial_buffer
        while query.size == 0:
            query = retail.sindex.query(geom.buffer(b), predicate='intersects')
            b += initial_buffer

        distance.append(retail.iloc[query].distance(orig).min())
    gdf['nearest_retail_centre'] = distance
    gdf[['hindex', 'nearest_retail_centre']].to_parquet(f'../../urbangrammar_samba/spatial_signatures/functional/retail_centre/retail_{chunk}.pq')
    
    return f"Chunk {chunk} processed sucessfully in {time() - s} seconds."
inputs = iter(range(103))
futures = [client.submit(measure_nearest, next(inputs)) for i in range(workers)]
ac = as_completed(futures)
for finished_future in ac:
    # submit new future 
    try:
        new_future = client.submit(measure_nearest, next(inputs))
        ac.add(new_future)
    except StopIteration:
        pass
    print(finished_future.result())

Water

Water is measured as a distance to the nearest one.

from sqlalchemy import create_engine
from shapely.geometry import box
from shapely.ops import polygonize

user = os.environ.get('DB_USER')
pwd = os.environ.get('DB_PWD')
host = os.environ.get('DB_HOST')
port = os.environ.get('DB_PORT')

db_connection_url = f"postgres+psycopg2://{user}:{pwd}@{host}:{port}/built_env"
def measure_nearest(chunk):
    s = time()
    gdf = gpd.read_parquet(f'../../urbangrammar_samba/spatial_signatures/tessellation/tess_{chunk}.pq')
    b = gdf.total_bounds
    engine = create_engine(db_connection_url)
    sql = f'SELECT * FROM gb_coastline_2016 WHERE ST_Intersects(geometry, ST_MakeEnvelope({b[0]}, {b[1]}, {b[2]}, {b[3]}, 27700))'
    coastline = gpd.read_postgis(sql, engine, geom_col='geometry')
    sql = f'SELECT * FROM openmap_surfacewater_area_200824 WHERE ST_Intersects(geometry, ST_MakeEnvelope({b[0]}, {b[1]}, {b[2]}, {b[3]}, 27700))'
    water = gpd.read_postgis(sql, engine, geom_col='geometry')
    
    sql = f'SELECT * FROM gb_coastline_2016'
    coastline = gpd.read_postgis(sql, engine, geom_col='geometry')

    polys = polygonize(coastline.geometry)
    land = gpd.GeoSeries(polys, crs=27700)
    sea = box(*land.total_bounds).difference(land.geometry.unary_union)
    
    target = water.geometry
    target.loc[len(water)] = sea
    target = gpd.clip(target, box(*b))
    
    initial_buffer = 500
    buffered = gdf.tessellation.buffer(initial_buffer)
    distance = []
    for orig, geom in zip(gdf.tessellation, buffered.geometry):
        query = target.sindex.query(geom, predicate='intersects')
        b = initial_buffer
        while query.size == 0:
            query = target.sindex.query(geom.buffer(b), predicate='intersects')
            b += initial_buffer

        distance.append(target.iloc[query].distance(orig).min())
    gdf['nearest_water'] = distance
    gdf[['hindex', 'nearest_water']].to_parquet(f'../../urbangrammar_samba/spatial_signatures/functional/water/water_{chunk}.pq')
    
    return f"Chunk {chunk} processed sucessfully in {time() - s} seconds."

Convolutions

Functional characters which do not express the tendency in the spatial context are contextualised using the same method applied to morphometric data - as the 1st, 2nd and 3rd quartile weigted by inverse distance based on cells within 10th order of contiguity. The metdo is applied to:

  • population

  • night lights

  • workplace population

  • CORINE

  • NDVI

cross_chunk = pd.read_parquet('../../urbangrammar_samba/spatial_signatures/cross-chunk_indices_10.pq')


def convolute(chunk_id):
    
    s = time()
    
    pop = pd.read_parquet(f"../../urbangrammar_samba/spatial_signatures/functional/population/pop_{chunk_id}")
    nl = pd.read_parquet(f"../../urbangrammar_samba/spatial_signatures/functional/night_lights/nl_{chunk_id}")
    workplace = pd.read_parquet(f"../../urbangrammar_samba/spatial_signatures/functional/workplace/pop_{chunk_id}")
    corine = pd.read_parquet(f"../../urbangrammar_samba/spatial_signatures/functional/corine/corine_{chunk_id}.pq")
    ndvi = pd.read_parquet(f"../../urbangrammar_samba/functional_data/ndvi/ndvi_tess_{chunk_id}.pq")
    combined = pop.merge(nl, on='hindex').merge(workplace, on='hindex').merge(corine, on='hindex').merge(ndvi.rename({'mean': 'ndvi'}), on='hindex')
    combined['keep'] = True
    # add neighbouring cells from other chunks
    cross_chunk_cells = []

    for chunk, inds in cross_chunk.loc[chunk_id].indices.iteritems():
        add_pop = pd.read_parquet(f"../../urbangrammar_samba/spatial_signatures/functional/population/pop_{chunk}").iloc[inds]
        add_nl = pd.read_parquet(f"../../urbangrammar_samba/spatial_signatures/functional/night_lights/nl_{chunk}").iloc[inds]
        add_workplace = pd.read_parquet(f"../../urbangrammar_samba/spatial_signatures/functional/workplace/pop_{chunk}").iloc[inds]
        add_corine = pd.read_parquet(f"../../urbangrammar_samba/spatial_signatures/functional/corine/corine_{chunk}.pq").iloc[inds]
        add_ndvi = pd.read_parquet(f"../../urbangrammar_samba/functional_data/ndvi/ndvi_tess_{chunk}.pq").iloc[inds]
        add_combined = add_pop.merge(add_nl, on='hindex').merge(add_workplace, on='hindex').merge(add_corine, on='hindex').merge(add_ndvi.rename({'mean': 'ndvi'}), on='hindex')
        add_combined['keep'] = False
        cross_chunk_cells.append(add_combined)
    
    df = combined.append(pd.concat(cross_chunk_cells, ignore_index=True), ignore_index=True).set_index('hindex')

    # read W
    W = libpysal.weights.WSP(scipy.sparse.load_npz(f"../../urbangrammar_samba/spatial_signatures/weights/w10_10_distance_circles_{chunk_id}.npz")).to_W()
 
    characters = df.columns
    # prepare dictionary to store results
    convolutions = {}
    for c in characters:
        convolutions[c] = []
        
    # measure convolutions
    for i in range(len(df)):
        neighbours = W.neighbors[i]
        vicinity = df.iloc[neighbours]
        distance = W.weights[i]
        distance_decay = 1 / np.array(distance)
        
        for c in characters:
            values = vicinity[c].values
            sorter = np.argsort(values)
            values = values[sorter]
            nan_mask = np.isnan(values)
            if nan_mask.all():
                convolutions[c].append(np.array([np.nan] * 3))
            else:
                sample_weight = distance_decay[sorter][~nan_mask]
                weighted_quantiles = np.cumsum(sample_weight) - 0.5 * sample_weight
                weighted_quantiles /= np.sum(sample_weight)
                interpolate = np.interp([.25, .5, .75], weighted_quantiles, values[~nan_mask])
                convolutions[c].append(interpolate)
    
    # save convolutions to parquet file
    conv = pd.DataFrame(convolutions, index=df.index)
    exploded = pd.concat([pd.DataFrame(conv[c].to_list(), columns=[c + '_q1', c + '_q2',c + '_q3']) for c in characters], axis=1)
    convoluted = exploded[df.keep.values]
    convoluted['hindex'] = combined['hindex'].values
    
    pois = pd.read_parquet(f"../../urbangrammar_samba/spatial_signatures/functional/accessibility/access_{chunk_id}.pq")
    water = pd.read_parquet(f'../../urbangrammar_samba/spatial_signatures/functional/water/water_{chunk_id}.pq')
    retail_centres = pd.read_parquet(f'../../urbangrammar_samba/spatial_signatures/functional/retail_centre/retail_{chunk_id}.pq')
    
    functional = convoluted.merge(pois, on='hindex').merge(water, on='hindex').merge(retail_centres, on='hindex').set_index('hindex')
    functional.to_parquet(f"../../urbangrammar_samba/spatial_signatures/functional/functional/func_{chunk_id}.pq")
    
    return f"Chunk {chunk_id} processed sucessfully in {time() - s} seconds."
# I am afraid that we would run out of memory if we did this in parallel
for i in tqdm(range(103), total=103):
    print(convolute(i))