摘要:TDW是腾讯最大的离线数据处理平台。本文主要从需求、挑战、方案和未来计划等方面,介绍了TDW在建设单个大规模集群中采取的JobTracker分散化和NameNode高可用两个优化方案。

TDW(Tencent distributed Data Warehouse,腾讯分布式数据仓库)基于开源软件Hadoop和Hive进行构建,打破了传统数据仓库不能线性扩展、可控性差的局限,并且根据腾讯数据量大、计算复杂等特定情况进行了大量优化和改造。

TDW服务覆盖了腾讯绝大部分业务产品,单集群规模达到4400台,CPU总核数达到10万左右,存储容量达到100PB;每日作业数100多万,每日计算量4PB,作业并发数2000左右;实际存储数据量80PB,文件数和块数达到6亿多;存储利用率83%左右,CPU利用率85%左右。经过四年多的持续投入和建设,TDW已经成为腾讯最大的离线数据处理平台。

TDW的功能模块主要包括:Hive、MapReduce、HDFS、TDBank、Lhotse等,如图1所示。TDW Core主要包括存储引擎HDFS、计算引擎MapReduce、查询引擎Hive,分别提供底层的存储、计算、查询服务,并且根据公司业务产品的应用情况进行了很多深度订制。TDBank负责数据采集,旨在统一数据接入入口,提供多样的数据接入方式。Lhotse任务调度系统是整个数据仓库的总管,提供一站式任务调度与管理。

图1  TDW的功能模块

图1 TDW的功能模块

建设单个大规模集群的原因

随着业务的快速增长,TDW的节点数也在增加,对单个大规模Hadoop集群的需求也越来越强烈。TDW需要做单个大规模集群,主要是从数据共享、计算资源共享、减轻运营负担和成本等三个方面考虑。

1. 数据共享。TDW之前在多个IDC部署数十个集群,主要是根据业务分别部署,这样当一个业务需要其他业务的数据,或者需要公共数据时,就需要跨集群或者跨IDC访问数据,这样会占用IDC之间的网络带宽。为了减少跨IDC的数据传输,有时会将公共数据冗余分布到多个IDC的集群,这样又会带来存储空间浪费。

2. 计算资源共享。当一个集群的计算资源由于某些原因变得紧张时,例如需要数据补录时,这个集群的计算资源就捉襟见肘,而同时,另一个集群的计算资源可能空闲,但这两者之间没有做到互通有无。

3. 减轻运营负担和成本。十几个集群同时需要稳定运营,而且当一个集群的问题解决时,也需要解决其他集群已经出现的或者潜在的问题。一个Hadoop版本要在十几个集群逐一变更,监控系统也要在十几个集群上部署。这些都给运营带来了很大负担。此外,分散的多个小集群,资源利用率不高,机器成本较大。

建设单个大规模集群的方案及优化

面临的挑战

TDW从单集群400台规模建设成单集群4000台规模,面临的最大挑战是Hadoop架构的单点问题:计算引擎单点JobTracker负载重,使得调度效率低、集群扩展性不好;存储引擎单点NameNode没有容灾,使得重启耗时长、不支持灰度变更、具有丢失数据的风险。TDW单点瓶颈导致平台的高可用性、高效性、高扩展性三方面都有所欠缺,将无法支撑4000台规模。为了解决单点瓶颈,TDW主要进行了JobTracker分散化和NameNode高可用两方面的实施。

JobTracker分散化

1.单点JobTracker的瓶颈

TDW以前的计算引擎是传统的两层架构,单点JobTracker负责整个集群的资源管理、任务调度和任务管理,TaskTracker负责任务执行。JobTracker的三个功能模块耦合在一起,而且全部由一个Master节点负责执行,当集群并发任务数较少时,这种架构可以正常运行,但当集群并发任务数达到2000、节点数达到4000时,任务调度就会出现瓶颈,节点心跳处理迟缓,集群扩展也会遇到瓶颈。

2.JobTracker分散化方案

TDW借鉴YARN和Facebook版corona设计方案,进行了计算引擎的三层架构优化(如图2所示):将资源管理、任务调度和任务管理三个功能模块解耦;JobTracker只负责任务管理功能,而且一个JobTracker只管理一个Job;将比较轻量的资源管理功能模块剥离出来交给新的称为ClusterManager的Master负责执行;任务调度也剥离出来,交给具有资源信息的ClusterManager负责执行;对性能要求较高的任务调度模块采用更加精细的调度方式。

