Tag Archives: apache hama

[repost ]YARN/MRv2 中基本术语介绍

original:http://dongxicheng.org/mapreduce-nextgen/basic-concepts/

YARN/MRv2是下一代MapReduce框架(见Hadoop-0.23.0),该框架完全不同于当前的MapReduce框架,它在扩展性,容错性和通用性等方面更出色,据统计,Yarn有超过150000行代码,完全是重写编写的。本文介绍了YARN/MRv2中基本术语的含义,帮助有兴趣的程序员们对YARN有一个初步的理解。

(1) YARN

下一代MapReduce框架的名称,为了容易记忆,一般称为MRv2(MapReduce version 2)。该框架已经不再是一个传统的MapReduce框架,甚至与MapReduce无关,她是一个通用的运行时框架,用户可以编写自己的计算框架,在该运行环境中运行。用于自己编写的框架作为客户端的一个lib,在运用提交作业时打包即可。该框架为提供了以下几个组件:

<1> 资源管理:包括应用程序管理和机器资源管理

<2> 资源双层调度

<3> 容错性:各个组件均有考虑容错性

<4> 扩展性:可扩展到上万个节点

当前比较有名的计算框架有:

MapReduce:google提出的计算框架,在互联网大规模数据处理方面使用广泛,但它存在缺点,如:不支持DAG作业,迭代式作业等。

Apache Giraph:图算法处理框架,采用BSP模型(bulk-synchronous parallel model),可用于计算pagerank,shared connections, personalization-based popularity等迭代类算法。

Apache HAMA:基于BSP模型的分布式计算框架,可用于大规模科学计算,如矩阵,图算法,网络算法等,受Google’s Pregel启发,但又有所不同,HAMA是个更加通用的框架,不仅仅支持图算法。

Open MPI:这是一个高性能计算函数库,通常在HPC(High Performance Computing)中采用,与MapReduce相比,其性能更高,用户可控性更强,但编程复杂,容错性差,可以说,各有所长,在实际应用中,针对不同该应用会采用MPI或者MapReduce。

HBase :Hadoop Database,是一个高可靠性、高性能、面向列、可伸缩的分布式存储系统,仿照Google Bigtable实现的,近几年逐步的流行,慢慢取代Cassandra了(在Hadoop In China2011上,FaceBook工程师说他们早就放弃Cassandra而改用HBase)。

以上这几个框架各有所长,在一些互联网公司中均会采用,如果分别部署安装这些计算框架过于繁琐,有了YARN后,这些计算框架可统一在YARN环境中部署。目前仅有MapReduce可使用,其他几个在陆续开发中,具体可参考:

(2) ResourceManager

简称“RM”。

MRv2最基本的设计思想是将JobTracker的两个主要功能,即资源管理和作业调度/监控分成两个独立的进程。在该解决方案中包含两个组件:全局的ResourceManager(RM)和与每个应用相关的ApplicationMaster(AM)。这里的“应用”指一个单独的MapReduce作业或者DAG作业。RM和与NodeManager(NM,每个节点一个)共同组成整个数据计算框架。RM是系统中将资源分配给各个应用的最终决策者。AM实际上是一个具体的框架库,它的任务是【与RM协商获取应用所需资源】和【与NM合作,以完成执行和监控task的任务】。

RM有两个组件组成:

调度器(Scheduler)

应用管理器(ApplicationsManager,ASM)

调度器根据容量,队列等限制条件(如每个队列分配一定的资源,最多执行一定数量的作业等),将系统中的资源分配给各个正在运行的应用。这里的调度器是一个“纯调度器”,因为它不再负责监控或者跟踪应用的执行状态等,此外,他也不负责重新启动因应用执行失败或者硬件故障而产生的失败任务。调度器仅根据各个应用的资源需求进行调度,这是通过抽象概念“资源容器”完成的,资源容器(Resource Container)将内存,CPU,磁盘,网络等资源封装在一起,从而限定每个任务使用的资源量。

调度器内嵌有策略可插拔的插件,主要负责将集群中得资源分配给多个队列和应用。当前MapReduce的调度器,如Capacity Scheduler和Fair Scheduler,均可作为该插件。

(3)NodeManager

