然而在机器学习领域,RDD 的弱点很快也暴露了。机器学习的核心是迭代和参数更新。RDD 凭借着逻辑上不落地的内存计算特性,可以很好的解决迭代的问题,然而 RDD 的不可变性,却不适合参数反复多次更新的需求。这个根本的不匹配性,导致了 Spark 的 MLLib 库,发展一直非常缓慢,从 15 年开始就没有实质性的创新,性能也不好,从而给了很多其它产品机会。而 Spark 社区,一直也不愿意正视和解决这个问题。 Spark-On-Angel 的系统架构 现在,由于 Angel 良好的设计和平台性,提供 PS-Service,Spark 可以充分利用 Angel 的参数更新能力,用最小化的修改代价,让 Spark 也具备高速训练大模型的能力,并写出更加优雅的机器学习代码,而不必绕来绕去。 更多详情,请参阅:https://github.com/Tencent/angel/blob/master/docs/overview/spark_on_angel.md Angel 快速入门指南 准备知识 这篇文档帮助你快速开始编写运行在 Angel-PS 架构上的程序,开始之前,你最好掌握以下能力: 会编写简单的 Scala 或者 Java 代码 掌握向量、矩阵和张量的基础知识,了解其定义和基础计算。 最好对机器学习算法有一定了解 如果没有学习过机器学习算法,也没有关系,你可以从这篇文档开始。在开始编程前,我们先来了解一些基础知识。 大多数的机器学习算法都可以抽象成向量(Vector)、矩阵 (Martix),张量(Tensor)间的运算,用向量、矩阵、张量来表示学习数据和算法模型。 Angel-PS 实现了基于参数服务器的矩阵计算,将分布在多台 PS Server 上的参数矩阵抽象为 PSModel,你只需要完成 PSModel 的定义、实现其计算过程,就可以实现一个运行在参数服务器上的简单算法。 Angel-PS 架构 简单的 Angel-PS 架构如下图所示 PS 是存储矩阵参数的多台机器,向计算节点提供矩阵参数的拉取、更新服务 每个 worker 是一个逻辑计算节点,一个 worker 可以运行一或多个 task 机器学习的算法,一般以迭代的方式训练,每次迭代 worker 从 PS 拉取最新的参数,计算一个更新值,推送给 PS。 开始你的第一个 Angel 算法: LR 本示例将以最简单的 Logistic Regression 算法为例,指导你完成第一个 Angel 算法。代码可以在 example.quickStart 里找到。 逻辑回归算法是机器学习中最简单的一个算法,它可以抽象为如下步骤: 1. 一个维度为 1×N 的矩阵,即一个 N 维向量,记为 w 2. 用梯度下降法训练 LR 模型,每次迭代 task 从 PS 拉取最新的模型 w, 计算得到变化梯度△w 将△w 推送给 PS 为了实现该算法,我们需要如下 3 个步骤: 1. 定义一个模型 (LRModel) 实现 LRModel 类继承 MLModel,通过 addPSModel 添加一个 N 维的 PSModel 给 LRModel,在 setSavePath 方法中,设置运算结束后 LR 模型的保存路径。 N 的值、保存路径都可以通过 conf 配置。
2. 定义一个 Task(TrainTask) Angel 的模型的训练是在 task 中完成,所以我们需要定义一个 LRTrainTask 来完成 LR 的模型的训练过程。 LRTrainTask 需要继承 TrainTask 类并实现如下 2 个方法: 解析数据 在模型开始训练前,输入的每一行文本被解析为一条训练数据,解析方法在 parse 方法里实现,此处我们使用 DataParser 解析 dummy 格式的数据。 override def parse(key: LongWritable, value: Text): LabeledData = { DataParser.parseVector(key, value, feaNum, "dummy", negY = true) } 可以通过 task 的 dataBlock 访问预处理后的数据。 训练 Angel 会自动执行 TrainTask 子类的 train 方法,我们在 LRTrainTask 的 train 方法中完成模型训练过程。 在这个简易的 LR 算法例子中,我们 先实例化 myLRModel 模型对象 model,然后开始迭代计算。 每次迭代 task 从 PS 拉取模型的参数 weight 训练数据计算得到梯度 grad,把 grad 推送给 PS,PS 上 weight 的更新会自动完成。 推送 grad 后,需要 clock()、incIteration()。 overridedeftrain( ctx: TaskContext) :Unit={ //A simple logistic regression modelvalmodel=newLRModel(ctx, conf) //Apply batch gradient descent LR iterativelywhile(ctx.getIteration <epochNum) { //Pull model from PS Servervalweight=model.weight.getRow( 0) //Calculate gradient vectorvalgrad=bathGradientDescent(weight) //Push gradient vector to PS Servermodel.weight.increment(grad.timesBy( -1.0*lr)) //LR model matrix clockmodel.weight.clock.get //Increase iteration numberctx.incIteration() } } 3. 定义一个 Runner(MLRunner) 前面,我们定义了 LR 模型,实现了它的训练过程。现在,还需要实现 Runner 类将训练这个模型的任务提交到集群。 定义 myLRRunner 类继承 MLRunner,在 train 方法中提交我们的 myLRModel 的模型类、和 myLRTrainTak 训练类就可以了。 classLRRunnerextendsMLRunner{ …… overridedeftrain( conf: Configuration) :Unit={ train(conf, myLRModel(conf), classOf[myLRTrainTask]) } } 运行任务 可以通过以下命令向 Yarn 集群提交刚刚完成的算法任务 (责任编辑:本港台直播) |