图2  JobTracker分散化架构

图2 JobTracker分散化架构

新架构下三个角色分别是:ClusterManager负责整个集群的资源管理和任务调度,JobTracker负责单个Job的管理,TaskTracker负责任务的执行。

(1)两路心跳。之前的架构下,TaskTracker向JobTracker上报心跳,JobTracker串行地处理这些心跳,心跳处理中进行节点管理、任务管理、任务调度等,心跳繁重,影响任务调度和集群扩展性。新架构下,心跳被拆分成两路心跳,分别上报任务和资源信息。

JobTracker获知任务信息通过任务上报心跳的方式。任务上报心跳是通过任务所在的TaskTracker启动一个新的独立线程向对应的JobTracker上报心跳这条途径,在同一个TaskTracker上,不同Job的任务使用不同的线程向不同的JobTracker上报心跳,途径分散,提升了心跳上报效率。

TaskTracker通过上报心跳的方式将资源信息汇报给ClusterManager。ClusterManager从TaskTracker的心跳中获取节点的资源信息:CPU数量、内存空间大小、磁盘空间大小等的总值和剩余值,根据这些信息判断节点是否还能执行更多的任务。同时,ClusterManager通过TaskTracker与其之间维系的心跳来管理节点的生死存亡。

以前繁重的一路心跳被拆分成了两路轻量的心跳,心跳间隔由40s优化成1s,集群的可扩展性得到了提升。

(2)资源概念。之前架构只有slot概念,一般根据核数来设置slot数量,对内存、磁盘空间等没有控制。新架构弱化了slot概念,加强了资源的概念。

每个资源请求包括具体的物理资源需求描述,包括内存、磁盘和CPU等。向ClusterManager进行资源申请的有三种来源类型:Map、Reduce、JobTracker,每种来源需要的具体资源量不同。在CPU资源上,调度器仍然保留slot概念,并且针对三种来源保证各自固定的资源帽。

例如,对于24核的节点,配置13个核给Map用、6个核给Reduce用、1个核给JobTracker用,则认为该节点上有1个JobTracker slot、13个Map slot、6个Reduce slot。某个Map请求的资源需要2个核,则认为需要两个Map slot,当一个节点的Map slot用完之后,即使有剩余的CPU,也不会继续分配Map予其执行了。内存空间、磁盘空间等资源没有slot概念,剩余空间大小满足需求即认为可以分配。在查找满足资源请求的节点时,会比较节点的这些剩余资源是否满足请求,而且还会优先选择负载低于集群平均值的节点。

(3)独立并发式的下推调度。之前架构下,调度器采用的是基于心跳模型的拉取调度:任务调度依赖于心跳,Map、Reduce的调度耦合在一起,而且对请求优先级采取全排序方式,时间复杂度为nlog(n),任务调度效率低下。

新架构采用独立并发式的下推调度。Map、Reduce、JobTracker三种资源请求使用三个线程进行独立调度,对请求优先级采取堆排序的方式,时间复杂度为log(n)。当有资源满足请求时,ClusterManager直接将资源下推到请求者,而不再被动地等待TaskTracker通过心跳的方式获取分配的资源。

例如,一个Job有10个Map,每个Map需要1个核、2GB内存空间、10GB磁盘空间,如果有足够的资源,Map调度线程查找到了满足这10个Map的节点列表,ClusterManager会把节点列表下推到JobTracker;如果Map调度线程第一次只查找到了满足5个Map的节点列表,ClusterManager会把这个列表下推到JobTracker,随后Map调度线程查找到了剩下5个Map的节点列表,ClusterManager再把这个列表下推到JobTracker。

以前基于心跳模型的拉取调度被优化成独立并发式的下推调度之后,平均调度处理时间由80ms优化至1ms,集群的调度效率得到了提升。

3. Job提交过程

新架构下,一次Job提交过程,需要Client和ClusterManager、TaskTracker均进行交互(如图3所示):JobClient先向ClusterManager申请启动JobTracker所需要的资源;申请到之后,JobClient在指定的TaskTracker上启动JobTracker进程,将Job提交给JobTracker;JobTracker再向ClusterManager申请Map和Reduce资源;申请到之后,JobTracker将任务启动命令提交给指定的TaskTracker。