简称“NM”。

NM是每个节点上的框架代理,主要负责启动应用所需的容器,监控资源(内存,CPU,磁盘,网络等)的使用情况并将之汇报给调度器。

一句话:“NM主要用于管理某个节点上的task和资源”。

(4)ApplicationsManager

简称“ASM”。

ASM主要负责接收作业,协商获取第一个容器用于执行AM和提供重启失败AM container的服务。

一句话:“ASM主要用于管理AM”。

(5)ApplicationMaster

简称“AM”。

AM主要负责同调度器协商以获取合适的容器,并跟踪这些容器的状态和监控其进度。

一句话:“AM主要用于管理其对应的应用程序,如MapReduce作业,DAG作业等”。

(6) Container

容器中封装了机器资源,如内存,CPU, 磁盘,网络等,每个任务会被分配一个容器,该任务只能在该容器中执行,并使用该容器封装的资源。

怎样将某个计算框架(MapReduce,HAMA,Giraph)部署到YARN中?

答:需要编写一个ApplicaionMaster。

【参考资料】

(1)雅虎声称对Apache Hadoop的贡献巨大:http://oss.org.cn/?action-viewnews-itemid-62734

(2)The Next Generation of Apache Hadoop MapReduce:http://developer.yahoo.com/blogs/hadoop/posts/2011/02/mapreduce-nextgen/

(3)Next Generation of Apache Hadoop MapReduce – The Scheduler:http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/

(4)Apache Hadoop NextGen MapReduce (YARN):http://hadoop.apache.org/common/docs/r0.23.0/hadoop-yarn/hadoop-yarn-site/YARN.html

原创文章,转载请注明: 转载自董的博客

本文链接地址: http://dongxicheng.org/mapreduce-nextgen/basic-concepts/

[repost ]Hadoop Hama项目–BSP模型的实现

original:http://www.javabloger.com/article/apache-hadoop-hama-bsp.html

1、Hama概论
·建立在Hadoop上的分布式并行计算模型。
·基于 Map/Reduce 和 Bulk Synchronous 的实现框架。
·运行环境需要关联 Zookeeper、HBase、HDFS 组件。
·集群环境中的系统架构由 BSPMaster/GroomServer(Computation Engine)、Zookeeper(Distributed Locking)、HDFS/HBase(Storage Systems) 这3大块组成。
如图所示:
HAMA Architecture

·Hama中有2个主要的模型:
– 矩阵计算(Matrix package)
– 面向图计算(Graph package)

·Hama项目起源于在2008年5月19日
·Hama主要成员 Edward J. Yoon (高丽棒子)
·Hama项目的最大支持者 韩国NHN互联网搜索引擎以及网络游戏公司,貌似中国的百度,详见这里

2、Hama介绍  
2008年5月Hama被视为Apache众多项目中一个被孵化的项目,目前(2010年12月)在Hama的项目网站上还没有正式的release版本,作为Hadoop项目中的一个子项目,BSP模型是Hama计算的核心,并且实现了分布式的计算框架,采用这个框架可以用于矩阵计算(matrix)和面向图计算(grah)、网络计算(network)。

我的废话:
1、如果要深入了解到 Hama中采用到的技术体系,需要去阅读一些BSP、MPI、Pregel等相关资料,可以有助于对Hama项目的了解。
2、看来Apache基金会对Google未开源的核心技术彻底的做了一个山寨版本,比如我之前提到过关于Yahoo山寨了Google的那些技术
3、Hama中依然存在SPFO的单点问题,如果主节点BSPMaster挂了,依然全挂,当然有其他的解决办法,不过这里主要想指出的是Hama暂时还没有设计到这点。
4、Hama在MapReduce的基础上实现了2种算法,Iterative 和 Block ,其中Iterative比较简单,而Block相对复杂些。

3、关于BSP模型
Hama中最关键的就是BSP(Bulk Synchronous Parallel-“大型”同步模型)模型, BSP的概念由Valiant(1990)提出的,“块”同步模型,是一种异步MIMD-DM模型,支持消息传递系统,块内异步并行,块间显式同步,该模型基于一个master协调,所有的worker同步(lock-step)执行, 数据从输入的队列中读取, 该模型的架构如图所示:

