当前位置: 首页 > 编程日记 > 正文

学习笔记TF065:TensorFlowOnSpark

2019独角兽企业重金招聘Python工程师标准>>> hot3.png

Hadoop生态大数据系统分为Yam、 HDFS、MapReduce计算框架。TensorFlow分布式相当于MapReduce计算框架,Kubernetes相当于Yam调度系统。TensorFlowOnSpark,利用远程直接内存访问(Remote Direct Memory Access,RDMA)解决存储功能和调度,实现深度学习和大数据融合。TensorFlowOnSpark(TFoS),雅虎开源项目。https://github.com/yahoo/TensorFlowOnSpark 。支持ApacheSpark集群分布式TensorFlow训练、预测。TensorFlowOnSpark提供桥接程序,每个Spark Executor启动一个对应TensorFlow进程,通过远程进程通信(RPC)交互。

TensorFlowOnSpark架构。TensorFlow训练程序用Spark集群运行,管理Spark集群步骤:预留,在Executor执行每个TensorFlow进程保留一个端口,启动数据消息监听器。启动,在Executor启动TensorFlow主函数。数据获取,TensorFlow Readers和QueueRunners机制直接读取HDFS数据文件,Spark不访问数据;Feeding,SparkRDD 数据发送TensorFlow节点,数据通过feed_dict机制传入TensorFlow计算图。关闭,关闭Executor TensorFlow计算节点、参数服务节点。Spark Driver->Spark Executor->参数服务器->TensorFlow Core->gRPC、RDMA->HDFS数据集。http://yahoohadoop.tumblr.com/post/157196317141/open-sourcing-tensorflowonspark-distributed-deep 。

TensorFlowOnSpark MNIST。https://github.com/yahoo/TensorFlowOnSpark/wiki/GetStarted_standalone 。Standalone模式Spark集群,一台计算机。安装 Spark、Hadoop。部署Java 1.8.0 JDK。下载Spark2.1.0版 http://spark.apache.org/downloads.html 。下载Hadoop2.7.3版 http://hadoop.apache.org/#Download+Hadoop 。0.12.1版本支持较好。 修改配置文件,设置环境变量,启动Hadoop:$HADOOP_HOME/sbin/start-all.sh。检出TensorFlowOnSpark源代码:

git clone --recurse-submodules https://github.com/yahoo/TensorFlowOnSpark.git
cd TensorFlowOnSpark
git submodule init
git submodule update --force
git submodule foreach --recursive git clean -dfx

源代码打包,提交任务使用:

cd TensorflowOnSpark/src
zip -r ../tfspark.zip *

设置TensorFlowOnSpark根目录环境变量:

cd TensorFlowOnSpark
export TFoS_HOME=$(pwd)

启动Spark主节点(master):

$(SPARK_HOME)/sbin/start-master.sh

配置两个工作节点(worker)实例,master-spark-URL连接主节点:

export MASTER=spark://$(hostname):7077
export SPARK_WORKER_INSTANCES=2
export CORES_PER_WORKER=1
export TOTAL_CORES=$(($(CORES_PER_WORKER)*$(SPARK_WORKER_INSTANCES)))
$(SPARK_HOME)/sbin/start-slave.sh -c $CORES_PER_WORKER -m 3G $(MASTER)

提交任务,MNIST zip文件转换为HDFS RDD 数据集:

$(SPARK_HOME)/bin/spark-submit \
--master $(MASTER) --conf spark.ui.port=4048 --verbose \
$(TFoS_HOME)/examples/mnist/mnist_data_setup.py \
--output examples/mnist/csv \
--format csv

查看处理过的数据集:

hadoop fs -ls hdfs://localhost:9000/user/libinggen/examples/mnist/csv

查看保存图片、标记向量:

hadoop fs -ls hdfs://localhost:9000/user/libinggen/examples/mnist/csv/train/labels

