SparkContext 以及 RDD 基础¶

SparkContext¶

Spark功能的主要入口点(相当于main()函数)。SparkContext代表着与一个Spark集群的连接。目前在一个JAVA虚拟机进程中可以创建多个SparkContext,但是只能有一个active的。如果你需要创建一个新的SparkContext实例,必须先调用stop方法停掉当前active的SparkContext实例。

可以看到SparkContext处于DriverProgram核心位置,所有与Cluster、Worker Node交互的操作都需要SparkContext来完成。

RDD¶

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,RDD里面的元素可进行并行计算。RDD需要在SparkContext下进行创建。

导入我们要用的包¶

In [1]:
import findspark
findspark.init()
findspark.find()
Out[1]:
'D:\\spark-3.1.3-bin-hadoop3.2'
In [2]:
from pyspark import SparkContext
import numpy as np

初始化一个 SparkContext¶

注意参数里面的数字4. 它的意思是在当前创建的SparkContext里使用4个worker node.

In [3]:
sc=SparkContext(master="local[4]")
In [4]:
print(sc)
<SparkContext master=local[4] appName=pyspark-shell>

生成一串随机的整数¶

In [5]:
lst=np.random.randint(0,10,20)
In [6]:
print(lst)
[5 9 3 7 4 3 3 6 4 9 3 5 8 3 6 4 0 3 4 8]

把这一串数字分配在不同的node- 这是分布式运算最主要的一步¶

In [7]:
A=sc.parallelize(lst) #把这个集合中的元素“并行化”,这样就构建了一个RDD

如何创建 RDD?¶

有两种方法去创造 RDDs:

  • 在程序中并行化一个现有的元素集合, 或者
  • 引用外部存储系统中的数据集,如共享文件系统、HDFS的数据源等。

这里采用前一种方法构建RDD

A是一个pyspark RDD对象,它是分布在多个节点里的,所以我们没办法直接访问其元素。¶

In [8]:
type(A)
Out[8]:
pyspark.rdd.RDD
In [9]:
A #打印A的话会出现如下的结果,而不是A当中的内容
Out[9]:
ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:274

与刚才并行化相反的操作是--收集所有的分布在各个节点的元素,并将它们合并。¶


注意 - 这个操作很耗时,不要经常调用

In [10]:
A.collect()
Out[10]:
[5, 9, 3, 7, 4, 3, 3, 6, 4, 9, 3, 5, 8, 3, 6, 4, 0, 3, 4, 8]

那么这些元素是如何在各个节点中分布的呢? 使用 glom 方法¶

In [11]:
A.glom().collect() # glom方法:
Out[11]:
[[5, 9, 3, 7, 4], [3, 3, 6, 4, 9], [3, 5, 8, 3, 6], [4, 0, 3, 4, 8]]

现在停止当前的SparkContext (sc),尝试用2个工作节点重新初始化¶

In [12]:
sc.stop()
In [13]:
sc=SparkContext(master="local[2]")
In [14]:
A = sc.parallelize(lst)
In [15]:
A.glom().collect()
Out[15]:
[[5, 9, 3, 7, 4, 3, 3, 6, 4, 9], [3, 5, 8, 3, 6, 4, 0, 3, 4, 8]]

RDD现在是分布在两个工作节点上,而不是四个

我们再次把这几个元素分配到4个工作节点

In [16]:
sc.stop()
In [17]:
sc = SparkContext(master="local[4]")
In [18]:
A = sc.parallelize(lst)

基本操作¶

Count 元素的个数¶

In [19]:
A.count()
Out[19]:
20

提取第一个元素 (first) 以及最开始的几个元素 (take)¶

In [20]:
A.first()
Out[20]:
5
In [21]:
A.take(4)
Out[21]:
[5, 9, 3, 7]

去掉重复的元素: 仅使用不同 的元素创建一个RDD对象¶

方法RDD.distinct()返回一个新的RDD数据集,包含源数据集的不同元素。

In [22]:
A_distinct=A.distinct()
In [23]:
A_distinct.collect()
Out[23]:
[4, 8, 0, 5, 9, 6, 3, 7]

对所有元素求和可以用 reduce 方法¶

In [24]:
A.reduce(lambda x,y:x+y) # lambda 匿名函数,替代通常的def,表达式和参数用冒号分开
Out[24]:
97

或者 直接用 sum 方法¶

In [25]:
A.sum()
Out[25]:
97

利用reduce找到最大的那个数¶

In [26]:
A.reduce(lambda x,y: x if x > y else y)
Out[26]:
9

利用 reduce函数找到最长的那个单词¶

