RDD基本操作(Python)

61 篇文章 5 订阅
订阅专栏

RDD基本转换运算

创建RDD最简单的方式是使用SparkContext的parallelize方法

intRDD=sc.parallelize([3,1,2,5,5])
intRDD.collect()

由于spark的惰性,转化操作并不会马上执行,而collect()是一个“动作”,spark立刻执行,RDD转换为list

关于collect,在shell环境可以直接显示结果,在eclipse中需要用print 将其打印

 [3, 1, 2, 5, 5]

stringRDD=sc.parallelize({"Apple","Orange","Banana","Grape"})
stringRDD.collect()

['Orange', 'Grape', 'Banana', 'Apple']

map

map运算使用具体函数

intRDD=sc.parallelize([3,1,2,5,5])
def add1(x):
    return (x+1)
intRDD.map(add1).collect()

map 是一个转化运算,命令将每一个元素加上1,生成新的RDD

map运算使用匿名函数

intRDD.map(lambda x:x+1).collect()

两种方法结果相同

[4, 2, 3, 6, 6]

对stringRDD所有字符串元素前面加上“fruit”,产生新的RDD

stringRDD.map(lambda x:"fruit:"+x).collect()

['fruit:Orange', 'fruit:Grape', 'fruit:Apple', 'fruit:Banana']

filter

intRDD.filter(lambda x:x<3).collect()
intRDD.filter(lambda x:x>3).collect()
intRDD.filter(lambda x:x<3 or x>=5).collect()

其功能如其名,筛选过滤

stringRDD.filter(lambda x:"ra"in x).collect()

['Orange', 'Grape']

 

distinct

删除重复的元素

intRDD.distinct().collect()

[1, 2, 3, 5]

randomSplit

sRDD=intRDD.randomSplit([0.4,0.6])
sRDD[0].collect()
sRDD[1].collect()

将整个集合元素以随机数的方式按比例分为多个RDD

[1, 2, 5]

[3, 5]

groupBy

gRDD=intRDD.groupBy(lambda x:"even"if(x%2==0)else "odd").collect()
print(gRDD[0][0],sorted(gRDD[0][1]))

('even', [2])

产生了奇数与偶数两个list

 

对一个RDD的操作总结如下:

多个RDD转换运算

intRDD1=sc.parallelize([3,1,2,5,5])
intRDD2=sc.parallelize([5,6])
intRDD3=sc.parallelize([2,7])
intRDD1.union(intRDD2).union(intRDD3).collect()

使用union函数进行并集运算

[3, 1, 2, 5, 5, 5, 6, 2, 7]

intRDD1.intersection(intRDD2).collect()

intersection交集运算

[5]

intRDD1.subtract(intRDD2).collect()

subtract差集运算

[2, 1, 3]

intRDD1.cartesian(intRDD2).collect()

cartesian笛卡儿积运算

[(3, 5), (3, 6), (1, 5), (1, 6), (2, 5), (2, 6), (5, 5), (5, 6), (5, 5), (5, 6)]

 

对多个RDD的操作总结如下:

 

RDD基本动作运算

intRDD.first()
intRDD.take(2)
intRDD.takeOrdered(3)
intRDD.takeOrdered(3,key=lambda x:-x)

1.取出第一项元素

2.去除第二项数据

3.从小到大排序,取出前三项

4.从大到小排序,取出前三项

intRDD.stats()

得到统计结果

(count: 5, mean: 3.2, stdev: 1.6, max: 5.0, min: 1.0)

对一个RDD的行动总结如下:

 

RDD Key-Value 基本转换运算

kvRDD1=sc.parallelize([(3,4),(3,6),(5,6),(1,2)])
kvRDD1.keys().collect()
kvRDD1.values().collect()

列出全部的key值

[3, 3, 5, 1]

列出全部的value值

[4, 6, 6, 2]

