本文共 12275 字,大约阅读时间需要 40 分钟。
Hadoop生态大数据系统分为Yam、 HDFS、MapReduce计算框架。TensorFlow分布式相当于MapReduce计算框架,Kubernetes相当于Yam调度系统。TensorFlowOnSpark,利用远程直接内存访问(Remote Direct Memory Access,RDMA)解决存储功能和调度,实现深度学习和大数据融合。TensorFlowOnSpark(TFoS),雅虎开源项目。 。支持ApacheSpark集群分布式TensorFlow训练、预测。TensorFlowOnSpark提供桥接程序,每个Spark Executor启动一个对应TensorFlow进程,通过远程进程通信(RPC)交互。
TensorFlowOnSpark架构。TensorFlow训练程序用Spark集群运行,管理Spark集群步骤:预留,在Executor执行每个TensorFlow进程保留一个端口,启动数据消息监听器。启动,在Executor启动TensorFlow主函数。数据获取,TensorFlow Readers和QueueRunners机制直接读取HDFS数据文件,Spark不访问数据;Feeding,SparkRDD 数据发送TensorFlow节点,数据通过feed_dict机制传入TensorFlow计算图。关闭,关闭Executor TensorFlow计算节点、参数服务节点。Spark Driver->Spark Executor->参数服务器->TensorFlow Core->gRPC、RDMA->HDFS数据集。 。
TensorFlowOnSpark MNIST。 。Standalone模式Spark集群,一台计算机。安装 Spark、Hadoop。部署Java 1.8.0 JDK。下载Spark2.1.0版 。下载Hadoop2.7.3版 。0.12.1版本支持较好。
修改配置文件,设置环境变量,启动Hadoop:$HADOOP_HOME/sbin/start-all.sh。检出TensorFlowOnSpark源代码:git clone --recurse-submodules https://github.com/yahoo/TensorFlowOnSpark.gitcd TensorFlowOnSparkgit submodule initgit submodule update --forcegit submodule foreach --recursive git clean -dfx
源代码打包,提交任务使用:
cd TensorflowOnSpark/srczip -r ../tfspark.zip *
设置TensorFlowOnSpark根目录环境变量:
cd TensorFlowOnSparkexport TFoS_HOME=$(pwd)
启动Spark主节点(master):
$(SPARK_HOME)/sbin/start-master.sh
配置两个工作节点(worker)实例,master-spark-URL连接主节点:
export MASTER=spark://$(hostname):7077export SPARK_WORKER_INSTANCES=2export CORES_PER_WORKER=1export TOTAL_CORES=$(($(CORES_PER_WORKER)*$(SPARK_WORKER_INSTANCES)))$(SPARK_HOME)/sbin/start-slave.sh -c $CORES_PER_WORKER -m 3G $(MASTER)
提交任务,MNIST zip文件转换为HDFS RDD 数据集:
$(SPARK_HOME)/bin/spark-submit \--master $(MASTER) --conf spark.ui.port=4048 --verbose \$(TFoS_HOME)/examples/mnist/mnist_data_setup.py \--output examples/mnist/csv \--format csv
查看处理过的数据集:
hadoop fs -ls hdfs://localhost:9000/user/libinggen/examples/mnist/csv
查看保存图片、标记向量:
hadoop fs -ls hdfs://localhost:9000/user/libinggen/examples/mnist/csv/train/labels
把训练集、测试集分别保存RDD数据。
。from __future__ import absolute_importfrom __future__ import divisionfrom __future__ import print_functionimport numpyimport tensorflow as tffrom array import arrayfrom tensorflow.contrib.learn.python.learn.datasets import mnistdef toTFExample(image, label): """Serializes an image/label as a TFExample byte string""" example = tf.train.Example( features = tf.train.Features( feature = { 'label': tf.train.Feature(int64_list=tf.train.Int64List(value=label.astype("int64"))), 'image': tf.train.Feature(int64_list=tf.train.Int64List(value=image.astype("int64"))) } ) ) return example.SerializeToString()def fromTFExample(bytestr): """Deserializes a TFExample from a byte string""" example = tf.train.Example() example.ParseFromString(bytestr) return exampledef toCSV(vec): """Converts a vector/array into a CSV string""" return ','.join([str(i) for i in vec])def fromCSV(s): """Converts a CSV string to a vector/array""" return [float(x) for x in s.split(',') if len(s) > 0]def writeMNIST(sc, input_images, input_labels, output, format, num_partitions): """Writes MNIST image/label vectors into parallelized files on HDFS""" # load MNIST gzip into memory # MNIST图像、标记向量写入HDFS with open(input_images, 'rb') as f: images = numpy.array(mnist.extract_images(f)) with open(input_labels, 'rb') as f: if format == "csv2": labels = numpy.array(mnist.extract_labels(f, one_hot=False)) else: labels = numpy.array(mnist.extract_labels(f, one_hot=True)) shape = images.shape print("images.shape: {0}".format(shape)) # 60000 x 28 x 28 print("labels.shape: {0}".format(labels.shape)) # 60000 x 10 # create RDDs of vectors imageRDD = sc.parallelize(images.reshape(shape[0], shape[1] * shape[2]), num_partitions) labelRDD = sc.parallelize(labels, num_partitions) output_images = output + "/images" output_labels = output + "/labels" # save RDDs as specific format # RDDs保存特定格式 if format == "pickle": imageRDD.saveAsPickleFile(output_images) labelRDD.saveAsPickleFile(output_labels) elif format == "csv": imageRDD.map(toCSV).saveAsTextFile(output_images) labelRDD.map(toCSV).saveAsTextFile(output_labels) elif format == "csv2": imageRDD.map(toCSV).zip(labelRDD).map(lambda x: str(x[1]) + "|" + x[0]).saveAsTextFile(output) else: # format == "tfr": tfRDD = imageRDD.zip(labelRDD).map(lambda x: (bytearray(toTFExample(x[0], x[1])), None)) # requires: --jars tensorflow-hadoop-1.0-SNAPSHOT.jar tfRDD.saveAsNewAPIHadoopFile(output, "org.tensorflow.hadoop.io.TFRecordFileOutputFormat", keyClass="org.apache.hadoop.io.BytesWritable", valueClass="org.apache.hadoop.io.NullWritable")# Note: this creates TFRecord files w/o requiring a custom Input/Output format# else: # format == "tfr":# def writeTFRecords(index, iter):# output_path = "{0}/part-{1:05d}".format(output, index)# writer = tf.python_io.TFRecordWriter(output_path)# for example in iter:# writer.write(example)# return [output_path]# tfRDD = imageRDD.zip(labelRDD).map(lambda x: toTFExample(x[0], x[1]))# tfRDD.mapPartitionsWithIndex(writeTFRecords).collect()def readMNIST(sc, output, format): """Reads/verifies previously created output""" output_images = output + "/images" output_labels = output + "/labels" imageRDD = None labelRDD = None if format == "pickle": imageRDD = sc.pickleFile(output_images) labelRDD = sc.pickleFile(output_labels) elif format == "csv": imageRDD = sc.textFile(output_images).map(fromCSV) labelRDD = sc.textFile(output_labels).map(fromCSV) else: # format.startswith("tf"): # requires: --jars tensorflow-hadoop-1.0-SNAPSHOT.jar tfRDD = sc.newAPIHadoopFile(output, "org.tensorflow.hadoop.io.TFRecordFileInputFormat", keyClass="org.apache.hadoop.io.BytesWritable", valueClass="org.apache.hadoop.io.NullWritable") imageRDD = tfRDD.map(lambda x: fromTFExample(str(x[0]))) num_images = imageRDD.count() num_labels = labelRDD.count() if labelRDD is not None else num_images samples = imageRDD.take(10) print("num_images: ", num_images) print("num_labels: ", num_labels) print("samples: ", samples)if __name__ == "__main__": import argparse from pyspark.context import SparkContext from pyspark.conf import SparkConf parser = argparse.ArgumentParser() parser.add_argument("-f", "--format", help="output format", choices=["csv","csv2","pickle","tf","tfr"], default="csv") parser.add_argument("-n", "--num-partitions", help="Number of output partitions", type=int, default=10) parser.add_argument("-o", "--output", help="HDFS directory to save examples in parallelized format", default="mnist_data") parser.add_argument("-r", "--read", help="read previously saved examples", action="store_true") parser.add_argument("-v", "--verify", help="verify saved examples after writing", action="store_true")
args = parser.parse_args()
print("args:",args) sc = SparkContext(conf=SparkConf().setAppName("mnist_parallelize")) if not args.read: # Note: these files are inside the mnist.zip file writeMNIST(sc, "mnist/train-images-idx3-ubyte.gz", "mnist/train-labels-idx1-ubyte.gz", args.output + "/train", args.format, args.num_partitions) writeMNIST(sc, "mnist/t10k-images-idx3-ubyte.gz", "mnist/t10k-labels-idx1-ubyte.gz", args.output + "/test", args.format, args.num_partitions) if args.read or args.verify: readMNIST(sc, args.output + "/train", args.format)
提交训练任务,开始训练,在HDFS生成mnist_model,命令:
${SPARK_HOME}/bin/spark-submit \--master ${MASTER} \--py-files ${TFoS_HOME}/examples/mnist/spark/mnist_dist.py \--conf spark.cores.max=${TOTAL_CORES} \--conf spark.task.cpus=${CORES_PER_WORKER} \--conf spark.executorEnv.JAVA_HOME="$JAVA_HOME" \${TFoS_HOME}/examples/mnist/spark/mnist_spark.py \--cluster_size ${SPARK_WORKER_INSTANCES} \--images examples/mnist/csv/train/images \--labels examples/mnist/csv/train/labels \--format csv \--mode train \--model mnist_model
mnist_dist.py 构建TensorFlow 分布式任务,定义分布式任务主函数,启动TensorFlow主函数map_fun,数据获取方式Feeding。获取TensorFlow集群和服务器实例:
cluster, server = TFNode.start_cluster_server(ctx, 1, args.rdma)
TFNode调用tfspark.zip TFNode.py文件。
mnist_spark.py文件是训练主程序,TensorFlowOnSpark部署步骤:
from __future__ import absolute_importfrom __future__ import divisionfrom __future__ import print_functionfrom pyspark.context import SparkContextfrom pyspark.conf import SparkConfimport argparseimport osimport numpyimport sysimport tensorflow as tfimport threadingimport timefrom datetime import datetimefrom tensorflowonspark import TFClusterimport mnist_distsc = SparkContext(conf=SparkConf().setAppName("mnist_spark"))executors = sc._conf.get("spark.executor.instances")num_executors = int(executors) if executors is not None else 1num_ps = 1parser = argparse.ArgumentParser()parser.add_argument("-b", "--batch_size", help="number of records per batch", type=int, default=100)parser.add_argument("-e", "--epochs", help="number of epochs", type=int, default=1)parser.add_argument("-f", "--format", help="example format: (csv|pickle|tfr)", choices=["csv","pickle","tfr"], default="csv")parser.add_argument("-i", "--images", help="HDFS path to MNIST images in parallelized format")parser.add_argument("-l", "--labels", help="HDFS path to MNIST labels in parallelized format")parser.add_argument("-m", "--model", help="HDFS path to save/load model during train/inference", default="mnist_model")parser.add_argument("-n", "--cluster_size", help="number of nodes in the cluster", type=int, default=num_executors)parser.add_argument("-o", "--output", help="HDFS path to save test/inference output", default="predictions")parser.add_argument("-r", "--readers", help="number of reader/enqueue threads", type=int, default=1)parser.add_argument("-s", "--steps", help="maximum number of steps", type=int, default=1000)parser.add_argument("-tb", "--tensorboard", help="launch tensorboard process", action="store_true")parser.add_argument("-X", "--mode", help="train|inference", default="train")parser.add_argument("-c", "--rdma", help="use rdma connection", default=False)args = parser.parse_args()print("args:",args)print("{0} ===== Start".format(datetime.now().isoformat()))if args.format == "tfr": images = sc.newAPIHadoopFile(args.images, "org.tensorflow.hadoop.io.TFRecordFileInputFormat", keyClass="org.apache.hadoop.io.BytesWritable", valueClass="org.apache.hadoop.io.NullWritable") def toNumpy(bytestr): example = tf.train.Example() example.ParseFromString(bytestr) features = example.features.feature image = numpy.array(features['image'].int64_list.value) label = numpy.array(features['label'].int64_list.value) return (image, label) dataRDD = images.map(lambda x: toNumpy(str(x[0])))else: if args.format == "csv": images = sc.textFile(args.images).map(lambda ln: [int(x) for x in ln.split(',')]) labels = sc.textFile(args.labels).map(lambda ln: [float(x) for x in ln.split(',')]) else: # args.format == "pickle": images = sc.pickleFile(args.images) labels = sc.pickleFile(args.labels) print("zipping images and labels") dataRDD = images.zip(labels)#1.为在Executor执行每个TensorFlow进程保留一个端口cluster = TFCluster.run(sc, mnist_dist.map_fun, args, args.cluster_size, num_ps, args.tensorboard, TFCluster.InputMode.SPARK)#2.启动Tensorflow主函数cluster.start(mnist_dist.map_fun, args)if args.mode == "train": #3.训练 cluster.train(dataRDD, args.epochs)else: #3.预测 labelRDD = cluster.inference(dataRDD) labelRDD.saveAsTextFile(args.output)#4.关闭Executor TensorFlow计算节点、参数服务节点cluster.shutdown()print("{0} ===== Stop".format(datetime.now().isoformat()))
预测命令:
${SPARK_HOME}/bin/spark-submit \--master ${MASTER} \--py-files ${TFoS_HOME}/examples/mnist/spark/mnist_dist.py \--conf spark.cores.max=${TOTAL_CORES} \--conf spark.task.cpus=${CORES_PER_WORKER} \--conf spark.executorEnv.JAVA_HOME="$JAVA_HOME" \${TFoS_HOME}/examples/mnist/spark/mnist_spark.py \--cluster_size ${SPARK_WORKER_INSTANCES} \--images examples/mnist/csv/test/images \--labels examples/mnist/csv/test/labels \--mode inference \--format csv \--model mnist_model \--output predictions
还可以Amazon EC2运行及在Hadoop集群采用YARN模式运行。
参考资料:
《TensorFlow技术解析与实战》欢迎推荐上海机器学习工作机会,我的微信:qingxingfengzi
转载地址:http://sdina.baihongyu.com/