This post will be helpful for those who encountered similar problems as described below:
The Problem: PySpark Matrix Multiplication Fails Due to Out of Memory Error
Calculate interaction score between user and webpage document in terms of common topics.
- document-topic data is small
- user-topic is not available, but user-visited document(80GB, 2bil rows) is available.
While processing 80GB data with 2 billion rows in PySpark, I kept encountering out of memory error.
Reasons for failure:
- There is no method that supports sparse matrix multiplication
pyspark.mllib.linalg.distributed.BlockMatrix.multiply is implemented with dense matrix, hence much overhead in memory.
PySpark source code lines 978, 1254-1262:
class BlockMatrix(DistributedMatrix):
def multiply(self, other):
"""
Left multiplies this BlockMatrix by `other`, another
BlockMatrix. The `colsPerBlock` of this matrix must equal the
`rowsPerBlock` of `other`. If `other` contains any SparseMatrix
blocks, they will have to be converted to DenseMatrix blocks.
The output BlockMatrix will only consist of DenseMatrix blocks.
This may cause some performance issues until support for
multiplying two sparse matrices is added.
"""
- Custom sparse matrix multiplication method with NumPy failed. (blog link)
After going though rigorous trial and error process, I’ve finally found a solution and modified it. Below is my answer to matrix multiplication with large scale data.
The Solution: Custom Sparse Matrix Multiplication in C++
- Create sparse matrix for user-topic and document-topic
- Create document-topic matrix from document-topic data
- (Multi process) Create user-topic matrix by joining user-document data and document-topic matrix
- Loop over user-topic and find matching document-topic to calculate interaction score between user and document
I’ve made this user-topic matrix creation process faster by multi processing. Below are the details.
1 Use less memory & Faster Access: Sparse matrix implemented in c++’s unordered_map
In order to ensure fast access to items, sparse matrix was implemented in unordered_map. Unordered_map is a key-value map with keys hashed to ensure O(1) search and access time. Vector consumes less memory but takes longer to find item.
struct pairhash {
public:
template <typename T, typename U>
std::size_t operator()(const std::pair<T, U> &x) const
{
return std::hash<T>()(x.first) ^ std::hash<U>()(x.second);
}
};
// <document_id, <topic_id, confidence_level>>
typedef std::unordered_map<int, std::vector<std::pair<int, float>>> document_topic_map;
// <<uuid, topic_id>, sum_confidence_level>
typedef std::unordered_map<std::pair<int, int>, float, pairhash> user_topic_map;
Here, the key is a user_id-topic_id pair as in x,y coordinates. The value is a confidence level in topic.
2 Make it faster: Multi processing while ensuring thread safety
- race condition: a problematic situation where a different sequence of threads accessing the same chunk of code could result in different output. Imagine a chat application without handling race condition. A types ‘apple’ and B types ‘bus’. The output: ‘abpupsle’.
When each thread updates the same object, it could interfere with other thread updating exactly the same object. (race condition)
“Nondeterminism = parallel processing + shared state”
Martin Odersky (Scala designer)
In order to prevent this problem, I’ve created an object for each thread. user_topic_map_set
is a vector of sparse user-topic matrices. Inevitably, there could be some overlaps in user ids. Row related to the same user id should actually be merged together, but they are separated over the matrices. Therefore, looping over all the matrices is necessary when calculating interaction score between user-topic and document-topic.
std::vector<unordered_map<std::pair<int, int>, float, pairhash>> gen_user_topic_map_set(
std::unordered_map<int, std::vector<std::pair<int, float>>> *doc_topic_map)
{
std::vector<unordered_map<std::pair<int, int>, float, pairhash>> user_topic_map_set;
string filename = "../input/page_views.csv.gz";
unsigned int num_thread = 5;
int num_row = 2034275448/num_thread + 1; //406,855,090
//init user_topic_map
for (int i = 0; i < num_thread; ++i) {
unordered_map<std::pair<int, int>, float, pairhash> user_topic_map;
user_topic_map_set.push_back(user_topic_map);
}
//start thread
std::vector<std::thread> thread_list;
for (int i = 0; i < num_thread; ++i) {
thread_list.push_back(std::thread(gen_user_topic_map,
i,
&user_topic_map_set[i],
filename,
(i * num_row + 1),
((1 + i) * num_row),
&(*doc_topic_map)));
}
//finish thread
for (auto &t: thread_list) {
t.join();
}
return user_topic_map_set;
}
Further developments:
Instead of reading through all the lines until the target line, modifying codes to read from specific line could be possible. I’ve testing this idea with seekg, with no success.
References: