中文字幕在线观看,亚洲а∨天堂久久精品9966,亚洲成a人片在线观看你懂的,亚洲av成人片无码网站,亚洲国产精品无码久久久五月天

Spark的Python編程示例

2018-07-20    來源:open-open

容器云強(qiáng)勢(shì)上線!快速搭建集群,上萬Linux鏡像隨意使用

鏈接:http://spark.apache.org/docs/latest/programming-guide.html

安裝好Spark 后,自帶了一些demo, 路徑在Spark根目錄/examples/src/main/python/

里面有些例子,例如統(tǒng)計(jì)字?jǐn)?shù)的 wordcount.py

import sys
from operator import add
 
from pyspark import SparkContext
 
import sys
reload(sys)
sys.setdefaultencoding("utf-8")
 
if __name__ == "__main__":
    if len(sys.argv) != 2:
        print >> sys.stderr, "Usage: wordcount <file>"
        exit(-1)
    sc = SparkContext(appName="PythonWordCount")
    lines = sc.textFile(sys.argv[1], 1)
    counts = lines.flatMap(lambda x: x.split(' ')) \
                  .map(lambda x: (x, 1)) \
                  .reduceByKey(add)
    output = counts.collect()
    for (word, count) in output:
        print "%s: %i" % (word, count)
 
    sc.stop()

另外參考Spary 的 python api: http://spark.apache.org/docs/latest/api/python/pyspark.html

寫了一個(gè)小demo,就是練習(xí)一下api的使用,做業(yè)務(wù)很方便。針對(duì)于大數(shù)據(jù)文件做統(tǒng)計(jì)分析的。比如幾十兆上百兆的我們單機(jī)處理,上G的就放在hadoop 的 hdfs上。

下面是一個(gè)學(xué)生成績(jī)單。四列字段:學(xué)生,以及三科成績(jī)。其中學(xué)生有重復(fù)的(比如額外加分的情況,需要合并分析)。

yang    85  90  30
wang    20  60  50
zhang   90  90  90
li  100 54  0
yanf    0   0   0   
yang 12 0 0

當(dāng)然實(shí)際中數(shù)據(jù)要很多,比如很多列,而且?guī)资f行甚至幾百萬行。這里是一個(gè)demo ,相當(dāng)于在部署前測(cè)試。

在 Spark根目錄/example/src/main/python/ 下新建一個(gè) students.py :

#coding=utf-8
 
import sys
from operator import add
from pyspark import SparkContext
 
import sys
reload(sys)
sys.setdefaultencoding("utf-8")
 
def map_func(x):
    s = x.split()
    return (s[0],[int(s[1]),int(s[2]),int(s[3])])
 
def f(x):
    return x
    rank = sc.parallelize(range(0,sorted.count()))
 
def add(a,b):
    return [a[r]+ b[r] for r in range(len(a))]
 
def _merge(a,b):
    print '****'
    return [a[r]+ b[r] for r in range(len(a))]
 
#the students who has one score is 100
def has100(x):
    for y in x:
        if(y==100):
            return True
    return False
 
def allIs0(x):
    if(type(x) == list and sum(x) == 0):
        return True
    return False
 
def subMax(x,y):
    m = [x[1][i] if(x[1][i] > y[1][i]) else y[1][i] for i in range(3)]
    return('',m)
 
def sumAll(x,y):
    return ('',[x[1][i]+y[1][i] for i in range(3)])
 
 