图3  Job提交过程

图3 Job提交过程

4. 存在的问题及应对措施

JobTracker分散化方案给计算引擎带来高效性和高扩展性,但没有带来高可用性,单一故障点的问题在此方案中仍然存在,此时的单一故障点问题有别于以前,如下所述。

(1)ClusterManager如果发生故障,不会造成Job状态丢失而且在短时间内即可恢复。它只存储资源情况,不存储状态,ClusterManager在很短的时间内可以重启完成。重启之后,TaskTracker重新向ClusterManager汇报资源,ClusterManager从重启至完全获得集群的资源情况整个阶段可以在10秒内完成。

(2)JobTracker如果发生故障,只会影响单个Job,对其他Job不会造成影响。

基于以上两点,认为新方案的单一故障点问题影响不大,而且考虑方案实施的复杂度和时效性,TDW在JobTracker分散化方案中没有设计高可用方案,而是通过外围系统来降低影响:监控系统保证ClusterManager故障及时发现和恢复;Lhotse调度系统从用户任务级别保证Job重试。

NameNode高可用

1. 单点NameNode的问题

TDW以前的存储引擎是单点NameNode,在一个业务对应一个集群的情况下,NameNode压力较小,出故障的几率也较小,而且NameNode单点故障带来的影响不会波及全部业务。但当把各个小集群统一到大集群,各个业务都存储之上时,NameNode压力变大,出故障的几率也变大,NameNode单点故障造成的影响将会非常严重。即使是计划内变更,停止NameNode服务耗时将近2个小时,计划内的停止服务变更也给用户带来了较大的影响。

2. NameNode高可用方案

TDW设计了一种一主两热备的NameNode高可用方案。新架构下NameNode角色有三个:一主(ActiveNameNode)两热备(BackupNameNode)。ActiveNameNode保存namespace和block信息,对DataNode下发命令,并且对客户端提供服务。BackupNameNode包括standby和newbie两种状态:standby提供对ActiveNameNode元数据的热备,在ActiveNameNode失效后接替其对外提供服务,newbie状态是正处于学习阶段,学习完毕之后成为standby。

(1)Replicaton协议。为了实现BackupNameNode对ActiveNameNode的元数据一致,随时准备接管ActiveNameNode角色,元数据操作日志需要在主备间同步。客户端对元数据的修改不止在ActiveNameNode记录事务日志,事务日志还需要从ActiveNameNode同步到BackupNameNode,客户端的每一次写操作,只有成功写入ActiveNameNode以及至少一个BackupNameNode(或者ZooKeeper)时,才返回客户端操作成功。当没有BackupNameNode可写入时,把事务日志同步到ZooKeeper来保证至少有一份事务日志备份。

客户端写操作记录事务日志遵循以下几个原则:

①写入ActiveNameNode,如果写入失败,返回操作失败,ActiveNameNode自动退出;

②当写入至少两个节点(Active-NameNode和Standby/ZooKeeper/LOG_SYNC newbie)时返回操作成功,其他返回失败;LOG_SYNC newbie表示newbie已经从ActiveNameNode获取到全量日志后的状态;

③当只成功写入ActiveNameNode,此后的Standby和ZooKeeper均写入失败时,返回失败;

④当只存在ActiveNameNode时,进入只读状态。

(2)Learning协议。newbie学习机制确保newbie启动后通过向ActiveNameNode学习获取最新的元数据信息,学习到与ActiveNameNode同步时变成standby状态。newbie从ActiveNameNode获取最新的fsimage和edits文件列表,ActiveNameNode还会和newbie之间建立事务日志传输通道,将后续操作日志同步给newbie,newbie将这些信息载入内存,构建最新的元数据状态。

(3)事务日志序号。为了验证事务日志是否丢失或者重复,为事务日志指定递增连续的记录号txid。在事务日志文件edits中加入txid,保证txid的连续性,日志传输和加载时保证txid连续递增,保存内存中的元数据信息到fsimage文件时,将当前txid写入fsimage头部,载入fsimage文件到内存中时,设置元数据当前txid为fsimage头部的txid。安全日志序号(safe txid)保存在ZooKeeper上,ActiveNameNode周期性地将txid写入ZooKeeper作为safe txid,在BackupNameNode转换为ActiveNameNode时,需要检查BackupNameNode当前的txid是否小于safe txid,若小于则禁止这次角色转换。