另外,BSP并行计算模型可以用 p/s/g/i 4个参数进行描述:
·    P为处理器的数目(带有存储器)
·    s为处理器的计算速度
·    g为每秒本地计算操作的数目/通信网络每秒传送的字节数,称之为选路器吞吐率,视为带宽因子 (time steps/packet)=1/bandwidth
·    i为全局的同步时间开销,称之为全局同步之间的时间间隔 (Barrier synchronization time)
那么假设有p台处理器同时传送h个字节信息,则g•h就是通信的开销。同步和通信的开销都规格化为处理器的指定条数。

BSP计算模型不仅是一种体系结构模型,也是设计并行程序的一种方法。BSP程序设计准则是 bulk同步 (bulk synchrony),其独特之处在于超步(superstep)概念的引入。一 个BSP程序同时具有水平和垂直两个方面的结构。从垂直上看,一个BSP程序由一系列串行的超步(superstep)组成,如图所示:

hama bsp

这种结构类似于一个串行程序结构。从水平上看, 在一个超步中, 所有的进程并行执行局部计算。一个超步可分为三个阶段 ,如图所示:
bsp
1 )本地计算阶段, 每个处理器只对存储本地内存中的数据进行本地计算。
2 )全局通信阶段, 对任何非本地数据进行操作。
3 )栅栏同步阶段, 等待所有通信行为的结束。

BSP模型相对于其他两种模型而言, 具有如下两个方面的优点:
•MPI 和 PVM两种并行计算模型,依赖于接收和发送 的操作对。这样通信方式容易导致上层应用程序产生死锁,而BSP并行计算库是一个程序划分为超步(superstep),使得死锁不再发生。
•BSP模型由于其本身的特点, 使得对于程序的正确性和时间的复杂性预测成为可能。

4、Apache Hama与Google Pregel
Hama类似Google发明的Pregel,如果你听过Google Pregel这个利器的话,那么就对BSP计算模型不会陌生,Google的Pregel也是基于BSP模型,在Google的整个计算体系中有20%的 计算是依赖于Pregel的计算模型,Google利用Pregel实现了图遍历(BFS)、最短路径(SSSP)、PageRank计算,我猜想 Google的Google Me 产品很有可能会大量采用Pregel的计算方式,用Pregel来绘制Google Me产品中SNS的关系图。

Google的Pregel是采用GFS或BigTable进行持久存储,Google的Pregel是一个Master-slave主从结构,有一个节点扮演master角色,其它节点通过name service定位该顶点并在第一次时进行注册,master负责对计算任务进行切分到各节点(也可以自己指定,考虑load balance等因素),根据顶ID哈希分配顶点到机器(一个机器可以有多个节点,通过name service进行逻辑区分),每个节点间异步传输消息,通过checkpoint机制实行容错(更高级的容错通过confined recovery实现),并且每个节点向master汇报心跳(ping)维持状态。

Hama是Apache中Hadoop的子项,所以Hama可以与Apache的HDSF进行完美的整合,利用HDFS对需要运行的任务和数据进行持久化存储,也可以在任何文件系统和数据库中。当然我们可以相信BSP模型的处理计算能力是相对没有极限的特别对于图计算来说,换句话说BSP模型就像MapReduce一样可以广泛的使用在任何一个分布式系统中,我们可以尝试的对实现使用Hama框架在分布式计算中得到更多的实践,比如:矩阵计算、排序计算、pagerank、BFS 等等。

5、Hama Architecture
Apache的Hama主要由三个部分组成:BSPMaster,GroomServers和Zookeeper,下面这张图主要概述了Hama的整体系统架构,并且描述了系统模块之间的通讯与交互。Hama的集群中需要有HDFS的运行环境负责持久化存储数据(例如:job.jar),BSPMaster负责进行对Groom Server 进行任务调配,groom Server 负责进行对BSPPeers进行调用 程序进行具体的调用,Zookeeper负责对Groom Server 进行失效转发。
Apache Hama Architecture.png
BSPMaster
在Apache Hama中BSPMaster模块是系统中的一个主要角色,他主要负责的是协同各个计算节点之间的工作,每一个计算节点在其注册到master上来的时候会分配到一个唯一的ID。Master内部维护着一个计算节点列表,表明当前哪些计算节点出于alive状态,该列表中就包括每个计算节点的ID和地址信息,以及哪些计算节点上被分配到了整个计算任务的哪一部分。Master中这些信息的数据结构大小取决于整个计算任务被分成多少个partition。因此,一台普通配置的BSPMaster足够用来协调对一个大型计算。
下面我们来看看BSPMaster做了哪些工作:
•    维护着Groom服务器的状态。
•    控制在集群环境中的superstep。
•    维护在groom中job的工作状态信息。
•    分配任务、调度任务到所有的groom服务器节点。
•    广播所有的groom服务器执行。
•    管理系统节点中的失效转发。
•    提供用户对集群环境的管理界面。

一个BSPMaster或者多个grooms服务器是通过脚本启动的,在Groom服务器中还包含了BSPeer的实例,在启动GroomServer的时候就会启动了BSPPeer,BSPPeer是整合在GrommServer中的,GrommServer通过PRC代理与BSPmaster连接。当BSPmaster、GroomServer启动完毕以后,每个GroomServer的生命周期通过发送“心跳”信息给BSPmaster服务器,在这个“心跳”信息中包含了GrommServer服务器的状态,这些状态包含了能够处理任务的最大容量,和可用的系统内存状态,等等。

BSPMaster的绝大部分工作,如input ,output,computation,saving以及resuming from checkpoint,都将会在一个叫做barrier的地方终止。Master会在每一次操作都会发送相同的指令到所有的计算节点,然后等待从每个计算节点的回应(response)。每一次的BSP主机接收心跳消息以后,这个信息会带来了最新的groom服务器状态,BSPMaster服务器对给出一个回应的信息,BSPMaster服务器将会与groom 服务器进行确定活动的groom server空闲状态,也就是groom 服务器可资源并且对其进行任务调度和任务分配。 BSPMaster与Groom Server两者之间通讯使用非常简单的FIFO(先进先出)原则对计算的任务进行分配、调度。

GroomServer
一个Groom服务器对应一个处理BSPMaster分配的任务,每个groom都需要与BSPMaster进行通讯,处理任务并且想BSPMaster处理报告状态,集群状态下的Groom Server需要运行在HDFS分布式存储环境中,而且对于Groom Server来说 一个groom 服务器对应一个BSPPeer节点,需要运行在同一个物理节点上。

Zookeeper
Zookeeper这里就不多提了,可以参考我之前写的几篇文章,在Apache HaMa项目中zookeeper是用来有效的管理BSPPeer节点之间的同步间隔(barrier synchronisation),同时在系统失效转发的功能上发挥了重要的作用。

6、Hama对BSP模型的实现
在一个BSP计算模型的程序中包含了一个supersteps步骤,每一个superstep由以下3个体系:
•    本地计算
•    进程通信
•    同步间隔

public class BSPEaxmple {

public static class MyBSP extends BSP {

@Override
public void bsp(BSPPeer bspPeer) throws IOException, KeeperException,
InterruptedException {
// 1. Do something locally

// 2. Sends/receives data to/from neighbor nodes
bspPeer.send(peerName, msg);

while ((message = bspPeer.getCurrentMessage()) != null) {
byte[] data = message.getData();
}

// 3. Barrier synchronization
bspPeer.sync();
}

@Override
public Configuration getConf() {
return conf;
}

@Override
public void setConf(Configuration conf) {
this.conf = conf;
}

}

// BSP job configuration
public void main(String[] args) throws Exception {
BSPJob bsp = new BSPJob(new HamaConfiguration(), BSPEaxmple.class);
// Set the job name
bsp.setJobName(“My BSP Job”);
bsp.setBspClass(MyBSP.class);

// Submit job
BSPJobClient.runJob(bsp);
}
}

接下来将会介绍 Hama的具体的用例和安装配置说明,待续。

感谢您的阅读。

 

相关文章:
  Apache ZooKeeper入门2 
Apache Zookeeper入门1 
Apache ZooKeeper入门3

本文由J2ee企业顾问-黄毅创作,并已采用创作共用署名2.5中国大陆版许可证授权