Python-bloggers

R One Billion Row Challenge: Is R Viable Option for Analyzing Huge Datasets?

This article was first published on Appsilon | Enterprise R Shiny Dashboards , and kindly contributed to python-bloggers. (You can report issue about the content on this page here)
Want to share your content on python-bloggers? click here.

Note: Thank you Kamil Żyła for providing guidance and expertise in writing R code for this article.

R, being single-threaded in nature, isn’t the fastest programming language out there. You have options when it comes to parallelism, but these often don’t reduce the runtime as much as you’d want.

So when the official 1 Billion Row Challenge popped up, our interest went through the roof! The challenge doesn’t allow for third-party packages. We’ll start the article by obeying the rules, but later, we’ll see what sort of performance benefits DataTable and DuckDB offer. For reference, the fastest Java implementation takes 1.535 seconds, and pure Python is around 10-15 times slower than that.

Without much ado, let’s see how R stacks up!

Can you parallelize data frame operations in R? You most definitely can with the R doParallel package!

Table of contents:

1 Billion Row Challenge Introduction

The idea of the challenge is simple – Go through a 1 billion row (13.8GB) text file that contains arbitrary temperature measurements and calculate summary statistics for each station (min, mean, and max). The data is generated by a script and is not downloadable from the web.

Once processed, you should end up with a text output similar to the following:

Image 1 – Sample 1BRC output

The official challenge author runs the submissions on a Hetzner Cloud CCX33 instance (8 dedicated vCPU, 32 GB RAM). We don’t have access to it, so we’ll use a powerful 16” M3 Pro Macbook Pro with 12 CPU cores and 36 GB of RAM. Your results will be different if using different hardware.

You now know what the challenge is. Up next, we’ll continue with the base R solution that uses no external packages.

Base R Implementation – No Third-Party Libraries

As you know by now, 1BRC doesn’t allow third-party packages. That’s a problem because you’ll have to reinvent the wheel. It’s not a realistic scenario by any means, since the whole industry relies on these packages when writing data pipelines.

Still, let’s see what R has to offer. We’ll write a couple of completely manual solutions first, and then opt for a couple of convenient built-in R functions.

Single-Threaded Implementation

This one takes some time to write, but is simple to read and understand. There are no fancy R-specific functions in it, and if you’re familiar with any programming language, this implementation should feel right at home.

We start by creating a new environment and then iterating over the text file one line at a time. Summary statistics are computed and updated on each iteration. For the mean, we’re keeping track of the sum and count, which are then divided to get the average value:

neutral_values <- c(100, -100, 0, 0)

process_file <- function(file_name) {
  result <- new.env()
  con <- file(file_name, "r")

  while (length(line <- readLines(con, n = 1, warn = FALSE)) > 0) {
    parts <- strsplit(line, split = ";", fixed = TRUE)[[1]]
    station_name <- parts[1]
    measurement <- as.numeric(parts[2])

    values <- get0(station_name, result, ifnotfound = neutral_values)
    result[[station_name]] <- c(
      min(values[[1]], measurement),
      max(values[[2]], measurement),
      values[[3]] + measurement,
      values[[4]] + 1
    )
  }
  close(con)

  lines <- lapply(sort(names(result)), function(station_name) {
    values <- result[[station_name]]
    min_value <- values[[1]]
    max_value <- values[[2]]
    mean_value <- values[[3]] / values[[4]]
    sprintf("%s=%.1f/%.1f/%.1f", station_name, min_value, mean_value, max_value)
  })
  output <- paste0("{", paste(lines, collapse = ", "), "}")
  print(output)
}


process_file("measurements.txt")

Multi-Threaded Implementation

The doParallel package in R allows you to parallelize an R function across multiple CPU cores. The challenge states no third-party packages are allowed, but this one has nothing to do with data processing, so we’ll consider it fair.

The core idea is the same as in the single-threaded implementation, but you now need to split the text file into equally sized chunks and process the chunks in parallel. You also need to calculate summary statistics for each chunk, and then combine the statistics for the final output:

