Data Analytics

The One Billion Row Challenge in Julia | by Vikas Negi | Jun, 2024


A recent release of Julia such as 1.10 is recommended. For those wanting to use a notebook, the repository shared above also contains a Pluto file, for which Pluto.jl needs to be installed. The input data file for the challenge is unique for everyone and needs to be generated using this Python script. Keep in mind that the file is about 15 GB in size.

python3 create_measurements.py 1000000000

Additionally, we will be running benchmarks using the BenchmarkTools.jl package. Note that this does not impact the challenge, it’s only meant to collect proper statistics to measure and quantify the performance of the Julia code.

The structure of the input data file measurements.txt is as follows (only the first five lines are shown):

attipūdi;-49.2
Bas Limbé;-43.8
Oas;5.6
Nesebar;35.9
Saint George’s;-6.6

The file contains a billion lines (also known as rows or records). Each line has a station name followed by the ; separator and then the recorded temperature. The number of unique stations can be up to 10,000. This implies that the same station appears on multiple lines. We therefore need to collect all the temperatures for all distinct stations in the file, and then calculate the required statistics. Easy, right?

Let’s start slow but simple

My first attempt was to simply parse the file one line at a time, and then collect the results in a dictionary where every station name is a key and the temperatures are added to a vector of Float64 to be used as the value mapped to the key. I expected this to be slow, but our aim here is to get a number for the baseline performance.

Once the dictionary is ready, we can calculate the necessary statistics:

The output of all the data processing needs to be displayed in a certain format. This is achieved by the following function:

Since this implementation is expected to take long, we can run a simple test by timing @time the following only once:

@time get_stations_dict_v2("measurements.txt") |> calculate_output_v3 |> print_output_v1

<output omitted for brevity> 526.056399 seconds (3.00 G allocations: 302.881 GiB, 3.68% gc time)

Our poor man’s implementation takes about 526 seconds, so ~ 9 minutes. It’s definitely slow, but not that bad at all!

Taking it up a notch — Enter multithreading!

Instead of reading the input file one line at a time, we can try to split it into chunks, and then process all the chunks in parallel. Julia makes it quite easy to implement a parallel for loop. However, we need to take some precautions while doing so.

Before we get to the loop, we first need to figure out how to split the file into chunks. This can be achieved using memory mapping to read the file. Then we need to determine the start and end positions of each chunk. It’s important to note that each line in the input data file ends with a new-line character, which has 0x0a as the byte representation. So each chunk should end at that character to ensure that we don’t make any errors while parsing the file.

The following function takes the number of chunksnum_chunksas an input argument, then returns an array with each element as the memory mapped chunk.

Since we are parsing station and temperature data from different chunks, we also need to combine them in the end. Each chunk will first be processed into a dictionary as shown before. Then, we combine all chunks as follows:

Now we know how to split the file into chunks, and how we can combine the parsed dictionaries from the chunks at the end. However, the desired speedup can only be obtained if we are also able to process the chunks in parallel. This can be done in a for loop. Note that Julia should be started with multiple threads julia -t 12 for this solution to have any impact.

Additionally, we now want to run a proper statistical benchmark. This means that the challenge should be executed a certain number of times, and we should then be able to visualize the distribution of the results. Thankfully, all of this can be easily done with BenchmarkTools.jl. We cap the maximum number of samples to 10, maximum time for the total run to be 20 minutes and enable garbage collection (will free up memory) to execute between samples. All of this can be brought together in a single script. Note that the input arguments are now the name of the file fname and the number of chunks num_chunks.

Benchmark results along with the inputs used are shown below. Note that we have used 12 threads here.

julia> Threads.nthreads()
12

julia> ARGS = ["measurements.txt", "48"]
2-element Vector{String}:
"measurements.txt"
"48"

12 threads, number of chunks = 48 (Image by author)

Multi-threading provides a big performance boost, we are now down to roughly over 2 minutes. Let’s see what else we can improve.

Avoiding storing all temperature data

Until now, our approach has been to store all the temperatures, and then determine the required statistics (min, mean and max) at the very end. However, the same can already be achieved while we parse every line from the input file. We replace existing values each time a new value which is either larger (for maximum) or smaller (for minimum) is found. For mean, we sum all the values and keep a separate counter as to how many times a temperature for a given station has been found.

Overall, out new logic looks like the following:

The function to combine all the results (from different chunks) also needs to be updated accordingly.

Let’s run a new benchmark and see if this change improves the timing.

12 threads, number of chunks = 48 (Image by author)

The median time seems to have improved, but only slightly. It’s a win, nonetheless!

More performance enhancement

Our previous logic to calculate and save the mix, max for temperature can be further simplified. Moreover, following the suggestion from this Julia Discourse post, we can make use of views (using @view ) when parsing the station names and temperature data. This has also been discussed in the Julia performance manual. Since we are using a slice expression for parsing every line, @view helps us avoid the cost of allocation and copying.

Rest of the logic remains the same. Running the benchmark now gives the following:

12 threads, number of chunks = 48 (Image by author)

Whoa! We managed to reach down to almost a minute. It seems switching to a view does make a big difference. Perhaps, there are further tweaks that could be made to improve performance even further. In case you have any suggestions, do let me know in the comments.

Restricting ourselves only to base Julia was fun. However, in the real world, we will almost always be using packages and thus making use of existing efficient implementations for performing the relevant tasks. In our case, CSV.jl (parsing the file in parallel) and DataFrames.jl (performing groupby and combine) will come in handy.

The function below performs the following tasks:

  • Use Mmap to read the large file
  • Split file into a predefined number of chunks
  • Loop through the chunks, read each chunk in parallel using CSV.read (12 threads passed to ntasks) into a DataFrame.
  • Use DataFrame groupby and combine to get the results for each station
  • Concatenate all DataFrames to combine results from all chunks
  • Once outside the loop, perform a groupby and combine again to get the final set of results for all stations.

We can now run the benchmark in the same manner as before.

12 threads, number of chunks = 48, using external packages (Image by author)

The performance using CSV.jl and DataFrames.jl is quite good, albeit slower than our base Julia implementation. When working on real world projects, these packages are an essential part of a data scientist’s toolkit. It would thus be interesting to explore if further optimizations are possible using this approach.



Source

Related Articles

Back to top button