(Spark Connect)使用Aerospike的 Spark 连接器
admin
2024-01-21 02:17:46
0

前言

Aerospike是一个高度可扩展的键值数据库,可提供同类产品中最佳的性能。它在实时业务环境中通常部署管理TB到PB数据量。

Aerospike通常与其他可扩展的分布式软件(例如,用于系统耦合的Kafka或用于分析的Spark)一起运行。Aerospike提供的 Aerospike Connect 附件使集成变得很容易。

本文通过使用 aerospike-ansible 讨论了 Aerospike Spark Connect 在实际中的工作方式,并提供一个全面且易于复制的端到端示例。

数据库集群设置

首先看一下 Ansible for Aerospike,它解释了如何使用 aerospike-ansible。

在此示例中,我在 vars/cluster-config.yml 中将 cluster_instance_type 设置为 c5d.18xlarge。

按照说明进行操作,直到并包括一键式设置,最后我们会运行到

 

ansible-playbook aws-setup-plus-aerospike-install.yml

ansible-playbook aerospike-java-client-setup.yml

这会产生一个3个节点的群集,以及一个安装了相关软件的客户端实例。

Spark 集群设置

这是通过

ansible-playbook spark-cluster-setup.yml

对于此示例,在运行之前,我在 vars / cluster-config.yml 中将 spark_instance_type 设置为 c5d.4xlarge

该脚本创建了一个3节点的给定实例类型的Spark集群,在其中已安装并运行了Spark,它还安装了Aerospike Spark Connect。

请注意,您需要设置 enterprise:true,并通过 vars/cluster-config.yml 中的 feature_key:/your/path/to/key 提供有效的Aerospike功能密钥的路径。因此,您必须是Aerospike的授权客户,或者必须正在Aerospike企业版试用期。

在过程即将结束时,您会看到

 

TASK [Spark master IP & master internal url] ************************************************************************************************************************************************************************

ok: [localhost] => {

"msg": "Spark master is 3.88.237.103. Spark master internal url is spark://10.0.2.122:7077."

}

记下Spark master内部网址,稍后需要。

加载数据

我们的示例利用了来自10亿纽约出租车司机库的2000万条记录,这些记录以压缩形式提供,网址为 https://aerospike-ken-tune.s3.amazonaws.com/nyc-taxi-data/trips_xaa.csv.gz 。我们使用安装在上面设置的客户端计算机上的 Aerospike loader 加载程序把数据加载到Aerospike。

首先,我们获得Aerospike集群的节点的地址,稍后需要这些地址。

source ./scripts/cluster-ip-address-list.sh
 

Adds cluster ips to this array- AERO_CLUSTER_IPS

Use as ${ AERO_CLUSTER_IPS[index]}

There are 3 entries

##########################################################

cluster IP addresses : Public : 3.87.14.39, Private : 10.0.2.58

cluster IP addresses : Public : 3.89.113.231, Private : 10.0.0.234

cluster IP addresses : Public : 23.20.193.64, Private : 10.0.1.95

aerospike loader 加载器需要一个配置文件才能将数据加载到Aerospike中,这会将 csv 列位置映射到命名和类型化的bin,样本条目看起来像

 

{

"name": "pkup_datetime",

"value": {

"column_position": 3,

"type": "timestamp",

"encoding": "yyyy-MM-dd hh:mm:ss",

"dst_type": "integer"

}

}

在 repos/aerospike-spark-demo/nyc-taxi-data-aero-loader-config.json 的仓库中提供了此功能,我们将此上传到客户端实例。

 

source ./scripts/client-ip-address-list.sh

scp -i .aws.pem ./recipes/aerospike-spark-demo/nyc-taxi-data-aero-loader-config.json ec2-user@${AERO_CLIENT_IPS[0]}:~

接下来,将数据放入客户端计算机。有多种方法可以执行此操作,但是您需要进行规划,因为未压缩时数据集为7.6GB。我使用了以下命令,但是具体情况取决于您的闪存和文件系统的具体情况

 

./scripts/client-quick-ssh.sh # to log in, followed by

sudo mkfs.ext4 /dev/nvme1n1

sudo mkdir /data

sudo mount -t ext4 /dev/nvme1n1 /data

sudo chmod 777 /data

wget -P /data https://aerospike-ken-tune.s3.amazonaws.com/nyc-taxi-data/trips_xaa.csv.gz