(4)checkpoint协议。新架构仍然具有checkpoint功能,以减少日志的大小,缩短重启时构建元数据状态的耗时。由ActiveNameNode维护一个checkpoint线程,周期性地通知所有standby做checkpoint,指定其中的一个将产生的fsimage文件上传给ActiveNameNode。

(5)DataNode双报。Block副本所在的节点列表是NameNode元数据信息的一部分,为了保证这部分信息在主备间一致性,DataNode采用双报机制。DataNode对块的改动会同时广播到主备,对主备下发的命令,DataNode区别对待,只执行主机下发的命令而忽略掉备机下发的命令。

(6)引入ZooKeeper。主要用来做主节点选举和记录相关日志:NameNode节点状态、安全日志序号、必要时记录edit log。

3. 主备切换过程

当主退出时主备状态切换的过程(如图4所示):当ActiveNameNode节点IP1由于某些原因退出时,两个备节点IP2和IP3通过向ZooKeeper抢锁竞争主节点角色;IP2抢到锁成为ActiveNameNode,客户端从ZooKeeper上重新获取主节点信息,和IP2进行交互,这时即使IP1服务恢复,也是newbie状态;事务日志在主备间同步,newbie IP1通过向主节点IP2学习成为standby状态。

图4  主退出时主备状态切换

图4 主退出时主备状态切换

4. 存在的问题

NameNode高可用方案给存储引擎带来了高可用性,但在高效性方面做出了一些牺牲,由于事务日志需要同步,写性能有20%左右的下降。

其他优化

TDW在实施大集群过程中,除了主要实施JobTracker分散化和NameNode高可用两个方案,还进行了一些其他优化。

1. NameNode分散化

随着存储量和业务的不断增长,一个HDFS元数据空间的访问压力与日俱增。通过NameNode分散化来减少一个元数据空间的访问压力。NameNode分散化主要对元数据信息进行分拆,对用户透明,用户访问认为处于同一个存储引擎,底层可以拆分成多个集群。TDW在Hive层增加用户到HDFS集群的路由表,用户表的数据将写入对应的HDFS集群,对外透明,用户只需使用标准的建表语句即可。TDW根据公司业务的实际应用场景,根据业务线和共享数据等把数据分散到两个HDFS集群,有利于数据共享同时也尽量规避集群间的数据拷贝。采用简单、改动最少的方案解决了实际的问题。

2. HDFS兼容

TDW内部有三个HDFS版本:0.20.1、CDH3u3、2.0,线上主流版本是CDH3u3,主流HDFS版本使用的RPC框架尚未优化成Thrift或者Protocol Buffers等,三个版本互不兼容,增加了互相访问的困难。通过RPC层兼容方式实现了CDH3u3和0.20.1之间的互通,通过完全实现两套接口方式实现了CDH3u3和2.0之间的互通。

3. 防止数据误删除

重要数据的误删除会给TDW带来不可估量的影响,TDW为了进一步增加数据存储可靠性,不仅开启NameNode回收站特性,还增加两个特性: 删除黑白名单,删除接口修改成重命名接口,白名单中的目录可以被删除,白名单中的IP可以进行删除操作,其他则不可;DataNode回收站,块删除操作不会立即进行磁盘文件的删除,而是维护在待删除队列里,过期之后才进行实际的删除操作,这样可以保证在一定时间内如果发现重要的数据被误删除时可以进行数据恢复,还可以防止NameNode启动之后元数据意外缺失而造成数据直接被删除的风险。

结语

TDW从实际情况出发,采取了一系列的优化措施,成功实施了单个大规模集群的建设。为了满足用户日益增长的计算需求,TDW正在进行更大规模集群的建设,并向实时化、集约化方向发展。TDW准备引入YARN作为统一的资源管理平台,在此基础上构建离线计算模型和Storm、Spark、Impala等各种实时计算模型,为用户提供更加丰富的服务。

作者翟艳堂,在腾讯参与了TDW、TDBank等项目的研发和运营,负责计算和存储平台的建设和优化工作,在分布式计算和存储、流式计算等领域积累了丰富的实践经验。

原文: CSDN