In [27]:
words = 'These are some of the best Macintosh computers ever'.split(' ')
wordRDD = sc.parallelize(words)
wordRDD.reduce(lambda w,v: w if len(w)>len(v) else v)
Out[27]:
'computers'

RDD上的过滤(filter)函数,是对RDD元素的一个筛选¶

利用 filter 返回一个新的RDD,并且这个RDD满足一个给定的条件 (lambda表达式)¶

In [28]:
# Return RDD with elements divisible by 3
A.filter(lambda x:x%3==0).collect()
Out[28]:
[9, 3, 3, 3, 6, 9, 3, 3, 6, 0, 3]

我们也可以用python的常规函数和 reduce结合¶

In [29]:
def largerThan(x,y):
    """
    Returns the last word among the longest words in a list
    """
    if len(x)> len(y):
        return x
    elif len(y) > len(x):
        return y
    else:
        if x < y: return x
        else: return y
In [30]:
wordRDD.reduce(largerThan)
Out[30]:
'Macintosh'

Sampling an RDD (对RDD元素进行采样)¶

  • RDDs 通常非常大.
  • 关于数据集的统计量,比如平均数,可以通过采样获得样本来有效地近似。 这在操作极其庞大的数据集时很方便,因为样本可以说明问题。包括数据的模式和描述性统计的问题
  • 采样是并行进行的,只需要有限的计算。

RDD.sample(withReplacement,p)会从RDD中抽取样本。

  • withReplacement是一个布尔值(True/False),用来表示RDD中的一个元素是否可以被多次抽样。
  • p是每个样本被采样的概率。
In [32]:
# get a sample whose expected size is m
# Note that the size of the sample is different in different runs
m=5
n=20 # A数字的个数
print('sample1=',A.sample(False,m/n).collect()) #无放回的抽样,每个元素被抽中的概率是0.25
print('sample2=',A.sample(False,m/n).collect())
sample1= [4, 3, 8]
sample2= [5, 9, 3, 7, 3, 3, 8, 0]

请自行搜集资料并思考:为什么每次采样后的结果会得到不同元素个数的集合,RDD采样的原理是什么?

基本的统计量¶

In [32]:
print("Maximum: ",A.max())
print("Minimum: ",A.min())
print("Mean (average): ",A.mean())
print("Standard deviation: ",A.stdev())
Maximum:  9
Minimum:  1
Mean (average):  3.95
Standard deviation:  2.71062723368596
In [33]:
A.stats()
Out[33]:
(count: 20, mean: 3.95, stdev: 2.71062723368596, max: 9.0, min: 1.0)

Mapping 把一个集合映射为另一个集合¶

map 操作与lambda函数的结合¶

In [34]:
B=A.map(lambda x:x*x) 
In [35]:
B.collect()
Out[35]:
[81, 1, 16, 49, 4, 4, 36, 9, 25, 81, 1, 16, 1, 1, 81, 4, 16, 4, 4, 25]

map 操作与传统python函数定义方法的结合¶

In [36]:
def square_if_odd(x):
    if x%2==1:
        return x*x
    else:
        return x
In [37]:
A.map(square_if_odd).collect()
Out[37]:
[81, 1, 4, 49, 2, 2, 6, 9, 25, 81, 1, 4, 1, 1, 81, 2, 4, 2, 2, 25]

集合操作¶

创建两个小的 RDDs 来展示集合的操作¶

In [38]:
lst1=np.random.randint(0,10,3)
C=sc.parallelize(lst1)
lst2=np.random.randint(10,20,3) #10到20间随机3个整数
D=sc.parallelize(lst2)
print("C:",C.collect())
print("D:",D.collect())
C: [8, 6, 5]
D: [14, 11, 10]

C+D 给出的是两个集合的并集¶

In [39]:
(C+D).collect()
Out[39]:
[8, 6, 5, 14, 11, 10]

cartesian 给出了两个集合的笛卡尔积¶

In [40]:
C.cartesian(D).collect()
Out[40]:
[(8, 14),
 (8, 11),
 (8, 10),
 (6, 14),
 (6, 11),
 (6, 10),
 (5, 14),
 (5, 11),
 (5, 10)]

intersection "和 "subtract "方法分别返回两个集合的交(两个集合的相同之处)以及两个集合的差(两个集合的不同之处)¶

In [41]:
rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5])
rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8])
rdd1.intersection(rdd2).collect()
Out[41]:
[1, 2, 3]
In [42]:
rdd1.subtract(rdd2).collect()
Out[42]:
[10, 4, 5]

最后我们要停掉当前活跃的 SparkContext¶

In [43]:
sc.stop()