WWW.DISSERS.RU

БЕСПЛАТНАЯ ЭЛЕКТРОННАЯ БИБЛИОТЕКА

   Добро пожаловать!


Pages:     | 1 |   ...   | 2 | 3 || 5 |

Реализация MapReduce в Google ориентирована на вычислительную инфраструктуру, состоящую из большого числа недорогих серверов из массовых комплектующих. Как правило, это двухпроцессорные x86-машины с 4-8 Гб оперативной памяти, работающие под управлением Linux. Для соединения машин в кластере используется коммутируемый Gigabit Ethernet. Кластеры состоят из тысяч машин, поэтому постоянно возникают отказы отдельных узлов. Для хранения данных используются недорогие IDE-диски, подключенные к каждой из машин. Данные хранятся под управлением распределенной файловой системы GFS (см. раздел 1.1).

Напомним, что GFS использует репликацию для надежного хранения данных.

Запуском MapReduce-заданий на кластере управляет планировщик, который отслеживает состояние машин и подбирает группу машин для выполнения задания.

Вызовы функции map распределяются между несколькими машинами путем автоматического разбиения входных данных на M частей, размер каждой из которых составляет обычно 16-64 Мб. Полученные порции данных могут обрабатываться параллельно различными машинами. Вызовы reduce распределяются путем разбиения пространства промежуточных ключей на R частей, определяемых с помощью функции разбиения (partitioning function). По умолчанию используется функция hash(k2) mod R.

Число частей R и функция разбиения указываются пользователем в спецификации задания.

Рассмотрим детально процесс выполнения задания в MapReduce.

Запуск задания инициируется блокирующим вызовом внутри кода пользователя библиотеки MapReduce. После чего система разбивает входные файлы на M частей, размер которых контролируется с помощью конфигурационного параметра. Далее MapReduce осуществляет запуск копий программы на машинах кластера. Одна из копий программы играет роль управляющего (master), а остальные являются рабочими (worker). Управляющий процесс осуществляет распределение задач (M map-задач и R reduce-задач) между рабочими. Каждому свободному рабочему процессу назначается map- или reduce-задача. Для каждой из задач управляющий процесс хранит ее состояние (ожидает выполнения, выполняется, завершена) и идентификатор рабочего процесса (для выполняющихся и выполненных заданий).

Рабочий процесс, выполняющий map-задачу, считывает содержимое соответствующего фрагмента входных данных из GFS, выделяет из данных пары ключзначение и передает каждую из пар заданной пользователем функции map. Полученные промежуточные пары буферизуются в памяти и периодически записываются на локальный диск. При записи данные разбиваются на R частей, в соответствии с функцией разбиения. По завершении задачи, местоположения файлов с промежуточными данными передаются управляющему процессу. Для каждой из mapзадач управляющий процесс хранит местоположения и размеры R фрагментов с промежуточными данными, полученными задачей. Данная информация периодически обновляется по мере выполнения map-заданий и передается рабочим процессам, выполняющим reduce-задачи.

При получении от управляющего процесса информации о готовых промежуточных данных, reduce-процесс считывает данные с локального диска mapпроцесса при помощи RPC-вызовов. Когда reduce-процесс получил все промежуточные данные для его порции выходных ключей, он производит сортировку значений по ключам. В случае если объем промежуточных данных слишком велик для размещения в памяти, используется внешняя сортировка. После сортировки, reduce-процесс последовательно сканирует промежуточные данные и для каждого встреченного уникального ключа вызывает заданную пользователем функцию reduce, передавая ей ключ и список найденных значений. Результаты вызова reduce записываются в выходной файл в GFS. Данные внутри выходного файла отсортированы по значению ключа. Для каждой reduce-задачи создается отдельный выходной файл.

Когда все map- и reduce-задачи выполнены, управляющий процесс завершает выполнение вызова MapReduce в программе пользователя и передает управление следующему за ней коду. Результаты выполнения задания доступны в виде R файлов.

