Функциональных подход к обработке коллекций

(на примере Kotlin)

Пример

Список точек в фазовом пространстве $(x, y, z, v_x, v_y, v_z)$.

In [ ]:
com.twosigma.beakerx.kernel.Kernel.showNullExecutionResult = false
In [ ]:
data class Point(val x: Double, val y: Double, val z: Double, val vx: Double, val vy: Double, val vz: Double)

val points = ArrayList<Point>()

Пусть есть задача отфильтровать набор точек по какому-то критерию и сгруппировать в гистограмму.

Процедурный подход (Fortran)

Фильтрация: создаем промежуточный массив

In [ ]:
import kotlin.math.pow

val filteredPoints = ArrayList<Point>()

for(point in points){
    val absoluteVelocity = (point.vx.pow(2.0) + point.vy.pow(2.0) + point.vz.pow(2.0)).pow(0.5)
    if(absoluteVelocity < 0.5){
        filteredPoints.add(point)
    }
}

Группировка

In [ ]:
val groups = HashMap<Double, MutableList<Point>>()

val step = 0.05

for(point in filteredPoints){
    val key = floor((point.x.pow(2.0) + point.y.pow(2.0) + point.z.pow(2.0)).pow(0.5) / step )
    if(!groups.containsKey(key)){
        groups.put(key, ArrayList<Point>())
    }
    groups[key]!!.add(point)
}

Ну и наконец подсчет

In [ ]:
val counts = HashMap<Double, Int>()

for((key,value) in groups){
    counts[key] = value.size
}

Что тут плохого?

  • Создаются две промежуточных коллекции, по размерам больше, чем исходная.

  • Любое изменение критериев фильтрации или группировки требует переписывания кода.

  • Много некрасивого кода.

Объектный подход (classic Java)

Фильтрация

In [ ]:
interface Filter{
    fun accept(point: Point): Boolean
}

fun filter(points: List<Point>, filter: Filter): List<Point>{
    val filteredPoints = ArrayList<Point>()

    for(point in points){
        if(filter.accept(point)){
            filteredPoints.add(point)
        }
    }
    return filteredPoints
}
In [ ]:
val filter = object: Filter{
    override fun accept(point: Point): Boolean{
        val absoluteVelocity = (point.vx.pow(2.0) + point.vy.pow(2.0) + point.vz.pow(2.0)).pow(0.5)
        return absoluteVelocity < 0.5
    }
}
val filtered = filter(points, filter)

Что тут хорошего?

Теперь мы можем подставлять разные фильтры в зависимости от задачи и даже держать готовый набор фильтров.

Что тут плохого?

Кода стало еще больше

Функциональный подход (Java 8, Scala)

In [ ]:
val result = points
    .filter{(it.vx.pow(2.0) + it.vy.pow(2.0) + it.vz.pow(2.0)).pow(0.5) < 0.5}
    .groupBy{floor((it.x.pow(2.0) + it.y.pow(2.0) + it.z.pow(2.0)).pow(0.5) / step )}
    .mapValues{ (_,value) -> value.size}

Можно передать функции как параметры:

In [ ]:
fun histogram(points: List<Point>, filter: (Point) -> Boolean, groupBuilder: (Point) -> Double){
    return points
        .filter(filter)
        .groupBy(groupBuilder)
        .mapValues{ (_,value) -> value.size}
}

val result = histogram(
    points, 
    filter = {(it.vx.pow(2.0) + it.vy.pow(2.0) + it.vz.pow(2.0)).pow(0.5) < 0.5},
    groupBuilder = {floor((it.x.pow(2.0) + it.y.pow(2.0) + it.z.pow(2.0)).pow(0.5) / step )}
)

Что тут хорошего?

  • Код сократился почти до нуля
  • Промежуточные коллекции не создаются
  • Гибкое конфигурирование всего процесса

Что плохого?

  • Отладка усложняется по сравнению с процедурным и объектным подходом. Для того, чтобы не делать ошибок, нужна строгая система типов.

Функциональный подход с печеньками (Kotlin)

In [ ]:
val Point.r: Double
    get = it.vx.pow(2.0) + it.vy.pow(2.0) + it.vz.pow(2.0)).pow(0.5)

val Point.v: Double
    get = (it.x.pow(2.0) + it.y.pow(2.0) + it.z.pow(2.0)).pow(0.5)

val result = points
    .filter{it.r < 0.5}
    .groupBy{floor(it.v / step )}
    .mapValues{ (_,value) -> value.size}

Чего еще не хватает для счастья?

Хочу быть владычецей морскою экономить память и не загружать туда все данные сразу, а подгружать их из файла по мере того, как идет обработка.

Подход больших данных

Q: Что такое большие данные?

A: Никто не знает, но самое простое определение сводится к тому, что это данные, которые не влезают в память.

Q: В чем специфика работы с большими данными

А: Вариантов много, но один из них - это параллельная обработка больших объемов данных из одного источника на разных вычислительных узлах.

Q: Что для этого нужно

A: Ленивая загрузка, обработка и складирование результатов.

Операции выполняются параллельно и не известно, когда какая из них завершится.

Map: Параллельные операции (отображения)

In [ ]:
// Загрузка одной точки из файла
suspend fun read(): String
suspend fun parse(str: String): Point

// Проверяет остались ли точки в файле
fun hasNextPoint(): Boolean

// Создаем канал, который будет считывать данные
val dataChannel = produce(capacity = 10){
    while(hasNextPoint()){
        send(read())
    }
}

val pointChannel = dataChannel.map{parse(it)}

val filteredChannel = pointChannel.filter{...}

Reduce: Терминальные операции (редукция)

Простой вариант:

In [ ]:
val grouping: (Point) -> Int

val result: Map<Int,Int> = filteredChannel
    .groupBy(grouping)
    .mapValues{ (_,value) -> value.size}

А теперь с визуализацией прогресса:

In [ ]:
fun notifyResult(histogram: Map<Int,AtomicLong>)

val job = launch{
    val collector = HashMap<Int,AtomicLong>()
    while(filteredChannel.isOpen){
        val point = filteredChannel.recieve()
        val bin = floor(point.v / step )
        collector.getOrPut(bin){AtomicLong(0)}
            .incrementAndGet()
        launch{
            notifyResult(collector)
        }
    }
}