kvRDD1.filter(lambda keyValue:keyValue[0]<5).collect()
kvRDD1.filter(lambda keyValue:keyValue[1]<5).collect()

筛选key小于5

[(3, 4), (3, 6), (1, 2)]

筛选value小于5

[(3, 4), (1, 2)]

kvRDD1.mapValues(lambda x:x*x).collect()

对RDD内每一组keyvalue进行运算,并产生另外一个RDD

[(3, 16), (3, 36), (5, 36), (1, 4)]

kvRDD1.sortByKey(ascending=True).collect()

按照key排序,传入参数默认为true,从小到大排序

[(1, 2), (3, 4), (3, 6), (5, 6)]

kvRDD1.sortByKey(ascending=False).collect()

从大到小排序

[(5, 6), (3, 4), (3, 6), (1, 2)]

kvRDD1.reduceByKey(lambda x,y:x+y).collect()

按照key值进行reduce运算,将相同key的数据value合并,合并后产生新的RDD

[(1, 2), (3, 10), (5, 6)]

 

多个RDD Key-Value 基本转换运算

创建RDD

kvRDD1=sc.parallelize([(3,4),(3,6),(5,6),(1,2)])
kvRDD2=sc.parallelize([(3,8)])

keyvalueRDD jion

kvRDD1.join(kvRDD2).collect()

[(3, (4, 8)), (3, (6, 8))]

leftOuterJoin

kvRDD1.leftOuterJoin(kvRDD2).collect()

从左边的集合对应到右边的集合,显示所有左边集合中的元素

[(1, (2, None)), (3, (4, 8)), (3, (6, 8)), (5, (6, None))]

subtractByKey

kvRDD1.subtractByKey(kvRDD2).collect()

删除key值相同的数据

[(1, 2), (5, 6)]

 

Key-Value 动作运算

kvRDD1.first()
kvRDD1.take(2)
kvFirst=kvRDD1.first()
kvFirst[0]

获取第一个数据的第一个元素即Key值

kvFirst[1]

获取第一个数据的第二个元素即Value值

kvRDD1.countByKey()

计算每一个Key值得项数

defaultdict(int, {1: 1, 3: 2, 5: 1})

KV=kvRDD1.collectAsMap()

创建kv字典

kvRDD1.lookup(3)

输入key值来查找value

[4, 6]

Pair RDD的行动操作

 

Broadcast广播变量

广播变量使用规则:

1.可以使用SparkContext.broadcast创建

2.使用.value方法来读取广播变量的值

3.Broadcast广播变量被创建后不能修改

 

Broadcast广播变量的范例

kvFruit=sc.parallelize([(1,"apple"),(2,"orange"),(3,"banana"),(4,"grape")])
fruitMap=kvFruit.collectAsMap()
print"对照表:"+str(fruitMap)

创建fruitMap字典

fruitMap=kvFruit.collectAsMap()
bcFruitMap=sc.broadcast(fruitMap)
print"字典:"+str(fruitMap)

将fruitMap字典转化为转化为bcFriotMap广播变量

fruitIds=sc.parallelize([2,4,1,3])

创建fruitIds

fruitNames=fruitIds.map(lambda x:bcFruitMap.value[x]).collect()
print str(fruitNames)

使用bcFruitMap.value字典进行转换

['orange', 'grape', 'apple', 'banana']

 

accumulator累加器

在task中,例如foreach循环中,不能读取累加器的值

只有驱动程序,也就是循环外,才可以使用.value来读取累加器的值

total=sc.accumulator(0.0)
num=sc.accumulator(0)
intRDD.foreach(lambda i:[total.add(i),num.add(1)])
avg=total.value/num.value
print str(total.value)+" "+str(num.value),str(avg)



16.0 5 3.2

 

RDD持久化

 

 

 

 