Обычно пользователь не объединяет полученные данные в один файл, а передает на вход следующему MappReduce-заданию или другому приложению, которое поддерживает работу с несколькими входными файлами.

Рассмотрим обеспечение отказоустойчивости при выполнении задач MapReduce.

Управляющий процесс периодически опрашивает рабочие процессы. В случае если в течение определенного времени ответ от рабочего процесса не поступил, управляющий процесс помечает рабочий процесс как отказавший. Все map-задачи, выполненные данным процессом переводятся в состояние “ожидает выполнения” и запускаются повторно на других машинах. Аналогично, все выполняемые отказавшим процессом map- и reduce-задачи переводятся в состояние “ожидает выполнения” и запускаются повторно. Перезапуск map-задач необходим, потому что их результаты хранятся на локальном диске отказавшей машины и, следовательно, становятся недоступны. Перезапуск reduce-задач не требуется, поскольку их результаты сохраняются в GFS. В случае повторного запуска уже выполненного map-задания, все reduce-процессы уведомляются об этом. Те процессы, которые не успели считать результаты предыдущего запуска задачи, будут считывать их у нового map-процесса.

В случае если заданные пользователем функции map и reduce являются детерминированными функциями своих входных значений, распределенная реализация MapReduce гарантирует получение такого же результата, что и при последовательном выполнении программы без ошибок. Для этого используется атомарная фиксация результатов map- и reduce-задач. По выполнении map-задачи, рабочий процесс отправляет управляющему процессу сообщение, в котором указывает список R промежуточных файлов. В случае, если управляющий процесс получает сообщение о завершении уже выполненной задачи, он игнорирует данное сообщение. По завершении reduce-задачи, рабочий процесс атомарно переименовывает средствами GFS временный выходной файл в окончательный выходной файл. В случае если reduceзадача выполняется на нескольких машинах, то только результат одной из них будет в итоге записан в окончательный выходной файл.



В случае если функции map и/или reduce являются недетерминированными, система предоставляет более слабые гарантии. Утверждается, что результат определенной reduce-задачи R1 будет эквивалентен результату R1, полученному при некотором запуске последовательной недетерминированной программы. При этом результат другой reduce-задачи R2 может соответствовать результату R2, полученному при другом запуске последовательной недетерминированной программы. Подобная ситуация может возникать из-за того, что задачи R1 и R2 использовали результаты двух различных запусков некоторой map-задачи.

Количество map- и reduce-задач обычно подбирается таким образом, что оно было гораздо больше числа рабочих машин. В этом случае достигается лучшая балансировка нагрузки между машинами. Кроме того, это позволяет эффективнее обрабатывать отказ map-процесса, поскольку большое число выполненных им задач может быть распределено по всем рабочим машинам. На количество map-заданий также влияет размер получаемых фрагментов входных данных. Идеальным является размер 16-64 Мб, не превышающий размер блока GFS и позволяющий локализовать вычисления рядом с данными. Количество reduce-заданий обычно ограничивается пользователями допустимым числом выходных файлов.

Рассмотрим некоторые оптимизации, примененные при реализации MapReduce.

При распределении map-задач по машинам в кластере управляющий процесс учитывает то, каким образом входные данные размещены в GFS (см. раздел 1.1).

Поскольку данные хранятся в виде нескольких реплик на тех же машинах кластера, то управляющий сервер пытается отправить map-задачу на машину, хранящую соответствующий фрагмент входных данных, или же на машину, наиболее близкую к данным в смысле сетевой топологии. Подобная стратегия позволяет существенно снизить объем данных, передаваемых по сети во время запуска задания, и, тем самым, уменьшить время выполнения задания.

Для уменьшения промежуточных данных передаваемых по сети от mapпроцессов к reduce-процессам, внутри map-задачи может производиться локальная редукция промежуточных данных с одним значением ключа. Для этого, пользователь должен указать в спецификации задания так называемую combiner-функцию, которая имеет такую же семантику, что функция reduce. Часто в качестве combiner-функции указывается reduce.

