sqlContext.udf.register("getYearFromSeconds", (seconds: Long) => (new SimpleDateFormat("yyyy")).format(1000 * seconds))
val taxi = sqlContext.read.format("com.aerospike.spark.sql").option("aerospike.set", "nyc-taxi-data").load
taxi.createOrReplaceTempView("taxi")
最后,运行我们的查询语句
// Journeys grouped by cab type
val result = sqlContext.sql("SELECT cab_type,count(*) count FROM taxi GROUP BY cab_type")
result.show()
+--------+--------+
|cab_type| count|
+--------+--------+
| green|20000000|
+--------+--------+
// Average fare based on different passenger count
val result = sqlContext.sql("SELECT passenger_cnt, round(avg(total_amount),2) avg_amount FROM taxi GROUP BY passenger_cnt ORDER BY passenger_cnt")
result.show()
+-------------+----------+
|passenger_cnt|avg_amount|
+-------------+----------+
| 0| 10.86|
| 1| 14.63|
| 2| 15.75|
| 3| 15.87|
| 4| 15.85|
| 5| 14.76|
| 6| 15.42|
| 7| 23.74|
| 8| 19.52|
| 9| 34.9|
+-------------+----------+
// No of journeys for different numbers of passengers
val result = sqlContext.sql("SELECT passenger_cnt,getYearFromSeconds(pkup_datetime) trip_year, count(*) count FROM taxi GROUP BY passenger_cnt, getYearFromSeconds(pkup_datetime) order by passenger_cnt");
result.show()
+-------------+---------+--------+
|passenger_cnt|trip_year| count|
+-------------+---------+--------+
| 0| 2014| 4106|
| 1| 2014|16557518|
| 2| 2014| 1473578|
| 3| 2014| 507862|
| 4| 2014| 160714|
| 5| 2014| 939276|
| 6| 2014| 355846|
| 7| 2014| 492|
| 8| 2014| 494|
| 9| 2014| 114|
+-------------+---------+--------+
// Number of trips for each passenger count/distance combination
// Ordered by trip count, descending
val result = sqlContext.sql("SELECT passenger_cnt,getYearFromSeconds(pkup_datetime) trip_year,round(trip_distance) distance,count(*) trips FROM taxi GROUP BY passenger_cnt,getYearFromSeconds(pkup_datetime),round(trip_distance) ORDER BY trip_year,trips desc")