Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rust query modules with parallelisation #328

Open
risufaj opened this issue Aug 25, 2023 · 4 comments · May be fixed by #336
Open

Rust query modules with parallelisation #328

risufaj opened this issue Aug 25, 2023 · 4 comments · May be fixed by #336
Labels
community community Effort - High Effort - High feature feature Frequency - Monthly Frequency - Monthly lang: rust Issue on Rust codebase Priority - Later Priority - Later Reach - VeryFew Reach - VeryFew Severity - S3 Severity - S3

Comments

@risufaj
Copy link

risufaj commented Aug 25, 2023

Hello,

I've been trying to develop a query module in rust that tries to parallelize some loops using the rayon library. The parallelization loops involve accessing nodes and edges. However, I get this error : G cannot be shared between threads safely where G is a MemgraphGraph object. I've seen that some of the query modules in C++ have parallelisation, but I haven't been able to find anything for rust.

Here is some code to reproduce this.
Cargo.toml

name = "parallel-example"
version = "0.1.0"
edition = "2018"

[dependencies]
c_str_macro = "1.0.2"
rayon = "1.5"


rsmgp-sys = { git = "https://github.com/memgraph/mage.git", tag="v1.7.0"}

[lib]
name = "parallel_example"
crate-type = ["cdylib"]

The src folder contains these files:
lib.rs

mod example;

use crate::example::MemgraphGraph;
use crate::example::example as example_algorithm;
use c_str_macro::c_str;
use rsmgp_sys::memgraph::*;
use rsmgp_sys::mgp::*;
use rsmgp_sys::result::*;
use rsmgp_sys::rsmgp::*;
use rsmgp_sys::value::*;
use rsmgp_sys::{close_module, define_optional_type, define_procedure, define_type, init_module};
use std::collections::{HashMap};
use std::ffi::{CString};
use std::os::raw::c_int;
use std::panic;

init_module!(|memgraph: &Memgraph| -> Result<()> {
    memgraph.add_read_procedure(
        example,
        c_str!("example"),
        &[define_type!("node_list", Type::List, Type::Int),],
        &[],
        &[
            define_type!("node_id", Type::Int),
        ],
    )?;
    Ok(())
});

fn write_nodes_to_records(memgraph: &Memgraph, nodes: Vec<i64>) -> Result<()> {
    for node_id in nodes {
        let record = memgraph.result_record()?;
        record.insert_int(c_str!("node_id"), node_id)?;
    }
    Ok(())
}

define_procedure!(example, |memgraph: &Memgraph| -> Result<()> {
    let args = memgraph.args()?;
    let Value::List(node_list) = args.value_at(0)? else {
        panic!("Failed to read node_list")
    };



    let node_list: Vec<i64> = node_list
        .iter()?
        .map(|value| match value {
            Value::Int(i) => i as i64,
            _ => panic!("Failed converting node_list to vector"),
        })
        .collect();

    let graph = MemgraphGraph::from_graph(memgraph);

    let result = example_algorithm(
        graph,
        &node_list
    );
    write_nodes_to_records(memgraph, result)?;
    Ok(())
});

close_module!(|| -> Result<()> { Ok(()) });

example.rs

use rsmgp_sys::memgraph::*;
use rsmgp_sys::result::Error as MgpError;
use rsmgp_sys::value::*;
use std::io;
use c_str_macro::c_str;
use rayon::prelude::*;

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct Vertex {
    pub id: i64,
}

#[derive(Debug)]
pub enum GraphError {
    IoError(io::Error),
    MgpError(MgpError),
}

impl From<io::Error> for GraphError {
    fn from(error: io::Error) -> Self {
        Self::IoError(error)
    }
}

impl From<MgpError> for GraphError {
    fn from(error: MgpError) -> Self {
        Self::MgpError(error)
    }
}

