Automatically And Elegantly Flatten DataFrame In Spark SQL


Answer :

The short answer is, there's no "accepted" way to do this, but you can do it very elegantly with a recursive function that generates your select(...) statement by walking through the DataFrame.schema.

The recursive function should return an Array[Column]. Every time the function hits a StructType, it would call itself and append the returned Array[Column] to its own Array[Column].

Something like:

import org.apache.spark.sql.Column import org.apache.spark.sql.types.StructType import org.apache.spark.sql.functions.col  def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {   schema.fields.flatMap(f => {     val colName = if (prefix == null) f.name else (prefix + "." + f.name)      f.dataType match {       case st: StructType => flattenSchema(st, colName)       case _ => Array(col(colName))     }   }) } 

You would then use it like this:

df.select(flattenSchema(df.schema):_*) 

I am improving my previous answer and offering a solution to my own problem stated in the comments of the accepted answer.

This accepted solution creates an array of Column objects and uses it to select these columns. In Spark, if you have a nested DataFrame, you can select the child column like this: df.select("Parent.Child") and this returns a DataFrame with the values of the child column and is named Child. But if you have identical names for attributes of different parent structures, you lose the info about the parent and may end up with identical column names and cannot access them by name anymore as they are unambiguous.

This was my problem.

I found a solution to my problem, maybe it can help someone else as well. I called the flattenSchema separately:

val flattenedSchema = flattenSchema(df.schema) 

and this returned an Array of Column objects. Instead of using this in the select(), which would return a DataFrame with columns named by the child of the last level, I mapped the original column names to themselves as strings, then after selecting Parent.Child column, it renames it as Parent.Child instead of Child (I also replaced dots with underscores for my convenience):

val renamedCols = flattenedSchema.map(name => col(name.toString()).as(name.toString().replace(".","_"))) 

And then you can use the select function as shown in the original answer:

var newDf = df.select(renamedCols:_*) 

Just wanted to share my solution for Pyspark - it's more or less a translation of @David Griffin's solution, so it supports any level of nested objects.

from pyspark.sql.types import StructType, ArrayType    def flatten(schema, prefix=None):     fields = []     for field in schema.fields:         name = prefix + '.' + field.name if prefix else field.name         dtype = field.dataType         if isinstance(dtype, ArrayType):             dtype = dtype.elementType          if isinstance(dtype, StructType):             fields += flatten(dtype, prefix=name)         else:             fields.append(name)      return fields   df.select(flatten(df.schema)).show() 

Comments

Popular posts from this blog

Are Regular VACUUM ANALYZE Still Recommended Under 9.1?

Can Feynman Diagrams Be Used To Represent Any Perturbation Theory?