RDD编程(python版)总结
lhy_1234的博客
06-25 1751
一、RDD创建方式包括:parallelize、textFile 1.parallelize:将一个已存在的集合生成RDD。 2.textFile:通过读取外部文件生成RDD 二、将RDD显示的方法包括:foreach()、collect() 三、RDD的操作:包括两类,转换操作和行动操作。 1.转换操作中的常用操作有:filter()、map()、flatMap()、groupBy()、reduceByKey()。 (1)filter(func):用于筛选。 例1:将data中含有hadoop的元素筛选出
RDD编程初级实践(Python版)
WangmZec的博客
06-05 703
RDD编程初级实践(Python版)一、实践所需资料及平台要求1.所需数据获取2.实践所需平台二、实践内容1.pyspark交互式编程(1)该系总共有多少学生;(2)该系共开设了多少门课程;(3)Tom同学的总成绩平均分是多少;(4)求每名同学的选修的课程门数;(5)该系DataBase课程共有多少人选修;(6)各门课程的平均分是多少;(7)使用累加器计算共有多少人选了DataBase这门课。2.编写独立应用程序实现数据去重编写内容如下:1)在usr/local/spark/sparksqldata路径下创
spark RDD操作详解
09-25
RDD即弹性分布式数据集,有容错机制并可以被并行操作的元素集合,具有只读、分区、容错、高效、无需物化、可以缓存、RDD依赖等特征。RDD只是数据集的抽象,分区内部并不会存储具体的数据。
DummyRDD:pyspark的RDD的纯Python模拟
05-11
虚拟RDD 贡献者 概述 像RDD一样走路的测试类,像RDD一样说话,但只是一个列表。 包含3个主要类别: 星火汇 SparkContext RDD 所有这些都实现了与实际spark方法完全相同的API,但是使用一个简单的python列表作为实际数据存储区。 Hadoop API,分区,复杂操作等许多功能未实现。 有关实现的功能及其注意事项的详细列表,请参见下文。 请注意,目前这只是实验性的,以后可能对测试或开发有用,但是使用此工具开发的任何东西都应始终在真实的火花上进行检查,以确保在那里确实可以正常工作。 由于实际上没有任何代码在此环境中分发,因此某些事情的行为会有所不同。 打算将该库用作真正的Spark上下文的替代品,而不会出错,但实际上可能什么也没做(例如,在不相关的配置选项的情况下)。 当前,不支持数据框api,也不支持任何事物的大多数功能,但仍在进行中。 例子 一个
头歌:RDD的创建 - Python
最新发布
weixin_62399938的博客
04-30 966
只有对于于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。此方法需要一个 URI的文件(本地路径的机器上,或一个hdfs://,s3a:// 等 URI),并读取其作为行的集合。说明:"local" 是指让Spark程序本地运行,"Simple App" 是指Spark程序的名称,这个名称可以任意(为了直观明了的查看,最好设置有意义的名称)。简单的来说RDD就是一个集合,一个将集合中数据存储在不同机器上的集合。
Python】PySpark 数据计算 ④ ( RDD#filter 方法 - 过滤 RDD 中的元素 | RDD#distinct 方法 - 对 RDD 中的元素去重 )
让 学习 成为一种 习惯 ( 韩曙亮 の 技术博客 )
08-02 1462
一、RDD#filter 方法 1、RDD#filter 方法简介 2、RDD#filter 函数语法 3、代码示例 - RDD#filter 方法示例 二、RDD#distinct 方法 1、RDD#distinct 方法简介 2、代码示例 - RDD#distinct 方法示例
Spark中RDD的常用操作(Python
weixin_30709809的博客
07-08 116
弹性分布式数据集(RDD) Spark是以RDD概念为中心运行的。RDD是一个容错的、可以被并行操作的元素集合。创建一个RDD有两个方法:在你的驱动程序中并行化一个已经存在的集合;从外部存储系统中引用一个数据集。RDD的一大特性是分布式存储,分布式存储在最大的好处是可以让数据在不同工作节点并行存储,以便在需要数据时并行运算。弹性指其在节点存储时,既可以使用内存,也可已使用外存,为...
RDDpython
KwokWyman的博客
12-15 2184
RDD创建 从文件系统加载 .textFile() 支持本地文件系统 分布式文件系统HDFS 云端文件 >>lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt") >>lines.foreach(print) Hadoop is good Spark is fast Spark is better sc : spark context textFile效果 分布式文件系统 通过并行集合(数组)创建
Python】PySpark 数据输入 ① ( RDD 简介 | RDD 中的数据存储与计算 | Python 容器数据转 RDD 对象 | 文件文件转 RDD 对象 )
让 学习 成为一种 习惯 ( 韩曙亮 の 技术博客 )
07-30 4332
一、RDD 简介 1、RDD 概念 2、RDD 中的数据存储与计算 二、Python 容器数据转 RDD 对象 1、RDD 转换 2、转换 RDD 对象相关 API 3、代码示例 - Python 容器转 RDD 对象 ( 列表 ) 4、代码示例 - Python 容器转 RDD 对象 ( 列表 / 元组 / 集合 / 字典 / 字符串 ) 三、文件文件转 RDD 对象
Python大数据之PySpark(五)RDD详解
Maynor的博客
10-04 1603
📢本文由 Maynor 原创,首发于 CSDN博客🙉。分区个数getNumberPartitions。📢感觉这辈子,最深情绵长的注视,都给了手机⭐。分区内元素glom().collect()PySpark中RDD的创建两种方式。扩展阅读:RDD分区数如何确定。WordCount中RDDRDD特点—不需要记忆。通过外部数据创建RDD。并行化方式创建RDD
Spark中RDD常见的算子:Value 类型、双 Value 类型、Key - Value 类型
weixin_44870066的博客
12-03 1223
Spark常见的算子
Spark中RDD的sortBy排序的5种实现方法
muyingmiao的专栏
09-27 9698
RDD,ortBy可以指定对键还是value进行排序,sortBy可以通过下面5中方式实现排序 假如数据的格式如下,list中元素中分别为名称、单价、数量,字符之间用空格连接,要实现按照单价和数量降序 val products = sc.parallelize(List("A 100 10","B 200 20","C 200 30","D 400 30")) 1.通过Tuple方式,按照...
RDD编程初级实践(基于python
weixin_51125289的博客
06-07 365
RDD编程初级实践(基于python) 1.实验目的 2.实验环境 3.实验数据 3.1pyspark交互式编程(实验描述) 3.2编写独立应用程序实现数据去重(实验描述) 3.3编写独立应用程序实现求平均值问题(实验描述) 3.4实验数据下载 4.实验步骤 4.1pyspark交互式编程 4.2编写独立应用程序实现数据去重 4.3编写独立应用程序实现求平均值问题 1.实验目的 (1)熟悉Spark的RDD基本操作及键值对操作; (2)熟悉使用RDD编程解决实际具体问题的方法。 2.实验环境 (1)操作系统
spark rdd 实战 ,基本语法
01-24
Spark RDD 实战、基本语法 本文将对 Spark RDD 进行深入浅出的讲解,涵盖 Spark 的基本特性、生态体系、优势、支持的 API、运行模式、RDD 的概念和类型、容错 Lineage、缓存策略等知识点。 Spark 的基本特性 ...
大数据spark学习之rdd概述
05-03
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。在 Spark 中,对数据的所有操作不外乎创建 RDD、转化已有RDD 以及...
Spark学习--RDD编码
05-09
对一个数据为{1,2,3,3}的RDD进行基本的RDD转化操作 函数名 目的 示例 结果 union() 生成一个包含两个RDD 中所有元素的RDD rdd.union(other) {1, 2, 3, 3, 4, 5} intersection() 求两个RDD 共同的元素的RDD rdd....
Python学习笔记——大数据之Spark简介与环境搭建
02-24
尤其是定义RDD的API、操作以及这两者上的动作。其他Spark的库都是构建在RDD和SparkCore之上的。SparkSQL:提供通过ApacheHive的SQL变体Hive查询语言(HiveQL)与Spark进行交互的API。每个数据库表被当做一个RDD,...
[spark]排序
胖胖的博客
10-27 1805
排序的执行顺序:按照集合中元素的比较规则进行排序,如果集合中元素没有比较规则,则可以通过extends Ordered[] 并重写相应方法实现。和sortByKey的区别是:sortBy按照f:(T)=>K这个函数的返回值进行排序这使得sortBy使用起来比sortByKey更加灵活。numPartition:可以指定排序结果的分区数,默认排序结果的分区数是这个rdd的分区数。ascending:为true则按照升序排序,为false则按照降序排序,默认为true。函数的作用:按照key进行排序。
Pyspark基础操作( rdd dataframe 创建 读取 利用)
BiuFEIMIR的博客
12-06 2380
Part1 Pyspark 1.读取数据 #enableHiveSupprot() 支持hive操作 #getOrCreate() 如果没有就创建,有就不用了 spark = SparkSession.builder.appName("appName").enableHiveSupport().getOrCreate() spark.sparkContext.pythonExec = spark.conf.get('spark.yarn.appMasterEnv.PYSPARK_PYTHON') pa
python 如何操作 spark
04-29
Python 可以使用 PySpark 操作 Spark,PySpark 是 Spark 的 Python API,它提供了 Spark 的所有功能和特性。使用 PySpark 可以通过编写 Python 代码来操作 Spark 的分布式计算集群。以下是一些基本的操作: 1. 安装 PySpark:可以通过 pip 安装 PySpark:`pip install pyspark` 2. 创建 SparkContext:在使用 PySpark 之前,需要先创建一个 SparkContext 对象。SparkContext 对象是连接 Spark 集群的入口。可以使用以下代码创建 SparkContext: ```python from pyspark import SparkContext sc = SparkContext("local", "PySpark Example") ``` 这里,“local”表示在本地运行,也可以指定连接到远程的 Spark 集群。 3. 创建 RDD:使用 PySpark 可以创建 RDD(Resilient Distributed Dataset),RDD 是 Spark 中的核心概念。可以使用以下代码创建 RDD: ```python data = [1, 2, 3, 4, 5] rdd = sc.parallelize(data) ``` 这里将 Python 列表转换为 RDD。 4. 转换操作:使用 PySpark 可以对 RDD 进行各种转换操作,例如 map、filter、reduce 等等。以下是一个 map 操作的示例: ```python data = [1, 2, 3, 4, 5] rdd = sc.parallelize(data) result = rdd.map(lambda x: x * 2).collect() print(result) ``` 这里使用 map 将 RDD 中的每个元素乘以 2。 5. 动作操作:在 PySpark 中,动作操作用于触发计算并返回结果。例如,collect、count、reduce 等等。以下是一个 count 操作的示例: ```python data = [1, 2, 3, 4, 5] rdd = sc.parallelize(data) result = rdd.count() print(result) ``` 这里使用 count 返回 RDD 中的元素个数。 以上是 PySpark 的一些基本操作,还有很多高级操作可以使用,例如 Spark SQL、DataFrame、机器学习等等。可以参考官方文档和教程来学习更多内容。

“相关推荐”对你有帮助么?

  • 非常没帮助
  • 没帮助
  • 一般
  • 有帮助
  • 非常有帮助
提交
写文章

分类专栏

  • python网络爬虫实战 7篇
  • Python数据分析 15篇
  • 区块链 2篇
  • Linux 8篇
  • C++ 10篇
  • Python 61篇
  • POJ 41篇
  • 数据库 8篇
  • MachineLearning 15篇
  • Leetcode 1篇

最新评论

  • 招商银行2022FinTech精英训练营 - 数据赛道方案分享

    m0_46299226: 请问找毒特是什么意思

  • Python爬取英雄联盟职业比赛数据

    codesss333: 20年之后的爬不到了

  • 招商银行2022FinTech精英训练营 - 数据赛道方案分享

    weixin_46470005: 用魔法打开

  • 科大讯飞人岗匹配Top1方案总结

    nqct1: 这是相当于多分类的任务嘛?而且好像不太知道具体的类别数和类别

  • 科大讯飞人岗匹配Top1方案总结

    p_joke: 大佬,求分享数据集484039234@qq.com,十分感谢!

大家在看

  • Golang | Leetcode Golang题解之第151题反转字符串中的单词 75
  • 数据结构预备知识(Java):包装类&泛型 1659

最新文章

  • 科大讯飞人岗匹配Top1方案总结
  • 科大讯飞CTR预估挑战赛Top3方案总结
  • 快速入门数据科学竞赛
2022年10篇
2021年20篇
2020年70篇
2019年156篇
2018年6篇

目录

目录

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43元 前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值

哆哆女性网欧洲老妇人70修正液黎字起名子lol起名字大全塞上风云记剧情介绍新生儿男孩起小名一群人一起创业取个群名烘焙坊起名字大全给房子起名字fast无线路由器设置女孩叠字取名起名大全羊年李姓宝宝起名久热爱精品视频线路一店铺 起名君臣斗挂面厂名字咋起可以用来起名字的中药名战地之王名字成语大全大话西游2表情包origin是什么何兆武建筑 公司 起名好看的盗墓小说起名女诗经男楚辞取名国学起名大全男宝女人起名字姓袁米乐星ktv隐匿的角落深圳起名字哪里比较好淀粉肠小王子日销售额涨超10倍罗斯否认插足凯特王妃婚姻不负春光新的一天从800个哈欠开始有个姐真把千机伞做出来了国产伟哥去年销售近13亿充个话费竟沦为间接洗钱工具重庆警方辟谣“男子杀人焚尸”男子给前妻转账 现任妻子起诉要回春分繁花正当时呼北高速交通事故已致14人死亡杨洋拄拐现身医院月嫂回应掌掴婴儿是在赶虫子男孩疑遭霸凌 家长讨说法被踢出群因自嘲式简历走红的教授更新简介网友建议重庆地铁不准乘客携带菜筐清明节放假3天调休1天郑州一火锅店爆改成麻辣烫店19岁小伙救下5人后溺亡 多方发声两大学生合买彩票中奖一人不认账张家界的山上“长”满了韩国人?单亲妈妈陷入热恋 14岁儿子报警#春分立蛋大挑战#青海通报栏杆断裂小学生跌落住进ICU代拍被何赛飞拿着魔杖追着打315晚会后胖东来又人满为患了当地回应沈阳致3死车祸车主疑毒驾武汉大学樱花即将进入盛花期张立群任西安交通大学校长为江西彩礼“减负”的“试婚人”网友洛杉矶偶遇贾玲倪萍分享减重40斤方法男孩8年未见母亲被告知被遗忘小米汽车超级工厂正式揭幕周杰伦一审败诉网易特朗普谈“凯特王妃P图照”考生莫言也上北大硕士复试名单了妈妈回应孩子在校撞护栏坠楼恒大被罚41.75亿到底怎么缴男子持台球杆殴打2名女店员被抓校方回应护栏损坏小学生课间坠楼外国人感慨凌晨的中国很安全火箭最近9战8胜1负王树国3次鞠躬告别西交大师生房客欠租失踪 房东直发愁萧美琴窜访捷克 外交部回应山西省委原副书记商黎光被逮捕阿根廷将发行1万与2万面值的纸币英国王室又一合照被质疑P图男子被猫抓伤后确诊“猫抓病”

哆哆女性网 XML地图 TXT地图 虚拟主机 SEO 网站制作 网站优化