Spark功能的主要入口点(相当于main()函数)。SparkContext代表着与一个Spark集群的连接。目前在一个JAVA虚拟机进程中可以创建多个SparkContext,但是只能有一个active的。如果你需要创建一个新的SparkContext实例,必须先调用stop方法停掉当前active的SparkContext实例。
可以看到SparkContext处于DriverProgram核心位置,所有与Cluster、Worker Node交互的操作都需要SparkContext来完成。
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,RDD里面的元素可进行并行计算。RDD需要在SparkContext下进行创建。
import findspark
findspark.init()
findspark.find()
'D:\\spark-3.1.3-bin-hadoop3.2'
from pyspark import SparkContext
import numpy as np
SparkContext
¶注意参数里面的数字4. 它的意思是在当前创建的SparkContext里使用4个worker node.
sc=SparkContext(master="local[4]")
print(sc)
<SparkContext master=local[4] appName=pyspark-shell>
lst=np.random.randint(0,10,20)
print(lst)
[5 9 3 7 4 3 3 6 4 9 3 5 8 3 6 4 0 3 4 8]
A=sc.parallelize(lst) #把这个集合中的元素“并行化”,这样就构建了一个RDD
A
是一个pyspark RDD对象,它是分布在多个节点里的,所以我们没办法直接访问其元素。¶type(A)
pyspark.rdd.RDD
A #打印A的话会出现如下的结果,而不是A当中的内容
ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:274
注意 - 这个操作很耗时,不要经常调用
A.collect()
[5, 9, 3, 7, 4, 3, 3, 6, 4, 9, 3, 5, 8, 3, 6, 4, 0, 3, 4, 8]
glom
方法¶A.glom().collect() # glom方法:
[[5, 9, 3, 7, 4], [3, 3, 6, 4, 9], [3, 5, 8, 3, 6], [4, 0, 3, 4, 8]]
sc.stop()
sc=SparkContext(master="local[2]")
A = sc.parallelize(lst)
A.glom().collect()
[[5, 9, 3, 7, 4, 3, 3, 6, 4, 9], [3, 5, 8, 3, 6, 4, 0, 3, 4, 8]]
RDD现在是分布在两个工作节点上,而不是四个
我们再次把这几个元素分配到4个工作节点
sc.stop()
sc = SparkContext(master="local[4]")
A = sc.parallelize(lst)
A.count()
20
first
) 以及最开始的几个元素 (take
)¶A.first()
5
A.take(4)
[5, 9, 3, 7]
不同
的元素创建一个RDD对象¶方法RDD.distinct()
返回一个新的RDD数据集,包含源数据集的不同元素。
A_distinct=A.distinct()
A_distinct.collect()
[4, 8, 0, 5, 9, 6, 3, 7]
reduce
方法¶A.reduce(lambda x,y:x+y) # lambda 匿名函数,替代通常的def,表达式和参数用冒号分开
97
sum
方法¶A.sum()
97
reduce
找到最大的那个数¶A.reduce(lambda x,y: x if x > y else y)
9
reduce
函数找到最长的那个单词¶words = 'These are some of the best Macintosh computers ever'.split(' ')
wordRDD = sc.parallelize(words)
wordRDD.reduce(lambda w,v: w if len(w)>len(v) else v)
'computers'
# Return RDD with elements divisible by 3
A.filter(lambda x:x%3==0).collect()
[9, 3, 3, 3, 6, 9, 3, 3, 6, 0, 3]
reduce
结合¶def largerThan(x,y):
"""
Returns the last word among the longest words in a list
"""
if len(x)> len(y):
return x
elif len(y) > len(x):
return y
else:
if x < y: return x
else: return y
wordRDD.reduce(largerThan)
'Macintosh'
RDD.sample(withReplacement,p)
会从RDD中抽取样本。
withReplacement
是一个布尔值(True/False),用来表示RDD中的一个元素是否可以被多次抽样。p
是每个样本被采样的概率。# get a sample whose expected size is m
# Note that the size of the sample is different in different runs
m=5
n=20 # A数字的个数
print('sample1=',A.sample(False,m/n).collect()) #无放回的抽样,每个元素被抽中的概率是0.25
print('sample2=',A.sample(False,m/n).collect())
sample1= [4, 3, 8] sample2= [5, 9, 3, 7, 3, 3, 8, 0]
请自行搜集资料并思考:为什么每次采样后的结果会得到不同元素个数的集合,RDD采样的原理是什么?
print("Maximum: ",A.max())
print("Minimum: ",A.min())
print("Mean (average): ",A.mean())
print("Standard deviation: ",A.stdev())
Maximum: 9 Minimum: 1 Mean (average): 3.95 Standard deviation: 2.71062723368596
A.stats()
(count: 20, mean: 3.95, stdev: 2.71062723368596, max: 9.0, min: 1.0)
B=A.map(lambda x:x*x)
B.collect()
[81, 1, 16, 49, 4, 4, 36, 9, 25, 81, 1, 16, 1, 1, 81, 4, 16, 4, 4, 25]
map
操作与传统python函数定义方法的结合¶def square_if_odd(x):
if x%2==1:
return x*x
else:
return x
A.map(square_if_odd).collect()
[81, 1, 4, 49, 2, 2, 6, 9, 25, 81, 1, 4, 1, 1, 81, 2, 4, 2, 2, 25]
lst1=np.random.randint(0,10,3)
C=sc.parallelize(lst1)
lst2=np.random.randint(10,20,3) #10到20间随机3个整数
D=sc.parallelize(lst2)
print("C:",C.collect())
print("D:",D.collect())
C: [8, 6, 5] D: [14, 11, 10]
C+D
给出的是两个集合的并集¶(C+D).collect()
[8, 6, 5, 14, 11, 10]
cartesian
给出了两个集合的笛卡尔积¶C.cartesian(D).collect()
[(8, 14), (8, 11), (8, 10), (6, 14), (6, 11), (6, 10), (5, 14), (5, 11), (5, 10)]
rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5])
rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8])
rdd1.intersection(rdd2).collect()
[1, 2, 3]
rdd1.subtract(rdd2).collect()
[10, 4, 5]
SparkContext
¶sc.stop()