王辉,2017年加入去哪儿网。
目前负责反爬虫相关风控业务,技术领域涉猎广泛,在风控智能化实践方向的道路上持续探索中。
一 前言
Qunar 智能风控场景中,风控研发团队经常会应用一些算法模型,来解决复杂场景问题。典型的如神经网络模型,决策树模型等等。而要完成模型从训练到部署预测的全过程,除了模型算法之外,离不开技术框架的支撑。本篇文章将和大家分享一下,在预测服务部署阶段,基于 Tensorflow for Java 和 Spark-Scala 构建分布式机器学习计算框架的实践经验。主要围绕以下几点展开:
二 框架选型
2.1 项目场景
而在机器学习框架领域, TensorFlow 、 PyTorch 目前分别成为了工业界和学术界使用最广泛的两大框架。TensorFlow 是谷歌的开发者创造的一款开源的深度学习框架,于 2015 年发布。PyTorch 是最新的深度学习框架之一,由 Facebook 的团队开发,并于 2017 年在 GitHub 上开源。
PySpark 的运行时结构如下。为了不破坏 Spark 已有的运行时架构,Spark 在外围包装一层 Python API ,借助 Py4j 实现 Python 和 Java 的交互,进而实现通过 Python 编写 Spark 应用程序。
综上得出结论,方案二在性能上优于方案一。
当然,方案一在其他方面也有着其优势,比如开发效率高、集成难度低(无需跨平台)、 API 支持度高等等。
本节内容总结:
基于 Spark 大数据框架和 Tensorflow 机器学习框架结合,实现分布式机器学习预测,是一种相对可行、有效的方案。
基于 Tensorflow for Java 和 Spark-Scala 实现 Spark和Tensorflow 集成,能带来更高的性能。
三 应用实践
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense,Dropout,Convolution2D,MaxPooling2D,Flatten
from tensorflow.keras.optimizers import Adam
def train_model():
# 载入训练集和测试集数据,进行独热编码
mnist = tf.keras.datasets.mnist
(x_train, y_train), (x_test, y_test) = mnist.load_data()
y_train = tf.keras.utils.to_categorical(y_train,num_classes=10)
y_test = tf.keras.utils.to_categorical(y_test,num_classes=10)
model = Sequential()
model.add(Convolution2D(input_shape=(28, 28, 1), filters=32, kernel_size=5, strides=1, padding='same', activation='relu'))
model.add(MaxPooling2D(pool_size=2, strides=2, padding = 'same'))
model.add(Convolution2D(64, 5, strides=1, padding='same', activation='relu'))
model.add(MaxPooling2D(2,2,'same'))
model.add(Flatten())
model.add(Dense(1024,activation = 'relu'))
model.add(Dropout(0.5))
model.add(Dense(10,activation='softmax'))
adam = Adam(lr=1e-4)
model.compile(optimizer=adam,loss='categorical_crossentropy',metrics=['accuracy'])
model.fit(x_train,y_train,batch_size=64,epochs=10,validation_data=(x_test, y_test))
model.save('./model/model_v1', save_format="tf")
3.1.2 查看模型文件
saved_model_cli show --dir ./model_v1/ --all
<!-- scala -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- spark hadoop -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${spark.scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${spark.scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${spark.scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- tensorflow -->
<dependency>
<groupId>org.tensorflow</groupId>
<artifactId>tensorflow</artifactId>
<version>1.15.0</version>
</dependency>
调用 Tensorflow API 加载预训练好的 protobuff 格式模型文件,得到 SavedModelBundle 类型模型对象。模型文件我们可以保存在工程 resource 目录下,再从 resource 目录加载( Tensorflow 不支持直接从 HDFS 记载模型,后文会介绍如何实现)。
package com.tfspark
import org.apache.spark.sql.SparkSession
import org.tensorflow.SavedModelBundle
import org.{tensorflow => tf}
object ModelLoader {
//modelPath是模型在resource下路径,modelTag从模型文件信息中获取
def loadModelFromLocal(spark: SparkSession, modelPath: String, modelTag: String): SavedModelBundle = {
val bundle = tf.SavedModelBundle.load(modelPath, modelTag)
}
}
package com.tfspark.tensorflow
import com.qunar.rdc.util.TfUtil
import org.tensorflow.SavedModelBundle
import scala.collection.mutable.WrappedArray
import org.{tensorflow => tf}
object TensorFlowCnnProcessor {
def predict(broads: SavedModelBundle, features: WrappedArray[WrappedArray[WrappedArray[Float]]]): Int = {
val sess = bundle.session()
// 特征数据格式化
val x = tf.Tensor.create(Array(features.map(a => a.map(b => b.toArray).toArray).toArray))
// 执行预测 需要传入模型信息里的输入张量名和输出张量名,以及格式化后的特征数据
val y = sess.runner().feed("serving_default_hmc_input:0", x).fetch("StatefulPartitionedCall:0").run().get(0)
// 结果是1x2的二维数组
val result = Array.ofDim[Float](y.shape()(0).toInt,y.shape()(1).toInt)
y.copyTo(result)
// 返回最大值坐标,即为分类结果,对应的是one-hot编码
TfUtil.argMaxOneDim(result(0))
}
}
// 将封装Tensorflow API的预测方法注册为udf函数
val sensorPredict = udf((features: WrappedArray[WrappedArray[WrappedArray[Float]]]) => {predict(bundle, features)})
// Dataframe调udf函数
val resultDf = featureDf.withColumn("predict_result", sensorPredict(col("feature"))
将 Spark-Scala 和 Tensorflow for Java 集成后的工程,通过 maven 打出依赖包:tfspark-1.0.0-jar-with-dependencies.jar 。
在部署了 spark 运行环境的 hadoop 集群上运行 jar 包。依赖的集群环境需提前安装 spark、hadoop、hive 等大数据组件。
sudo -u root /usr/local/Cellar/apache-spark/2.4.3/bin/spark-submit --class com.tfspark.PredictMain --master yarn --deploy-mode client --driver-memory 6g --executor-memory 6g --num-executors 5 --executor-cores 4 /tmp/tfspark-1.0.0-jar-with-dependencies.jar model_v1
3.4 实践成果
四 优化&踩坑经验
问题点:每一条数据都会调用一次模型预测方法,会导致一些可复用的对象被多次创建,相同的方法流程也被多次调用。
优化思路:数据批量调用预测方法。减少重复的对象创建和方法流程执行。
解决方案:使用 RDD 模式下 mapPartition 算子替代 map 算子,获取特征数组,批量调用。
两者都是操作 partition 的迭代器, map 算子通过迭代器获取每个元素,调用操作函数,函数入参是元素类型。mapPartition 直接将迭代器传给操作函数,函数入参是元素集合的迭代器类型。所以使用区别在于, mapPartition 在一个方法中,操作所有 partition 元素,调用一次操作函数;map 一次只能操作一个元素,调用多次操作函数。
float[][] matrix = new float[m][n];
Tensor<Float> ft = Tensor.create(matrix, Float.class);
val y = sess.runner().feed("serving_default_hmc_input:0", ft).fetch("StatefulPartitionedCall:0").run().get(0)
问题点:上面我们提到模型文件存放在工程resource目录,在模型结构不变的情况下,这种方式不便于对模型文件进行更新,需要重新部署服务。
解决方案:对存储方式进行改进,将模型文件存放在HDFS中。每次从HDFS获取模型数据。Tensorflow本身没有提供直接从HDFS加载模型的API,但可以通过Spark先从HDFS读到本地,再从本地加载来实现。这样在模型结构不变的情况下,每次只需要上传新的模型文件,对HDFS中原文件进行覆盖或升级版本号,就可以热更新。
//modelPath是模型在HDFS下路径,modelTag从模型文件信息中获取
spark.sparkContext.addFile(modelPath, true)
val localPath = SparkFiles.get(modelPath)
tf.SavedModelBundle.load(localPath, modelTag)
Exception in thread "main" java.lang.UnsatisfiedLinkError: /tmp/tensorflow_native_libraries-1613705012956-0/libtensorflow_jni.so: libtensorflow_framework.so.1: cannot open shared object file: No such file or directory
at java.lang.ClassLoader$NativeLibrary.load(Native Method)
at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941)
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824)
at java.lang.Runtime.load0(Runtime.java:809)
at java.lang.System.load(System.java:1086)
原因分析:通过查看报错日志,分析得出结论,Tensorflow 依赖 C 环境,对 C 基础库的版本有要求。集群 C 环境基础库版本存在不一致,部分安装了过低或过高版本,导致 Tensorflow 不兼容,部分机器任务报错。
解决方案:提供两种思路。
第一种方案是修复集群环境,但如果是公共集群更改基础库影响较大。
第二种方案是不用 Hadoop 集群,使用单台实体机,采用 Spark Local 模式,启动多个 Executor 执行任务,从而保证环境一致性。
实践中我们采用了第二种。
五 总结
本文写作的目的,是希望将自己在分布式机器学习计算框架实际应用中的思考和经验分享出来,供大家参考交流。
通过不同框架的优缺点对比,以及底层实现对性能影响的剖析,阐述了选型的思考过程。明确了 Tensorflow for Java & Spark-Scala 为何适用于大数据下高性能分布式机器学习模型预测场景。结合实践经验,演示了项目中框架应用的整体流程。并总结了在性能和部署流程优化过程中的思考。
由于水平有限,文章多有纰漏不足,也恳请大家指正。
END