把训练集、测试集分别保存RDD数据。 https://github.com/yahoo/TensorFlowOnSpark/blob/master/examples/mnist/mnist_data_setup.py 。

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy
import tensorflow as tf
from array import array
from tensorflow.contrib.learn.python.learn.datasets import mnist
def toTFExample(image, label):"""Serializes an image/label as a TFExample byte string"""example = tf.train.Example(features = tf.train.Features(feature = {'label': tf.train.Feature(int64_list=tf.train.Int64List(value=label.astype("int64"))),'image': tf.train.Feature(int64_list=tf.train.Int64List(value=image.astype("int64")))}))return example.SerializeToString()
def fromTFExample(bytestr):"""Deserializes a TFExample from a byte string"""example = tf.train.Example()example.ParseFromString(bytestr)return example
def toCSV(vec):"""Converts a vector/array into a CSV string"""return ','.join([str(i) for i in vec])
def fromCSV(s):"""Converts a CSV string to a vector/array"""return [float(x) for x in s.split(',') if len(s) > 0]
def writeMNIST(sc, input_images, input_labels, output, format, num_partitions):"""Writes MNIST image/label vectors into parallelized files on HDFS"""# load MNIST gzip into memory# MNIST图像、标记向量写入HDFSwith open(input_images, 'rb') as f:images = numpy.array(mnist.extract_images(f))with open(input_labels, 'rb') as f:if format == "csv2":labels = numpy.array(mnist.extract_labels(f, one_hot=False))else:labels = numpy.array(mnist.extract_labels(f, one_hot=True))shape = images.shapeprint("images.shape: {0}".format(shape))          # 60000 x 28 x 28print("labels.shape: {0}".format(labels.shape))   # 60000 x 10# create RDDs of vectorsimageRDD = sc.parallelize(images.reshape(shape[0], shape[1] * shape[2]), num_partitions)labelRDD = sc.parallelize(labels, num_partitions)output_images = output + "/images"output_labels = output + "/labels"# save RDDs as specific format# RDDs保存特定格式if format == "pickle":imageRDD.saveAsPickleFile(output_images)labelRDD.saveAsPickleFile(output_labels)elif format == "csv":imageRDD.map(toCSV).saveAsTextFile(output_images)labelRDD.map(toCSV).saveAsTextFile(output_labels)elif format == "csv2":imageRDD.map(toCSV).zip(labelRDD).map(lambda x: str(x[1]) + "|" + x[0]).saveAsTextFile(output)else: # format == "tfr":tfRDD = imageRDD.zip(labelRDD).map(lambda x: (bytearray(toTFExample(x[0], x[1])), None))# requires: --jars tensorflow-hadoop-1.0-SNAPSHOT.jartfRDD.saveAsNewAPIHadoopFile(output, "org.tensorflow.hadoop.io.TFRecordFileOutputFormat",keyClass="org.apache.hadoop.io.BytesWritable",valueClass="org.apache.hadoop.io.NullWritable")
#  Note: this creates TFRecord files w/o requiring a custom Input/Output format
#  else: # format == "tfr":
#    def writeTFRecords(index, iter):
#      output_path = "{0}/part-{1:05d}".format(output, index)
#      writer = tf.python_io.TFRecordWriter(output_path)
#      for example in iter:
#        writer.write(example)
#      return [output_path]
#    tfRDD = imageRDD.zip(labelRDD).map(lambda x: toTFExample(x[0], x[1]))
#    tfRDD.mapPartitionsWithIndex(writeTFRecords).collect()
def readMNIST(sc, output, format):"""Reads/verifies previously created output"""output_images = output + "/images"output_labels = output + "/labels"imageRDD = NonelabelRDD = Noneif format == "pickle":imageRDD = sc.pickleFile(output_images)labelRDD = sc.pickleFile(output_labels)elif format == "csv":imageRDD = sc.textFile(output_images).map(fromCSV)labelRDD = sc.textFile(output_labels).map(fromCSV)else: # format.startswith("tf"):# requires: --jars tensorflow-hadoop-1.0-SNAPSHOT.jartfRDD = sc.newAPIHadoopFile(output, "org.tensorflow.hadoop.io.TFRecordFileInputFormat",keyClass="org.apache.hadoop.io.BytesWritable",valueClass="org.apache.hadoop.io.NullWritable")imageRDD = tfRDD.map(lambda x: fromTFExample(str(x[0])))num_images = imageRDD.count()num_labels = labelRDD.count() if labelRDD is not None else num_imagessamples = imageRDD.take(10)print("num_images: ", num_images)print("num_labels: ", num_labels)print("samples: ", samples)
if __name__ == "__main__":import argparsefrom pyspark.context import SparkContextfrom pyspark.conf import SparkConfparser = argparse.ArgumentParser()parser.add_argument("-f", "--format", help="output format", choices=["csv","csv2","pickle","tf","tfr"], default="csv")parser.add_argument("-n", "--num-partitions", help="Number of output partitions", type=int, default=10)parser.add_argument("-o", "--output", help="HDFS directory to save examples in parallelized format", default="mnist_data")parser.add_argument("-r", "--read", help="read previously saved examples", action="store_true")parser.add_argument("-v", "--verify", help="verify saved examples after writing", action="store_true")

