⚡️ PySpark by Example¶
Here are my worked examples from the very useful LinkedIn Learning course: PySpark by Example by Jonathan Fernandes : https://www.linkedin.com/learning/apache-pyspark-by-example
Learning PySpark by Example¶
Over the past 12 months or so I have been learning and playing with Apache Spark. I went through the brilliant book by Bill Chambers and Matei Zaharia, Spark: The Definitive Guide, that covers Spark in depth and gives plenty of code snippets one can try out in the spark-shell
. Whilst the book is indeed very detailed and provides great examples, the datasets that are included for you to get your hands on are on the order of Mb
's (with the exception of the activity-data
dataset used for the Streaming examples).
$ du -sh data/* | sort -rh
1.2G data/activity-data
90M data/retail-data
71M data/deep-learning-images
42M data/bike-data
208K data/flight-data
104K data/sample_libsvm_data.txt
32K data/sample_movielens_ratings.txt
32K data/regression
32K data/multiclass-classification
32K data/clustering
32K data/binary-classification
12K data/simple-ml-integers
12K data/flight-data-hive
8.0K data/simple-ml
4.0K data/simple-ml-scaling
4.0K data/README.md
For this reason, I wanted to try out PySpark by Example that plays with the City of Chicago's reported-crimes.csv
dataset which is around 1.6Gb
.
Another reason for why that course and the related dataset was appealing, was I could use it as an excuse to explore the plotting capabilities of Ploty, an interactive library for plotting data. This dataset had location data combined with distributions of data.
The Data¶
As mentioned, in the course, the City of Chicago, reported crimes data was used.
$ wget https://data.cityofchicago.org/api/views/ijzp-q8t2/rows.csv?accessType=DOWNLOAD
$ mv rows.csv\?accessType\=DOWNLOAD data/reported-crimes.csv
>>> from pyspark.sql.functions import to_timestamp,col,lit
>>> rc = spark.read.csv('data/reported-crimes.csv',header=True).withColumn('Date',to_timestamp(col('Date'),'MM/dd/yyyy hh:mm:ss a')).filter(col('Date') <= lit('2018-11-11'))
>>> rc.show(5)
+--------+-----------+-------------------+--------------------+----+-------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+
| ID|Case Number| Date| Block|IUCR| Primary Type| Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year| Updated On|Latitude|Longitude|Location|
+--------+-----------+-------------------+--------------------+----+-------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+
|11034701| JA366925|2001-01-01 11:00:00| 016XX E 86TH PL|1153| DECEPTIVE PRACTICE|FINANCIAL IDENTIT...| RESIDENCE| false| false|0412| 004| 8| 45| 11| null| null|2001|08/05/2017 03:50:...| null| null| null|
|11227287| JB147188|2017-10-08 03:00:00| 092XX S RACINE AVE|0281|CRIM SEXUAL ASSAULT| NON-AGGRAVATED| RESIDENCE| false| false|2222| 022| 21| 73| 02| null| null|2017|02/11/2018 03:57:...| null| null| null|
|11227583| JB147595|2017-03-28 14:00:00| 026XX W 79TH ST|0620| BURGLARY| UNLAWFUL ENTRY| OTHER| false| false|0835| 008| 18| 70| 05| null| null|2017|02/11/2018 03:57:...| null| null| null|
|11227293| JB147230|2017-09-09 20:17:00|060XX S EBERHART AVE|0810| THEFT| OVER $500| RESIDENCE| false| false|0313| 003| 20| 42| 06| null| null|2017|02/11/2018 03:57:...| null| null| null|
|11227634| JB147599|2017-08-26 10:00:00| 001XX W RANDOLPH ST|0281|CRIM SEXUAL ASSAULT| NON-AGGRAVATED| HOTEL/MOTEL| false| false|0122| 001| 42| 32| 02| null| null|2017|02/11/2018 03:57:...| null| null| null|
+--------+-----------+-------------------+--------------------+----+-------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+
only showing top 5 rows
>>> rc.columns
['ID', 'Case Number', 'Date', 'Block', 'IUCR', 'Primary Type', 'Description', 'Location Description', 'Arrest', 'Domestic', 'Beat', 'District', 'Ward', 'Community Area', 'FBI Code', 'X Coordinate', 'Y Coordinate', 'Year', 'Updated On', 'Latitude', 'Longitude', 'Location']
Another dataset is used,
$ wget -O data/police-stations.csv https://data.cityofchicago.org/api/views/z8bn-74gv/rows.csv?accessType=DOWNLOAD
>>> ps = spark.read.csv("data/police-stations.csv", header=True)
>>> ps.show(5, truncate=False)
+------------+--------------+--------------------+-------+-----+-----+-------------------------------------------------------------------------------+------------+------------+------------+------------+------------+-----------+------------+-------------------------------+
|DISTRICT |DISTRICT NAME |ADDRESS |CITY |STATE|ZIP |WEBSITE |PHONE |FAX |TTY |X COORDINATE|Y COORDINATE|LATITUDE |LONGITUDE |LOCATION |
+------------+--------------+--------------------+-------+-----+-----+-------------------------------------------------------------------------------+------------+------------+------------+------------+------------+-----------+------------+-------------------------------+
|Headquarters|Headquarters |3510 S Michigan Ave |Chicago|IL |60653|http://home.chicagopolice.org |null |null |null |1177731.401 |1881697.404 |41.83070169|-87.62339535|(41.8307016873, -87.6233953459)|
|1 |Central |1718 S State St |Chicago|IL |60616|http://home.chicagopolice.org/community/districts/1st-district-central/ |312-745-4290|312-745-3694|312-745-3693|1176569.052 |1891771.704 |41.85837259|-87.62735617|(41.8583725929, -87.627356171) |
|6 |Gresham |7808 S Halsted St |Chicago|IL |60620|http://home.chicagopolice.org/community/districts/6th-district-gresham/ |312-745-3617|312-745-3649|312-745-3639|1172283.013 |1853022.646 |41.75213684|-87.64422891|(41.7521368378, -87.6442289066)|
|11 |Harrison |3151 W Harrison St |Chicago|IL |60612|http://home.chicagopolice.org/community/districts/11th-district-harrison/ |312-746-8386|312-746-4281|312-746-5151|1155244.069 |1897148.755 |41.87358229|-87.70548813|(41.8735822883, -87.705488126) |
|16 |Jefferson Park|5151 N Milwaukee Ave|Chicago|IL |60630|http://home.chicagopolice.org/community/districts/16th-district-jefferson-park/|312-742-4480|312-742-4421|312-742-4423|1138480.758 |1933660.473 |41.97409445|-87.76614884|(41.9740944511, -87.7661488432)|
+------------+--------------+--------------------+-------+-----+-----+-------------------------------------------------------------------------------+------------+------------+------------+------------+------------+-----------+------------+-------------------------------+
only showing top 5 rows
>>> ps.columns
['DISTRICT', 'DISTRICT NAME', 'ADDRESS', 'CITY', 'STATE', 'ZIP', 'WEBSITE', 'PHONE', 'FAX', 'TTY', 'X COORDINATE', 'Y COORDINATE', 'LATITUDE', 'LONGITUDE', 'LOCATION']
Exploratory Data Analysis and Challenge Questions¶
Before we do this, let's cache the dataset in memory for faster querying, this will alleviate us from the burden of reading from disk each time we want to run a query.
>>> rc.cache()
DataFrame[ID: string, Case Number: string, Date: timestamp, Block: string, IUCR: string, Primary Type: string, Description: string, Location Description: string, Arrest: string, Domestic: string, Beat: string, District: string, Ward: string, Community Area: string, FBI Code: string, X Coordinate: string, Y Coordinate: string, Year: string, Updated On: string, Latitude: string, Longitude: string, Location: string]
>>> rc.count()
6752020
Note, the cache()
command is evaluated lazily, so an action is required to execute it. Here we simply do a count()
action to ensure cache()
is run.
Display only the first 4 rows of the column names Case Number, Date and Arrest¶
>>> rc.select('Case Number', 'Date', 'Arrest').show(4)
+-----------+-------------------+------+
|Case Number| Date|Arrest|
+-----------+-------------------+------+
| JA366925|2001-01-01 11:00:00| false|
| JB147188|2017-10-08 03:00:00| false|
| JB147595|2017-03-28 14:00:00| false|
| JB147230|2017-09-09 20:17:00| false|
+-----------+-------------------+------+
only showing top 4 rows
What are the top 10 number of reported crimes by Primary type, in descending order of occurrence?¶
>>> rc.groupBy('Primary Type').count().show(5)
+--------------------+-----+
| Primary Type|count|
+--------------------+-----+
|OFFENSE INVOLVING...|45709|
|CRIMINAL SEXUAL A...| 333|
| STALKING| 3384|
|PUBLIC PEACE VIOL...|47780|
| OBSCENITY| 582|
+--------------------+-----+
only showing top 5 rows
>>> rc.groupBy('Primary Type').count().orderBy('count', ascending=False).show(10)
+-------------------+-------+
| Primary Type| count|
+-------------------+-------+
| THEFT|1418293|
| BATTERY|1232001|
| CRIMINAL DAMAGE| 771399|
| NARCOTICS| 711609|
| OTHER OFFENSE| 418802|
| ASSAULT| 418479|
| BURGLARY| 388009|
|MOTOR VEHICLE THEFT| 314101|
| DECEPTIVE PRACTICE| 265567|
| ROBBERY| 255566|
+-------------------+-------+
only showing top 10 rows
What percentage of reported crimes resulted in an arrest?¶
>>> rc.select('Arrest').distinct().show()
+------+
|Arrest|
+------+
| false|
| true|
+------+
>>> rc.where(col('Arrest') == 'true').count() / rc.select('Arrest').count()
0.2775467193521346
What are the top 3 locations for reported crimes?¶
>>> rc.groupBy('Location Description').count().orderBy('count', ascending=False).show(3)
+--------------------+-------+
|Location Description| count|
+--------------------+-------+
| STREET|1770359|
| RESIDENCE|1144528|
| APARTMENT| 698091|
+--------------------+-------+
only showing top 3 rows
What is the most frequently reported non-criminal activity?¶
>>> rc.select('Primary Type').distinct().count()
36
>>> rc.select('Primary Type').distinct().orderBy(col('Primary Type')).show(36, truncate=False)
+---------------------------------+
|Primary Type |
+---------------------------------+
|ARSON |
|ASSAULT |
|BATTERY |
|BURGLARY |
|CONCEALED CARRY LICENSE VIOLATION|
|CRIM SEXUAL ASSAULT |
|CRIMINAL DAMAGE |
|CRIMINAL SEXUAL ASSAULT |
|CRIMINAL TRESPASS |
|DECEPTIVE PRACTICE |
|DOMESTIC VIOLENCE |
|GAMBLING |
|HOMICIDE |
|HUMAN TRAFFICKING |
|INTERFERENCE WITH PUBLIC OFFICER |
|INTIMIDATION |
|KIDNAPPING |
|LIQUOR LAW VIOLATION |
|MOTOR VEHICLE THEFT |
|NARCOTICS |
|NON - CRIMINAL |
|NON-CRIMINAL |
|NON-CRIMINAL (SUBJECT SPECIFIED) |
|OBSCENITY |
|OFFENSE INVOLVING CHILDREN |
|OTHER NARCOTIC VIOLATION |
|OTHER OFFENSE |
|PROSTITUTION |
|PUBLIC INDECENCY |
|PUBLIC PEACE VIOLATION |
|RITUALISM |
|ROBBERY |
|SEX OFFENSE |
|STALKING |
|THEFT |
|WEAPONS VIOLATION |
+---------------------------------+
>>> nc = rc.filter( (col('Primary Type') == 'NON - CRIMINAL') | (col('Primary Type') == 'NON-CRIMINAL ') | (col('Primary Type') == 'NON-CRIMINAL (SUBJECT SPECIFIED)') )
>>> nc.show(5, truncate=False)
+--------+-----------+-------------------+---------------------+----+--------------+-----------------+-------------------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+----------------------+------------+-------------+-----------------------------+
|ID |Case Number|Date |Block |IUCR|Primary Type |Description |Location Description |Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|Updated On |Latitude |Longitude |Location |
+--------+-----------+-------------------+---------------------+----+--------------+-----------------+-------------------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+----------------------+------------+-------------+-----------------------------+
|10062441|HY250685 |2015-05-07 13:20:00|012XX S HARDING AVE |5114|NON - CRIMINAL|FOID - REVOCATION|RESIDENCE |false |false |1011|010 |24 |29 |26 |1150243 |1894129 |2015|02/10/2018 03:50:01 PM|41.865394646|-87.723928428|(41.865394646, -87.723928428)|
|10064717|HY253344 |2015-05-08 13:15:00|051XX S WENTWORTH AVE|5114|NON - CRIMINAL|FOID - REVOCATION|POLICE FACILITY/VEH PARKING LOT|false |false |0225|002 |3 |37 |26 |1175826 |1871120 |2015|02/10/2018 03:50:01 PM|41.80171934 |-87.630703621|(41.80171934, -87.630703621) |
|10072565|HY261001 |2015-05-14 10:30:00|006XX N WELLS ST |5114|NON - CRIMINAL|FOID - REVOCATION|STREET |false |false |1832|018 |42 |8 |26 |1174623 |1904537 |2015|02/10/2018 03:50:01 PM|41.89344506 |-87.634117632|(41.89344506, -87.634117632) |
|10109156|HY297801 |2015-06-12 09:00:00|053XX S NEVA AVE |5114|NON - CRIMINAL|FOID - REVOCATION|RESIDENCE |false |false |0811|008 |23 |56 |26 |1129584 |1868411 |2015|02/10/2018 03:50:01 PM|41.795197456|-87.800355525|(41.795197456, -87.800355525)|
|10115077|HY304017 |2015-06-16 19:00:00|081XX S WHIPPLE ST |5114|NON - CRIMINAL|FOID - REVOCATION|RESIDENCE |false |false |0835|008 |18 |70 |26 |1157460 |1850515 |2015|02/10/2018 03:50:01 PM|41.745568408|-87.698616805|(41.745568408, -87.698616805)|
+--------+-----------+-------------------+---------------------+----+--------------+-----------------+-------------------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+----------------------+------------+-------------+-----------------------------+
only showing top 5 rows
>>> nc.groupBy(col('Description')).count().orderBy('count', ascending=False).show(truncate=False)
+--------------------------------------+-----+
|Description |count|
+--------------------------------------+-----+
|FOID - REVOCATION |38 |
|NOTIFICATION OF CIVIL NO CONTACT ORDER|9 |
+--------------------------------------+-----+
Using a bar chart, plot which day of the week has the most number of reported crime.¶
>>> from pyspark.sql.functions import count, avg
>>> ss = rc.groupBy(dayofweek(col('Date')), date_format(col('Date'), 'E')).agg(count("*")).show()
+---------------+--------------------+--------+
|dayofweek(Date)|date_format(Date, E)|count(1)|
+---------------+--------------------+--------+
| 2| Mon| 952646|
| 6| Fri| 1016882|
| 1| Sun| 911174|
| 5| Thu| 964457|
| 4| Wed| 973801|
| 3| Tue| 967965|
| 7| Sat| 965095|
+---------------+--------------------+--------+
>>> ss = rc.groupBy(dayofweek(col('Date')), date_format(col('Date'), 'E')).count().orderBy('dayofweek(Date)')
>>> ss.show()
+---------------+--------------------+-------+
|dayofweek(Date)|date_format(Date, E)| count|
+---------------+--------------------+-------+
| 1| Sun| 911174|
| 2| Mon| 952646|
| 3| Tue| 967965|
| 4| Wed| 973801|
| 5| Thu| 964457|
| 6| Fri|1016882|
| 7| Sat| 965095|
+---------------+--------------------+-------+
>>> type(ss)
<class 'pyspark.sql.dataframe.DataFrame'>
>>> sbar = ss.withColumn("Day of Week", col("date_format(Date, E)"))
>>> import plotly.express as px
>>> fig = px.bar(sbar.toPandas(), x="Day of Week", y="count")
>>> fig.show()
Maps¶
Now let's explore plotly
's functionality for plotting data with lat, long co-ordinates...
Below is a figure made from a sub-sample of the data of 100 rows.
Let's scale this up! Now we'll try for 50, 000 points:
Awesome! Now I would have loved to show all points (800,000) but my laptop crashes each time I try, I assume this is simply a memory issue and on other laptops might well be fine..
Conclusions¶
Playing with this dataset has been fun and it has been interesting to follow the course at the same time exploring plotly
. I've put a few points on things I learned whilst writing this post. Overall, I have learned at lot and look forward to exploring more "big-data"-sets with pyspark
and plotly
Things I Learned Along the Way¶
-
A few key things-I-learned during this post was how to embed interactive
plotly
figures into markdown such that they can be rendered into the blog with ease.This can simply be done using the
to_html(..)
function:This spits out aimport plotly plotly.io.to_html(fig, include_plotlyjs=False, full_html=False)
<div>
element one can then place into their desired markdown, which will then translate as renderedHTML
.To ensure just the
<div>
element is returned,full_html=False
is required. Another thing to remember is that this will return the element as a string, so the leading and trailing apostophe's that make it a string need to be removed. In the process of discovering this, a potential "bug" was found in this actual function, resulting in excessive\n
characters being generated. So, the actual function that has been used for this post is:import importlib.util spec = importlib.util.spec_from_file_location("plotly", "/Users/tallamjr/github/forks/plotly.py/packages/python/plotly/plotly/io/_html.py") foo = importlib.util.module_from_spec(spec) spec.loader.exec_module(foo) foo.to_html(fig, include_plotlyjs=False, full_html=False)
Which points to a forked version of the
plotly
codebase while I have an outstanding PR waiting to be reviewed.A final thing to mention, is that in order for all of the plots above to show up at all, even with the
<div>
elements, one needs to make sure to include the necessary Javascript tags. Therefore, in thehead.html
file for this blog, there exists:$ sed -n 14,17p layouts/partials/head.html <!-- Plotly embeddings REF: http://www.kellieottoboni.com/posts/2017/08/plotly-markup/ ================================================== --> <script src="https://cdn.plot.ly/plotly-latest.min.js"></script>
-
Another thing I discovered was how to allow for better formatting of the
.show()
command on Spark DataFrames. My approach is explained in this StackOverflow post for pyspark show dataframe as table with horizontal scroll in ipython notebook and Improve PySpark DataFrame.show output to fit Jupyter notebook:Adding to the answers given above by @karan-singla and @vijay-jangir, a handy one-liner to comment out the `white-space: pre-wrap` styling can be done like so: $ awk -i inplace '/pre-wrap/ {$0="/*"$0"*/"}1' $(dirname `python -c "import notebook as nb;print(nb.__file__)"`)/static/style/style.min.css This translates as; use `awk` to update _inplace_ lines that contain `pre-wrap` to be surrounded by `*/ -- */` i.e. comment out, on the file found in `styles.css` found in your working Python environment. This, in theory, can then be used as an alias if one uses multiple environments, say with Anaconda. - https://stackoverflow.com/a/24884616/4521950 - https://stackoverflow.com/questions/16529716/save-modifications-in-place-with-awk
-
Finally, this is not necessarily something I learned during this post, but it opened my eyes to the possibilities that are available when using
nbconvert
The notebook for this post has been rendered here using the following command:
$ jupyter nbconvert --ExecutePreprocessor.kernel_name=python --ExecutePreprocessor.timeout=600 --to html --execute PySpark-by-Example.ipynb --output-dir /Users/tallamjr/www/blog/static/notebooks
After look for ways to link point number 1. above and how one can add custom css
, I discovered the numerous customisations one can do. Some example can be found at https://github.com/jupyter/nbconvert-examples
References and Resources¶
- The notebooks for this post can be found at here
- SO:Calling Java/Scala function from a task
- SO:Spark performance for Scala vs Python
- PySpark Internals