Posts

Showing posts with the label Apache Spark

Automatically And Elegantly Flatten DataFrame In Spark SQL

Answer : The short answer is, there's no "accepted" way to do this, but you can do it very elegantly with a recursive function that generates your select(...) statement by walking through the DataFrame.schema . The recursive function should return an Array[Column] . Every time the function hits a StructType , it would call itself and append the returned Array[Column] to its own Array[Column] . Something like: import org.apache.spark.sql.Column import org.apache.spark.sql.types.StructType import org.apache.spark.sql.functions.col def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = { schema.fields.flatMap(f => { val colName = if (prefix == null) f.name else (prefix + "." + f.name) f.dataType match { case st: StructType => flattenSchema(st, colName) case _ => Array(col(colName)) } }) } You would then use it like this: df.select(flattenSchema(df.schema):_*) I am improving my previous a...

Collect() Or ToPandas() On A Large DataFrame In Pyspark/EMR

Answer : TL;DR I believe you're seriously underestimating memory requirements. Even assuming that data is fully cached, storage info will show only a fraction of peak memory required for bringing data back to the driver. First of all Spark SQL uses compressed columnar storage for caching. Depending on the data distribution and compression algorithm in-memory size can be much smaller than the uncompressed Pandas output, not to mention plain List[Row] . The latter also stores column names, further increasing memory usage. Data collection is indirect, with data being stored both on the JVM side and Python side. While JVM memory can be released once data goes through socket, peak memory usage should account for both. Plain toPandas implementation collects Rows first, then creates Pandas DataFrame locally. This further increases (possibly doubles) memory usage. Luckily this part is already addressed on master (Spark 2.3), with more direct approach using Arrow serialization...

Apache Flink Vs Apache Spark As Platforms For Large-scale Machine Learning?

Answer : Disclaimer: I'm a PMC member of Apache Flink. My answer focuses on the differences of executing iterations in Flink and Spark. Apache Spark executes iterations by loop unrolling. This means that for each iteration a new set of tasks/operators is scheduled and executed. Spark does that very efficiently because it is very good at low-latency task scheduling (same mechanism is used for Spark streaming btw.) and caches data in-memory across iterations. Therefore, each iteration operates on the result of the previous iteration which is held in memory. In Spark, iterations are implemented as regular for-loops (see Logistic Regression example). Flink executes programs with iterations as cyclic data flows. This means that a data flow program (and all its operators) is scheduled just once and the data is fed back from the tail of an iteration to its head. Basically, data is flowing in cycles around the operators within an iteration. Since operators are just scheduled once, t...

Apply OneHotEncoder For Several Categorical Columns In SparkMlib

Answer : Spark >= 3.0 : In Spark 3.0 OneHotEncoderEstimator has been renamed to OneHotEncoder : from pyspark.ml.feature import OneHotEncoderEstimator, OneHotEncoderModel encoder = OneHotEncoderEstimator(...) with from pyspark.ml.feature import OneHotEncoder, OneHotEncoderModel encoder = OneHotEncoder(...) Spark >= 2.3 You can use newly added OneHotEncoderEstimator : from pyspark.ml.feature import OneHotEncoderEstimator, OneHotEncoderModel encoder = OneHotEncoderEstimator( inputCols=[indexer.getOutputCol() for indexer in indexers], outputCols=[ "{0}_encoded".format(indexer.getOutputCol()) for indexer in indexers] ) assembler = VectorAssembler( inputCols=encoder.getOutputCols(), outputCol="features" ) pipeline = Pipeline(stages=indexers + [encoder, assembler]) pipeline.fit(df).transform(df) Spark < 2.3 It is not possible. StringIndexer transformer operates only on a single column at the time so you'll ne...

AWS EMR Spark Python Logging

Answer : I've found that EMR's logging for particular steps almost never winds up in the controller or stderr logs that get pulled alongside the step in the AWS console. Usually I find what I want in the job's container logs (and usually it's in stdout). These are typically at a path like s3://mybucket/logs/emr/spark/j-XXXXXX/containers/application‌​_XXXXXXXXX/container‌​_XXXXXXX/... . You might need to poke around within the various application_... and container_... directories within containers . That last container directory should have a stdout.log and stderr.log .

Apache Spark: Map Vs MapPartitions?

Answer : Imp. TIP : Whenever you have heavyweight initialization that should be done once for many RDD elements rather than once per RDD element, and if this initialization, such as creation of objects from a third-party library, cannot be serialized (so that Spark can transmit it across the cluster to the worker nodes), use mapPartitions() instead of map() . mapPartitions() provides for the initialization to be done once per worker task/thread/partition instead of once per RDD data element for example : see below. val newRd = myRdd.mapPartitions(partition => { val connection = new DbConnection /*creates a db connection per partition*/ val newPartition = partition.map(record => { readMatchingFromDB(record, connection) }).toList // consumes the iterator, thus calls readMatchingFromDB connection.close() // close dbconnection here newPartition.iterator // create a new iterator }) Q2. does flatMap behave like map or like mapPart...

Calculate The Standard Deviation Of Grouped Data In A Spark DataFrame

Image
Answer : Spark 1.6+ You can use stddev_pop to compute population standard deviation and stddev / stddev_samp to compute unbiased sample standard deviation: import org.apache.spark.sql.functions.{stddev_samp, stddev_pop} selectedData.groupBy($"user").agg(stdev_pop($"duration")) Spark 1.5 and below ( The original answer ): Not so pretty and biased (same as the value returned from describe ) but using formula: you can do something like this: import org.apache.spark.sql.functions.sqrt selectedData .groupBy($"user") .agg((sqrt( avg($"duration" * $"duration") - avg($"duration") * avg($"duration") )).alias("duration_sd")) You can of course create a function to reduce the clutter: import org.apache.spark.sql.Column def mySd(col: Column): Column = { sqrt(avg(col * col) - avg(col) * avg(col)) } df.groupBy($"user").agg(mySd($"duration").a...

Apache Spark: Setting Executor Instances Does Not Change The Executors

Answer : Increase yarn.nodemanager.resource.memory-mb in yarn-site.xml With 12g per node you can only launch driver(3g) and 2 executors(11g). Node1 - driver 3g (+7% overhead) Node2 - executor1 11g (+7% overhead) Node3 - executor2 11g (+7% overhead) now you are requesting for executor3 of 11g and no node has 11g memory available. for 7% overhead refer spark.yarn.executor.memoryOverhead and spark.yarn.driver.memoryOverhead in https://spark.apache.org/docs/1.2.0/running-on-yarn.html Note that yarn.nodemanager.resource.memory-mb is total memory that a single NodeManager can allocate across all containers on one node. In your case, since yarn.nodemanager.resource.memory-mb = 12G , if you add up the memory allocated to all YARN containers on any single node, it cannot exceed 12G. You have requested 11G ( -executor-memory 11G ) for each Spark Executor container. Though 11G is less than 12G, this still won't work. Why ? Because you have to account for spark.yar...