args = parser.parse_args() print("args:",args) sc = SparkContext(conf=SparkConf().setAppName("mnist_parallelize")) if not args.read: # Note: these files are inside the mnist.zip file writeMNIST(sc, "mnist/train-images-idx3-ubyte.gz", "mnist/train-labels-idx1-ubyte.gz", args.output + "/train", args.format, args.num_partitions) writeMNIST(sc, "mnist/t10k-images-idx3-ubyte.gz", "mnist/t10k-labels-idx1-ubyte.gz", args.output + "/test", args.format, args.num_partitions) if args.read or args.verify: readMNIST(sc, args.output + "/train", args.format)

提交训练任务,开始训练,在HDFS生成mnist_model,命令:

${SPARK_HOME}/bin/spark-submit \
--master ${MASTER} \
--py-files ${TFoS_HOME}/examples/mnist/spark/mnist_dist.py \
--conf spark.cores.max=${TOTAL_CORES} \
--conf spark.task.cpus=${CORES_PER_WORKER} \
--conf spark.executorEnv.JAVA_HOME="$JAVA_HOME" \
${TFoS_HOME}/examples/mnist/spark/mnist_spark.py \
--cluster_size ${SPARK_WORKER_INSTANCES} \
--images examples/mnist/csv/train/images \
--labels examples/mnist/csv/train/labels \
--format csv \
--mode train \
--model mnist_model

mnist_dist.py 构建TensorFlow 分布式任务,定义分布式任务主函数,启动TensorFlow主函数map_fun,数据获取方式Feeding。获取TensorFlow集群和服务器实例:

cluster, server = TFNode.start_cluster_server(ctx, 1, args.rdma)

TFNode调用tfspark.zip TFNode.py文件。

mnist_spark.py文件是训练主程序,TensorFlowOnSpark部署步骤:

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
import argparse
import os
import numpy
import sys
import tensorflow as tf
import threading
import time
from datetime import datetime
from tensorflowonspark import TFCluster
import mnist_dist
sc = SparkContext(conf=SparkConf().setAppName("mnist_spark"))
executors = sc._conf.get("spark.executor.instances")
num_executors = int(executors) if executors is not None else 1
num_ps = 1
parser = argparse.ArgumentParser()
parser.add_argument("-b", "--batch_size", help="number of records per batch", type=int, default=100)
parser.add_argument("-e", "--epochs", help="number of epochs", type=int, default=1)
parser.add_argument("-f", "--format", help="example format: (csv|pickle|tfr)", choices=["csv","pickle","tfr"], default="csv")
parser.add_argument("-i", "--images", help="HDFS path to MNIST images in parallelized format")
parser.add_argument("-l", "--labels", help="HDFS path to MNIST labels in parallelized format")
parser.add_argument("-m", "--model", help="HDFS path to save/load model during train/inference", default="mnist_model")
parser.add_argument("-n", "--cluster_size", help="number of nodes in the cluster", type=int, default=num_executors)
parser.add_argument("-o", "--output", help="HDFS path to save test/inference output", default="predictions")
parser.add_argument("-r", "--readers", help="number of reader/enqueue threads", type=int, default=1)
parser.add_argument("-s", "--steps", help="maximum number of steps", type=int, default=1000)
parser.add_argument("-tb", "--tensorboard", help="launch tensorboard process", action="store_true")
parser.add_argument("-X", "--mode", help="train|inference", default="train")
parser.add_argument("-c", "--rdma", help="use rdma connection", default=False)
args = parser.parse_args()
print("args:",args)
print("{0} ===== Start".format(datetime.now().isoformat()))
if args.format == "tfr":images = sc.newAPIHadoopFile(args.images, "org.tensorflow.hadoop.io.TFRecordFileInputFormat",keyClass="org.apache.hadoop.io.BytesWritable",valueClass="org.apache.hadoop.io.NullWritable")def toNumpy(bytestr):example = tf.train.Example()example.ParseFromString(bytestr)features = example.features.featureimage = numpy.array(features['image'].int64_list.value)label = numpy.array(features['label'].int64_list.value)return (image, label)dataRDD = images.map(lambda x: toNumpy(str(x[0])))
else:if args.format == "csv":images = sc.textFile(args.images).map(lambda ln: [int(x) for x in ln.split(',')])labels = sc.textFile(args.labels).map(lambda ln: [float(x) for x in ln.split(',')])else: # args.format == "pickle":images = sc.pickleFile(args.images)labels = sc.pickleFile(args.labels)print("zipping images and labels")dataRDD = images.zip(labels)
#1.为在Executor执行每个TensorFlow进程保留一个端口
cluster = TFCluster.run(sc, mnist_dist.map_fun, args, args.cluster_size, num_ps, args.tensorboard, TFCluster.InputMode.SPARK)
#2.启动Tensorflow主函数
cluster.start(mnist_dist.map_fun, args)
if args.mode == "train":#3.训练cluster.train(dataRDD, args.epochs)
else:#3.预测labelRDD = cluster.inference(dataRDD)labelRDD.saveAsTextFile(args.output)
#4.关闭Executor TensorFlow计算节点、参数服务节点
cluster.shutdown()
print("{0} ===== Stop".format(datetime.now().isoformat()))

