Posts

Showing posts with the label Scala

Automatically And Elegantly Flatten DataFrame In Spark SQL

Answer : The short answer is, there's no "accepted" way to do this, but you can do it very elegantly with a recursive function that generates your select(...) statement by walking through the DataFrame.schema . The recursive function should return an Array[Column] . Every time the function hits a StructType , it would call itself and append the returned Array[Column] to its own Array[Column] . Something like: import org.apache.spark.sql.Column import org.apache.spark.sql.types.StructType import org.apache.spark.sql.functions.col def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = { schema.fields.flatMap(f => { val colName = if (prefix == null) f.name else (prefix + "." + f.name) f.dataType match { case st: StructType => flattenSchema(st, colName) case _ => Array(col(colName)) } }) } You would then use it like this: df.select(flattenSchema(df.schema):_*) I am improving my previous a...

Appending An Element To The End Of A List In Scala

Answer : List(1,2,3) :+ 4 Results in List[Int] = List(1, 2, 3, 4) Note that this operation has a complexity of O(n). If you need this operation frequently, or for long lists, consider using another data type (e.g. a ListBuffer). That's because you shouldn't do it (at least with an immutable list). If you really really need to append an element to the end of a data structure and this data structure really really needs to be a list and this list really really has to be immutable then do eiher this: (4 :: List(1,2,3).reverse).reverse or that: List(1,2,3) ::: List(4) Lists in Scala are not designed to be modified. In fact, you can't add elements to a Scala List ; it's an immutable data structure , like a Java String. What you actually do when you "add an element to a list" in Scala is to create a new List from an existing List . (Source) Instead of using lists for such use cases, I suggest to either use an ArrayBuffer or a ListBuffer . Those data...

Add An Item In A Seq In Scala

Answer : Two things. When you use :+ , the operation is left associative , meaning the element you're calling the method on should be on the left hand side. Now, Seq (as used in your example) refers to immutable.Seq . When you append or prepend an element, it returns a new sequence containing the extra element, it doesn't add it to the existing sequence. val newSeq = customerList :+ CustomerDetail("1", "Active", "Shougat") But appending an element means traversing the entire list in order to add an item, consider prepending: val newSeq = CustomerDetail("1", "Active", "Shougat") +: customerList A simplified example: scala> val original = Seq(1,2,3,4) original: Seq[Int] = List(1, 2, 3, 4) scala> val newSeq = 0 +: original newSeq: Seq[Int] = List(0, 1, 2, 3, 4) It might be worth pointing out that while the Seq append item operator, :+ , is left associative , the prepend operator, +: , is right as...

Automatically Update Sbt Dependencies To Latest Version

Answer : I've used the sbt-updates plugin for this purpose locally for many years—simply add it to your local sbt user configuration and then run sbt dependencyUpdates in your project directory, and you'll get a list of dependencies that have updates in Maven Central (or whatever other repositories you have configured for that project). The scala-steward bot builds on sbt-updates to provide GitHub pull requests for dependency updates. You can either add your projects to the main instance's configuration if they're open source (I do this personally for 15-20 projects, and while it's not perfect, it's hugely useful, and getting better all the time), or run your own instance (which I've not tried personally). Note that neither of these choices verifies that the dependency updates are compatible with each other—just that they're the latest. You'll still need to watch carefully for evictions, etc.

Best Way To Represent A Readline Loop In Scala?

Answer : How about this? val lines = Iterator.continually(reader.readLine()).takeWhile(_ != null).mkString Well, in Scala you can actually say: val lines = scala.io.Source.fromFile("file.txt").mkString But this is just a library sugar. See Read entire file in Scala? for other possiblities. What you are actually asking is how to apply functional paradigm to this problem. Here is a hint: Source.fromFile("file.txt").getLines().foreach {println} Do you get the idea behind this? foreach line in the file execute println function. BTW don't worry, getLines() returns an iterator, not the whole file. Now something more serious: lines filter {_.startsWith("ab")} map {_.toUpperCase} foreach {println} See the idea? Take lines (it can be an array, list, set, iterator, whatever that can be filtered and which contains an items having startsWith method) and filter taking only the items starting with "ab" . Now take every item and map ...

Apache Spark: Map Vs MapPartitions?

Answer : Imp. TIP : Whenever you have heavyweight initialization that should be done once for many RDD elements rather than once per RDD element, and if this initialization, such as creation of objects from a third-party library, cannot be serialized (so that Spark can transmit it across the cluster to the worker nodes), use mapPartitions() instead of map() . mapPartitions() provides for the initialization to be done once per worker task/thread/partition instead of once per RDD data element for example : see below. val newRd = myRdd.mapPartitions(partition => { val connection = new DbConnection /*creates a db connection per partition*/ val newPartition = partition.map(record => { readMatchingFromDB(record, connection) }).toList // consumes the iterator, thus calls readMatchingFromDB connection.close() // close dbconnection here newPartition.iterator // create a new iterator }) Q2. does flatMap behave like map or like mapPart...

Calculate The Standard Deviation Of Grouped Data In A Spark DataFrame

Image
Answer : Spark 1.6+ You can use stddev_pop to compute population standard deviation and stddev / stddev_samp to compute unbiased sample standard deviation: import org.apache.spark.sql.functions.{stddev_samp, stddev_pop} selectedData.groupBy($"user").agg(stdev_pop($"duration")) Spark 1.5 and below ( The original answer ): Not so pretty and biased (same as the value returned from describe ) but using formula: you can do something like this: import org.apache.spark.sql.functions.sqrt selectedData .groupBy($"user") .agg((sqrt( avg($"duration" * $"duration") - avg($"duration") * avg($"duration") )).alias("duration_sd")) You can of course create a function to reduce the clutter: import org.apache.spark.sql.Column def mySd(col: Column): Column = { sqrt(avg(col * col) - avg(col) * avg(col)) } df.groupBy($"user").agg(mySd($"duration").a...