Matrix Multiplication with Large Scale Data

go to project

This post will be helpful for those who encountered similar problems as described below:

The Problem: PySpark Matrix Multiplication Fails Due to Out of Memory Error

Calculate interaction score between user and webpage document in terms of common topics.

While processing 80GB data with 2 billion rows in PySpark, I kept encountering out of memory error.

Reasons for failure:

PySpark source code lines 978, 1254-1262:

class BlockMatrix(DistributedMatrix):
  def multiply(self, other):
      """
      Left multiplies this BlockMatrix by `other`, another
      BlockMatrix. The `colsPerBlock` of this matrix must equal the
      `rowsPerBlock` of `other`. If `other` contains any SparseMatrix
      blocks, they will have to be converted to DenseMatrix blocks.
      The output BlockMatrix will only consist of DenseMatrix blocks.
      This may cause some performance issues until support for
      multiplying two sparse matrices is added.
      """

After going though rigorous trial and error process, I’ve finally found a solution and modified it. Below is my answer to matrix multiplication with large scale data.
 

 

 

 

The Solution: Custom Sparse Matrix Multiplication in C++

I’ve made this user-topic matrix creation process faster by multi processing. Below are the details.

1 Use less memory & Faster Access: Sparse matrix implemented in c++’s unordered_map

In order to ensure fast access to items, sparse matrix was implemented in unordered_map. Unordered_map is a key-value map with keys hashed to ensure O(1) search and access time. Vector consumes less memory but takes longer to find item.

struct pairhash {
public:
    template <typename T, typename U>
    std::size_t operator()(const std::pair<T, U> &x) const
    {
        return std::hash<T>()(x.first) ^ std::hash<U>()(x.second);
    }
};

// <document_id, <topic_id, confidence_level>>
typedef std::unordered_map<int, std::vector<std::pair<int, float>>> document_topic_map;
// <<uuid, topic_id>, sum_confidence_level>
typedef std::unordered_map<std::pair<int, int>, float, pairhash> user_topic_map;

Here, the key is a user_id-topic_id pair as in x,y coordinates. The value is a confidence level in topic.

2 Make it faster: Multi processing while ensuring thread safety

When each thread updates the same object, it could interfere with other thread updating exactly the same object. (race condition)

“Nondeterminism = parallel processing + shared state”

Martin Odersky (Scala designer)

In order to prevent this problem, I’ve created an object for each thread. user_topic_map_set is a vector of sparse user-topic matrices. Inevitably, there could be some overlaps in user ids. Row related to the same user id should actually be merged together, but they are separated over the matrices. Therefore, looping over all the matrices is necessary when calculating interaction score between user-topic and document-topic.

std::vector<unordered_map<std::pair<int, int>, float, pairhash>> gen_user_topic_map_set(
        std::unordered_map<int, std::vector<std::pair<int, float>>> *doc_topic_map)
{
    std::vector<unordered_map<std::pair<int, int>, float, pairhash>> user_topic_map_set;
    string filename = "../input/page_views.csv.gz";

    unsigned int num_thread = 5;
    int num_row = 2034275448/num_thread + 1; //406,855,090

    //init user_topic_map
    for (int i = 0; i < num_thread; ++i) {
        unordered_map<std::pair<int, int>, float, pairhash> user_topic_map;
        user_topic_map_set.push_back(user_topic_map);
    }

    //start thread
    std::vector<std::thread> thread_list;
    for (int i = 0; i < num_thread; ++i) {
        thread_list.push_back(std::thread(gen_user_topic_map,
                                i,
                                &user_topic_map_set[i],
                                filename,
                                (i * num_row + 1),
                                ((1 + i) * num_row),
                                &(*doc_topic_map)));
    }

    //finish thread
    for (auto &t: thread_list) {
        t.join();
    }
    return user_topic_map_set;
}
Further developments:

Instead of reading through all the lines until the target line, modifying codes to read from specific line could be possible. I’ve testing this idea with seekg, with no success.

References: