From a114a92cbaf0b3583b80d18387cac6a3f34d1183 Mon Sep 17 00:00:00 2001 From: shenjack <3695888@qq.com> Date: Tue, 11 Feb 2025 23:00:35 +0800 Subject: [PATCH] wtf --- ica-rs/src/ica.rs | 1 + ica-rs/src/main.rs | 27 +++++++++------ ica-rs/src/py/call.rs | 76 ++++++++++++++++++++++++++++++++++++++++--- ica-rs/src/py/mod.rs | 23 +++++++++++-- news.md | 3 ++ 5 files changed, 113 insertions(+), 17 deletions(-) diff --git a/ica-rs/src/ica.rs b/ica-rs/src/ica.rs index 601d0b9..c8e3d53 100644 --- a/ica-rs/src/ica.rs +++ b/ica-rs/src/ica.rs @@ -94,6 +94,7 @@ pub async fn start_ica(config: &IcaConfig, stop_reciver: StopGetter) -> ClientRe } } // 等待停止信号 + event!(Level::INFO, "ica client waiting for stop signal"); stop_reciver.await.ok(); event!(Level::INFO, "socketio client stopping"); match socket.disconnect().await { diff --git a/ica-rs/src/main.rs b/ica-rs/src/main.rs index d6c63e2..9e4272e 100644 --- a/ica-rs/src/main.rs +++ b/ica-rs/src/main.rs @@ -18,6 +18,8 @@ mod tailchat; use config::BotConfig; use tracing::{event, span, Level}; +use crate::py::call; + pub static mut MAIN_STATUS: status::BotStatus = status::BotStatus { config: None, ica_status: None, @@ -52,7 +54,9 @@ pub fn help_msg() -> String { static STARTUP_TIME: OnceLock = OnceLock::new(); -pub fn start_up_time() -> SystemTime { *STARTUP_TIME.get().expect("WTF, why did you panic?") } +pub fn start_up_time() -> SystemTime { + *STARTUP_TIME.get().expect("WTF, why did you panic?") +} /// 获得当前客户端的 id /// 防止串号 @@ -138,13 +142,16 @@ fn main() -> anyhow::Result<()> { tracing_subscriber::fmt().with_max_level(level).init(); - tokio::runtime::Builder::new_multi_thread() + let _ = tokio::runtime::Builder::new_multi_thread() .enable_all() .thread_name("shenbot-rs") - .worker_threads(4) + .worker_threads(10) .build() .unwrap() - .block_on(inner_main()) + .block_on(inner_main()); + + event!(Level::INFO, "shenbot-rs v{} exiting", VERSION); + Ok(()) } async fn inner_main() -> anyhow::Result<()> { @@ -169,7 +176,6 @@ async fn inner_main() -> anyhow::Result<()> { let (ica_send, ica_recv) = tokio::sync::oneshot::channel::<()>(); if bot_config.check_ica() { - event!(Level::INFO, "启动 ica"); let config = bot_config.ica(); tokio::spawn(async move { ica::start_ica(&config, ica_recv).await.unwrap(); @@ -190,18 +196,19 @@ async fn inner_main() -> anyhow::Result<()> { event!(Level::INFO, "未启用 Tailchat"); } - tokio::time::sleep(Duration::from_secs(2)).await; + tokio::time::sleep(Duration::from_secs(1)).await; // 等待一个输入 - event!(Level::INFO, "Press any key to exit"); - let mut input = String::new(); - std::io::stdin().read_line(&mut input).unwrap(); + event!(Level::INFO, "Press ctrl+c to exit, second ctrl+c to force exit"); + tokio::signal::ctrl_c().await.ok(); ica_send.send(()).ok(); tailchat_send.send(()).ok(); event!(Level::INFO, "Disconnected"); - py::post_py()?; + py::post_py().await?; + + event!(Level::INFO, "Shenbot-rs exiting"); Ok(()) } diff --git a/ica-rs/src/py/call.rs b/ica-rs/src/py/call.rs index 404e960..882df40 100644 --- a/ica-rs/src/py/call.rs +++ b/ica-rs/src/py/call.rs @@ -1,7 +1,9 @@ use std::path::PathBuf; +use std::sync::LazyLock; use pyo3::prelude::*; use rust_socketio::asynchronous::Client; +use tokio::sync::Mutex; use tracing::{event, info, warn, Level}; use crate::data_struct::{ica, tailchat}; @@ -9,6 +11,70 @@ use crate::error::PyPluginError; use crate::py::{class, PyPlugin, PyStatus}; use crate::MainStatus; +pub struct PyTasks { + pub ica_new_message: Vec>, + pub ica_delete_message: Vec>, + pub tailchat_new_message: Vec>, +} + +impl PyTasks { + pub fn clear(&mut self) { + self.ica_new_message.clear(); + self.ica_delete_message.clear(); + self.tailchat_new_message.clear(); + } + + pub fn push_ica_new_message(&mut self, handle: tokio::task::JoinHandle<()>) { + self.ica_new_message.push(handle); + } + + pub fn push_ica_delete_message(&mut self, handle: tokio::task::JoinHandle<()>) { + self.ica_delete_message.push(handle); + } + + pub fn push_tailchat_new_message(&mut self, handle: tokio::task::JoinHandle<()>) { + self.tailchat_new_message.push(handle); + } + + pub async fn join_all(&mut self) { + for handle in self.ica_new_message.drain(..) { + let _ = handle.await; + } + for handle in self.ica_delete_message.drain(..) { + let _ = handle.await; + } + for handle in self.tailchat_new_message.drain(..) { + let _ = handle.await; + } + } + + pub fn len(&self) -> usize { + self.ica_new_message.len() + self.ica_delete_message.len() + self.tailchat_new_message.len() + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + pub fn cancel_all(&mut self) { + for handle in self.ica_new_message.drain(..) { + handle.abort(); + } + for handle in self.ica_delete_message.drain(..) { + handle.abort(); + } + for handle in self.tailchat_new_message.drain(..) { + handle.abort(); + } + } +} + +pub static PY_TASKS: LazyLock> = LazyLock::new(|| Mutex::new(PyTasks { + ica_new_message: Vec::new(), + ica_delete_message: Vec::new(), + tailchat_new_message: Vec::new(), +})); + pub fn get_func<'py>( py_module: &Bound<'py, PyAny>, name: &'py str, @@ -154,8 +220,8 @@ pub async fn ica_new_message_py(message: &ica::messages::NewMessage, client: &Cl let msg = class::ica::NewMessagePy::new(message); let client = class::ica::IcaClientPy::new(client); let args = (msg, client); - // 甚至实际上压根不需要await这个spawn, 直接让他自己跑就好了(离谱) - call_py_func!(args, plugin, path, ICA_NEW_MESSAGE_FUNC, client); + let task = call_py_func!(args, plugin, path, ICA_NEW_MESSAGE_FUNC, client); + PY_TASKS.lock().await.push_ica_new_message(task); } } @@ -167,7 +233,8 @@ pub async fn ica_delete_message_py(msg_id: ica::MessageId, client: &Client) { let msg_id = msg_id.clone(); let client = class::ica::IcaClientPy::new(client); let args = (msg_id.clone(), client); - call_py_func!(args, plugin, path, ICA_DELETE_MESSAGE_FUNC, client); + let task = call_py_func!(args, plugin, path, ICA_DELETE_MESSAGE_FUNC, client); + PY_TASKS.lock().await.push_ica_delete_message(task); } } @@ -182,6 +249,7 @@ pub async fn tailchat_new_message_py( let msg = class::tailchat::TailchatReceiveMessagePy::from_recive_message(message); let client = class::tailchat::TailchatClientPy::new(client); let args = (msg, client); - call_py_func!(args, plugin, path, TAILCHAT_NEW_MESSAGE_FUNC, client); + let task = call_py_func!(args, plugin, path, TAILCHAT_NEW_MESSAGE_FUNC, client); + PY_TASKS.lock().await.push_tailchat_new_message(task); } } diff --git a/ica-rs/src/py/mod.rs b/ica-rs/src/py/mod.rs index 36bb573..fd71e32 100644 --- a/ica-rs/src/py/mod.rs +++ b/ica-rs/src/py/mod.rs @@ -14,7 +14,7 @@ use pyo3::prelude::*; use pyo3::types::PyTuple; use tracing::{event, span, warn, Level}; -use crate::MainStatus; +use crate::{MainStatus, StopGetter}; #[derive(Debug, Clone)] pub struct PyStatus { @@ -463,7 +463,7 @@ fn init_py_with_env_path(path: &str) { /// Python 侧初始化 pub fn init_py() { // 从 全局配置中获取 python 插件路径 - let span = span!(Level::INFO, "初始化 python 及其插件.ing"); + let span = span!(Level::INFO, "py init"); let _enter = span.enter(); let plugin_path = MainStatus::global_config().py().plugin_path; @@ -500,9 +500,26 @@ pub fn init_py() { event!(Level::INFO, "python 初始化完成") } -pub fn post_py() -> anyhow::Result<()> { +pub async fn post_py() -> anyhow::Result<()> { let status = PyStatus::get_mut(); status.config.sync_status_to_config(); status.config.write_to_default()?; + + stop_tasks().await; Ok(()) } + +async fn stop_tasks() { + let waiter = tokio::spawn(async { + call::PY_TASKS.lock().await.cancel_all(); + }); + tokio::select! { + _ = waiter => { + event!(Level::INFO, "Python 任务完成"); + } + _ = tokio::signal::ctrl_c() => { + call::PY_TASKS.lock().await.cancel_all(); + event!(Level::INFO, "Python 任务被中断"); + } + } +} diff --git a/news.md b/news.md index 08e3ab9..372170d 100644 --- a/news.md +++ b/news.md @@ -10,6 +10,9 @@ - 用于指定 python 插件的虚拟环境路径 - 会覆盖环境变量中的 `VIRTUAL_ENV` - 现在加入了默认的配置文件路径 `./config.toml` +- 现在会记录所有的 python 运行中 task 了 + - 也会在退出的时候等待所有的 task 结束 + - 二次 ctrl-c 会立即退出 ## 0.8.1