Spark
Spark
- JavaRDD
- JavaPairRDD
- Dataset
- BloomFilter
Simple operation
RDD
Opertion | SQL example | Formatted |
---|---|---|
select | select * from airports | airports |
select | select * from airports limit 3 | airports.take(3) / airports.sample(3) |
select | select id from airports where ident=‘KLAX’ | airports .filter(node -> “KLAX”.compareTo(node.get(“ident”).asText()) |
select | select distinct type from airport | airports .map(node -> node.get(“type”).asText()) .distinct() |
where | select * from airports where iso_region=‘US-CA’ and type=‘seaplane_base’ | airports .filter(node -> “US-CA”.compareTo(node.get(“iso_region”).asText() && “seaplane_base”.compareTo(node.get(“type”).asText()) |
where | select ident, name, municipality from airports where iso_region=‘US-CA’ and type=‘seaplane_base’ | airports .filter(node -> “US-CA”.compareTo(node.get(“iso_region”).asText() && “seaplane_base”.compareTo(node.get(“type”).asText()) .map(node -> Json.copyFields(node, “ident”, “name”, “municipality”) |
order by | select * from airport_freq where airport_ident=‘KLAX’ order by type | airports .filter(node -> “KLAX”.compareTo(node.get(“ident”).asText()) .sortBy(node -> node.get(“type”).asText(), true) |
order by | select * from airport_freq where airport_ident = ‘KLAX’ order by type desc | airports .filter(node -> “KLAX”.compareTo(node.get(“ident”).asText()) .sortBy(node -> node.get(“type”).asText(), false) |
in | select * from airports where type in (‘heliport’, ‘balloonport’) | airports .map(node -> ImmutableSet.of(“heliport”, “balloonport”).contains(node.get(“type”).asText()) |
not in | select * from airports where type not in (‘heliport’, ‘balloonport’) | airports .map(node -> !ImmutableSet.of(“heliport”, “balloonport”).contains(node.get(“type”).asText()) |
group+count | select iso_country, type, count(*) from airports group by iso_country, type order by iso_country, type | airports .mapToPair(node -> Json.copyFields(node, “iso_country”, “type”)) .reduceByKey((x, y) -> x + y); |
group+count+order | select iso_country, type, count() from airports group by iso_country, type order by iso_country, count() desc | airports .mapToPair(node -> Json.copyFields(node, “iso_country”, “type”)) .reduceByKey((x, y) -> x + y) .map(pair -> Json.objectMapper().put(“iso_country”, pair._1.get(“iso_country”).asText()) .put(“type”, pair._1.get(“type”).asText()) .put(“count”, pair._2)) .sortBy(node -> new Tuple2<>(node.get(“iso_country”).asText(), -node.get(“count”).asInt()), false) |
having | select type, count() from airports where iso_country = ‘US’ group by type having count() > 1000 order by count(*) desc | airports .mapToPair(node -> Json.copyFields(node, “iso_country”, “type”)) .reduceByKey((x, y) -> x + y) .map(pair -> Json.objectMapper().put(“iso_country”, pair._1.get(“iso_country”).asText()) .put(“type”, pair._1.get(“type”).asText()) .put(“count”, pair._2)) .filter(node -> node.get(“count”).asInt() > 1000) .sortBy(node -> node.get(“count”).asInt()), false) |
topN | select iso_country from by_country order by size desc limit 10 | by_country .sortBy(node -> node.get(“size”).asText(), true) .map(node -> node.get(“iso_country”).asText()) .take(10) |
topN+offset | select iso_country from by_country order by size desc limit 10 offset 10 | by_country .sortBy(node -> node.get(“size”).asText(), true) .zipWithIndex() .filter(pair -> pair._2 >= 10 && pair._2 < 10+20) .map(node -> node.get(“iso_country”).asText()) |
aggregate | select max(length_ft), min(length_ft), avg(length_ft), median(length_ft) from runways | val columnRDD = runways.map(row -> node.get(“length_ft”).asDouble()); double max = columnRDD.max(); double min = columnRDD.min(); double avg = columnRDD.mean(); double median = columnRDD .mapToDouble(x -> x) .sortBy(x -> x) .zipWithIndex() .mapToDouble(pair -> { long count = columnRDD.count(); long index = pair._2(); double value = pair._1(); if (count % 2 == 0) { if (index == count / 2 - 1) { return value; } else if (index == count / 2) { return (value + columnRDD.zipWithIndex() .filter(p -> p._2() == count / 2 - 1) .mapToDouble(p -> p._1()) .sum()) / 2; } else { return 0; } } else { if (index == count / 2) { return value; } else { return 0; } } }) .sum() / columnRDD.count(); |
join | select airport_ident, type, description, frequency_mhz from airport_freq join airports on airport_freq.airport_ref = airports.id where airports.ident = ‘KLAX’ | val airport_freq_key= airport_freq.keyBy(node -> node.get(“airport_ref”).asText()); JavaPairRDD<String, JsonNode> airports_key = airports.keyBy(node -> node.get(“id”).asText()); airport_freq_key.join(airports_key, “inner_join”) .map(tuple -> Json.objectMaper() .put(“airport_ident”, tuple._1.get(“ident”).asText()) .put(“type”, tuple._1.get(“type”).asText()) .put(“description”, tuple._1.get(“description”).asText()) .put(“frequency_mhz”, tuple._2.get(“frequency_mhz”).asText()) .filter(node -> “KLAX”.compareTo(node.get(“ident”))) |
union | select name, municipality from airports where ident = ‘KLAX’ union all select name, municipality from airports where ident = ‘KLGB’ | val rdd1 = airports.filter(node -> “KLAX”.compareTo(node.get(“ident”).asText())) .map(node -> Json.copyFields(node, “name”, “municipality”)) val rdd2 = airports.filter(node -> “KLGB”.compareTo(node.get(“ident”).asText())) .map(node -> Json.copyFields(node, “name”, “municipality”)) rdd1.union(rdd2) |
Dataset
TBC
MLib
TBC
Streaming
TBC
GraphX
TBC