Posts

Showing posts with the label Emr

Collect() Or ToPandas() On A Large DataFrame In Pyspark/EMR

Answer : TL;DR I believe you're seriously underestimating memory requirements. Even assuming that data is fully cached, storage info will show only a fraction of peak memory required for bringing data back to the driver. First of all Spark SQL uses compressed columnar storage for caching. Depending on the data distribution and compression algorithm in-memory size can be much smaller than the uncompressed Pandas output, not to mention plain List[Row] . The latter also stores column names, further increasing memory usage. Data collection is indirect, with data being stored both on the JVM side and Python side. While JVM memory can be released once data goes through socket, peak memory usage should account for both. Plain toPandas implementation collects Rows first, then creates Pandas DataFrame locally. This further increases (possibly doubles) memory usage. Luckily this part is already addressed on master (Spark 2.3), with more direct approach using Arrow serialization...

AWS EMR Spark Python Logging

Answer : I've found that EMR's logging for particular steps almost never winds up in the controller or stderr logs that get pulled alongside the step in the AWS console. Usually I find what I want in the job's container logs (and usually it's in stdout). These are typically at a path like s3://mybucket/logs/emr/spark/j-XXXXXX/containers/application‌​_XXXXXXXXX/container‌​_XXXXXXX/... . You might need to poke around within the various application_... and container_... directories within containers . That last container directory should have a stdout.log and stderr.log .