Одной из частых причин увеличения времени выполнения MapReduce-задания является появление “отстающего” процесса, который слишком долго выполняет одну из последних map- или reduce-задач. Подобное поведение может быть обусловлено многими причинами, например, неисправностью жесткого диска или запуском на машине других вычислений. Для устранения подобной проблемы в MapReduce используется следующая стратегия. В конце вычислений, управляющий процесс запускает выполняющиеся задачи на дополнительных машинах. При выполнении задачи на одной из машин, ее выполнение на другой машине приостанавливается.

Подобная стратегия значительно уменьшает время выполнения больших заданий.

Для отладки MapReduce-программ предусмотрен последовательный режим выполнения задания на локальной машине. Во время выполнения задания на кластере, управляющий процесс предоставляет Web-интерфейс с подробной информацией о статусе выполнения задания, доступом к log-файлам и т.д.

В заключение, рассмотрим отличия MapReduce от существующих моделей и систем параллельных вычислений. Модели параллельных вычислений с элементами функционального программирования, позволяющие пользователю формировать программу из примитивов типа map, reduce, scan, sort и т.д., предлагались в академической среде задолго до появления MapReduce (см., например [9]). С этой точки зрения MapReduce можно рассматривать как упрощенную квинтэссенцию данных моделей, ориентированную на решение определенного круга задач по обработке больших массивов данных. Пожалуй, главная заслуга создателей MapReduce заключается в отказоустойчивой реализации вычислений на большом количестве ненадежных машин. В отличие от MapReduce, большинство систем параллельной обработки данных были реализованы на кластерах меньшего масштаба и часто требуют от программиста ручной обработки возникающих отказов. Модель MapReduce накладывает ряд ограничений на программу для того, чтобы автоматизировать распараллеливание, запуск и управление вычислениями на кластере. С одной стороны, это значительно упрощает задачу программиста и практически не требует от него специальной квалификации. С другой стороны, накладываемые системой ограничения не позволяют реализовать в ней решение произвольных задач. Например, в рамках описанной модели нельзя простым образом реализовать операции типа JOIN и SPLIT или организовать взаимодействие между параллельными процессами так, как это делается в технологии MPI.

2.2. Платформа Apache Hadoop Платформа распределенных вычислений Hadoop [10] разрабатывается на принципах open source в рамках организации The Apache Software Foundation.

Платформа ориентирована на поддержку обработки больших объемов данных и заимствует многие идеи у закрытых технологий Google, таких как MapReduce, GFS и BigTable (см. разделы 2.1, 1.1, 1.3). Hadoop состоит из двух частей: Hadoop Core и HBase. В состав Hadoop Core входят распределенная файловая система HDFS (см.





раздел 1.2) и реализация модели MapReduce. HBase содержит реализацию распределенной системы хранения структурированный данных (см. раздел 1.4).

Данный раздел посвящен особенностям реализации модели MapReduce в Hadoop.

Приведенное описание соответствует текущей на момент написания статьи версии Hadoop 0.17.0.

Для реализации вычислений в Hadoop используется архитектура “masterworker”. В отличие от Google MapReduce, в системе есть один выделенный управляющий процесс (т.н. JobTracker) и множество рабочих процессов (т.н.

TaskTracker), которые осуществляют выполнение всех заданий пользователей.

JobTracker принимает задания от приложений, разбивает их на map- и reduce-задачи, распределяет задачи по рабочим процессам, отслеживает выполнение и осуществляет перезапуск задач. TaskTracker запрашивает задачи у управляющего процесса, загружает код и выполняет запуск задачи, уведомляет управляющий процесс о состоянии выполнения задачи и предоставляет доступ к промежуточным данным map-задач.

Процессы взаимодействуют с помощью RPC-вызовов, причем все вызовы идут в направлении от рабочего к управляющему процессу, что позволяет уменьшить его зависимость от состояния рабочих процессов.