library(foreach)
library(doParallel)

n_workers <- detectCores() - 1
cluster <- makeCluster(n_workers)
registerDoParallel(cluster)


get_file_chunks <- function(file_name) {
  file_size <- file.info(file_name)$size
  chunk_size <- ceiling(file_size / n_workers)

  con <- file(file_name, "r")
  boundaries <- lapply(seq_len(n_workers - 1), function(i) {
    boundary <- i * chunk_size
    seek(con, boundary)
    line <- readLines(con, n = 1)
    boundary + nchar(line) + 1
  })
  close(con)

  boundaries <- c(0, boundaries, file_size)
  lapply(seq_len(n_workers), function(i) {
    list(
      start_pos = boundaries[[i]],
      end_pos = boundaries[[i + 1]]
    )
  })
}

process_file <- function(file_name, max_cpu = 12) {
  process_chunk <- function(file_name, start_pos, end_pos) {
    neutral_values <- c(100, -100, 0, 0)
    result <- new.env()
    con <- file(file_name, "r")
    seek(con, start_pos)

    while (seek(con) < end_pos) {
      line <- readLines(con, n = 1, warn = FALSE)
      parts <- strsplit(line, split = ";", fixed = TRUE)[[1]]
      station_name <- parts[1]
      measurement <- as.numeric(parts[2])

      values <- get0(station_name, result, ifnotfound = neutral_values)
      result[[station_name]] <- c(
        min(values[[1]], measurement),
        max(values[[2]], measurement),
        values[[3]] + measurement,
        values[[4]] + 1
      )
    }
    close(con)
    return(result)
  }

  chunks <- get_file_chunks(file_name)
  result <- foreach(i = chunks) %dopar% {
    process_chunk(
      file_name = file_name,
      start_pos = i$start_pos,
      end_pos = i$end_pos
    )
  }

  result_fmt <- list()
  for (chunk_res in result) {
    for (station_name in names(chunk_res)) {
      if (!(station_name %in% names(result_fmt))) {
        result_fmt[[station_name]] <- chunk_res[[station_name]]
      } else {
        result_fmt[[station_name]][1] <- min(result_fmt[[station_name]][1], chunk_res[[station_name]][1])
        result_fmt[[station_name]][3] <- result_fmt[[station_name]][3] + chunk_res[[station_name]][3]
        result_fmt[[station_name]][2] <- max(result_fmt[[station_name]][2], chunk_res[[station_name]][2])
        result_fmt[[station_name]][4] <- result_fmt[[station_name]][4] + chunk_res[[station_name]][4]
      }
    }
  }

  for (station_name in names(result_fmt)) {
    result_fmt[[station_name]][5] <- result_fmt[[station_name]][3] / result_fmt[[station_name]][4]
  }

  result_fmt <- result_fmt[order(names(result_fmt))]

  output <- "{"
  for (station_name in names(result_fmt)) {
    min_value <- result_fmt[[station_name]][1]
    mean_value <- result_fmt[[station_name]][5]
    max_value <- result_fmt[[station_name]][2]
    output <- paste0(output, station_name, "=", sprintf("%.1f/%.1f/%.1f, ", min_value, mean_value, max_value))
  }

  output <- substr(output, 1, nchar(output) - 2)
  output <- paste0(output, "}")
  print(output)

  stopCluster(cluster)
}


process_file("measurements.txt")

More Elegant Base R Implementations

Both of the previous implementations are long and don’t leverage any built-in functions R is famous for. In this section, we’ll explore two R-specific implementations, one using `aggregate()` and the other using `tapply()`. Both will be called on a data frame that has been read into memory by using `read.delim()`.

Let’s take a look into `aggregate()` first:

