Как определяется количество разделов RDD в Apache Spark?

Вопрос

  1. Как определяется количество разделов Spark?
  2. Нужно ли где-то явно указывать количество доступных ядер ЦП, чтобы число разделов было одинаковым (например, numPartition arg метода параллелизации, но затем нужно обновлять программу всякий раз, когда количество ядер изменилось)?

Фон

Установлен кластер Spark, как в Среде, без изменений в файлах spark-env.sh, spark-defaults.conf и SparkConf в программах.

Для программы N Queen количество разделов было 2, и только одному узлу были назначены задачи. Для программы подсчета слов количество разделов было 22, и задачи были распределены по всем узлам. Используется spark-submit для обеих программ.

программы

Королева

val sparkConf = new SparkConf().setAppName("NQueen").set("spark.files.overwrite", "true")
val sc = new SparkContext(sparkConf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
def isSafe(column: Int, placement: List[Int]): Boolean = { ... }
def placeQueensAt(row: Int, placements: Set[List[Int]]): Set[List[Int]] = { ... }

val initial = sc.parallelize(queensAtFirst)
//val initial = sc.parallelize(queensAtFirst, 12)
println("Partitions = %d".format(initial.partitions.size))

val result = initial.flatMap(x => placeQueensAt(1, Set(x))).collect()

Количество слов

val sparkConf = new SparkConf().setAppName("WordCount").set("spark.files.overwrite", "true")
val sc = new SparkContext(sparkConf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val lines = sc.textFile("hdfs:/user/wynadmin/sfpd.csv")
println("Patitions = %d".format(lines.partitions.size))

val words = for (line <- lines; word <- line.split(",") if word.toLowerCase.matches("[a-z]+")) yield (word, 1)
val counts = words.reduceByKey(_ + _)

Среда

Spark 2.0.1 (3 узла с 4 процессорами в каждом) в Ubuntu 14.04.
Автономное развертывание (не YARN и не Mesos)

1 ответ

Решение

Информацию можно найти в разделе " Как: настроить работу Apache Spark" (часть 2).

Как определяется это число? Как Spark группирует СДР по этапам, описано в предыдущем посте. (В качестве быстрого напоминания такие преобразования, как перераспределение и reduByKey, вызывают границы этапа.) Количество задач на этапе соответствует количеству разделов в последнем СДР на этапе. Количество разделов в RDD такое же, как и количество разделов в RDD, от которых оно зависит, за исключением нескольких: преобразование coalesce позволяет создать RDD с меньшим количеством разделов, чем его родительский RDD, преобразование объединения создает RDD с сумма количества разделов его родителей, и декартово создает RDD с их продуктом.

А как насчет СДР без родителей? Разделы RDD, создаваемые textFile или hadoopFile, определяются в соответствии с используемым базовым MapReduce InputFormat. Обычно для каждого читаемого блока HDFS создается раздел. Разделы для RDD, создаваемых распараллеливанием, берутся из параметра, заданного пользователем, или из spark.default.parallelism, если он не задан.

Параметр spark.default.parallelism исправил симптом.

--conf spark.default.parallelism=24

Установка на 12 (то же самое с количеством ядер) приводит к неравномерному использованию узлов.

Другие вопросы по тегам