成人在线亚洲_国产日韩视频一区二区三区_久久久国产精品_99国内精品久久久久久久

您的位置:首頁(yè)技術(shù)文章
文章詳情頁(yè)

Python如何把Spark數(shù)據(jù)寫(xiě)入ElasticSearch

瀏覽:6日期:2022-07-29 14:37:53

這里以將Apache的日志寫(xiě)入到ElasticSearch為例,來(lái)演示一下如何使用Python將Spark數(shù)據(jù)導(dǎo)入到ES中。

實(shí)際工作中,由于數(shù)據(jù)與使用框架或技術(shù)的復(fù)雜性,數(shù)據(jù)的寫(xiě)入變得比較復(fù)雜,在這里我們簡(jiǎn)單演示一下。

如果使用Scala或Java的話,Spark提供自帶了支持寫(xiě)入ES的支持庫(kù),但Python不支持。所以首先你需要去這里下載依賴的ES官方開(kāi)發(fā)的依賴包包。

下載完成后,放在本地目錄,以下面命令方式啟動(dòng)pyspark:

pyspark --jars elasticsearch-hadoop-6.4.1.jar

如果你想pyspark使用Python3,請(qǐng)?jiān)O(shè)置環(huán)境變量:

export PYSPARK_PYTHON=/usr/bin/python3理解如何寫(xiě)入ES的關(guān)鍵是要明白,ES是一個(gè)JSON格式的數(shù)據(jù)庫(kù),它有一個(gè)必須的要求。數(shù)據(jù)格式必須采用以下格式

