(на примере Kotlin)
Список точек в фазовом пространстве $(x, y, z, v_x, v_y, v_z)$.
com.twosigma.beakerx.kernel.Kernel.showNullExecutionResult = false
data class Point(val x: Double, val y: Double, val z: Double, val vx: Double, val vy: Double, val vz: Double)
val points = ArrayList<Point>()
Пусть есть задача отфильтровать набор точек по какому-то критерию и сгруппировать в гистограмму.
Фильтрация: создаем промежуточный массив
import kotlin.math.pow
val filteredPoints = ArrayList<Point>()
for(point in points){
val absoluteVelocity = (point.vx.pow(2.0) + point.vy.pow(2.0) + point.vz.pow(2.0)).pow(0.5)
if(absoluteVelocity < 0.5){
filteredPoints.add(point)
}
}
Группировка
val groups = HashMap<Double, MutableList<Point>>()
val step = 0.05
for(point in filteredPoints){
val key = floor((point.x.pow(2.0) + point.y.pow(2.0) + point.z.pow(2.0)).pow(0.5) / step )
if(!groups.containsKey(key)){
groups.put(key, ArrayList<Point>())
}
groups[key]!!.add(point)
}
Ну и наконец подсчет
val counts = HashMap<Double, Int>()
for((key,value) in groups){
counts[key] = value.size
}
Создаются две промежуточных коллекции, по размерам больше, чем исходная.
Любое изменение критериев фильтрации или группировки требует переписывания кода.
Много некрасивого кода.
Фильтрация
interface Filter{
fun accept(point: Point): Boolean
}
fun filter(points: List<Point>, filter: Filter): List<Point>{
val filteredPoints = ArrayList<Point>()
for(point in points){
if(filter.accept(point)){
filteredPoints.add(point)
}
}
return filteredPoints
}
val filter = object: Filter{
override fun accept(point: Point): Boolean{
val absoluteVelocity = (point.vx.pow(2.0) + point.vy.pow(2.0) + point.vz.pow(2.0)).pow(0.5)
return absoluteVelocity < 0.5
}
}
val filtered = filter(points, filter)
Теперь мы можем подставлять разные фильтры в зависимости от задачи и даже держать готовый набор фильтров.
Кода стало еще больше
val result = points
.filter{(it.vx.pow(2.0) + it.vy.pow(2.0) + it.vz.pow(2.0)).pow(0.5) < 0.5}
.groupBy{floor((it.x.pow(2.0) + it.y.pow(2.0) + it.z.pow(2.0)).pow(0.5) / step )}
.mapValues{ (_,value) -> value.size}
Можно передать функции как параметры:
fun histogram(points: List<Point>, filter: (Point) -> Boolean, groupBuilder: (Point) -> Double){
return points
.filter(filter)
.groupBy(groupBuilder)
.mapValues{ (_,value) -> value.size}
}
val result = histogram(
points,
filter = {(it.vx.pow(2.0) + it.vy.pow(2.0) + it.vz.pow(2.0)).pow(0.5) < 0.5},
groupBuilder = {floor((it.x.pow(2.0) + it.y.pow(2.0) + it.z.pow(2.0)).pow(0.5) / step )}
)
val Point.r: Double
get = it.vx.pow(2.0) + it.vy.pow(2.0) + it.vz.pow(2.0)).pow(0.5)
val Point.v: Double
get = (it.x.pow(2.0) + it.y.pow(2.0) + it.z.pow(2.0)).pow(0.5)
val result = points
.filter{it.r < 0.5}
.groupBy{floor(it.v / step )}
.mapValues{ (_,value) -> value.size}
Хочу быть владычецей морскою экономить память и не загружать туда все данные сразу, а подгружать их из файла по мере того, как идет обработка.
Q: Что такое большие данные?
A: Никто не знает, но самое простое определение сводится к тому, что это данные, которые не влезают в память.
Q: В чем специфика работы с большими данными
А: Вариантов много, но один из них - это параллельная обработка больших объемов данных из одного источника на разных вычислительных узлах.
Q: Что для этого нужно
A: Ленивая загрузка, обработка и складирование результатов.
Операции выполняются параллельно и не известно, когда какая из них завершится.
// Загрузка одной точки из файла
suspend fun read(): String
suspend fun parse(str: String): Point
// Проверяет остались ли точки в файле
fun hasNextPoint(): Boolean
// Создаем канал, который будет считывать данные
val dataChannel = produce(capacity = 10){
while(hasNextPoint()){
send(read())
}
}
val pointChannel = dataChannel.map{parse(it)}
val filteredChannel = pointChannel.filter{...}
Простой вариант:
val grouping: (Point) -> Int
val result: Map<Int,Int> = filteredChannel
.groupBy(grouping)
.mapValues{ (_,value) -> value.size}
А теперь с визуализацией прогресса:
fun notifyResult(histogram: Map<Int,AtomicLong>)
val job = launch{
val collector = HashMap<Int,AtomicLong>()
while(filteredChannel.isOpen){
val point = filteredChannel.recieve()
val bin = floor(point.v / step )
collector.getOrPut(bin){AtomicLong(0)}
.incrementAndGet()
launch{
notifyResult(collector)
}
}
}