基于深度学习的故障诊断平台,使用Java、Python实现,在Spark上部署,另外涉及Kafka、Springboot、Tensorflow、Go等内容
1.项目介绍
设计一个故障诊断平台,可以实时分析数据采集设备的北向数据,实现故障诊断功能。
2.项目难点
2.1 降低模型加载的IO瓶颈
通过Java来加载模型进而运行在实际部署过程中需要较多工作量来实现数据的分发工作,而且模型在网络中传输是否会影响性能?后期拟采用Google提供的Serving方案。
2.1.1 问题描述
- 接入车辆多,无法每个服务器都加载所有的类型的车辆模型,也没有必要
- IO速度慢,模型加载时间长(模型加载时间达到1.5秒左右,计算一次的时间也只有300ms左右)
2.1.2 解决方案
- 服务器动态加载模型,创建一个模型管理Hash表,来存储ModelManager
- 每个模型都有一个存活时间(每次使用都要清零),到达最大存活时间则清除
- 使用可调度线程池来进行周期性运行存活时间更新和Check是否超时
- Spark在每次Hash表更改后,由Spark集群的master将Hash表broadcast到所有机器上
- Kafka在接受到设备的数据请求后,根据车辆ID均匀分配到不同的partition,而在Kafka中每个partition都会固定分给一个消费者处理,所以不会出现不同的消费者频繁加载模型的情况
2.2 消息中间件调研
-
Kafka吞吐量更大,RocketMQ居中,RabbitMQ最差。
-
Kafka社区活跃度高,针对Spark、Flink等有案例
2.3 JVM如何配置
2.3.1 GC配置
本项目要求较高的吞吐量和响应速度,这两者是有一定的矛盾的。从两种垃圾收集器种类来看(暂且不考虑串行垃圾收集器),并发(Concurrent)垃圾收集器是指垃圾收集线程和应用同时工作的收集器,适用于对响应速度有较高要求的场景,典型的如CMS(Concurrent Mark Sweep)垃圾收集器;并行垃圾收集器是指垃圾收集线程会暂停应用执行,并通过多线程合作实现并行垃圾回收的收集器,适用于对吞吐量有较高要求的场景,典型的如Parallel Scavenges(JVM默认,用于新生代)、Parallel Old(JVM默认,用于老年代)。
2.4 提高数据处理和TF模型计算速度
2.4.1 问题描述
- 数据量大,需要提高计算效率
2.4.2 解决方案
-
Spark并行处理
sc.parallelize(value).reduce(func)
2.5 TF模型部署
通过TF的Serving来进行部署。
# Download the TensorFlow Serving Docker image and repo
docker pull tensorflow/serving
git clone https://github.com/tensorflow/serving
# Location of demo models
TESTDATA="$(pwd)/serving/tensorflow_serving/servables/tensorflow/testdata"
# Start TensorFlow Serving container and open the REST API port
docker run -t --rm -p 8501:8501 \
-v "$TESTDATA/saved_model_half_plus_two_cpu:/models/half_plus_two" \
-e MODEL_NAME=half_plus_two \
tensorflow/serving &
# Query the model using the predict API
curl -d '{"instances": [1.0, 2.0, 5.0]}' \
-X POST http://localhost:8501/v1/models/half_plus_two:predict
# Returns => { "predictions": [2.5, 3.0, 4.5] }
2.X 性能评估
2.X.1 机器学习模型的评估
2.X.2 接口性能
使用JMeter进行性能测试
-
测试吞吐量
发送数据时,可以选择是否获取最近的诊断结果。
-
测试响应时间
2.X.3 数据生成与测试
使用Golang实现简单的数据读取,并转化为json格式,通过HTTP发送到服务器进行处理。项目地址为netest。
3.项目小问题记录
3.1 配置问题
-
如何把Springboot应用打包成Spark可识别
Spark不可识别
spring-boot-maven-plugin
打包的springboot项目结构,需要使用maven-shade-plugin
<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <keepDependenciesWithProvidedScope>false</keepDependenciesWithProvidedScope> <createDependencyReducedPom>false</createDependencyReducedPom> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>META-INF/spring.handlers</resource> </transformer> <transformer implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer"> <resource>META-INF/spring.factories</resource> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>META-INF/spring.schemas</resource> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>cn.neyzoter.aiot.fddp.FddpApplication</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin>
-
Spark的Scala版本2.11和2.12不兼容
如果下载的Spark安装包使用的是Scala2.11,而应用的maven包中配置了Spark的Scala是2.12,则会找不到对应的Scala包。
3.2 数据预处理的可配置性
3.2.1 问题描述
数据预处理的时候需要知道每种数据的最大最小值,固定死程序,则会造成经常需要修改。
3.2.2 解决方案
通过properties文件写入参数(最大最小值),ModelManager在每次创建时都会读入最大最小值,保存形式为Map<参数名, 数值>
。而需要使用该参数时,利用Java的反射机制,可以知道每个参数的名称,通过该名称来读取对应的最大最小值。
改进
反射效率较低,如果每次给数据规划化都要先获取Field的String 名称,再使用该String名称找到最大最小值过于麻烦和效率底下。所以,后面为了提高效率,其实使用了数据对象保存。也就是在业务场景下不使用上述Map对象,而是从保存最大最小值的数据对象中获取。只在模型读取时将Map作为中间过度。
欢迎关注我的微信公众号
互联网矿工