package cn.hhb.spark.core;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.VoidFunction;import org.apache.spark.broadcast.Broadcast;import org.apache.spark.Accumulator;import java.util.Arrays;import java.util.List;/** * Created by dell on 2017/7/13. */public class AccumulatorVariable { public static void main(String[] args) { SparkConf conf = new SparkConf() .setAppName("AccumulatorVariable") .setMaster("local") .set("spark.testing.memory", "2147480000"); JavaSparkContext sc = new JavaSparkContext(conf); /** * 创建Accumulator变量,需要调用SparkContext的accumulator方法 */ final Accumulatorsum = sc.accumulator(0); List numberList = Arrays.asList(1,2,3,4,5); JavaRDD numbers = sc.parallelize(numberList); numbers.foreach(new VoidFunction () { @Override public void call(Integer integer) throws Exception { // 在函数内部就可以对accumulator变量调用add方法累加值 sum.add(integer); } }); // 在driver程序中,可以调用accumulator的value()方法,获取其值 System.out.println(sum.value()); sc.close(); }}