Spark 自带特定领域语言(DSL),使得编写自定义应用程序比 SQL 查询作业更加容易。通过 DSL,你可以控制较低级别的操作(例如,当数据被洗牌时),并且可以访问中间数据。这有助于实现复杂的算法,达到更高的效率和稳定性。它还允许用户能以模块化的方式编写管道,而不是使用一个单一的 SQL 字符串,这提高了管道的可读性,可维护性和可测试性。所有这些优势吸引我们引入了 Spark。 在 Scala 或 Java 中重现 C ++的逻辑——语言模型训练算法的实现——会是巨量的工作,因此我们决定不更改该部分。和 Hive 一样,Spark 支持运行自定义用户代码,这使得调用相同的 C ++二进制文件变得容易。它允许开发者平滑过渡,因此我们不必同时维护两个版本的 C ++逻辑,而且迁移对用户是透明的。我们使用 Spark 提供的 RDD 接口,没有使用 Spark SQL,因为前者可以控制中间数据的分区并直接管理分片生成。Spark 的 pipe()运算符用于调用二进制文件。 在更高层上,管道的设计保持不变。我们继续使用 Hive 表作为应用程序的初始输入和最终输出。中间输出被写入集群节点上的本地硬盘中。整个应用程序大约有 1,000 行的 Scala 代码,并且可以在 Spark 上执行时生成 100 多个阶段(这取决于训练数据源的数量)。 可扩展性挑战 当我们使用更大的训练数据集来测试 Spark 方案时,我们遇到了可扩展性的挑战。在本节中,我们首先介绍数据分布要求(平滑和分割),然后是它带来的挑战和我们的解决方案。 平滑 N-gram 模型是根据训练数据中的 N-gram 出现计数来估算的。由于在训练数据中有可能缺少 N-gram,这种方式可能很难推广到未见的数据中。为了解决这个问题,我们使用了许多平滑方法以减少观察到的 N-gram 计数以提升未见的 N-gram 概率,并使用较低阶模型来让较高阶模型平滑。由于平滑,对于具有历史 h 的 N-gram,需要具有相同历史的所有 N-gram 计数和具有作为 h 的后缀的历史的所有较低级 N-gram 来估算其概率。例如,对于三元组「how are you,」,其中「how are」是历史,「you」是要预测的词,为了估计 P(you|how are),我们需要「how are*」,「are*」和所有 unigram(单字 N-gram)的计数,其中*是表示词汇表中任何单词的通配符。经常会出现 N-gram(例如,「how are*」)导致处理时的数据发生偏移。 分片 通过分布式计算框架,我们可以将 N-gram 计数分割成多片,以便由多个并行机器进行处理。基于 N-gram 历史的最后 k 个单词的分片方式可以保证比 k 更长的 N-gram 在所有片段之间被平衡。这需要在所有分片上共享所有长度为 k 的 N-gram 计数。我们把所有这些短 N-gram 放在一个叫做「0-shard」的特殊分片中。例如,如果 k 是 2,那么从训练数据中提取的所有单字母和双字母会被组合在同一个分片(0- shard)中,并且所有进行模型训练的服务器都可以访问。 问题:数据扭曲(Data skew) 在基于 Hive 的管道中,我们使用两个单词的历史分片(two word history sharding) 方式进行模型训练。两词历史分片意味着,共享相同集合的最高有效两词历史(最靠近正被预测的词)的所有 N-gram 计数会被分布到同一节点用于处理。与单字历史相比,两字分片通常具有更平衡的数据分布,除了所有节点必须共享存储在 0-shard 中的平滑算法所需的单字和双字统计。下图说明了具有单字和两字历史的分片分布之间的比较。
对于大型数据集而言,两字历史分割会生成巨大的 0-shard。必须向所有节点散布 0-shard 以缩短总计算时间。同时,这种情况还存在潜在的不稳定性,因为很难预测它的内存需求,一旦启动作业,它可能在运行中耗尽内存。虽然我们可以提前分配更多内存,但仍然不能保证 100%的稳定性,而且这会导致集群内存利用率降低,因为并不是所有实例都需要比历史均值更多的内存。 当我们尝试使用 Spark 后,作业可以在低负载状况下运行。但是对于更大的数据集,我们观察到了以下几个问题: 由于执行器长时间没有接收到 heartbeat,驱动程序将执行器标记为「lost」 执行器 OOM 频繁的执行器 GC 随机服务 OOM Spark 的 block 存在 2GB 的限制 (责任编辑:本港台直播) |