if __name__ == "__main__":
    if len(sys.argv) != 2:
        print >> sys.stderr, "Usage: students <file>"
        exit(-1)
    sc = SparkContext(appName="Students")
    #加載學(xué)生文件,調(diào)用map將學(xué)生映射成keyValues.其中,key是學(xué)生,Value是學(xué)生成績(jī)。map后的結(jié)果如('yang',(85,90,30))
    # 之后調(diào)用 CombineByKey,將相同學(xué)生的成績(jī)相加(合并)。然后調(diào)用cache, 將整個(gè)數(shù)據(jù)緩存,以便多次進(jìn)行reduce而無需每次都重新生成。
    lines = sc.textFile(sys.argv[1], 1).map(map_func).combineByKey(f,add,_merge).cache()
    #print lines
    count = lines.count()
     
    # 獲取學(xué)生中三科成績(jī)有滿分的,調(diào)用filter來實(shí)現(xiàn)
    whohas100 = lines.filter(lambda x: filter(has100,x)).collect()
    # 獲取三科中所有成績(jī)都是0的同學(xué)(缺考)
    whoIs0 = lines.filter(lambda x: filter(allIs0,x)).collect()
    # 獲取每個(gè)學(xué)生的成績(jī)總和
    sumScore = lines.map(lambda x: (x[0],sum(x[1]))).collect()
    # 獲取三科中,單科最高分
    subM = lines.reduce(subMax)
    # 獲取學(xué)生單科成績(jī)的總和,求單科平均分用
    sumA = lines.reduce(sumAll)
    # 總分最高的學(xué)生
    maxScore = max(sumScore,key = lambda x: x[1])
    # 總分最低的學(xué)生
    minScore = min(sumScore,key = lambda x: x[1])
    # 所有學(xué)生三科成績(jī)平均分
    avgA = [x/count for x in sumA[1]]
    # 根據(jù)總分進(jìn)行排序(默認(rèn)由小而大)
    sorted = lines.sortBy(lambda x: sum(x[1]))
    # 排序并附帶序號(hào)
    sortedWithRank = sorted.zipWithIndex().collect()
    # 取出成績(jī)最高的前三名同學(xué),發(fā)獎(jiǎng)!
    first3 = sorted.takeOrdered(3,key = lambda x: -sum(x[1]))
    
    #print '*'*50
    print whohas100
    print maxScore
    print whoIs0
    print subM
    print avgA
    print sorted.collect()
    print sortedWithRank
    print first3
     
    #將結(jié)果匯總輸出到文件
    file = open('/home/yanggaofei/downloads/result.txt','w')
    file.write('students num:'+`count`+ '\n')
    file.write('who has a 100 scores:' + str(whohas100) + '\n')
    file.write('who all is 0:' + str(whoIs0) + '\n')
    file.write('the max score of each subject:' + str(subM) + '\n')
    file.write('the avg score of each subject:' + str(avgA) + '\n')
    file.write('sorted the students:' + str(sorted.collect()) + '\n')
    file.write('sorted the students with the rank:' + str(sortedWithRank) + '\n')
    file.write('the first 3 who will get the award:' + str(first3) + '\n')
    file.close()

好了,運(yùn)行:

[root@cyouemt spark-1.1.1]# ./bin/spark-submit examples/src/main/python/students.py temp/student.txt

運(yùn)行結(jié)果result.txt如下:

students num:5
who has a 100 scores:[(u'li', [100, 54, 0])]
who all is 0:[(u'yanf', [0, 0, 0])]
the max score of each subject:('', [100, 90, 90])
the avg score of each subject:[61, 58, 34]
sorted the students:[(u'yanf', [0, 0, 0]), (u'wang', [20, 60, 50]), (u'li', [100, 54, 0]), (u'yang', [97, 90, 30]), (u'zhang', [90, 90, 90])]
sorted the students with the rank:[((u'yanf', [0, 0, 0]), 0), ((u'wang', [20, 60, 50]), 1), ((u'li', [100, 54, 0]), 2), ((u'yang', [97, 90, 30]), 3), ((u'zhang', [90, 90, 90]), 4)]
the first 3 who will get the award:[(u'zhang', [90, 90, 90]), (u'yang', [97, 90, 30]), (u'li', [100, 54, 0])]

Spark的運(yùn)行過程會(huì)打印出任務(wù)執(zhí)行的開始過程以及結(jié)束。表示沒研究透,不做陳述。。。相比hadoop,Spark 是一個(gè)內(nèi)存計(jì)算的MapReduce, 通過緩存機(jī)制,在性能上要好很多。它自身不帶數(shù)據(jù)系統(tǒng)。但是支持 hdfs,mesos,hbase。文本文件等。從架構(gòu)和應(yīng)用角度上看,spark是 一個(gè)僅包含計(jì)算邏輯的開發(fā)庫(盡管它提供個(gè)獨(dú)立運(yùn)行的master/slave服務(wù),但考慮到穩(wěn)定后以及與其他類型作業(yè)的繼承性,通常不會(huì)被采用),而不 包含任何資源管理和調(diào)度相關(guān)的實(shí)現(xiàn),這使得spark可以靈活運(yùn)行在目前比較主流的資源管理系統(tǒng)上,典型的代表是mesos和yarn,我們稱之為 “spark on mesos”和“spark on yarn”。將spark運(yùn)行在資源管理系統(tǒng)上將帶來非常多的收益,包括:與其他計(jì)算框架共享集群資源;資源按需分配,進(jìn)而提高集群資源利用率等

標(biāo)簽: whois 大數(shù)據(jù)

版權(quán)申明:本站文章部分自網(wǎng)絡(luò),如有侵權(quán),請(qǐng)聯(lián)系:west999com@outlook.com
特別注意:本站所有轉(zhuǎn)載文章言論不代表本站觀點(diǎn)!
本站所提供的圖片等素材,版權(quán)歸原作者所有,如需使用,請(qǐng)與原作者聯(lián)系。

上一篇:判斷輸入的參數(shù)是否是個(gè)合格標(biāo)準(zhǔn)的郵箱

下一篇:JavaScript設(shè)置主頁功能的代碼