分类 pyspark 下的文章

一、基本流程

735245-20200412143139708-538043308.jpeg

对spark里面一些基本名词介绍一下

master和worker节点:
master节点常驻master守护进程,负责管理worker节点,我们从master节点提交应用。
worker节点常驻worker守护进程,与master节点通信,并且管理executor进程。

1,Application

application(应用)其实就是用spark-submit提交的程序。比方说spark examples中的计算pi的SparkPi。一个application通常包含三部分:从数据源(比方说HDFS)取数据形成RDD,通过RDD的transformation和action进行计算,将结果输出到console或者外部存储(比方说collect收集输出到console)。

2,Driver

 Spark中的driver感觉其实和yarn中Application Master的功能相类似。主要完成任务的调度以及和executor和cluster manager进行协调。有client和cluster联众模式。client模式driver在任务提交的机器上运行,而cluster模式会随机选择机器中的一台机器启动driver。从spark官网截图的一张图可以大致了解driver的功能。

3,Job

 Spark中的Job和MR中Job不一样不一样。MR中Job主要是Map或者Reduce Job。而Spark的Job其实很好区别,一个action算子就算一个Job,比方说count,first等。

4, Task

Task是Spark中最新的执行单元。RDD一般是带有partitions的,每个partition的在一个executor上的执行可以任务是一个Task。 

5, Stage

Stage概念是spark中独有的。一般而言一个Job会切换成一定数量的stage。各个stage之间按照顺序执行。至于stage是怎么切分的,首选得知道spark论文中提到的narrow dependency(窄依赖)和wide dependency( 宽依赖)的概念。其实很好区分,看一下父RDD中的数据是否进入不同的子RDD,如果只进入到一个子RDD则是窄依赖,否则就是宽依赖。宽依赖和窄依赖的边界就是stage的划分点

def get_sim_test(clear_stand_names_list,clear_names_list):

return [float(0),0.9]

相似度计算

udf_get_sim = F.udf(get_sim_test,ArrayType(FloatType()))
xtl_data1 = xtl_data.withColumn('sim_max',udf_get_sim(xtl_data.stand_names_cut,xtl_data.skunames_cut))

xtl_data1.select("standard_id","barndname_cn","capacity","pac_spec","stand_ids","stand_names","stand_names_cut","skuids","skunames","skunames_cut","sim_max").show()

for i in xtl_data1.select("sim_max").collect():

print("sssss:",i["sim_max"])

一、背景

Spark最初由美国加州伯克利大学(UCBerkeley)的AMP(Algorithms, Machines and People)实验室于2009年开发,是基于内存计算的大数据并行计算框架,可用于构建大型的、低延迟的数据分析应用程序。Spark在诞生之初属于研究性项目,其诸多核心理念均源自学术研究论文。2013年,Spark加入Apache孵化器项目后,开始获得迅猛的发展,如今已成为Apache软件基金会最重要的三大分布式计算系统开源项目之一(即Hadoop、Spark、Storm)。

Spark作为大数据计算平台的后起之秀,在2014年打破了Hadoop保持的基准排序(Sort Benchmark)纪录,使用206个节点在23分钟的时间里完成了100TB数据的排序,而Hadoop则是使用2000个节点在72分钟的时间里完成同样数据的排序。也就是说,Spark仅使用了十分之一的计算资源,获得了比Hadoop快3倍的速度。新纪录的诞生,使得Spark获得多方追捧,也表明了Spark可以作为一个更加快速、高效的大数据计算平台。
Spark具有如下几个主要特点:
 运行速度快:Spark使用先进的DAG(Directed Acyclic Graph,有向无环图)执行引擎,以支持循环数据流与内存计算,基于内存的执行速度可比Hadoop MapReduce快上百倍,基于磁盘的执行速度也能快十倍;
 容易使用:Spark支持使用Scala、Java、Python和R语言进行编程,简洁的API设计有助于用户轻松构建并行程序,并且可以通过Spark Shell进行交互式编程;
 通用性:Spark提供了完整而强大的技术栈,包括SQL查询、流式计算、机器学习和图算法组件,这些组件可以无缝整合在同一个应用中,足以应对复杂的计算;
 运行模式多样:Spark可运行于独立的集群模式中,或者运行于Hadoop中,也可运行于Amazon EC2等云环境中,并且可以访问HDFS、Cassandra、HBase、Hive等多种数据源。
Spark源码托管在Github中,截至2016年3月,共有超过800名来自200多家不同公司的开发人员贡献了15000次代码提交,可见Spark的受欢迎程度。

二、Spark相对于Hadoop的优势

Hadoop虽然已成为大数据技术的事实标准,但其本身还存在诸多缺陷,最主要的缺陷是其MapReduce计算模型延迟过高,无法胜任实时、快速计算的需求,因而只适用于离线批处理的应用场景。
回顾Hadoop的工作流程,可以发现Hadoop存在如下一些缺点:
 表达能力有限。计算都必须要转化成Map和Reduce两个操作,但这并不适合所有的情况,难以描述复杂的数据处理过程;
 磁盘IO开销大。每次执行时都需要从磁盘读取数据,并且在计算完成后需要将中间结果写入到磁盘中,IO开销较大;
 延迟高。一次计算可能需要分解成一系列按顺序执行的MapReduce任务,任务之间的衔接由于涉及到IO开销,会产生较高延迟。而且,在前一个任务执行完成之前,其他任务无法开始,难以胜任复杂、多阶段的计算任务。
Spark在借鉴Hadoop MapReduce优点的同时,很好地解决了MapReduce所面临的问题。相比于MapReduce,Spark主要具有如下优点:
 Spark的计算模式也属于MapReduce,但不局限于Map和Reduce操作,还提供了多种数据集操作类型,编程模型比MapReduce更灵活;
 Spark提供了内存计算,中间结果直接放到内存中,带来了更高的迭代运算效率;
 Spark基于DAG的任务调度执行机制,要优于MapReduce的迭代执行机制。

Spark最大的特点就是将计算数据、中间结果都存储在内存中,大大减少了IO开销,因而,Spark更适合于迭代运算比较多的数据挖掘与机器学习运算。使用Hadoop进行迭代计算非常耗资源,因为每次迭代都需要从磁盘中写入、读取中间数据,IO开销大。而Spark将数据载入内存后,之后的迭代计算都可以直接使用内存中的中间结果作运算,避免了从磁盘中频繁读取数据。

三、资源调度

1.可以借助于YARN实现资源调度管理

四、存储

1.借助于HDFS实现分布式存储

图-BDAS架构.jpg

参考文献:
1.http://dblab.xmu.edu.cn/blog/1709-2/

一、定义

RDD代表Resilient Distributed Dataset,它们是在多个节点上运行和操作以在集群上进行并行处理的元素。
1.RDD是不可变元素,这意味着一旦创建了RDD,就无法对其进行更改。
2.RDD也具有容错能力,因此在发生任何故障时,它们会自动恢复。您可以对这些RDD应用多个操作来完成某项任务
3.