预测命令:

${SPARK_HOME}/bin/spark-submit \
--master ${MASTER} \
--py-files ${TFoS_HOME}/examples/mnist/spark/mnist_dist.py \
--conf spark.cores.max=${TOTAL_CORES} \
--conf spark.task.cpus=${CORES_PER_WORKER} \
--conf spark.executorEnv.JAVA_HOME="$JAVA_HOME" \
${TFoS_HOME}/examples/mnist/spark/mnist_spark.py \
--cluster_size ${SPARK_WORKER_INSTANCES} \
--images examples/mnist/csv/test/images \
--labels examples/mnist/csv/test/labels \
--mode inference \
--format csv \
--model mnist_model \
--output predictions

还可以Amazon EC2运行及在Hadoop集群采用YARN模式运行。

参考资料: 《TensorFlow技术解析与实战》

欢迎推荐上海机器学习工作机会,我的微信:qingxingfengzi

转载于:https://my.oschina.net/u/3482787/blog/1572538

相关文章:

HTML5培训好不好

HTML5培训好不好?这个问题,要看你选择的培训机构,想要学习HTML5技术,靠谱的培训机构非常重要,下面我们就来看看详细的介绍吧。 HTML5培训好不好?从前端开发的基础出发,学习使用HTML,CSS,JavaS…

技术图文:03 结构型设计模式(下)

结构型设计模式(下) 本教程主要介绍一系列用于如何将现有类或对象组合在一起形成更加强大结构的经验总结。 知识结构: 组合模式 – 树形结构的处理 Sunny 软件公司欲开发一个杀毒(AntiVirus)软件,该软件…

程序员必知8大排序3大查找(三)

