Functional data¶
This notebook links various functional layers to ET cells across GB. Various methods are used based on the nature of input data, from areal interpolation to zonal statistics.
All data are furhter measured within a relevant spatial context.
Population estimates¶
Population estimates are linked using area weighted interpolation based on building geometry.
import warnings
import geopandas as gpd
import pandas as pd
import numpy as np
import libpysal
import tobler
from time import time
import scipy
import xarray
import rioxarray
import rasterstats
from tqdm.notebook import tqdm
from dask.distributed import Client, LocalCluster, as_completed
import dask.dataframe as dd
warnings.filterwarnings('ignore', message='.*initial implementation of Parquet.*')
population_est = gpd.read_parquet("../../urbangrammar_samba/functional_data/population_estimates/gb_population_estimates.pq")
chunk = gpd.read_parquet("../../urbangrammar_samba/spatial_signatures/tessellation/tess_0.pq")
xmin, ymin, xmax, ymax = chunk.total_bounds
%%time
ests = tobler.area_weighted.area_interpolate(population_est.cx[xmin:xmax, ymin:ymax], chunk.set_geometry("buildings"), extensive_variables=['population'])
for chunk_id in range(103):
s = time()
chunk = gpd.read_parquet(f"../../urbangrammar_samba/spatial_signatures/tessellation/tess_{chunk_id}.pq", columns=["hindex", "buildings"]).set_geometry("buildings")
xmin, ymin, xmax, ymax = chunk.total_bounds
ests = tobler.area_weighted.area_interpolate(population_est.cx[xmin:xmax, ymin:ymax], chunk, extensive_variables=['population'])
pop = pd.DataFrame({'hindex': chunk.hindex.values, "population": ests.population.values})
pop.to_parquet(f"../../urbangrammar_samba/spatial_signatures/functional/population/pop_{chunk_id}")
print(f"Chunk {chunk_id} processed sucessfully in {time() - s} seconds.")
Night lights¶
Night lights are merged using zonal statistics and parallelisation using dask
.
workers = 8
client = Client(LocalCluster(n_workers=workers, threads_per_worker=1))
client
def _night_lights(chunk_id):
import rioxarray
s = time()
chunk = gpd.read_parquet(f"../../urbangrammar_samba/spatial_signatures/tessellation/tess_{chunk_id}.pq", columns=["hindex", "tessellation"])
nl = xarray.open_rasterio("../../urbangrammar_samba/functional_data/employment/night_lights_osgb.tif")
nl_clip = nl.rio.clip_box(*chunk.total_bounds)
arr = nl_clip.values
affine = nl_clip.rio.transform()
stats_nl = rasterstats.zonal_stats(
chunk.tessellation,
raster=arr[0],
affine=affine,
stats=['mean'],
all_touched=True,
nodata = np.nan,
)
chunk["night_lights"] = [x['mean'] for x in stats_nl]
chunk[["hindex", "night_lights"]].to_parquet(f"../../urbangrammar_samba/spatial_signatures/functional/night_lights/nl_{chunk_id}")
return f"Chunk {chunk_id} processed sucessfully in {time() - s} seconds."
inputs = iter(range(103))
futures = [client.submit(_night_lights, next(inputs)) for i in range(workers)]
ac = as_completed(futures)
for finished_future in ac:
# submit new future
try:
new_future = client.submit(_night_lights, next(inputs))
ac.add(new_future)
except StopIteration:
pass
print(finished_future.result())
Worplace population by industry¶
Worplace population is linked using area weighted interpolation based on building geometry.
wpz = gpd.read_parquet('../../urbangrammar_samba/functional_data/employment/workplace/workplace_by_industry_gb.pq')
for chunk_id in range(103):
s = time()
chunk = gpd.read_parquet(f"../../urbangrammar_samba/spatial_signatures/tessellation/tess_{chunk_id}.pq", columns=["hindex", "buildings"]).set_geometry("buildings")
xmin, ymin, xmax, ymax = chunk.total_bounds
ests = tobler.area_weighted.area_interpolate(wpz.cx[xmin:xmax, ymin:ymax], chunk, extensive_variables=wpz.columns[1:-1].to_list())
ests['hindex'] = chunk.hindex.values
ests.drop(columns="geometry").to_parquet(f"../../urbangrammar_samba/spatial_signatures/functional/workplace/pop_{chunk_id}")
print(f"Chunk {chunk_id} processed sucessfully in {time() - s} seconds.")
CORINE Land cover¶
CORINE Land cover is linked using area weighted interpolation based on tessellation geometry.
corine = gpd.read_parquet("../../urbangrammar_samba/functional_data/land_use/corine/corine_gb.pq")
def _dask_binning(corine, cells, n_chunks=512):
import dask_geopandas as dgpd
from scipy.sparse import coo_matrix
ids_src, ids_tgt = cells.sindex.query_bulk(corine.geometry, predicate="intersects")
df = gpd.GeoDataFrame({'clc': corine.geometry.values[ids_src], 'tess': cells.geometry.values[ids_tgt]})
ddf = dgpd.from_geopandas(df, npartitions=n_chunks)
areas = ddf.clc.intersection(ddf.tess).area.compute()
table = coo_matrix(
(areas, (ids_src, ids_tgt),),
shape=(corine.shape[0], cells.shape[0]),
dtype=np.float32,
)
table = table.todok()
return table
def _dask_area_interpolate(corine, cells, n_chunks=512, categorical_variables=None):
table = _dask_binning(corine, cells, n_chunks)
if categorical_variables:
categorical = {}
for variable in categorical_variables:
unique = corine[variable].unique()
for value in unique:
mask = corine[variable] == value
categorical[f"{variable}_{value}"] = np.asarray(
table[mask].sum(axis=0)
)[0]
categorical = pd.DataFrame(categorical)
categorical = categorical.div(cells.area, axis="rows")
return categorical
for chunk_id in range(103):
s = time()
chunk = gpd.read_parquet(f"../../urbangrammar_samba/spatial_signatures/tessellation/tess_{chunk_id}.pq", columns=["hindex", "tessellation"])
xmin, ymin, xmax, ymax = chunk.total_bounds
ests = _dask_area_interpolate(corine.cx[xmin:xmax, ymin:ymax], chunk, categorical_variables=["Code_18"])
ests['hindex'] = chunk.hindex.values
ests.to_parquet(f"../../urbangrammar_samba/spatial_signatures/functional/corine/corine_{chunk_id}.pq")
print(f"Chunk {chunk_id} processed sucessfully in {time() - s} seconds.")
Retail centres¶
CDRC Retail centres are linked as a distance to the nearest one.
retail = gpd.read_file("../../urbangrammar_samba/functional_data/retail_centres/Pre Release.zip!Retail_Centres_UK.gpkg")
workers = 16
client = Client(LocalCluster(n_workers=workers, threads_per_worker=1))
client
def measure_nearest(chunk):
s = time()
gdf = gpd.read_parquet(f'../../urbangrammar_samba/spatial_signatures/tessellation/tess_{chunk}.pq')
b = gdf.total_bounds
initial_buffer = 500
buffered = gdf.tessellation.buffer(initial_buffer)
distance = []
for orig, geom in zip(gdf.tessellation, buffered.geometry):
query = retail.sindex.query(geom, predicate='intersects')
b = initial_buffer
while query.size == 0:
query = retail.sindex.query(geom.buffer(b), predicate='intersects')
b += initial_buffer
distance.append(retail.iloc[query].distance(orig).min())
gdf['nearest_retail_centre'] = distance
gdf[['hindex', 'nearest_retail_centre']].to_parquet(f'../../urbangrammar_samba/spatial_signatures/functional/retail_centre/retail_{chunk}.pq')
return f"Chunk {chunk} processed sucessfully in {time() - s} seconds."
inputs = iter(range(103))
futures = [client.submit(measure_nearest, next(inputs)) for i in range(workers)]
ac = as_completed(futures)
for finished_future in ac:
# submit new future
try:
new_future = client.submit(measure_nearest, next(inputs))
ac.add(new_future)
except StopIteration:
pass
print(finished_future.result())
Water¶
Water is measured as a distance to the nearest one.
from sqlalchemy import create_engine
from shapely.geometry import box
from shapely.ops import polygonize
user = os.environ.get('DB_USER')
pwd = os.environ.get('DB_PWD')
host = os.environ.get('DB_HOST')
port = os.environ.get('DB_PORT')
db_connection_url = f"postgres+psycopg2://{user}:{pwd}@{host}:{port}/built_env"
def measure_nearest(chunk):
s = time()
gdf = gpd.read_parquet(f'../../urbangrammar_samba/spatial_signatures/tessellation/tess_{chunk}.pq')
b = gdf.total_bounds
engine = create_engine(db_connection_url)
sql = f'SELECT * FROM gb_coastline_2016 WHERE ST_Intersects(geometry, ST_MakeEnvelope({b[0]}, {b[1]}, {b[2]}, {b[3]}, 27700))'
coastline = gpd.read_postgis(sql, engine, geom_col='geometry')
sql = f'SELECT * FROM openmap_surfacewater_area_200824 WHERE ST_Intersects(geometry, ST_MakeEnvelope({b[0]}, {b[1]}, {b[2]}, {b[3]}, 27700))'
water = gpd.read_postgis(sql, engine, geom_col='geometry')
sql = f'SELECT * FROM gb_coastline_2016'
coastline = gpd.read_postgis(sql, engine, geom_col='geometry')
polys = polygonize(coastline.geometry)
land = gpd.GeoSeries(polys, crs=27700)
sea = box(*land.total_bounds).difference(land.geometry.unary_union)
target = water.geometry
target.loc[len(water)] = sea
target = gpd.clip(target, box(*b))
initial_buffer = 500
buffered = gdf.tessellation.buffer(initial_buffer)
distance = []
for orig, geom in zip(gdf.tessellation, buffered.geometry):
query = target.sindex.query(geom, predicate='intersects')
b = initial_buffer
while query.size == 0:
query = target.sindex.query(geom.buffer(b), predicate='intersects')
b += initial_buffer
distance.append(target.iloc[query].distance(orig).min())
gdf['nearest_water'] = distance
gdf[['hindex', 'nearest_water']].to_parquet(f'../../urbangrammar_samba/spatial_signatures/functional/water/water_{chunk}.pq')
return f"Chunk {chunk} processed sucessfully in {time() - s} seconds."
Convolutions¶
Functional characters which do not express the tendency in the spatial context are contextualised using the same method applied to morphometric data - as the 1st, 2nd and 3rd quartile weigted by inverse distance based on cells within 10th order of contiguity. The metdo is applied to:
population
night lights
workplace population
CORINE
NDVI
cross_chunk = pd.read_parquet('../../urbangrammar_samba/spatial_signatures/cross-chunk_indices_10.pq')
def convolute(chunk_id):
s = time()
pop = pd.read_parquet(f"../../urbangrammar_samba/spatial_signatures/functional/population/pop_{chunk_id}")
nl = pd.read_parquet(f"../../urbangrammar_samba/spatial_signatures/functional/night_lights/nl_{chunk_id}")
workplace = pd.read_parquet(f"../../urbangrammar_samba/spatial_signatures/functional/workplace/pop_{chunk_id}")
corine = pd.read_parquet(f"../../urbangrammar_samba/spatial_signatures/functional/corine/corine_{chunk_id}.pq")
ndvi = pd.read_parquet(f"../../urbangrammar_samba/functional_data/ndvi/ndvi_tess_{chunk_id}.pq")
combined = pop.merge(nl, on='hindex').merge(workplace, on='hindex').merge(corine, on='hindex').merge(ndvi.rename({'mean': 'ndvi'}), on='hindex')
combined['keep'] = True
# add neighbouring cells from other chunks
cross_chunk_cells = []
for chunk, inds in cross_chunk.loc[chunk_id].indices.iteritems():
add_pop = pd.read_parquet(f"../../urbangrammar_samba/spatial_signatures/functional/population/pop_{chunk}").iloc[inds]
add_nl = pd.read_parquet(f"../../urbangrammar_samba/spatial_signatures/functional/night_lights/nl_{chunk}").iloc[inds]
add_workplace = pd.read_parquet(f"../../urbangrammar_samba/spatial_signatures/functional/workplace/pop_{chunk}").iloc[inds]
add_corine = pd.read_parquet(f"../../urbangrammar_samba/spatial_signatures/functional/corine/corine_{chunk}.pq").iloc[inds]
add_ndvi = pd.read_parquet(f"../../urbangrammar_samba/functional_data/ndvi/ndvi_tess_{chunk}.pq").iloc[inds]
add_combined = add_pop.merge(add_nl, on='hindex').merge(add_workplace, on='hindex').merge(add_corine, on='hindex').merge(add_ndvi.rename({'mean': 'ndvi'}), on='hindex')
add_combined['keep'] = False
cross_chunk_cells.append(add_combined)
df = combined.append(pd.concat(cross_chunk_cells, ignore_index=True), ignore_index=True).set_index('hindex')
# read W
W = libpysal.weights.WSP(scipy.sparse.load_npz(f"../../urbangrammar_samba/spatial_signatures/weights/w10_10_distance_circles_{chunk_id}.npz")).to_W()
characters = df.columns
# prepare dictionary to store results
convolutions = {}
for c in characters:
convolutions[c] = []
# measure convolutions
for i in range(len(df)):
neighbours = W.neighbors[i]
vicinity = df.iloc[neighbours]
distance = W.weights[i]
distance_decay = 1 / np.array(distance)
for c in characters:
values = vicinity[c].values
sorter = np.argsort(values)
values = values[sorter]
nan_mask = np.isnan(values)
if nan_mask.all():
convolutions[c].append(np.array([np.nan] * 3))
else:
sample_weight = distance_decay[sorter][~nan_mask]
weighted_quantiles = np.cumsum(sample_weight) - 0.5 * sample_weight
weighted_quantiles /= np.sum(sample_weight)
interpolate = np.interp([.25, .5, .75], weighted_quantiles, values[~nan_mask])
convolutions[c].append(interpolate)
# save convolutions to parquet file
conv = pd.DataFrame(convolutions, index=df.index)
exploded = pd.concat([pd.DataFrame(conv[c].to_list(), columns=[c + '_q1', c + '_q2',c + '_q3']) for c in characters], axis=1)
convoluted = exploded[df.keep.values]
convoluted['hindex'] = combined['hindex'].values
pois = pd.read_parquet(f"../../urbangrammar_samba/spatial_signatures/functional/accessibility/access_{chunk_id}.pq")
water = pd.read_parquet(f'../../urbangrammar_samba/spatial_signatures/functional/water/water_{chunk_id}.pq')
retail_centres = pd.read_parquet(f'../../urbangrammar_samba/spatial_signatures/functional/retail_centre/retail_{chunk_id}.pq')
functional = convoluted.merge(pois, on='hindex').merge(water, on='hindex').merge(retail_centres, on='hindex').set_index('hindex')
functional.to_parquet(f"../../urbangrammar_samba/spatial_signatures/functional/functional/func_{chunk_id}.pq")
return f"Chunk {chunk_id} processed sucessfully in {time() - s} seconds."
# I am afraid that we would run out of memory if we did this in parallel
for i in tqdm(range(103), total=103):
print(convolute(i))