Saturday, May 14, 2016

Reading JSON Nested Array in Spark DataFrames

In a previous post on JSON data, I showed how to read nested JSON arrays with Spark DataFrames. Now that I am more familiar with the API, I can describe an easier way to access such data, using the explode() function. All of the example code is in Scala, on Spark 1.6.

Loading JSON data

Suppose you have a file with JSON data, with one JSON object per line:

{"name":"Michael", "schools":[{"sname":"stanford", "year":2010}, {"sname":"berkeley", "year":2012}]}
{"name":"Andy", "schools":[{"sname":"ucsb", "year":2011}]}

You can read it into a DataFrame with the SqlContext read() method:

>> val people = sqlContext.read.json("people.json")
people: org.apache.spark.sql.DataFrame

>> people.show()
+-------+--------------------+ | name| schools| +-------+--------------------+ |Michael|[[stanford,2010],...| | Andy| [[ucsb,2011]]| +-------+--------------------+

Notice that the second column "schools", is an Array type, and each element of the array is a Struct:

>> people.printSchema()
root |-- name: string (nullable = true) |-- schools: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- sname: string (nullable = true) | | |-- year: long (nullable = true)


Nested Array of Struct

Flatten / Explode an Array

If your JSON object contains nested arrays of structs, how will you access the elements of an array? One way is by flattening it. For instance, in the example above, each JSON object contains a "schools" array. We can simply flatten "schools" with the explode() function.

>> import org.apache.spark.sql.functions._
val flattened = people.select($"name", explode($"schools").as("schools_flat"))
flattened: org.apache.spark.sql.DataFrame

>> flattened.show()
+-------+---------------+ | name| schools_flat| +-------+---------------+ |Michael|[stanford,2010]| |Michael|[berkeley,2012]| | Andy| [ucsb,2011]| +-------+---------------+

Now each school is on a separate row. The new column "schools_flat" is of type Struct.

Select into Struct

Now you can select, for instance, all the school names within each struct, by using the DataFrame select() method. The struct has two fields: "sname" and "year". We will select only the school name, "sname":

>> val schools = flattened.select("name""schools_flat.sname")
schools: org.apache.spark.sql.DataFrame = [sname: string]

>> schools.show()
+-------+--------+
| name| sname| +-------+--------+ |Michael|stanford| |Michael|berkeley| | Andy| ucsb| +-------+--------+

There you have it! We have taken data that was nested as structs inside an array column and bubbled it up to a first-level column in a DataFrame. You can now manipulate that column with the standard DataFrame methods.

References


  1. The DataFrame API: http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrame
  2. The explode() function: http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$


7 comments:

  1. This comment has been removed by a blog administrator.

    ReplyDelete
  2. How to Achieve this using Java API.

    ReplyDelete
  3. Thanks for the post, this is awesome. Have you tried flattening when json is present in more that the first level? For example, if json was like
    {"name":"Michael", "schools":[{"sname":"stanford", "year":2010, "courses": [{"name": "Data Structures", "department": "Computer Science"}]}, {"sname":"berkeley", "year":2012}]}

    ReplyDelete
    Replies
    1. It's possible. You would have to call explode() twice.

      Delete
  4. This comment has been removed by the author.

    ReplyDelete