pub trait Graph {
    fn vertices_iter(&self) -> Result<Vec<Vertex>, GraphError>;
    fn neighbors(&self, vertex: Vertex) -> Result<Vec<Vertex>, GraphError>;
    fn weighted_neighbors(&self, vertex: Vertex) -> Result<Vec<(Vertex, f64)>, GraphError>;
    fn add_vertex(&mut self, vertex: Vertex) -> Result<(), GraphError>;
    fn add_edge(&mut self, source: Vertex, target: Vertex, weight: f32) -> Result<(), GraphError>;
    fn num_vertices(&self) -> usize;
    fn get_vertex_by_id(&self, id: i64) -> Option<Vertex>;
    fn outgoing_edges(&self, vertex: Vertex) -> Result<Vec<(Vertex, f64)>, GraphError>;
    fn incoming_edges(&self, vertex: Vertex) -> Result<Vec<(Vertex, f64)>, GraphError>;
}

pub struct MemgraphGraph<'a> {
    graph: &'a Memgraph,
}

impl<'a> MemgraphGraph<'a> {
    pub fn from_graph(graph: &'a Memgraph) -> Self {
        Self { graph }
    }
}

impl<'a> Graph for MemgraphGraph<'a> {
    fn vertices_iter(&self) -> Result<Vec<Vertex>, GraphError> {
        let vertices_iter = self.graph.vertices_iter()?;
        let vertices: Vec<_> = vertices_iter.map(|v| Vertex { id: v.id() }).collect();
        Ok(vertices)
    }

    fn incoming_edges(&self, vertex: Vertex) -> Result<Vec<(Vertex, f64)>, GraphError> {
        let vertex_mgp = self.graph.vertex_by_id(vertex.id)?;
        let iter = vertex_mgp.in_edges()?.map(|e| {
            let target_vertex = e.from_vertex().unwrap();
            // if the vertex doesn't have a weight, we assume it's 1.0
            let weight = e
                .property(&c_str!("weight"))
                .ok()
                .and_then(|p| {
                    if let Value::Float(f) = p.value {
                        Some(f)
                    } else {
                        None
                    }
                })
                .unwrap_or(1.0);

            Ok::<(Vertex, f64), GraphError>((
                Vertex {
                    id: target_vertex.id(),
                },
                weight,
            ))
                .unwrap()
        });
        let incoming_edges: Vec<_> = iter.collect();
        Ok(incoming_edges)

    }

    fn outgoing_edges(&self, vertex: Vertex) -> Result<Vec<(Vertex, f64)>, GraphError> {
        let vertex_mgp = self.graph.vertex_by_id(vertex.id)?;
        let outgoing_edges_iter = vertex_mgp.out_edges()?.map(|e| {
            let target_vertex = e.to_vertex().unwrap();
            // if the vertex doesn't have a weight, we assume it's 1.0
            let weight = e
                .property(&c_str!("weight"))
                .ok()
                .and_then(|p| {
                    if let Value::Float(f) = p.value {
                        Some(f)
                    } else {
                        None
                    }
                })
                .unwrap_or(1.0);

            Ok::<(Vertex, f64), GraphError>((
                Vertex {
                    id: target_vertex.id(),
                },
                weight,
            ))
                .unwrap()
        });
        let outgoing_edges: Vec<_> = outgoing_edges_iter.collect();
        Ok(outgoing_edges)
    }

    fn weighted_neighbors(&self, vertex: Vertex) -> Result<Vec<(Vertex,f64)>, GraphError> {
        let mut outgoing_edges = self.outgoing_edges(vertex).unwrap();
        let incoming_edges = self.incoming_edges(vertex).unwrap();

        outgoing_edges.extend(incoming_edges);

        Ok(outgoing_edges)
    }

    fn neighbors(&self, vertex: Vertex) -> Result<Vec<Vertex>, GraphError> {
        let mut neighbors = vec![];
        let vertex_mgp = self.graph.vertex_by_id(vertex.id)?;
        let neighbors_iter = vertex_mgp.out_edges()?.map(|e| e.to_vertex());
        for neighbor_mgp in neighbors_iter {
            neighbors.push(Vertex {
                id: neighbor_mgp?.id(),
            });
        }
        let neighbors_in = vertex_mgp.in_edges()?.map(|e| e.from_vertex());
        for neighbor_mgp in neighbors_in {
            neighbors.push(Vertex {
                id: neighbor_mgp?.id(),
            });
        }
        Ok(neighbors)
    }

    fn add_vertex(&mut self, _vertex: Vertex) -> Result<(), GraphError> {
        !unimplemented!()
    }

