RDD中的数据来源可以是程序中的对象集合,也可以是外部存储系统中的数据集,例如共享文件系统、HDFS、HBase或任何提供Hadoop InputFormat的数据源。
下面使用Spark Shell讲解常用的创建RDD的两种方式。
Spark可以通过parallelize()或makeRDD()方法将一个对象集合转化为RDD。
例如,将一个List集合转化为RDD,代码如下:
scala> val rdd=sc.parallelize(List(1,2,3,4,5,6)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0]
或
scala> val rdd=sc.makeRDD(List(1,2,3,4,5,6)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1]
从返回信息可以看出,上述创建的RDD中存储的是Int类型的数据。实际上,RDD也是一个集合,与常用的List集合不同的是,RDD集合的数据分布于多台机器上。
Spark的textFile()方法可以读取本地文件系统或外部其他系统中的数据,并创建RDD。不同的是,数据的来源路径不同。
例如,本地CentOS系统中有一个文件/home /words.txt,该文件的内容如下:
hello hadoop hello java scala
使用textFile()方法将上述文件内容转为一个RDD,并使用collect()方法(该方法是RDD的一个行动算子,3.3.1小节将会详细讲解)查看RDD中的内容,代码如下:
scala> val rdd=sc.textFile("/home/words.txt") rdd: org.apache.spark.rdd.RDD[String] = /home/words.txt MapPartitionsRDD[1] scala> rdd.collect res1: Array[String] = Array("hello hadoop ", "hello java ", "scala ")
从上述rdd.collect的输出内容可以看出,textFile()方法将源文件中的内容按行拆分成了RDD集合中的多个元素。
将本地系统文件/home/words.txt上传到HDFS系统的/input目录,然后读取文件/input/words.txt中的数据,代码如下:
scala> val rdd=sc.textFile("hdfs://centos01:9000/input/words.txt") rdd: org.apache.spark.rdd.RDD[String] = hdfs://centos01:9000/input/words.txt MapPartitionsRDD[2] scala> rdd.collect res2: Array[String] = Array("hello hadoop ", "hello java ", "scala ")