Spark

Spark

  • JavaRDD
  • JavaPairRDD
  • Dataset
  • BloomFilter

Simple operation

RDD

OpertionSQL exampleFormatted
selectselect * from airportsairports
selectselect * from airports limit 3airports.take(3) / airports.sample(3)
selectselect id from airports where ident=‘KLAX’airports
    .filter(node -> “KLAX”.compareTo(node.get(“ident”).asText())
selectselect distinct type from airportairports
    .map(node -> node.get(“type”).asText())
    .distinct()
whereselect * from airports where iso_region=‘US-CA’ and type=‘seaplane_base’airports
    .filter(node -> “US-CA”.compareTo(node.get(“iso_region”).asText() 
        && “seaplane_base”.compareTo(node.get(“type”).asText())
whereselect ident, name, municipality from airports where iso_region=‘US-CA’ and type=‘seaplane_base’airports
    .filter(node -> “US-CA”.compareTo(node.get(“iso_region”).asText() 
        && “seaplane_base”.compareTo(node.get(“type”).asText())
    .map(node -> Json.copyFields(node, “ident”, “name”, “municipality”)
order byselect * from airport_freq where airport_ident=‘KLAX’ order by typeairports
    .filter(node -> “KLAX”.compareTo(node.get(“ident”).asText())
    .sortBy(node -> node.get(“type”).asText(), true)
order byselect * from airport_freq where airport_ident = ‘KLAX’ order by type descairports
    .filter(node -> “KLAX”.compareTo(node.get(“ident”).asText())
    .sortBy(node -> node.get(“type”).asText(), false)
inselect * from airports where type in (‘heliport’, ‘balloonport’)airports
    .map(node -> 
        ImmutableSet.of(“heliport”, “balloonport”).contains(node.get(“type”).asText())
not inselect * from airports where type not in (‘heliport’, ‘balloonport’)airports
    .map(node -> 
        !ImmutableSet.of(“heliport”, “balloonport”).contains(node.get(“type”).asText())
group+countselect iso_country, type, count(*) from airports group by iso_country, type order by iso_country, typeairports
    .mapToPair(node -> Json.copyFields(node, “iso_country”, “type”))
    .reduceByKey((x, y) -> x + y);
group+count+orderselect iso_country, type, count() from airports group by iso_country, type order by iso_country, count() descairports
    .mapToPair(node -> Json.copyFields(node, “iso_country”, “type”))
    .reduceByKey((x, y) -> x + y)
    .map(pair -> Json.objectMapper().put(“iso_country”, pair._1.get(“iso_country”).asText())
    .put(“type”, pair._1.get(“type”).asText())
    .put(“count”, pair._2))
    .sortBy(node -> new Tuple2<>(node.get(“iso_country”).asText(), -node.get(“count”).asInt()), false)  
havingselect type, count() from airports where iso_country = ‘US’ group by type having count() > 1000 order by count(*) descairports
    .mapToPair(node -> Json.copyFields(node, “iso_country”, “type”))
    .reduceByKey((x, y) -> x + y)
    .map(pair -> Json.objectMapper().put(“iso_country”, pair._1.get(“iso_country”).asText())
        .put(“type”, pair._1.get(“type”).asText())
        .put(“count”, pair._2))
    .filter(node -> node.get(“count”).asInt() > 1000)
    .sortBy(node -> node.get(“count”).asInt()), false)
topNselect iso_country from by_country order by size desc limit 10by_country
    .sortBy(node -> node.get(“size”).asText(), true)
    .map(node -> node.get(“iso_country”).asText())
    .take(10)
topN+offsetselect iso_country from by_country order by size desc limit 10 offset 10by_country
    .sortBy(node -> node.get(“size”).asText(), true)
    .zipWithIndex()
    .filter(pair -> pair._2 >= 10 && pair._2 < 10+20)
    .map(node -> node.get(“iso_country”).asText())
aggregateselect max(length_ft), min(length_ft), avg(length_ft), median(length_ft) from runwaysval columnRDD = runways.map(row -> node.get(“length_ft”).asDouble()); 

double max = columnRDD.max();
double min = columnRDD.min();
double avg = columnRDD.mean();
double median = columnRDD
    .mapToDouble(x -> x)
    .sortBy(x -> x)
    .zipWithIndex()
    .mapToDouble(pair -> {
        long count = columnRDD.count();
        long index = pair._2();
        double value = pair._1();
        if (count % 2 == 0) {
            if (index == count / 2 - 1) {
                return value;
            } else if (index == count / 2) {
                return (value + columnRDD.zipWithIndex()
                    .filter(p -> p._2() == count / 2 - 1)
                    .mapToDouble(p -> p._1())
                    .sum()) / 2;
            } else {
                return 0;
            }
        } else {
            if (index == count / 2) {
                return value;
            } else {
                return 0;
            }
        }
    })
    .sum() / columnRDD.count();
joinselect airport_ident, type, description, frequency_mhz from airport_freq join airports on airport_freq.airport_ref = airports.id where airports.ident = ‘KLAX’val airport_freq_key= airport_freq.keyBy(node -> node.get(“airport_ref”).asText());
JavaPairRDD<String, JsonNode> airports_key = airports.keyBy(node -> node.get(“id”).asText());
airport_freq_key.join(airports_key, “inner_join”)
    .map(tuple -> Json.objectMaper()
        .put(“airport_ident”, tuple._1.get(“ident”).asText())
        .put(“type”, tuple._1.get(“type”).asText())
        .put(“description”, tuple._1.get(“description”).asText())
        .put(“frequency_mhz”, tuple._2.get(“frequency_mhz”).asText())
    .filter(node -> “KLAX”.compareTo(node.get(“ident”)))
unionselect name, municipality from airports where ident = ‘KLAX’ union all select name, municipality from airports where ident = ‘KLGB’val rdd1 = airports.filter(node -> “KLAX”.compareTo(node.get(“ident”).asText()))
    .map(node -> Json.copyFields(node, “name”, “municipality”))
val rdd2 = airports.filter(node -> “KLGB”.compareTo(node.get(“ident”).asText()))
    .map(node -> Json.copyFields(node, “name”, “municipality”))
rdd1.union(rdd2)

Dataset

TBC

MLib

TBC

Streaming

TBC

GraphX

TBC