    fn add_edge(&mut self, _source: Vertex, _target: Vertex, _weight: f32) -> Result<(), GraphError> {
        // let source_mgp = self.graph.vertex_by_id(source.id)?;
        // let target_mgp = self.graph.vertex_by_id(target.id)?;
        // self.graph.create_edge(source_mgp, target_mgp, weight)?;
        // Ok(())
        !unimplemented!()
    }

    fn num_vertices(&self) -> usize {
        self.graph.vertices_iter().unwrap().count()
    }

    fn get_vertex_by_id(&self, id: i64) -> Option<Vertex> {
        match self.graph.vertex_by_id(id) {
            Ok(_) => Some(Vertex { id }),
            Err(_) => None,
        }
    }
}

pub fn example<G: Graph>(
    graph: G,
    node_list: &[i64]
) -> Vec<i64> {

    node_list.par_iter()
        .filter_map(|&node_id| {
            graph.get_vertex_by_id(node_id)
        })
        .flat_map(|node| {
            graph.neighbors(node).unwrap_or_else(|_| Vec::new())
        })
        .map(|vertex| vertex.id)
        .collect()
}

Any help on how to move this forward would be great.

@gitbuda
Copy link
Member

gitbuda commented Aug 25, 2023

@risufaj very nice module / issue 😃 Thanks for pinging!

Yes, Rust API is not as polished as C/C++, but there is a plan to improve it, this will help 💪

I'll try to deep-dive in the following days, in the meantime, if you have some suggestions on the API side, feel free to dump them here or make a new PR with the improvements 👀

@risufaj
Copy link
Author

risufaj commented Aug 25, 2023

There are definitely workarounds. For example, in my use case, I'm practically doing random walks as an approximations for personalized pagerank. Then, it is enough to calculate the neighbors for each node and store this information in a HashMap for instance, and use rayon to paralellize the loops over those. This is very likely not optimized, but I just tried it today to show that it works. Also, from what I understand, something similar to this is done for the C++ implementation of pagerank.

However, it makes sense to have a better and more general solution, that also doesn't require to first go over the whole graph and then do what you need to do every time the query module is called. I know I'm not proposing anything concrete yet, but we'll get there.

@gitbuda gitbuda added lang: rust Issue on Rust codebase type: enhancement labels Sep 2, 2023
@gitbuda gitbuda linked a pull request Sep 2, 2023 that will close this issue
1 task
@gitbuda
Copy link
Member

gitbuda commented Sep 2, 2023

@risufaj I've put your code into the #336 PR and made it compilable with the Send&Sync trait implementations (just "hints"). But, the underlying object has to be implemented to be safe.

It's not a huge priority for me now, but I'll try to figure out / implement thread safety in the following days/weeks.

If you have any suggestions, just put them here or contribute directly to the PR 😃

@gitbuda gitbuda self-assigned this Sep 2, 2023
@risufaj
Copy link
Author

risufaj commented Sep 6, 2023

Thank you @gitbuda
This does allow now for parallel access to the graph. However, there is no speedup when compared to the workaround that I mentioned before. That's probably to do with the larger usecase though.

@hal-eisen-MG hal-eisen-MG added feature feature and removed enhancement labels Nov 11, 2023
@katarinasupe katarinasupe added community community Effort - Unknown Effort - Unknown Importance - I3 Importance - I3 Severity - S3 Severity - S3 labels Dec 27, 2023
@katarinasupe katarinasupe added Frequency - Monthly Frequency - Monthly Reach - VeryFew Reach - VeryFew and removed Importance - I3 Importance - I3 labels Feb 27, 2024
@gitbuda gitbuda removed their assignment Mar 14, 2024
@gvolfing gvolfing added Effort - High Effort - High and removed Effort - Unknown Effort - Unknown labels May 14, 2024
@hal-eisen-MG hal-eisen-MG added the Priority - Later Priority - Later label May 19, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
community community Effort - High Effort - High feature feature Frequency - Monthly Frequency - Monthly lang: rust Issue on Rust codebase Priority - Later Priority - Later Reach - VeryFew Reach - VeryFew Severity - S3 Severity - S3
Projects
Status: In Progress
Development

Successfully merging a pull request may close this issue.

5 participants