前两篇 《程序员必知8大排序3大查找(一)》 《程序员必知8大排序3大查找(二)》 三种查找算法:顺序查找,二分法查找(折半查找),分块查找,散列表(以后谈&#xf…

MongoDB给数据库创建用户

转自http://www.imooc.com/article/18439 一.先以非授权的模式启动MongoDB非授权: linux/Mac : mongod -f /mongodb/etc/mongo.confwindows : mongod --config c:\mongodb\etc\mongo.conf 或者 net start mongodb (前提是mongo安装到了服务里面&#xff…

如何挑选一家好的软件测试培训机构

随着智能时代的发展,我们的手机APP等各种软件都变得越来越复杂化、规模化,软件测试这一步骤是必不可少的,这也造就了这个行业的兴起,越来越多的人想要学习软件测试技术,想要知道如何挑选一家好的软件测试培训机构?来看…

POJ 3177 判决素数个数

时间限制: 1000ms内存限制:65536kB描述输入两个整数X和Y,输出两者之间的素数个数(包括X和Y)。输入两个整数X和Y,X和Y的大小任意。输出输出一个整数,结果可以是0,或大于0的整数。样例输入1 100样例输出25&am…

数据结构与算法:22 精选练习50

精选练习50 马上就要期末考试或者考研了。为了大家复习的方便,我精选了有关数据结构与算法的50道选择题,大家可以抽空练习一下。公众号后台回复“答案”可以获取该50道题目的答案。 01、数据在计算机中的表示称为数据的______。 (A&#x…

极速理解设计模式系列:11.单例模式(Singleton Pattern)

单例模式:确保某一个类只有一个实例,而且自行实例化并向整个系统提供这个实例。这个类称为单例类。 三要点: 一、单例类只能有一个实例 二、单例类必须自行创建自身实例 三、单例类自行向整个系统提供实例 类图: 应用场景&#xf…

参加web前端培训要学哪些知识

IT行业,web前端技术是比较吃香的,也是工资待遇非常高的行业之一,如果想要做一名合格的web前端工程师,系统学习是非常重要的,那么参加web前端培训要学哪些知识呢?来看看下面的详细介绍。 参加web前端培训要学哪些知识?…

数据结构与算法:19 排序

19 排序 知识结构: 1. 排序的基本概念与术语 假设含有nnn个记录的序列为{r1,r2,⋯,rn}\lbrace r_1,r_2,\cdots,r_n \rbrace{r1​,r2​,⋯,rn​},其相应的关键字分别为{k1,k2,⋯,kn}\lbrace k_1,k_2,\cdots,k_n \rbrace{k1​,k2​,⋯,kn​},…

Objective-C 什么是类

Objective-C 什么是类 转自http://www.189works.com/article-31219-1.html 之前一直做C开发,最近2个多月转 Objective-C, 入门的时候,遇到了很多的困惑。现在过节,正是解决他们的好时机。 主要参考来自http://www.sealiesoftware.…

APP之红点提醒三个阶段

下面这个页面就是我们进入APP后的主界面。客户选项的红点上数字就是显示我们没有查看的客户总数量。 当我们切换到客户这个fragment时,会显示贷款客户数量与保险客户数量。 当我们随便点击入一个选项,假如进入到保险客户的这个activity里面,L…

零基础参加java培训的系统学习路线

​ 零基础想要学习java技术,那么最好的选择就是参加java培训,进行系统的学习,以下就是小编为大家整理的零基础参加java培训的系统学习路线,希望能够帮助到正在学习java技术的零基础同学。 零基础参加java培训的系统学习路线&#…

在ASP.NET中跟踪和恢复大文件下载

在Web应用程序中处理大文件下载的问题一直出了名的困难,因此对于大多数站点来说,如果用户的下载被中断了,它们只能说悲哀降临到用户的身上了。但是我们现在不必这样了,因为你可以使自己的ASP.NET应用程序有能力支持可恢复&#xf…

ZeroMQ实例-使用ZeroMQ进行windows与linux之间的通信

1、本文包括 1)在windows下使用ZMQ 2)在windows环境下与Linux环境下进行网络通信 2、在Linux下使用ZMQ 之前写过一篇如何在Linux环境下使用ZMQ的文章 《ZeroMQ实例-使用ZMQ(ZeroMQ)进行局域网内网络通信》,这里就不再赘述。 3、在Windows环境…

线性代数:03 向量空间 -- 基本概念

本讲义是自己上课所用幻灯片,里面没有详细的推导过程(笔者板书推导)只以大纲的方式来展示课上的内容,以方便大家下来复习。 本章主要介绍向量空间的知识,与前两章一样本章也可以通过研究解线性方程组的解把所有知识点…

如何获得PMP认证证书

​ pmp证书是一项由美国项目管理协会发起的项目管理专业人士认证证书,它属于国际认证类证书,含金量是非常高的,那么如何获得PMP认证证书呢?来看看下面的详细介绍。 如何获得PMP证书? PMP证书的获取是需要参加PMP考试的。我国自1999年引进PM…

UITextField的详细使用

