spark的安装
tar zxf spark-2.2.1-bin-hadoop2.7.tgzmkdir spark-2.2.1mv spark-2.2.1-bin-hadoop2.7 spark-2.2.1cd spark-2.2.1ln -s spark-2.2.1-bin-hadoop2.7 spark
export SPARK_HOME=/home/yueyao/spark-2.2.1/sparkexport PATH=$SPARK_HOME/bin:$PATH
- 由于系统自带的java版本比较低,因此下载java的jdk重新进行安装
tar zxf jdk-8u131-linux-x64.tar.gzexport JAVA_HOME=/home/yueyao/java-1.8/jdk1.8.0_131/export PATH=$JAVA_HOME/bin:$PATH
#首先初始化SparkContext,导入Spark包并且创建SparkContextfrom pyspark import SparkConf, SparkContext#创建一个SparkConf 对象,设置应用的名称conf = SparkConf().setMaster("local").setAppName("My App")#基于SparkConf 对象创建一个SparkContext对象sc = SparkContext(conf = conf)
// 创建一个Java版本的Spark ContextSparkConf conf = new SparkConf().setAppName("wordCount");JavaSparkContext sc = new JavaSparkContext(conf)// 读取我们输入的数据JavaRDD input = sc.textFile(inputFile);//切分为单词JavaRDD words = input.flatMap( new FlatMapFunction (){ public Iterable call(String x){ return Arrays.asList(x.split(" ")); } });// 抓换位键值对并计数JavaPairRDD counts = word.mapToPair( new PairFunction (){ public Tuple2 call(String x){ return new Tuple2(x,1); } }).reduceByKey( new Function2 (){ public Integer call(Integer x, Integer y){ return x + y; } });// 将统计出来的单词总数存入一个文本文件,引发求值counts.saveAsTextFile(outputFile);
//创建一个Scala版本的Spark Contextval conf = new SparkConf().setAppName("wordCount")val sc = new SparkContext(conf)//读取我们的输入数据val input = sc.textFile(inputFile)//把它切分成一个个单词val words = input.flatMap(line => line.split(" "))// 转换键值对并计数val counts = words.map(word => (word, 1)).reduceByKey{case (x,y) => x+y}//将统计出来的单词总数存入一个文本文件,引发求值counts.saveAsTextFile(outputFile)
- 创建一个python版本的wordcount,有个疑问,自己建立的hadoop小集群不能读取本地文件,只能上传到hdfs再读写文件
#导入所需要的模块from pyspark import SparkConf, SparkContextimport os#初始化一个sparkconf对象和一个sparkcontext对象conf = SparkConf().setMaster("local").setAppName("Yue Yao app")sc = SparkContext(conf = conf)#调用hadoop命令在hdfs上建立文件夹,同时上传文件到hdfs,这一步可以忽略os.system('hadoop fs -mkdir /user/yueyao/Spark/input/')os.system('hadoop fs -put Test.md /user/yueyao/Spark/input')os.system('hadoop fs -mkdir /user/yueyao/Spark/output/')#设置输入的文件路径和输出路径input_file = "/user/yueyao/Spark/input/Test.md"output_file = "/user/yueyao/Spark/output/out2"#这里是读入文件lines = sc.textFile(input_file)#对数据进行分割split_file = lines.flatMap(lambda line:line.split(" "))#对单词进行计数word_count = split_file.map(lambda word:(word,1))#通过reduce合并计数total_count = word_count.reduceByKey(lambda a,b:a+b)#输出文件,这里是写到了hdfs上面total_count.saveAsTextFile(output_file)#上面的步骤可以简写成下面的部分#counts = lines.flatMap(lambda line:line.split(" ")).map(lambda word:(word,1)).reduceByKey(lambda a,b:a+b)#couts.saveAsTextFile(output_file)