通过 JNI 实现 Java 与 Rust 的 Channel 消息传递

发布于:2024-11-29 ⋅ 阅读:(22) ⋅ 点赞:(0)

做纯粹的自己。“你要搞清楚自己人生的剧本——不是父母的续集,不是子女的前传,更不是朋友的外篇。对待生命你不妨再大胆一点,因为你好歹要失去它。如果这世上真有奇迹,那只是努力的另一个名字”。

一、crossbeam_channel

参考 crossbeam_channel - Rust

crossbeam_channel 是一个多生产者多消费者通道,用于消息传递,它是std::sync::mpsc的替代品,具有更多的功能和更好的性能。

二、Channel 类型

通道可以使用两个函数创建:

  1. bounded 函数创建一个容量有限的信道,即一个信道一次可以容纳的消息数量是有限制的。
  2. unbounded 函数创建一个容量无界的信道,即它一次可以容纳任意数量的消息。

这两个函数都返回一个发送方 Sender 和一个接收方 Receiver,它们代表通道的相反两端。

创建一个有界 Channel:

use crossbeam_channel::bounded;

// Create a channel that can hold at most 5 messages at a time.
let (s, r) = bounded(5);

// Can send only 5 messages without blocking.
for i in 0..5 {
    s.send(i).unwrap();
}

// Another call to `send` would block because the channel is full.
// s.send(5).unwrap();

创建一个无界 Channel:

use crossbeam_channel::unbounded;

// Create an unbounded channel.
let (s, r) = unbounded();

// Can send any number of messages into the channel without blocking.
for i in 0..1000 {
    s.send(i).unwrap();
}

三、通过 JNI 使用 Channel

Java 端可以通过 JNI 调用 getSender 获取发送端指针,调用 sendMessage 发送消息到 Rust 中的处理线程,由 Rust 负责处理核心逻辑。

1、新建一个 Rust 库项目

cargo new rust_jni_channel_test --lib

添加依赖包, 

# Cargo.toml

[dependencies]
jni = "0.21.1"
lazy_static = "1.5.0"
crossbeam-channel = "0.5.13"
#log = "0.4"
#env_logger = "0.11"

[lib]
crate_type = ["cdylib"]

实现 JNI 模块函数, 

// lib.rs

#[macro_use]
extern crate lazy_static;

use jni::objects::{JClass, JObject};
use jni::sys::{jlong, jobject};
use jni::JNIEnv;
use crossbeam_channel::{unbounded, Sender, Receiver};
use std::thread;

lazy_static! {
    static ref SENDER: Sender<String> = {
        let (sender, receiver) = unbounded();
        // Spawn a thread to handle the receiver
        thread::spawn(move || {
            for message in receiver.iter() {
                println!("Received message: {}", message);
            }
        });
        sender
    };
}

#[no_mangle]
pub extern "system" fn Java_com_yushanma_MyResultHandler_getSender(
    _env: JNIEnv,
    _class: JClass,
) -> jlong {
    let sender_ptr = Box::into_raw(Box::new(SENDER.clone())) as jlong;
    sender_ptr
}

#[no_mangle]
pub extern "system" fn Java_com_yushanma_MyResultHandler_sendMessage(
    mut env: JNIEnv,
    _class: JClass,
    sender_ptr: jlong,
    message: JObject,
) {
    let sender = unsafe { &*(sender_ptr as *mut Sender<String>) };
    let message: String = env.get_string(&message.into()).expect("Couldn't get java string!").into();
    match sender.send(message) {
        Ok(_) => println!("Message sent successfully"),
        Err(e) => eprintln!("Failed to send message: {:?}", e),
    }
}

以上代码主要做了三件事情: 

1、定义静态变量

  • 使用lazy_static!宏定义一个静态变量SENDER,它是一个Sender<String>类型的通道发送端。
  • 创建一个无界通道(unbounded channel),返回一个发送端和接收端。
  • 启动一个新线程,在该线程中不断从接收端读取消息并打印出来。
  • 返回发送端sender

2、实现JNI函数:获取发送端:Java_com_yushanma_MyResultHandler_getSender

  • 这个函数名遵循JNI命名规则,表示这是一个供Java调用的本地方法。
  • 函数参数包括JNI环境指针_env和Java类_class
  • 将静态变量SENDER克隆后转换为原始指针,再将其转换为jlong类型返回给Java。

 3、实现JNI函数:发送消息Java_com_yushanma_MyResultHandler_sendMessage

  • 这个函数同样遵循JNI命名规则。
  • 参数包括JNI环境指针env、Java类_class、发送端指针sender_ptr和Java字符串对象message
  • sender_ptr转换回Sender<String>类型的引用。
  • 从Java字符串对象中提取出Rust字符串。
  • 尝试通过发送端发送消息,如果成功则打印成功信息,否则打印错误信息。

2、新建一个 Maven 项目

package com.yushanma;

import java.io.IOException;

/**
 * @version: V1.0
 * @author: 余衫马
 * @description: 主函数
 * @data: 2024-11-22 19:24
 **/

public class MyResultHandler {

    private static native long getSender();

    private static native void sendMessage(long senderPtr, String message);

    // 用于加载包含本地方法实现的本地库。
    static {
        System.loadLibrary("rust_jni_channel_test");
    }

    public static void main(String[] args) throws IOException {

        long senderPtr = getSender();

        new Thread(() -> {
            for (int i = 0; i < 100; i++) {
                sendMessage(senderPtr, String.format("Hello from Java! COUNT * %d", i + 1));
            }
        }).start();

        new Thread(() -> {
            for (int i = 100; i < 200; i++) {
                sendMessage(senderPtr, String.format("Hello from Java! COUNT * %d", i + 1));
            }
        }).start();

        System.in.read();
    }
}

 我们声明了两个本地方法getSendersendMessage,它们分别用于获取发送端指针和发送消息,对应 Rust 库中封装的两个 Native 方法。然后在 main 函数中,启动两个新线程,每个线程发送100条消息到Rust端,并且使用System.in.read()阻塞主线程,以防止程序过早退出。

3、添加外部 libs 与 VM 启动参数

-Djava.library.path=D:\JWorkSpace\ChannelDeomo\libs

 通过 VM 参数指定外部库路径,否则会直接使用默认路径,会报错找不到 dll 文件。

4、运行效果

四、封装

1、新建 Callback 类

package com.yushanma.callback;

import java.time.LocalDateTime;

/**
 * @version: V1.0
 * @author: 余衫马
 * @description: 回调类
 * @data: 2024-11-22 19:30
 **/
public class MyCallback {

    // 用于加载包含本地方法实现的本地库。
    static {
        System.loadLibrary("rust_jni_channel_test");
    }

    /**
     * Sender 指针
     */
    private final long senderPtr;

    /**
     * 获取 Sender 指针
     *
     * @return
     */
    private static native long getSender();

    /**
     * 发送信息
     *
     * @param senderPtr Sender 指针
     * @param message   信息内容
     */
    private static native void sendMessage(long senderPtr, String message);


    /**
     * 默认构造方法
     */
    public MyCallback() {
        senderPtr = getSender();
    }

    /**
     * 回调方法
     */
    public void callback(String out) {
        sendMessage(senderPtr, String.format("%s callback data: %s, senderPtr %d", LocalDateTime.now(), out, senderPtr));
    }


}
package com.yushanma;

import com.yushanma.callback.MyCallback;

import java.io.IOException;
import java.util.UUID;

/**
 * @version: V1.0
 * @author: 余衫马
 * @description: 主函数
 * @data: 2024-11-22 19:24
 **/

public class MyResultHandler {


    public static void main(String[] args) throws IOException {

        new Thread(() -> {

            MyCallback myCallback = new MyCallback();
            for (int i = 0; i < 10; i++) {
                myCallback.callback(UUID.randomUUID().toString());
            }
        }).start();

        new Thread(() -> {

            MyCallback myCallback = new MyCallback();
            for (int i = 0; i < 10; i++) {
                myCallback.callback(UUID.randomUUID().toString());
            }
        }).start();

        System.in.read();
    }
}

2、Rust 修改命名路径

#[macro_use]
extern crate lazy_static;

use jni::objects::{JClass, JObject};
use jni::sys::{jlong, jobject};
use jni::JNIEnv;
use crossbeam_channel::{unbounded, Sender, Receiver};
use std::thread;

lazy_static! {
    static ref SENDER: Sender<String> = {
        let (sender, receiver) = unbounded();
        // Spawn a thread to handle the receiver
        thread::spawn(move || {
            for message in receiver.iter() {
                println!("Received message: {}", message);
            }
        });
        sender
    };
}

#[no_mangle]
pub extern "system" fn Java_com_yushanma_callback_MyCallback_getSender(
    _env: JNIEnv,
    _class: JClass,
) -> jlong {
    let sender_ptr = Box::into_raw(Box::new(SENDER.clone())) as jlong;
    sender_ptr
}

#[no_mangle]
pub extern "system" fn Java_com_yushanma_callback_MyCallback_sendMessage(
    mut env: JNIEnv,
    _class: JClass,
    sender_ptr: jlong,
    message: JObject,
) {
    let sender = unsafe { &*(sender_ptr as *mut Sender<String>) };
    let message: String = env.get_string(&message.into()).expect("Couldn't get java string!").into();
    match sender.send(message) {
        Ok(_) => println!("Message sent successfully"),
        Err(e) => eprintln!("Failed to send message: {:?}", e),
    }
}

路径要保持一致,否则调用本地方法时会报错找不到函数。

3、运行效果

五、内存释放

在 Java 中,JVM(Java Virtual Machine)管理着内存的分配和释放。对于纯 Java 对象,JVM 的垃圾回收机制会自动处理对象的生命周期。然而,当涉及到与本地代码(如 Rust 或 C/C++)交互时,需要特别注意资源的管理。

在这个示例中,MyCallback 是一个通过 JNI(Java Native Interface)或类似技术与 Rust 代码交互的类。senderPtr 是一个指向本地(Rust)资源的指针。当 MyCallback 实例 lib 被垃圾回收时,JVM 并不会自动知道如何释放 senderPtr 所指向的本地资源。这意味着我们需要手动管理这些资源,以避免内存泄漏。

1、显式释放资源

在 MyCallback 类中提供一个方法,用于显式释放本地资源。

package com.yushanma.callback;

import java.time.LocalDateTime;

/**
 * @version: V1.0
 * @author: 余衫马
 * @description: 回调类
 * @data: 2024-11-22 19:30
 **/
public class MyCallback {

    // 用于加载包含本地方法实现的本地库。
    static {
        System.loadLibrary("rust_jni_channel_test");
    }

    /**
     * Sender 指针
     */
    private long senderPtr;

    /**
     * 获取 Sender 指针
     *
     * @return
     */
    private static native long getSender();

    /**
     * 发送信息
     *
     * @param senderPtr Sender 指针
     * @param message   信息内容
     */
    private static native void sendMessage(long senderPtr, String message);

    /**
     * 释放资源
     * @param senderPtr Sender 指针
     */
    private static native void releaseSender(long senderPtr);

    /**
     * 默认构造方法
     */
    public MyCallback() {
        senderPtr = getSender();
    }

    /**
     * 回调方法
     */
    public void callback(String out) {
        sendMessage(senderPtr, String.format("%s callback data: %s, senderPtr %d", LocalDateTime.now(), out, senderPtr));
    }

    /**
     * 释放资源
     */
    public void release(){
        releaseSender(senderPtr);
        senderPtr = 0;
    }

    /**
     * 重载 Object 类的 finalize 方法
     * 不推荐依赖 finalize 方法,因为它的执行时间是不确定的,但作为最后的防线,可以在 finalize 方法中释放资源。
     * @throws Throwable
     */
    @Override
    protected void finalize() throws Throwable {
        try {
            if (senderPtr != 0) {
                releaseSender(senderPtr);
            }
        } finally {
            super.finalize();
        }
    }
}

2、使用 AutoCloseable 接口

package com.yushanma.callback;

import java.time.LocalDateTime;

/**
 * @version: V1.0
 * @author: 余衫马
 * @description: 回调类
 * @data: 2024-11-22 19:30
 **/
public class MyCallback implements AutoCloseable  {

    // 用于加载包含本地方法实现的本地库。
    static {
        System.loadLibrary("rust_jni_channel_test");
    }

    /**
     * Sender 指针
     */
    private long senderPtr;

    /**
     * 获取 Sender 指针
     *
     * @return
     */
    private static native long getSender();

    /**
     * 发送信息
     *
     * @param senderPtr Sender 指针
     * @param message   信息内容
     */
    private static native void sendMessage(long senderPtr, String message);

    /**
     * 释放资源
     * @param senderPtr Sender 指针
     */
    private static native void releaseSender(long senderPtr);

    /**
     * 默认构造方法
     */
    public MyCallback() {
        senderPtr = getSender();
    }

    /**
     * 回调方法
     */
    public void callback(String out) {
        sendMessage(senderPtr, String.format("%s callback data: %s, senderPtr %d", LocalDateTime.now(), out, senderPtr));
    }

    
//    /**
//     * 释放资源
//     */
//    public void release(){
//        releaseSender(senderPtr);
//        senderPtr = 0;
//    }
//
//    /**
//     * 重载 Object 类的 finalize 方法
//     * 不推荐依赖 finalize 方法,因为它的执行时间是不确定的,但作为最后的防线,可以在 finalize 方法中释放资源。
//     * @throws Throwable
//     */
//    @Override
//    protected void finalize() throws Throwable {
//        try {
//            if (senderPtr != 0) {
//                releaseSender(senderPtr);
//            }
//        } finally {
//            super.finalize();
//        }
//    }

    /**
     * 释放资源
     */
    @Override
    public void close() {
        if (senderPtr != 0) {
            releaseSender(senderPtr);
            senderPtr = 0;
        }
    }
    
}
try (MyCallback myCallback = new MyCallback()) {
    for (int i = 0; i < 10; i++) {
        myCallback.callback(UUID.randomUUID().toString());
    }
} // 自动调用 myCallback.close() 方法

3、Rust 释放内存逻辑

添加一个新的 JNI 函数来释放 Sender,并确保在释放时不会发生内存泄漏或其他问题。

#[macro_use]
extern crate lazy_static;

use jni::objects::{JClass, JObject};
use jni::sys::{jlong, jobject};
use jni::JNIEnv;
use crossbeam_channel::{unbounded, Sender, Receiver};
use std::thread;

lazy_static! {
    static ref SENDER: Sender<String> = {
        let (sender, receiver) = unbounded();
        // Spawn a thread to handle the receiver
        thread::spawn(move || {
            for message in receiver.iter() {
                println!("Received message: {}", message);
            }
        });
        sender
    };
}

#[no_mangle]
pub extern "system" fn Java_com_yushanma_callback_MyCallback_getSender(
    _env: JNIEnv,
    _class: JClass,
) -> jlong {
    let sender_ptr = Box::into_raw(Box::new(SENDER.clone())) as jlong;
    sender_ptr
}

#[no_mangle]
pub extern "system" fn Java_com_yushanma_callback_MyCallback_sendMessage(
    mut env: JNIEnv,
    _class: JClass,
    sender_ptr: jlong,
    message: JObject,
) {
    let sender = unsafe { &*(sender_ptr as *mut Sender<String>) };
    let message: String = env.get_string(&message.into()).expect("Couldn't get java string!").into();
    match sender.send(message) {
        Ok(_) => println!("Message sent successfully"),
        Err(e) => eprintln!("Failed to send message: {:?}", e),
    }
}


#[no_mangle]
pub extern "system" fn Java_com_yushanma_callback_MyCallback_releaseSender(
    _env: JNIEnv,
    _class: JClass,
    sender_ptr: jlong,
) {
    if sender_ptr != 0 {
        unsafe {
            // Convert the raw pointer back to a Box and drop it
            let _ = Box::from_raw(sender_ptr as *mut Sender<String>);
        }
        println!("Sender released");
    }
}

释放内存的关键在这里:

let _ = Box::from_raw(sender_ptr as *mut Sender<String>);

这行代码的作用是将一个原始指针(raw pointer)转换为一个 Box,从而恢复对堆上分配的内存的所有权。在 Rust 中,Box 是一个智能指针类型,用于管理堆上的内存。具体来说,这行代码执行了以下操作:

  1. 类型转换sender_ptr as *mut Sender<String> 将 sender_ptr 转换为一个可变的原始指针,指向 Sender<String> 类型的数据。
  2. 从原始指针创建 BoxBox::from_raw 接受一个原始指针,并返回一个 Box,从而接管该指针所指向的内存的所有权。

完整的解释如下:

  • sender_ptr 是一个原始指针,通常是通过某种方式获得的,例如通过 Box::into_raw 方法将一个 Box 转换为原始指针。
  • as *mut Sender<String> 是一个类型转换,将 sender_ptr 转换为一个指向 Sender<String> 的可变原始指针。
  • Box::from_raw(sender_ptr as *mut Sender<String>) 使用这个原始指针创建一个 Box<Sender<String>>,这样 Rust 的所有权系统就可以正确地管理这块内存。

需要注意的是,使用 Box::from_raw 时要确保:

  • 原始指针确实指向有效的堆内存。
  • 该内存之前是通过 Box 分配的。
  • 在调用 Box::from_raw 后,不再使用原始指针,因为 Box 会负责释放这块内存。

4、运行效果