UItextField通常用于外部数据输入,以实现人机交互。下面以一个简单的登陆界面来讲解UItextField的详细使用。//用来显示“用户名”的labelUILabel* label1 [[UILabelalloc] initWithFrame:CGRectMake(15, 65, 70, 30)];label1.backgroundCol…

06-hibernate注解-一对多单向外键关联

一对多单向外键 1,一方持有多方的集合,一个班级有多个学生(一对多)。 2,OneToMany(cascade{CascadeType.ALL}, fetchFetchType.LAZY )  //级联关系,抓取策略:懒加载。 JoinColumn(name"c…

线性代数:03 向量空间 -- 矩阵的零空间,列空间,线性方程组解的结构

本讲义是自己上课所用幻灯片,里面没有详细的推导过程(笔者板书推导)只以大纲的方式来展示课上的内容,以方便大家下来复习。 本章主要介绍向量空间的知识,与前两章一样本章也可以通过研究解线性方程组的解把所有知识点…

学Python培训有什么用

​ Python在近几年的发展非常迅速,在互联网行业Python的薪资也越来越高,不少人开始准备学习Python技术,那么到底学Python培训有什么用呢?来看看下面的详细介绍。 学Python培训有什么用? 学习python可以提高工作效率,使用python&…

SQL压力测试用的语句和相关计数器

将数据库中所有表的所有的内容选一遍: IF object_id(tempdb..#temp) is not null BEGIN DROP TABLE #temp END DECLARE index int DECLARE count int DECLARE schemaname varchar(50) DECLARE tablename varchar(50) set index1 set count(select count(*) from s…

线性代数:04 特征值与特征向量 -- 特征值与特征向量

本讲义是自己上课所用幻灯片,里面没有详细的推导过程(笔者板书推导)只以大纲的方式来展示课上的内容,以方便大家下来复习。 本章主要介绍特征值与特征向量的知识,前一章我们介绍了线性变换可以把一个向量映射到另一个…

使用Silverlight2的WebClient下载远程图片

在Silverlight 2之前有一个Downloader对象,开发者一般使用Downloader下载图片和文体文件,这个对象在Silverlight 2中作为了一个特性被集成到WebClient类之中,你可以直接使用WebClient的OpenReadAsync方法加载远程图片的URI,然后使…

学习Web前端需要避免哪些错误

很多初学web前端的同学,在学习web前端的时候都会遇到一些错误,虽然有些错误与某一个具体的行为相关,但有些错误却是所有Web开发人员都需要面对的挑战。下面小编就整理一下学习Web前端需要避免哪些错误,希望能够给同学们带来帮助。…

【2012百度之星/资格赛】H:用户请求中的品牌 [后缀数组]

时间限制:1000ms内存限制:65536kB描述馅饼同学是一个在百度工作,做用户请求(query)分析的同学,他在用户请求中经常会遇到一些很奇葩的词汇。在比方说“johnsonjohnson”、“duckduck”,这些词汇虽然看起来是一些词汇的…

实战:使用Telnet排除网络故障

使用Telnet排除网络故障 如果员工告诉你,他的计算机不能访问网站。你需要断定是他的计算机系统出了问题还是IE浏览器中了恶意插件,或者是网络层面的问题。 如图2-108所示,通过Telnet 服务器的某个端口,就能断定是否访问该服务器的…

线性代数:04 特征值与特征向量 -- 矩阵的相似对角化

本讲义是自己上课所用幻灯片,里面没有详细的推导过程(笔者板书推导)只以大纲的方式来展示课上的内容,以方便大家下来复习。 本章主要介绍特征值与特征向量的知识,前一章我们介绍了线性变换可以把一个向量映射到另一个…

UI设计培训完之后可以去哪些公司工作

UI设计培训完之后可以去哪些公司工作?这是目前很多学习UI设计或者准备学习UI设计的同学比较关注的一个问题,虽然都知道UI设计的发展前景不错,但是具体学完之后该去哪里工作大家却比较迷茫,来看看下面的详细介绍吧。 UI设计培训完之后可以去哪…

Tomcat详解(下)

配置监听端口 1、编辑配置文件 1234[rootplinuxos ~]# vim /usr/local/tomcat/conf/server.xml <Connector port"80" protocol"HTTP/1.1" ##改成80端口 connectionTimeout"20000" redirectPort"8443" /> 2、重启服务 123456…