gunzip /data/trips_xaa.csv.gz

最后,我们使用上传的配置文件加载数据。

 

cd ~/aerospike-loader

./run_loader -h 10.0.0.234 -p 3000 -n test -c ~/nyc-taxi-data-aero-loader-config.json /data/trips_xaa.csv

请注意,我们使用的是我们先前记录的群集IP地址之一。

使用Spark

登录到Spark的一个节点,通过aerospike-ansible中的一个工具脚本

/scripts/spark-quick-ssh.sh

使用我们在运行Spark集群安装手册时看到的Spark主URL启动Spark Shell。

/spark/bin/spark-shell --master spark://10.0.2.122:7077

导入相关库

 

import org.apache.spark.sql.{ SQLContext, SparkSession, SaveMode}

import org.apache.spark.SparkConf

import java.util.Date

import java.text.SimpleDateFormat

提供Aerospike配置。请注意,我们在这里使用了之前的集群IP:

 

spark.conf.set("aerospike.seedhost", "10.0.0.234")

spark.conf.set("aerospike.namespace", "test")

定义一个视图,以及我们将要使用的功能

 

val sqlContext = spark.sqlContext

sqlContext.udf.register("getYearFromSeconds", (seconds: Long) => (new SimpleDateFormat("yyyy")).format(1000 * seconds))

val taxi = sqlContext.read.format("com.aerospike.spark.sql").option("aerospike.set", "nyc-taxi-data").load

taxi.createOrReplaceTempView("taxi")

最后,运行我们的查询语句

 

// Journeys grouped by cab type

val result = sqlContext.sql("SELECT cab_type,count(*) count FROM taxi GROUP BY cab_type")

result.show()

+--------+--------+

|cab_type| count|

+--------+--------+

| green|20000000|

+--------+--------+

// Average fare based on different passenger count

val result = sqlContext.sql("SELECT passenger_cnt, round(avg(total_amount),2) avg_amount FROM taxi GROUP BY passenger_cnt ORDER BY passenger_cnt")

result.show()

+-------------+----------+

|passenger_cnt|avg_amount|

+-------------+----------+

| 0| 10.86|

| 1| 14.63|

| 2| 15.75|

| 3| 15.87|

| 4| 15.85|

| 5| 14.76|

| 6| 15.42|

| 7| 23.74|

| 8| 19.52|

| 9| 34.9|

+-------------+----------+

// No of journeys for different numbers of passengers

val result = sqlContext.sql("SELECT passenger_cnt,getYearFromSeconds(pkup_datetime) trip_year, count(*) count FROM taxi GROUP BY passenger_cnt, getYearFromSeconds(pkup_datetime) order by passenger_cnt");

result.show()

+-------------+---------+--------+

|passenger_cnt|trip_year| count|

+-------------+---------+--------+

| 0| 2014| 4106|

| 1| 2014|16557518|

| 2| 2014| 1473578|

| 3| 2014| 507862|

| 4| 2014| 160714|

| 5| 2014| 939276|

| 6| 2014| 355846|

| 7| 2014| 492|

| 8| 2014| 494|

| 9| 2014| 114|

+-------------+---------+--------+

// Number of trips for each passenger count/distance combination

// Ordered by trip count, descending

val result = sqlContext.sql("SELECT passenger_cnt,getYearFromSeconds(pkup_datetime) trip_year,round(trip_distance) distance,count(*) trips FROM taxi GROUP BY passenger_cnt,getYearFromSeconds(pkup_datetime),round(trip_distance) ORDER BY trip_year,trips desc")

result.show()

+-------------+---------+--------+-------+

|passenger_cnt|trip_year|distance| trips|

+-------------+---------+--------+-------+

| 1| 2014| 1.0|5321230|

| 1| 2014| 2.0|3500458|

| 1| 2014| 3.0|2166462|

| 1| 2014| 4.0|1418494|

| 1| 2014| 5.0| 918460|

| 1| 2014| 0.0| 868210|

| 1| 2014| 6.0| 653646|

| 1| 2014| 7.0| 488416|

| 2| 2014| 1.0| 433746|

| 1| 2014| 8.0| 345728|

| 2| 2014| 2.0| 305578|

| 5| 2014| 1.0| 302120|

| 1| 2014| 9.0| 226278|

