- 2017 Jun 01
- Data Analysis
- #python, #Dask
Note: This post has interactive Bokeh graphics which may not render well on mobile devices. Try viewing the Jupyter notebook which underlies this post on NBViewer.
Part 1 : A Gentle Introduction to the Spatial Join¶
One problem I came across when analyzing the New York City Taxi Dataset, is that from 2009 to June 2016, both the starting and stopping locations of taxi trips were given as longitude and latitude points. After July 2016, to provide a degree of anonymity when releasing data to the public, the Taxi and Limousine Commission (TLC) only provides the starting and ending "taxi zones" of a trip, and a shapefile that specifies the boundaries, available here. Let's load this up in Geopandas, and set the coordinate system to 'epsg:4326', which is latitude and longitude coordinates.
We see that the geometry column consists of polygons (from Shapely) that have vertices defined by longitude and latitude points. Let's plot using bokeh, in order of ascending LocationID.
This is a familiar map of New York, with 262 taxi districts shown, colored by the id of the taxi district. I have added a random point (-73.966˚E, 40.78˚N) in magenta, which happens to fall in the middle of Central Park. Assigning a point as within a taxi zone is something humans can do easily, but on a computer it requires solving the point in polygon problem. Luckily the Shapely library provides an easy interface to such geometric operations in Python. But, point in polygon is computationally expensive, and using the Shapely library on 2.4 billion (latitude, longitude) pairs to assign taxi zones as in the NYC Taxi Dataset would take a modern single core cpu about four years. To speed this up, we calculate the bounding boxes for each taxi zone, which looks like:
Now, given a (longitude, latitude) coordinate pair, bounding boxes that contain that pair can be efficiently calculated with an R-tree. You can find an excellent introduction to R-trees here. Only the polygons (taxi zones) that have bounding boxes that contain the coordinate pair need to be examined, and then the point in Polygon is solved for those (hopefully) few taxi zones. This reduces computation by a factor of about 100-1000. This process, assigning coordinate pairs to taxi zones is one example of a spatial join. Geopandas provides a nice interface to efficient spatial joins in Python, and it takes care of calculating bounding boxes and R-trees for you, as this snippet shows.
gpd.sjoin(gpd.GeoDataFrame(crs={'init': 'epsg:4326'}, geometry=[Point(-73.966, 40.78)]),
df,
how='left', op='within')
This does the merge for a single point (drawn in magenta) on maps above, and correctly identifies it in Central Park
Part 2 : Spatial Joins at scale using Dask¶
In my NYC transit project, I download and process the Taxi dataset. Here I load up a single file from the taxi dataset (May 2016) into Dask, and show the first few rows and a few columns. The file is a bit large at 1.8GB, and Dask chooses to divide up the dataframe into 30 partitions for efficient calculations. Each partition is a pandas DataFrame, and dask takes care of all the logic to view the combination as a single DataFrame. Here are a few columns.
So each trip has pickup and dropoff (longitude, latitude) coordinate pairs. Just to give you a feel for the data, I plot the start and end locations of the first trip, ending in the East Village. Driving directions come with a great deal of additional complexity, so here I just plot an arrow, as the crow flies. A spatial join identifies the taxi zones as Clinton East and East Village.
gpd.sjoin(gpd.GeoDataFrame(crs={'init': 'epsg:4326'},
geometry=[Point(x, y), Point(x2, y2)]),
df,
how='left', op='within')
So, Dask DataFrames are just collections of Pandas DataFrames, and I know how to perform a spatial join on a Pandas DataFrame. Let's take advantage of Dask's map_partitions
function to do a spatial join with the taxi zones on every partition. Here is the function to do the spatial join, given a Pandas DataFrame, and the names of the longitude, latitude, and taxizone id columns.
def assign_taxi_zones(df, lon_var, lat_var, locid_var):
"""Joins DataFrame with Taxi Zones shapefile.
This function takes longitude values provided by `lon_var`, and latitude
values provided by `lat_var` in DataFrame `df`, and performs a spatial join
with the NYC taxi_zones shapefile.
The shapefile is hard coded in, as this function makes a hard assumption of
latitude and longitude coordinates. It also assumes latitude=0 and
longitude=0 is not a datapoint that can exist in your dataset. Which is
reasonable for a dataset of New York, but bad for a global dataset.
Only rows where `df.lon_var`, `df.lat_var` are reasonably near New York,
and `df.locid_var` is set to np.nan are updated.
Parameters
----------
df : pandas.DataFrame or dask.DataFrame
DataFrame containing latitudes, longitudes, and location_id columns.
lon_var : string
Name of column in `df` containing longitude values. Invalid values
should be np.nan.
lat_var : string
Name of column in `df` containing latitude values. Invalid values
should be np.nan
locid_var : string
Name of series to return.
"""
import geopandas
from shapely.geometry import Point
# make a copy since we will modify lats and lons
localdf = df[[lon_var, lat_var]].copy()
# missing lat lon info is indicated by nan. Fill with zero
# which is outside New York shapefile.
localdf[lon_var] = localdf[lon_var].fillna(value=0.)
localdf[lat_var] = localdf[lat_var].fillna(value=0.)
shape_df = geopandas.read_file('../shapefiles/taxi_zones.shp')
shape_df.drop(['OBJECTID', "Shape_Area", "Shape_Leng", "borough", "zone"],
axis=1, inplace=True)
shape_df = shape_df.to_crs({'init': 'epsg:4326'})
try:
local_gdf = geopandas.GeoDataFrame(
localdf, crs={'init': 'epsg:4326'},
geometry=[Point(xy) for xy in
zip(localdf[lon_var], localdf[lat_var])])
local_gdf = geopandas.sjoin(
local_gdf, shape_df, how='left', op='within')
return local_gdf.LocationID.rename(locid_var)
except ValueError as ve:
print(ve)
print(ve.stacktrace())
series = localdf[lon_var]
series = np.nan
return series
At Scale¶
Using the map_partitions
function, I apply the spatial join to each of the Pandas DataFrames that make up the Dask DataFrame. For simplicity, I just call the function twice, once for pickup locations, and once for dropoff locations. To assist dask in determining schema of returned data, we specify it as a column of floats (allowing for NaN values).
trips['pickup_taxizone_id'] = trips.map_partitions(
assign_taxi_zones, "pickup_longitude", "pickup_latitude",
"pickup_taxizone_id", meta=('pickup_taxizone_id', np.float64))
trips['dropoff_taxizone_id'] = trips.map_partitions(
assign_taxi_zones, "dropoff_longitude", "dropoff_latitude",
"dropoff_taxizone_id", meta=('dropoff_taxizone_id', np.float64))
trips[['pickup_taxizone_id', 'dropoff_taxizone_id']].head()
At this point, the trips Dask DataFrame will have valid taxizone_id information. Lets save this data to Parquet, which is a columnar format that is well supported in Dask and Apache Spark. This prevents Dask from recalculating the spatial join (very expensive) every time an operation on the trips DataFrame is required.
trips.to_parquet('trips_2016-05.parquet', has_nulls=True, object_encoding='json', compression="SNAPPY")
trips = dd.read_parquet('trips_2016-05.parquet', columns=['pickup_taxizone_id', 'dropoff_taxizone_id'])
To close this post out, I'll produce a heatmap of taxi dropoff locations, aggregated by taxi zone using Dask. Unsurprisingly (for New Yorkers at least) the vast majority of taxi dropoffs tend to be in Downtown and Midtown Manhattan. I will analyze this dataset further in future posts.
Summary¶
In this post I described the process of a Spatial Join, and doing the Spatial Join at scale on a cluster using Dask and Pandas. I glossed over some details that are important for the full New York Taxi Dataset, but my full code is available here on Github. In future posts, I will analyze this data more thoroughly, and possibly look into releasing the processed data as a parquet file for others to analyze.
Project on Github¶
A side note on spatial join performance¶
The spatial join as written above with GeoPandas, using the New York Taxi Dataset, can assign taxi zones to approxmately 40 million taxi trips per hour on a 4 GHz 4-core i5 system. A lot of the code that supports this join is some amalgamation of Python and wrapped C code.
This is approxmately a factor of two slower than performing the same spatial join in highly optimized PostGIS C/C++ code. However, PostGIS does not efficiently use multiple cores (at least without multiple spatial joins running simultaneously), and more importantly, the network and serialization overhead of a round trip to the PostgreSQL database puts PostgreSQL/PostGIS at roughly the same speed as the GeoPandas implementation I describe in this post, with vastly more moving parts to break.
Basically, Python is actually pretty fast for these kinds of data structure operations.