Система реализована на языке Java. Для создания приложений используется прикладной интерфейс программирования на Java. Функции map и reduce описываются в виде классов, реализующих стандартные интерфейсы Mapper и Reducer.

Спецификация задания оформляется в виде объекта типа JobConf, содержащего методы для указания классов с map и reduce, форматов входных и выходных данных, путей к входных и выходным данным в HDFS и других параметров задания.

Отметим, что, в отличие от Google MapReduce, реализация функции reduce может возвращать пары с произвольными ключами, не совпадающими с переданным на вход функции промежуточным ключом. Аналогично Google MapReduce, пользователь может указать функции разбиения входных данных (Partitioner) и комбинирования промежуточных данных (Combiner), оформленные в виде Java-классов. В состав Hadoop входят часто используемые на практике реализации функций map и reduce, а также функции разбиения.

Запуск задания осуществляется с помощью вызова функции runJob или submitJob, каждой из которых передается объект JobConf. В первом случае приложение блокируется до завершения заданий, а во втором случае вызов сразу возвращает управление коду приложения. При запуске задания его спецификация и jar-файл с кодом автоматически размещаются в HDFS, после чего задание направляется JobTracker-процессу. В описании задания пользователь может указать набор дополнительных файлов, копируемых на рабочие узлы перед запуском вычислений.

Количество map-задач определяется системой автоматически, исходя из указанного пользователем желаемого числа задач, а также максимального (размер HDFS-блока) и минимального размеров фрагмента входных данных. В общем случае, количество map-задач может не совпадать с указанным пользователем. Количество reduce-задач управляется с помощью параметра задания. По умолчанию используется значение 1. Пользователь может установить число reduce-задач равным 0. В этом случае фаза reduce не проводится, и промежуточные результаты map-задач записываются в выходные файлы в HDFS. Данная возможность полезна в тех случаях, когда не требуется агрегация или сортировка результатов фазы map.

Hadoop поддерживает различные форматы входных и выходных данных, включая текстовый файл, двоичный формат со сжатием и таблицы HBase.

Пользователь может использовать другие форматы данных путем создания специальных Java-классов для чтения и записи данных.

Для отладки и отслеживания статуса выполнения map- и reduce-задач Hadoop позволяет обновлять внутри функций map и reduce строку статуса задачи и значения счетчиков, определенных пользователем. Статус задачи и значения счетчиков отображаются в Web-интерфейсе Hadoop.

В заключение отметим, что помимо запуска Java-программ, Hadoop позволяет указать в качестве реализаций map и reduce произвольные программы. Данные программы должны считывать входные данные из потока ввода и записывать результаты в поток вывода.

2.3. Технология Microsoft Dryad Универсальная технология распределенной обработки данных Dryad [11] разрабатывается компанией Microsoft. В настоящее время эта технология является закрытой и применяется только внутри компании, например, в поисковой системе MSN Live Search. Описание технологии было опубликовано в работе [12].

Модель программирования Dryad основана на представлении приложения в виде ориентированного ациклического графа. Вершинами графа являются процессы. Ребра графа определяют потоки данных между процессами в виде односторонних каналов. У процесса может быть несколько входных и выходных каналов. Вершины графа могут быть сгруппированы в стадии (stage). Важно отметить, что модель программирования Dryad содержит в себе в качестве частных случаев реляционную алгебру и MapReduce.

Система организует выполнение приложения на имеющихся вычислительных ресурсах, будь то многоядерная машина или кластер из большого числа машин. При этом система автоматически осуществляет планирование вычислений, распределение вершин между машинами, обработку отказов и динамическую оптимизацию структуры графа.

Pages:     | 1 |   ...   | 2 | 3 || 5 |










© 2011 www.dissers.ru - «Бесплатная электронная библиотека»

Материалы этого сайта размещены для ознакомления, все права принадлежат их авторам.
Если Вы не согласны с тем, что Ваш материал размещён на этом сайте, пожалуйста, напишите нам, мы в течении 1-2 рабочих дней удалим его.