Python大数据处理扩展库pySpark用法精要

2017-01-24 董付国 Python小屋 Python小屋

Spark是一个开源的、通用的并行计算与分布式计算框架,其活跃度在Apache基金会所有开源项目中排第三位,最大特点是基于内存计算,适合迭代计算,兼容多种应用场景,同时还兼容Hadoop生态系统中的组件,并且具有非常强的容错性。Spark的设计目的是全栈式解决批处理、结构化数据查询、流计算、图计算和机器学习等业务和应用,适用于需要多次操作特定数据集的应用场合。需要反复操作的次数越多,所需读取的数据量越大,效率提升越大。

Spark集成了Spark SQL(分布式SQL查询引擎,提供了一个DataFrame编程抽象)、Spark Streaming(把流式计算分解成一系列短小的批处理计算,并且提供高可靠和吞吐量服务)、MLlib(提供机器学习服务)、GraphX(提供图计算服务)、SparkR(R on Spark)等子框架,为不同应用领域的从业者提供了全新的大数据处理方式,越来越便捷、轻松。

为了适应迭代计算,Spark把经常被重用的数据缓存到内存中以提高数据读取和操作速度,比Hadoop快近百倍,并且支持Java、Scala、Python、R等多种语言。除map和reduce之外,Spark还支持filter、foreach、reduceByKey、aggregate以及SQL查询、流式查询等等。

扩展库pyspark提供了SparkContext(Spark功能的主要入口,一个SparkContext表示与一个Spark集群的连接,可用来创建RDD或在该集群上广播变量)、RDD(Spark中的基本抽象,弹性分布式数据集Resilient Distributed Dataset)、Broadcast(可以跨任务重用的广播变量)、Accumulator(共享变量,任务只能为其增加值)、SparkConf(用来配置Spark)、SparkFiles(访问任务的文件)、StorageLevel(更细粒度的缓冲永久级别)等可以公开访问的类,并且提供了pyspark.sql、pyspark.streaming与pyspark.mllib等模块与包。

>>> from pyspark import SparkFiles
>>> path = 'test.txt'
>>> with open(path, 'w') as fp:  #创建文件
       fp.write('100')
>>> sc.addFile(path)   #提交文件
>>> def func(iterator):
       with open(SparkFiles.get('test.txt')) as fp:  #打开文件
           Val = int(fp.readline()) #读取文件内容
           return [x * Val for x in iterator]
>>> sc.parallelize([1, 2, 3, 4, 5]).mapPartitions(func).collect() #并行处理,collect()返回包含RDD上所有元素的列表
[100, 200, 300, 400, 500]
>>> sc.parallelize([2, 3, 4]).count()   #count()用来返回RDD中元素个数,parallelize()用来分布本地的Python集合,并创建RDD
3
>>> rdd = sc.parallelize([1, 2])
>>> sorted(rdd.cartesian(rdd).collect())  #collect()返回包含RDD中元素的列表,cartesian()计算两个RDD的笛卡尔积

[(1, 1), (1, 2), (2, 1), (2, 2)] 
>>> rdd = sc.parallelize([1, 2, 3, 4, 5])
>>> rdd.filter(lambda x: x % 2 == 0).collect()  #只保留符合条件的元素
[2, 4]
>>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect()) #返回唯一元素
[1, 2, 3]
>>> rdd = sc.parallelize(range(10))
>>> rdd.map(lambda x: str(x)).collect()   #映射
>>> rdd = sc.parallelize([1.0, 5.0, 43.0, 10.0])
>>> rdd.max() #最大值
43.0
>>> rdd.max(key=str)
5.0
>>> rdd.min()   #最小值
1.0
>>> rdd.sum() #所有元素求和
59.0
>>> from random import randint
>>> lst = [randint(1,100) for _ in range(20)]
>>> lst
[18, 55, 48, 13, 86, 23, 85, 62, 66, 58, 73, 96, 90, 16, 49, 98, 49, 69, 3, 53]
>>> sc.parallelize(lst).top(3) #最大的3个元素
[98, 96, 90]
>>> sorted(lst, reverse=True)[:3]
[98, 96, 90]
>>> sc.parallelize(range(100)).filter(lambda x:x>90).take(3) #使用take()返回前3个元素
[91, 92, 93]
>>> sc.parallelize(range(20), 3).glom().collect()  #查看分片情况
[[0, 1, 2, 3, 4, 5], [6, 7, 8, 9, 10, 11, 12], [13, 14, 15, 16, 17, 18, 19]]
>>> sc.parallelize(range(20), 6).glom().collect()  #查看分片情况
[[0, 1, 2], [3, 4, 5], [6, 7, 8, 9], [10, 11, 12], [13, 14, 15], [16, 17, 18, 19]]
>>> myRDD = sc.parallelize(range(20), 6)  #6表示分片数
>>> sc.runJob(myRDD, lambda part: [x ** 2 for x in part]) #执行任务
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361]
>>> sc.runJob(myRDD, lambda part: [x ** 2 for x in part], [1]) #只查看第2个分片的结果
[9, 16, 25]
>>> sc.runJob(myRDD, lambda part: [x ** 2 for x in part], [1,5]) #查看第2和第6个分片上的结果
[9, 16, 25, 256, 289, 324, 361]
>>> sc.parallelize([1,2,3,3,3,2]).distinct().collect()  #distinct()返回包含唯一元素的RDD
[1, 2, 3]
>>> from operator import add, mul
>>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add) #把所有分片上的数据累加
15
>>> sc.parallelize([1, 2, 3, 4, 5]).fold(1, mul)  #把所有分片上的数据连乘
120
>>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add)  #reduce()函数的并行版本
15
>>> sc.parallelize([1, 2, 3, 4, 5]).reduce(mul)
120
>>> result = sc.parallelize(range(1, 6)).groupBy(lambda x: x%3).collect()  #对所有数据进行分组
>>> for k, v in result:
    print(k, sorted(v))

0 [3]
1 [1, 4]
2 [2, 5]

>>> rdd1 = sc.parallelize(range(10))
>>> rdd2 = sc.parallelize(range(5, 20))
>>> rdd1.intersection(rdd2).collect()   #交集
[8, 9, 5, 6, 7]
>>> rdd1.subtract(rdd2).collect()  #差集
[0, 1, 2, 3, 4]
>>> rdd1.union(rdd2).collect()  #合并两个RDD上的元素
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
>>> rdd1 = sc.parallelize('abcd')
>>> rdd2 = sc.parallelize(range(4))
>>> rdd1.zip(rdd2).collect()   #两个RDD必须等长
[('a', 0), ('b', 1), ('c', 2), ('d', 3)]
>>> rdd = sc.parallelize('abcd')
>>> rdd.map(lambda x: (x, 1)).collect()  #内置函数map()的并行版本
[('a', 1), ('b', 1), ('c', 1), ('d', 1)]
>>> sc.parallelize([1, 2, 3, 4, 5]).stdev()  #计算标准差
1.4142135623730951
>>> sc.parallelize([1, 1, 1, 1, 1]).stdev()
0.0


明天回老家过年了,一周后回来,偏远的农村上网不太方便,这几天可能暂时不再发文了,春节回来以后再继续发。提前祝所有朋友春节快乐!