process_file <- function(file_name) {
  df <- read.delim(
    file = file_name,
    header = FALSE,
    sep = ";",
    col.names = c("station_name", "measurement")
  )

  summarize <- function(x) {
    c(
      min = min(x),
      mean = mean(x),
      max = max(x)
    )
  }

  res <- aggregate(measurement ~ station_name, data = df, FUN = summarize)
  res <- do.call(data.frame, res)
  
  output <- "{"
  for (i in 1:nrow(res)) {
    row <- res[i, ]
    output <- paste0(output, row$station_name, "=", sprintf("%.1f/%.1f/%.1f, ", row$measurement.min, row$measurement.mean, row$measurement.max))
  }
  output <- substr(output, 1, nchar(output) - 2)
  output <- paste0(output, "}")
  print(output)
}


process_file("measurements.txt")

The `aggregate()` function returns two columns – station name and the matrix of measurements. This matrix needs to be expanded into multiple columns before printing (`do.call()`).

Up next, let’s take a look at the `tapply()` example:

process_file <- function(file_name) {
  df <- read.delim(
    file = file_name,
    header = FALSE,
    sep = ";",
    col.names = c("station_name", "measurement")
  )
  
  summarize <- function(x) {
    c(
      min = min(x),
      mean = mean(x),
      max = max(x)
    )
  }
  
  res <- do.call("rbind", tapply(df$measurement, df$station_name, summarize))
  res <- data.frame(res)
  res$station_name <- row.names(res)
  row.names(res) <- NULL
  
  output <- "{"
  for (i in 1:nrow(res)) {
    row <- res[i, ]
    output <- paste0(output, row$station_name, "=", sprintf("%.1f/%.1f/%.1f, ", row$min, row$mean, row$max))
  }
  output <- substr(output, 1, nchar(output) - 2)
  output <- paste0(output, "}")
  print(output)
}


process_file("measurements.txt")

The `tapply()` function calculates statistical summaries by a group based on the levels of one or several factors, making it an ideal candidate here.

R 1 Billion Row Challenge – Using Third-Party Packages

Onto the third-party R packages now! These are typically used in R pipelines to increase code readability, reduce runtime, and minimize the amount of code you need to write.

Dplyr

The dplyr package is used and loved by many in R community. It has an intuitive and beginner-friendly syntax, which means any newcomer will feel comfortable in a matter of days. Still, it’s a tool built for convenience, not for performance.

That being said, it does allow you to read and aggregate the data in a single command, due to the pipe operator:

library(dplyr)

process_file <- function(file_name) {
  df <- read.csv(
    file = file_name,
    header = FALSE,
    sep = ";",
    col.names = c("station_name", "measurement")
  ) %>%
    group_by(station_name) %>%
    summarise(
      min_measurement = min(measurement),
      mean_measurement = mean(measurement),
      max_measurement = max(measurement)
    ) %>%
    arrange(station_name)


  output <- "{"
  for (i in 1:nrow(df)) {
    row <- df[i, ]
    output <- paste0(output, row$station_name, "=", sprintf("%.1f/%.1f/%.1f, ", row$min_measurement, row$mean_measurement, row$max_measurement))
  }
  output <- substr(output, 1, nchar(output) - 2)
  output <- paste0(output, "}")
  print(output)
}


process_file("measurements.txt")

Data.Table

In R, most developers turn to the data.table package when dplyr becomes too slow. There’s even a dtplyr package that combines the best of both worlds, but it’s out of the scope for today’s article.

In plain English, `data.table` is an enhanced version of `data.frame` and allows for faster data lookups, analysis, and aggregation. It does have a steeper learning curve when compared to dplyr, and that’s the reason why most developers opt for dtplyr.

But for a simple data aggregation the syntax isn’t all that different:

library(data.table)

process_file <- function(file_name) {
  dt <- fread(file_name, sep = ";", col.names = c("station_name", "measurement"))
  dt_summary <- dt[, .(
    min_measurement = min(measurement),
    mean_measurement = mean(measurement),
    max_measurement = max(measurement)
  ),
  by = station_name
  ]

  setkey(dt_summary, station_name)

  output <- "{"
  for (i in 1:nrow(dt_summary)) {
    row <- dt_summary[i, ]
    output <- paste0(
      output, row$station_name, "=",
      sprintf("%.1f/%.1f/%.1f, ", row$min_measurement, row$mean_measurement, row$max_measurement)
    )
  }
  output <- substr(output, 1, nchar(output) - 2)
  output <- paste0(output, "}")
  print(output)
}


process_file("measurements.txt")

DuckDB

And finally, let’s discuss DuckDB. It’s an in-memory relational OLAP DBMS designed to go through huge amounts of data blazing fast. Our recent comparison of DuckDB and dplyr concluded that the prior is almost 20 times faster while requiring minimal code changes.

DuckDB allows you to use dplyr-like syntax or SQL. We’ll opt for the latter today, just to mix things up:

library(duckdb)
library(stringr)

process_file <- function(file_name) {
  df <- dbGetQuery(
    conn = dbConnect(duckdb()),
    statement = str_interp("
        select
            station_name,
            min(measurement) as min_measurement,
            cast(avg(measurement) as decimal(8, 1)) as mean_measurement,
            max(measurement) as max_measurement
        from read_csv(
            '${file_name}',
            header=false,
            columns={'station_name': 'varchar', 'measurement': 'decimal(8, 1)'},
            delim=';',
            parallel=true
        )
        group by station_name
        order by station_name
    ")
  )

  output <- "{"
  for (i in 1:nrow(df)) {
    row <- df[i, ]
    output <- paste0(output, row$station_name, "=", sprintf("%.1f/%.1f/%.1f, ", row$min_measurement, row$mean_measurement, row$max_measurement))
  }
  output <- substr(output, 1, nchar(output) - 2)
  output <- paste0(output, "}")
  print(output)
}

process_file("measurements.txt")

Congratulations – you’ve made it to the end of the package comparison and coding part of the article! Up next, we’ll compare the results.

Benchmark Results – One Billion Row Challenge in R

After navigating through countless lines of code, it’s time to take a look at the average runtime results. For reference, the results you’re about to see are an average of 3 runs for each data processing method. The hardware of choice was a 16” M3 Pro MacBook Pro with 12 CPU cores and 36 GB of RAM:

Image 2 – Average runtime results for R 1BRC

There’s a clear reason why everyone and their mothers use third-party packages. It requires you to write less code and reduces runtime by orders of magnitude.

Vanilla (single-threaded) R implementation was by far the slowest – averaging around 50 minutes. Distributing the workload over 11 CPU cores reduced the average runtime to about 17 minutes.

No matter what we tried to do, the `aggregate()` method approach always ran out of memory.

The real winner here is DuckDB, providing on average ~ 335x faster runtime when compared to single-threaded R implementation, or ~ 115x faster runtime when compared to multi-threaded implementation.

Summing up R One Billion Row Challenge

To conclude, R isn’t the fastest programming language out there, nor was it ever designed to be one. However, the sheer amount of R-specific functions (`apply` family) and third-party packages make it a viable solution for processing huge amounts of data. Today you’ve seen this claim proven, as we managed to process 1 billion rows of data in under 10 seconds!

The biggest bottleneck here is the data itself. No one would ever (we hope) store such a huge amount of records in an uncompressed plain text file. Alternative file formats, such as Apache Parquet, would reduce the file size appropriately 5 times, which would then have the effect of further reducing the data processing time.

What are your thoughts on the 1 Billion Row Challenge in R? Have you managed to reduce the runtime even further? Make sure to let us know in our Shiny community on Slack.

Dealing with slow and frustrating data.frame computations? We explain how R and doParallel allow you to make it faster.

The post appeared first on appsilon.com/blog/.

To leave a comment for the author, please follow the link and comment on their blog: Appsilon | Enterprise R Shiny Dashboards .

Want to share your content on python-bloggers? click here.
Exit mobile version