| 5| 2014| 2.0| 199968|

| 2| 2014| 3.0| 199522|

| 1| 2014| 10.0| 163928|

| 3| 2014| 1.0| 145580|

| 2| 2014| 4.0| 137152|

| 5| 2014| 3.0| 122714|

| 1| 2014| 11.0| 117570|

+-------------+---------+--------+-------+

only showing top 20 rows

结论

这篇文章向您展示了可以很快的启动并运行一个大型数据集。该示例处理了二千万行数据,并很容易扩展到整个数据集。我们还可以看到您可以快速启动并运行 Aerospike-ansible 工具。

相关内容

热门资讯

拜年的来历? 拜年的来历?那你是卖。... 那你是卖。 展开 传说远古时代有一种叫“年”的怪物,每逢腊月三十...
工作压力有利于成长的事例 工作压力有利于成长的事例 把李嘉诚的传记,推荐给您,我以前看过一多半,很有感触,希望可以帮助到您
苏武牧羊北海边的北海指的是哪里... 苏武牧羊北海边的北海指的是哪里?贝加尔湖,中国古代称为北海,位于俄罗斯西伯利亚南部简介贝加尔湖是亚欧...
购物狂电影的最后一幕那个弃婴 ... 购物狂电影的最后一幕那个弃婴 是怎么回事?是主角的小时候吧,讲她为什么是购物狂,因为她小时候被抛弃在...
方法决定成败 方法决定成败没错,方法决定成败,态度决定一切,心态决定效率。送你这几句名言,以资鼓励。
霞秋的意思?想表达晚年幸福? 霞秋的意思?想表达晚年幸福?霞秋的意思?想表达晚年幸福?... 霞秋的意思?想表达晚年幸福? 展...
关于“欣赏”的名言和事例 关于“欣赏”的名言和事例19世纪末,美国西部的密苏里有一个坏孩子,他偷偷地向邻居家的窗户扔石头,还把...
自己发的语音总是不敢去听,怎么... 自己发的语音总是不敢去听,怎么让自己勇敢去听呢?怎么说呢,其实你录下来的声音就是别人听到的,只是因为...
格物致知的真正意义是什么? 格物致知的真正意义是什么?提示:可以从文章内容中直接找到答案。? 参考答案:格物致知的真正...
醒悟中心卢卫斌老师谈强迫症的治... 醒悟中心卢卫斌老师谈强迫症的治疗为什么需要禅修练习  强迫症是一种复合型心理障碍,有其独特的障碍模型...
宣化那有好玩的地方 宣化那有好玩的地方有吗? 「莫等闲,白了少年头。空b切」
水深水浅东西涧 云去云来远近山... 水深水浅东西涧 云去云来远近山是什么意思  涧水或东或西时深时浅,山峦亦近云雾盘桓.  作品原文  ...
我的妹妹不可能那么可爱 讲的是... 我的妹妹不可能那么可爱 讲的是什么?就是有一个长的很可爱明明可以很受欢迎【也确实很受欢迎】的妹纸爱好...
童话诗有什么特点 童话诗有什么特点 童话诗(Fairy tale poem),故事诗的一种,现代诗的一种体裁。是以童话...
一块姜在挡车的是什么歌 一块姜在挡车的是什么歌一块姜在挡车的是什么歌感应:泳儿的歌我听的不多,但这首歌尤其的熟悉是一个人生活...
MP3出问题 MP3出问题我选其他歌,它都会跳回去原来那首歌,有时候还更厉害,显示这首歌的文件名字,唱另外一首歌出...
在金庸的设定里,一个武林高手可... 在金庸的设定里,一个武林高手可以打赢100个人已经是极限,但500个武林高手一定打不过1000名普第...
唐诗人张九龄"海上生明月天涯天... 唐诗人张九龄"海上生明月天涯天涯共此时"一诗全文?望月汪渗怀远 唐· 张九龄海上生明月,天涯共此时。...
暗里着迷歌曲深意? 暗里着迷歌曲深意?歌曲《暗里着迷》是刘德华在1993年发行的一首歌,他一直称这一首歌是自己半生中最重...
好听的名字 好听的名字 好听的名字有:圣杰,俊楠,皓轩,雨泽,智宸,明杰,峻熙,泽林,沛荣,嘉浩。1、圣杰:“圣...