{ 'id: { the rest of your json}}

往下會(huì)展示如何轉(zhuǎn)換成這種格式。

解析Apache日志文件我們將Apache的日志文件讀入,構(gòu)建Spark RDD。然后我們寫(xiě)一個(gè)parse()函數(shù)用正則表達(dá)式處理每條日志,提取我們需要的字

rdd = sc.textFile('/home/ubuntu/walker/apache_logs')regex=’^(S+) (S+) (S+) [([w:/]+s[+-]d{4})] '(S+)s?(S+)?s?(S+)?' (d{3}|-) (d+|-)s?'?([^']*)'?s?'?([^']*)?'?$’

p=re.compile(regex)def parse(str): s=p.match(str) d = {} d[’ip’]=s.group(1) d[’date’]=s.group(4) d[’operation’]=s.group(5) d[’uri’]=s.group(6) return d

換句話說(shuō),我們剛開(kāi)始從日志文件讀入RDD的數(shù)據(jù)類似如下:

[’83.149.9.216 - - [17/May/2015:10:05:03 +0000] 'GET /presentations/logstash-monitorama-2013/images/kibana-search.png HTTP/1.1' 200 203023 'http://semicomplete.com/presentations/logstash-monitorama-2013/' 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36'’]

然后我們使用map函數(shù)轉(zhuǎn)換每條記錄:

rdd2 = rdd.map(parse)

rdd2.take(1)

[{’date’: ’17/May/2015:10:05:03 +0000’, ’ip’: ’83.149.9.216’, ’operation’: ’GET’, ’uri’: ’/presentations/logstash-monitorama-2013/images/kibana-search.png’}]

現(xiàn)在看起來(lái)像JSON,但并不是JSON字符串,我們需要使用json.dumps將dict對(duì)象轉(zhuǎn)換。

我們同時(shí)增加一個(gè)doc_id字段作為整個(gè)JSON的ID。在配置ES中我們?cè)黾尤缦屡渲谩癳s.mapping.id”: “doc_id”告訴ES我們將這個(gè)字段作為ID。

這里我們使用SHA算法,將這個(gè)JSON字符串作為參數(shù),得到一個(gè)唯一ID。計(jì)算結(jié)果類似如下,可以看到ID是一個(gè)很長(zhǎng)的SHA數(shù)值。

rdd3.take(1)

[(’a5b086b04e1cc45fb4a19e2a641bf99ea3a378599ef62ba12563b75c’, ’{'date': '17/May/2015:10:05:03 +0000', 'ip': '83.149.9.216', 'operation': 'GET', 'doc_id': 'a5b086b04e1cc45fb4a19e2a641bf99ea3a378599ef62ba12563b75c', 'uri': '/presentations/logstash-monitorama-2013/images/kibana-search.png'}’)]

現(xiàn)在我們需要制定ES配置,比較重要的兩項(xiàng)是:

“es.resource” : ‘walker/apache’: 'walker'是索引,apache是類型,兩者一般合稱索引 “es.mapping.id”: “doc_id”: 告訴ES那個(gè)字段作為整個(gè)文檔的ID,也就是查詢結(jié)果中的_id

其他的配置自己去探索。

然后我們使用saveAsNewAPIHadoopFile()將RDD寫(xiě)入到ES。這部分代碼對(duì)于所有的ES都是一樣的,比較固定,不需要理解每一個(gè)細(xì)節(jié)

es_write_conf = { 'es.nodes' : 'localhost', 'es.port' : '9200', 'es.resource' : ’walker/apache’, 'es.input.json': 'yes', 'es.mapping.id': 'doc_id' } rdd3.saveAsNewAPIHadoopFile( path=’-’, outputFormatClass='org.elasticsearch.hadoop.mr.EsOutputFormat', keyClass='org.apache.hadoop.io.NullWritable', valueClass='org.elasticsearch.hadoop.mr.LinkedMapWritable', conf=es_write_conf)rdd3 = rdd2.map(addID)def addId(data): j=json.dumps(data).encode(’ascii’, ’ignore’) data[’doc_id’] = hashlib.sha224(j).hexdigest() return (data[’doc_id’], json.dumps(data))

最后我們可以使用curl進(jìn)行查詢

curl http://localhost:9200s/walker/apache/_search?pretty=true&?q=*{ '_index' : 'walker', '_type' : 'apache', '_id' : '227e977849bfd5f8d1fca69b04f7a766560745c6cb3712c106d590c2', '_score' : 1.0, '_source' : { 'date' : '17/May/2015:10:05:32 +0000', 'ip' : '91.177.205.119', 'operation' : 'GET', 'doc_id' : '227e977849bfd5f8d1fca69b04f7a766560745c6cb3712c106d590c2', 'uri' : '/favicon.ico' }

如下是所有代碼:

import jsonimport hashlibimport redef addId(data): j=json.dumps(data).encode(’ascii’, ’ignore’) data[’doc_id’] = hashlib.sha224(j).hexdigest() return (data[’doc_id’], json.dumps(data))def parse(str): s=p.match(str) d = {} d[’ip’]=s.group(1) d[’date’]=s.group(4) d[’operation’]=s.group(5) d[’uri’]=s.group(6) return d regex=’^(S+) (S+) (S+) [([w:/]+s[+-]d{4})] '(S+)s?(S+)?s?(S+)?' (d{3}|-) (d+|-)s?'?([^']*)'?s?'?([^']*)?'?$’p=re.compile(regex)rdd = sc.textFile('/home/ubuntu/walker/apache_logs')rdd2 = rdd.map(parse)rdd3 = rdd2.map(addID)es_write_conf = { 'es.nodes' : 'localhost', 'es.port' : '9200', 'es.resource' : ’walker/apache’, 'es.input.json': 'yes', 'es.mapping.id': 'doc_id' } rdd3.saveAsNewAPIHadoopFile( path=’-’, outputFormatClass='org.elasticsearch.hadoop.mr.EsOutputFormat', keyClass='org.apache.hadoop.io.NullWritable', valueClass='org.elasticsearch.hadoop.mr.LinkedMapWritable', conf=es_write_conf)

也可以這么封裝,其實(shí)原理是一樣的

import hashlibimport jsonfrom pyspark import Sparkcontextdef make_md5(line): md5_obj=hashlib.md5() md5_obj.encode(line) return md5_obj.hexdigest()def parse(line): dic={} l = line.split(’t’) doc_id=make_md5(line) dic[’name’]=l[1] dic[’age’] =l[2] dic[’doc_id’]=doc_id return dic #記得這邊返回的是字典類型的,在寫(xiě)入es之前要記得dumpsdef saveData2es(pdd, es_host, port,index, index_type, key): ''' 把saprk的運(yùn)行結(jié)果寫(xiě)入es :param pdd: 一個(gè)rdd類型的數(shù)據(jù) :param es_host: 要寫(xiě)es的ip :param index: 要寫(xiě)入數(shù)據(jù)的索引 :param index_type: 索引的類型 :param key: 指定文檔的id,就是要以文檔的那個(gè)字段作為_(kāi)id :return: ''' #實(shí)例es客戶端記得單例模式 if es.exist.index(index): es.index.create(index, ’spo’) es_write_conf = { 'es.nodes': es_host, 'es.port': port, 'es.resource': index/index_type, 'es.input.json': 'yes', 'es.mapping.id': key } (pdd.map(lambda _dic: (’’, json.dumps(_dic)))) #這百年是為把這個(gè)數(shù)據(jù)構(gòu)造成元組格式,如果傳進(jìn)來(lái)的_dic是字典則需要jdumps,如果傳進(jìn)來(lái)之前就已經(jīng)dumps,這便就不需要dumps了 .saveAsNewAPIHadoopFile( path=’-’, outputFormatClass='org.elasticsearch.hadoop.mr.EsOutputFormat', keyClass='org.apache.hadoop.io.NullWritable', valueClass='org.elasticsearch.hadoop.mr.LinkedMapWritable', conf=es_write_conf) )if __name__ == ’__main__’: #實(shí)例化sp對(duì)象 sc=Sparkcontext() #文件中的呢內(nèi)容一行一行用sc的讀取出來(lái) json_text=sc.textFile(’./1.txt’) #進(jìn)行轉(zhuǎn)換 json_data=json_text.map(lambda line:parse(line)) saveData2es(json_data,’127.0.01’,’9200’,’index_test’,’index_type’,’doc_id’) sc.stop()

看到了把,面那個(gè)例子在寫(xiě)入es之前加了一個(gè)id,返回一個(gè)元組格式的,現(xiàn)在這個(gè)封裝指定_id就會(huì)比較靈活了

以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持好吧啦網(wǎng)。

標(biāo)簽: Python 編程
相關(guān)文章:
成人在线亚洲_国产日韩视频一区二区三区_久久久国产精品_99国内精品久久久久久久
久久综合福利| 韩国女主播成人在线观看| 琪琪一区二区三区| 国产精品一卡| 日韩一区在线看| 99国产精品久久久久久久久久| 欧美日韩中文字幕精品| 美女视频第一区二区三区免费观看网站| 久久国产精品久久久久久电车| 尤物av一区二区| 99精品国产在热久久下载| 中文字幕一区二区三区在线不卡 | 五月天一区二区| 日韩午夜免费视频| 亚洲免费毛片网站| 亚洲高清不卡一区| 亚洲色图19p| 在线看无码的免费网站| 亚洲国产精品高清| 欧美精品二区| 国产精品无人区| 尤物在线精品| 亚洲欧美日韩人成在线播放| 亚洲精品极品| 亚洲综合图片区| 久久精品官网| 久久99国产乱子伦精品免费| 欧美撒尿777hd撒尿| 激情久久五月天| 8x福利精品第一导航| 久久国产福利国产秒拍| 欧美肥胖老妇做爰| 国产盗摄一区二区| 欧美极品美女视频| 精品1区2区| 一区二区三国产精华液| 久久香蕉精品| 日日噜噜夜夜狠狠视频欧美人| 91官网在线观看| 国产一区二区三区不卡在线观看 | 蜜臀av一区二区在线观看| 久热这里只精品99re8久| 欧美a级理论片| 欧美一区二区啪啪| 色综合欧美在线视频区| 中文字幕一区二区在线观看| 亚洲影音先锋| 蜜桃av噜噜一区| 91精品国产综合久久国产大片| 不卡电影一区二区三区| 日本一区二区三区四区| 国产视频不卡| 国内精品免费在线观看| 精品对白一区国产伦| 亚洲小说欧美另类社区| 亚洲午夜激情av| 欧美在线视频不卡| 成人av免费网站| 综合网在线视频| 色老头久久综合| va亚洲va日韩不卡在线观看| 国产精品欧美精品| 麻豆成人av| 国产久卡久卡久卡久卡视频精品| 久久久精品日韩欧美| 亚洲欧洲久久| 日本中文在线一区| 精品久久久久久久人人人人传媒| 好看的亚洲午夜视频在线| 午夜精品福利在线| 欧美一区二区高清| 欧美日韩亚洲一区二区三区在线观看 | 91网站最新地址| 一区二区三区加勒比av| 欧美色视频在线观看| 波多野结衣一区二区三区 | 国产日韩欧美不卡在线| 国产日产精品一区二区三区四区的观看方式 | 亚洲国产免费| 日本sm残虐另类| 久久免费精品国产久精品久久久久| 亚洲国产免费看| 久久国产精品一区二区| 国产三级一区二区| 久久九九国产| 91影视在线播放| 亚洲福利视频一区二区| 日韩三级伦理片妻子的秘密按摩| 91久久精品国产91久久性色tv| 免费人成精品欧美精品| 26uuu国产日韩综合| 亚洲一区二区三区涩| 国产一区二区三区视频在线播放| 中文欧美字幕免费| 日本电影亚洲天堂一区| 欧美一区二区三区另类 | 久草热8精品视频在线观看| 国产欧美中文在线| 日本精品视频一区二区三区| 99re热这里只有精品视频| 天堂在线亚洲视频| 久久精品欧美一区二区三区不卡 | 538在线一区二区精品国产| 欧美fxxxxxx另类| 蜜臀久久久久久久| 国产精品久久福利| 欧美日本一区二区三区四区| 亚洲欧洲日韩综合二区| 国产成人一区在线| 一区2区3区在线看| 久久久久久免费网| 欧美私模裸体表演在线观看| 国产精品观看| 激情丁香综合五月| 亚洲欧美另类久久久精品 | 日本一区二区免费在线| 欧美日韩中文字幕一区二区| 在线观看一区欧美| 丁香婷婷综合五月| 日韩和欧美的一区| 国产精品国产三级国产| 91精品国产综合久久蜜臀| 亚洲一区二区三区午夜| 91亚洲国产成人精品一区二区三 | 香蕉av777xxx色综合一区| 99re成人在线| 精品一区二区三区在线播放 | 91亚洲精品久久久蜜桃网站| 精品中文字幕一区二区小辣椒| 亚洲精品免费播放| 亚洲精品一线二线三线| 在线免费亚洲电影| 亚洲国产欧美日韩| 午夜久久美女| 99久久精品国产导航| 国内一区二区在线| 亚洲第一福利一区| 国产精品久久久99| 久久精品视频一区| 日韩欧美中文一区二区| 欧美日韩中文另类| 久久久久久一区二区| 在线观看日韩av电影| 91网上在线视频| 成人午夜看片网址| 国产精品原创巨作av| 奇米在线7777在线精品| 亚洲精品成a人| 中文字幕免费不卡| 精品精品国产高清a毛片牛牛 | 国产精品全国免费观看高清| 精品国产自在久精品国产| 欧美日韩国产综合久久| 色婷婷亚洲精品| 亚洲一卡久久| 国产一区二区三区黄| 在线不卡欧美| 国产精品mv在线观看| 9l国产精品久久久久麻豆| 国产一区二区三区免费看| 久久精品国产999大香线蕉| 人人精品人人爱| 日韩国产成人精品| 爽爽淫人综合网网站 | 欧美~级网站不卡| 99久久婷婷国产| 丁香婷婷综合激情五月色| 国产又粗又猛又爽又黄91精品| 免费视频一区二区| 免费在线观看不卡| 麻豆成人久久精品二区三区红 | 日韩一级免费观看| 欧美一级高清片在线观看| 在线播放91灌醉迷j高跟美女 | 欧美成人a∨高清免费观看| 日韩视频国产视频| 日韩一区二区在线观看视频| 日韩欧美色综合| 欧美不卡激情三级在线观看| 日韩欧美一二三四区| 欧美成人vr18sexvr| 欧美电影免费观看高清完整版| 欧美一区二区三区四区五区 | 天使萌一区二区三区免费观看| 天堂久久一区二区三区| 三级不卡在线观看| 蜜臀av性久久久久蜜臀aⅴ流畅 | 99国产精品久久久久久久成人热 | 色诱亚洲精品久久久久久| 一本到不卡精品视频在线观看| 老司机亚洲精品| 久热这里只精品99re8久| 日本精品视频一区二区| 欧美日韩国产天堂| 欧美一级搡bbbb搡bbbb| 欧美成人video| 国产精品美女久久久久久久| 最新热久久免费视频| 亚洲已满18点击进入久久| 视频一区二区三区入口|