From 0420cf36b210d3cc32142827e4b02b1bbc721fef Mon Sep 17 00:00:00 2001 From: shenjack <3695888@qq.com> Date: Sat, 24 Feb 2024 23:56:36 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0=E4=BA=86ica=5Ftyping.py?= =?UTF-8?q?=E5=92=8Cmain.rs=E6=96=87=E4=BB=B6=E4=B8=AD=E7=9A=84=E5=87=BD?= =?UTF-8?q?=E6=95=B0=E7=AD=BE=E5=90=8D=EF=BC=8C=E6=B7=BB=E5=8A=A0=E4=BA=86?= =?UTF-8?q?on=5Fdelete=5Fmessage=E3=80=81messageSuccess=E5=92=8CmessageFai?= =?UTF-8?q?led=E5=9B=9E=E8=B0=83=E5=87=BD=E6=95=B0=E3=80=82=E6=B7=BB?= =?UTF-8?q?=E5=8A=A0=E4=BA=86delete=5Fmessage=5Fpy=E5=92=8Csucces=5Fmessag?= =?UTF-8?q?e=E3=80=81failed=5Fmessage=E5=87=BD=E6=95=B0=E6=9D=A5=E5=A4=84?= =?UTF-8?q?=E7=90=86=E7=9B=B8=E5=BA=94=E7=9A=84=E4=BA=8B=E4=BB=B6=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ica-rs/ica_typing.py | 4 +- ica-rs/src/events.rs | 27 ++++++++- ica-rs/src/main.rs | 2 + ica-rs/src/py/call.rs | 87 +++++++++++++++++++++++++++++ ica-rs/src/py/mod.rs | 125 +----------------------------------------- 5 files changed, 117 insertions(+), 128 deletions(-) create mode 100644 ica-rs/src/py/call.rs diff --git a/ica-rs/ica_typing.py b/ica-rs/ica_typing.py index f293182..0f172cd 100644 --- a/ica-rs/ica_typing.py +++ b/ica-rs/ica_typing.py @@ -110,4 +110,6 @@ on_message = Callable[[NewMessage, IcaClient], None] # def on_message(msg: NewMessage, client: IcaClient) -> None: # ... -on_delete_message = Callable[[int, IcaClient], None] +on_delete_message = Callable[[MessageId, IcaClient], None] +# def on_delete_message(msg_id: MessageId, client: IcaClient) -> None: +# ... diff --git a/ica-rs/src/events.rs b/ica-rs/src/events.rs index 3f46f81..1c1c3fc 100644 --- a/ica-rs/src/events.rs +++ b/ica-rs/src/events.rs @@ -43,18 +43,20 @@ pub async fn add_message(payload: Payload, client: Client) { send_message(&client, &reply).await; } // python 插件 - py::new_message_py(&message, &client).await; + py::call::new_message_py(&message, &client).await; } } } /// 撤回消息 -pub async fn delete_message(payload: Payload, _client: Client) { +pub async fn delete_message(payload: Payload, client: Client) { if let Payload::Text(values) = payload { // 消息 id if let Some(value) = values.first() { if let Some(msg_id) = value.as_str() { info!("delete_message {}", format!("{}", msg_id).yellow()); + + py::call::delete_message_py(msg_id.to_string(), &client).await; } } } @@ -77,6 +79,22 @@ pub async fn update_all_room(payload: Payload, _client: Client) { } } +pub async fn succes_message(payload: Payload, _client: Client) { + if let Payload::Text(values) = payload { + if let Some(value) = values.first() { + info!("messageSuccess {}", value.to_string().green()); + } + } +} + +pub async fn failed_message(payload: Payload, _client: Client) { + if let Payload::Text(values) = payload { + if let Some(value) = values.first() { + warn!("messageFailed {}", value.to_string().red()); + } + } +} + /// 所有 pub async fn any_event(event: Event, payload: Payload, _client: Client) { let handled = vec![ @@ -89,11 +107,14 @@ pub async fn any_event(event: Event, payload: Payload, _client: Client) { "addMessage", "deleteMessage", "setAllRooms", + // 也许以后会用到 + "messageSuccess", + "messageFailed", // 忽略的 "notify", "closeLoading", // 发送消息/加载新聊天 有一个 loading "updateRoom", - "syncRead", + // "syncRead", ]; match &event { Event::Custom(event_name) => { diff --git a/ica-rs/src/main.rs b/ica-rs/src/main.rs index 7dc6286..141c639 100644 --- a/ica-rs/src/main.rs +++ b/ica-rs/src/main.rs @@ -55,6 +55,8 @@ async fn main() { .on("message", wrap_callback!(events::connect_callback)) .on("authSucceed", wrap_callback!(events::connect_callback)) .on("authFailed", wrap_callback!(events::connect_callback)) + .on("messageSuccess", wrap_callback!(events::succes_message)) + .on("messageFailed", wrap_callback!(events::failed_message)) .on("onlineData", wrap_callback!(events::get_online_data)) .on("setAllRooms", wrap_callback!(events::update_all_room)) .on("addMessage", wrap_callback!(events::add_message)) diff --git a/ica-rs/src/py/call.rs b/ica-rs/src/py/call.rs new file mode 100644 index 0000000..9aa4ef0 --- /dev/null +++ b/ica-rs/src/py/call.rs @@ -0,0 +1,87 @@ + + +use std::path::PathBuf; + +use tracing::{warn, debug}; +use pyo3::prelude::*; +use rust_socketio::asynchronous::Client; + +use crate::data_struct::MessageId; +use crate::data_struct::messages::NewMessage; +use crate::py::{class, verify_plugins, PyStatus}; + +pub fn get_func<'a>(py_module: &'a PyAny, path: &PathBuf, name: &'a str) -> Option<&'a PyAny> { + // 要处理的情况: + // 1. 有这个函数 + // 2. 没有这个函数 + // 3. 函数不是 Callable + match py_module.hasattr(name) { + Ok(contain) => { + if contain { + match py_module.getattr(name) { + Ok(func) => { + if func.is_callable() { + Some(func) + } else { + warn!("function<{}>: {:#?} in {:?} is not callable", name, func, path); + None + } + }, + Err(e) => { + warn!("failed to get function<{}> from {:?}: {:?}", name, path, e); + None + } + } + } else { + debug!("no function<{}> in module {:?}", name, path); + None + } + } + Err(e) => { + warn!("failed to check function<{}> from {:?}: {:?}", name, path, e); + None + } + } +} + +/// 执行 new message 的 python 插件 +pub async fn new_message_py(message: &NewMessage, client: &Client) { + // 验证插件是否改变 + verify_plugins(); + + let plugins = PyStatus::get_files(); + for (path, (_, py_module)) in plugins.iter() { + let msg = class::NewMessagePy::new(message); + let client = class::IcaClientPy::new(client); + // 甚至实际上压根不需要await这个spawn, 直接让他自己跑就好了(离谱) + tokio::spawn(async move { + Python::with_gil(|py| { + let args = (msg, client); + if let Some(py_func) = get_func(py_module.as_ref(py), &path, "on_message") { + if let Err(e) = py_func.call1(args) { + warn!("failed to call function: {:?}", e); + } + } + }) + }); + } +} + +pub async fn delete_message_py(msg_id: MessageId, client: &Client) { + verify_plugins(); + let plugins = PyStatus::get_files(); + for (path, (_, py_module)) in plugins.iter() { + let msg_id = msg_id.clone(); + let client = class::IcaClientPy::new(client); + tokio::spawn(async move { + Python::with_gil(|py| { + let args = (msg_id.clone(), client); + if let Some(py_func) = get_func(py_module.as_ref(py), &path, "on_delete_message") { + if let Err(e) = py_func.call1(args) { + warn!("failed to call function: {:?}", e); + } + } + }) + }); + } +} diff --git a/ica-rs/src/py/mod.rs b/ica-rs/src/py/mod.rs index f955f1a..23e01b5 100644 --- a/ica-rs/src/py/mod.rs +++ b/ica-rs/src/py/mod.rs @@ -1,16 +1,14 @@ pub mod class; +pub mod call; use std::time::SystemTime; use std::{collections::HashMap, path::PathBuf}; -use futures_util::future::join_all; use pyo3::prelude::*; -use rust_socketio::asynchronous::Client; use tracing::{debug, info, warn}; use crate::client::IcalinguaStatus; use crate::config::IcaConfig; -use crate::data_struct::messages::NewMessage; #[derive(Debug, Clone)] pub struct PyStatus { @@ -194,124 +192,3 @@ pub fn init_py(config: &IcaConfig) { info!("python inited") } - -/// 执行 new message 的 python 插件 -pub async fn new_message_py(message: &NewMessage, client: &Client) { - // 验证插件是否改变 - verify_plugins(); - - let plugins = PyStatus::get_files(); - // let tasks: Vec<_> = plugins.iter().map(|(path, (_, py_module))| { - // let msg = class::NewMessagePy::new(message); - // let client = class::IcaClientPy::new(client); - // let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel(); - // let task = tokio::spawn(async move { - // tokio::select! { - // _ = tokio::spawn(async move {Python::with_gil(|py| { - // let args = (msg, client); - // let async_py_func = py_module.getattr(py, "on_message"); - // match async_py_func { - // Ok(async_py_func) => match async_py_func.as_ref(py).call1(args) { - // Err(e) => { - // warn!("get a PyErr when call on_message from {:?}: {:?}", path, e); - // } - // _ => (), - // }, - // Err(e) => { - // warn!("failed to get on_message function: {:?}", e); - // } - // } - // })}) => (), - // _ = cancel_rx => (), - // } - // }); - // (task, cancel_tx) - // }).collect(); - - // let timeout = tokio::time::sleep(std::time::Duration::from_secs(5)); - // tokio::select! { - // _ = join_all(tasks.map(|(task, _)| task)) => (), - // _ = timeout => { - // warn!("timeout when join all tasks"); - // for (_, cancel_tx) in &tasks { - // let _ = cancel_tx.send(()); - // } - // } - // } - // for (path, (_, py_module)) in plugins.iter() { - // let msg = class::NewMessagePy::new(message); - // let client = class::IcaClientPy::new(client); - // let task = tokio::spawn(async move { - // Python::with_gil(|py| { - // let args = (msg, client); - // let async_py_func = py_module.getattr(py, "on_message"); - // match async_py_func { - // Ok(async_py_func) => match async_py_func.as_ref(py).call1(args) { - // Err(e) => { - // warn!("get a PyErr when call on_message from {:?}: {:?}", path, e); - // } - // _ => (), - // }, - // Err(e) => { - // warn!("failed to get on_message function: {:?}", e); - // } - // } - // }) - // }); - // tokio::select! { - // _ = task => (), - // _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => { - // warn!("timeout when join all tasks"); - // // task.abort(); - // } - - // } - // } - let mut tasks = Vec::with_capacity(plugins.len()); - for (path, (_, py_module)) in plugins.iter() { - let msg = class::NewMessagePy::new(message); - let client = class::IcaClientPy::new(client); - let task = tokio::spawn(async move { - Python::with_gil(|py| { - let args = (msg, client); - let async_py_func = py_module.getattr(py, "on_message"); - match async_py_func { - Ok(async_py_func) => match async_py_func.as_ref(py).call1(args) { - Err(e) => { - warn!("get a PyErr when call on_message from {:?}: {:?}", path, e); - } - _ => (), - }, - Err(e) => { - warn!("failed to get on_message function: {:?}", e); - } - } - }) - }); - tasks.push(task); - } - // 等待所有的插件执行完毕 - // 超时时间为 0.1 秒 - // ~~ 超时则取消所有的任务 ~~ - // 超时就超时了……, 就让他跑着了…… - // 主要是, 这玩意是同步的 还没法取消 - let wait_time = std::time::Duration::from_millis(100); - let awaits = join_all(tasks); - let timeout = tokio::time::sleep(wait_time.clone()); - let await_task = tokio::time::timeout(wait_time.clone(), awaits); - tokio::select! { - _ = await_task => (), - _ = timeout => { - warn!("timeout when join all tasks"); - // for task in tasks { - // task.abort(); - // } - } - } - // match tokio::time::timeout(wait_time.clone(), awaits).await { - // Ok(_) => (), - // Err(e) => { - // warn!("timeout when join all tasks: {:?}", e); - // } - // } -}