This commit is contained in:
shenjack 2025-02-11 23:00:35 +08:00
parent 56a6c39df7
commit a114a92cba
Signed by: shenjack
GPG Key ID: 7B1134A979775551
5 changed files with 113 additions and 17 deletions

View File

@ -94,6 +94,7 @@ pub async fn start_ica(config: &IcaConfig, stop_reciver: StopGetter) -> ClientRe
} }
} }
// 等待停止信号 // 等待停止信号
event!(Level::INFO, "ica client waiting for stop signal");
stop_reciver.await.ok(); stop_reciver.await.ok();
event!(Level::INFO, "socketio client stopping"); event!(Level::INFO, "socketio client stopping");
match socket.disconnect().await { match socket.disconnect().await {

View File

@ -18,6 +18,8 @@ mod tailchat;
use config::BotConfig; use config::BotConfig;
use tracing::{event, span, Level}; use tracing::{event, span, Level};
use crate::py::call;
pub static mut MAIN_STATUS: status::BotStatus = status::BotStatus { pub static mut MAIN_STATUS: status::BotStatus = status::BotStatus {
config: None, config: None,
ica_status: None, ica_status: None,
@ -52,7 +54,9 @@ pub fn help_msg() -> String {
static STARTUP_TIME: OnceLock<SystemTime> = OnceLock::new(); static STARTUP_TIME: OnceLock<SystemTime> = OnceLock::new();
pub fn start_up_time() -> SystemTime { *STARTUP_TIME.get().expect("WTF, why did you panic?") } pub fn start_up_time() -> SystemTime {
*STARTUP_TIME.get().expect("WTF, why did you panic?")
}
/// 获得当前客户端的 id /// 获得当前客户端的 id
/// 防止串号 /// 防止串号
@ -138,13 +142,16 @@ fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt().with_max_level(level).init(); tracing_subscriber::fmt().with_max_level(level).init();
tokio::runtime::Builder::new_multi_thread() let _ = tokio::runtime::Builder::new_multi_thread()
.enable_all() .enable_all()
.thread_name("shenbot-rs") .thread_name("shenbot-rs")
.worker_threads(4) .worker_threads(10)
.build() .build()
.unwrap() .unwrap()
.block_on(inner_main()) .block_on(inner_main());
event!(Level::INFO, "shenbot-rs v{} exiting", VERSION);
Ok(())
} }
async fn inner_main() -> anyhow::Result<()> { async fn inner_main() -> anyhow::Result<()> {
@ -169,7 +176,6 @@ async fn inner_main() -> anyhow::Result<()> {
let (ica_send, ica_recv) = tokio::sync::oneshot::channel::<()>(); let (ica_send, ica_recv) = tokio::sync::oneshot::channel::<()>();
if bot_config.check_ica() { if bot_config.check_ica() {
event!(Level::INFO, "启动 ica");
let config = bot_config.ica(); let config = bot_config.ica();
tokio::spawn(async move { tokio::spawn(async move {
ica::start_ica(&config, ica_recv).await.unwrap(); ica::start_ica(&config, ica_recv).await.unwrap();
@ -190,18 +196,19 @@ async fn inner_main() -> anyhow::Result<()> {
event!(Level::INFO, "未启用 Tailchat"); event!(Level::INFO, "未启用 Tailchat");
} }
tokio::time::sleep(Duration::from_secs(2)).await; tokio::time::sleep(Duration::from_secs(1)).await;
// 等待一个输入 // 等待一个输入
event!(Level::INFO, "Press any key to exit"); event!(Level::INFO, "Press ctrl+c to exit, second ctrl+c to force exit");
let mut input = String::new(); tokio::signal::ctrl_c().await.ok();
std::io::stdin().read_line(&mut input).unwrap();
ica_send.send(()).ok(); ica_send.send(()).ok();
tailchat_send.send(()).ok(); tailchat_send.send(()).ok();
event!(Level::INFO, "Disconnected"); event!(Level::INFO, "Disconnected");
py::post_py()?; py::post_py().await?;
event!(Level::INFO, "Shenbot-rs exiting");
Ok(()) Ok(())
} }

View File

@ -1,7 +1,9 @@
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::LazyLock;
use pyo3::prelude::*; use pyo3::prelude::*;
use rust_socketio::asynchronous::Client; use rust_socketio::asynchronous::Client;
use tokio::sync::Mutex;
use tracing::{event, info, warn, Level}; use tracing::{event, info, warn, Level};
use crate::data_struct::{ica, tailchat}; use crate::data_struct::{ica, tailchat};
@ -9,6 +11,70 @@ use crate::error::PyPluginError;
use crate::py::{class, PyPlugin, PyStatus}; use crate::py::{class, PyPlugin, PyStatus};
use crate::MainStatus; use crate::MainStatus;
pub struct PyTasks {
pub ica_new_message: Vec<tokio::task::JoinHandle<()>>,
pub ica_delete_message: Vec<tokio::task::JoinHandle<()>>,
pub tailchat_new_message: Vec<tokio::task::JoinHandle<()>>,
}
impl PyTasks {
pub fn clear(&mut self) {
self.ica_new_message.clear();
self.ica_delete_message.clear();
self.tailchat_new_message.clear();
}
pub fn push_ica_new_message(&mut self, handle: tokio::task::JoinHandle<()>) {
self.ica_new_message.push(handle);
}
pub fn push_ica_delete_message(&mut self, handle: tokio::task::JoinHandle<()>) {
self.ica_delete_message.push(handle);
}
pub fn push_tailchat_new_message(&mut self, handle: tokio::task::JoinHandle<()>) {
self.tailchat_new_message.push(handle);
}
pub async fn join_all(&mut self) {
for handle in self.ica_new_message.drain(..) {
let _ = handle.await;
}
for handle in self.ica_delete_message.drain(..) {
let _ = handle.await;
}
for handle in self.tailchat_new_message.drain(..) {
let _ = handle.await;
}
}
pub fn len(&self) -> usize {
self.ica_new_message.len() + self.ica_delete_message.len() + self.tailchat_new_message.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn cancel_all(&mut self) {
for handle in self.ica_new_message.drain(..) {
handle.abort();
}
for handle in self.ica_delete_message.drain(..) {
handle.abort();
}
for handle in self.tailchat_new_message.drain(..) {
handle.abort();
}
}
}
pub static PY_TASKS: LazyLock<Mutex<PyTasks>> = LazyLock::new(|| Mutex::new(PyTasks {
ica_new_message: Vec::new(),
ica_delete_message: Vec::new(),
tailchat_new_message: Vec::new(),
}));
pub fn get_func<'py>( pub fn get_func<'py>(
py_module: &Bound<'py, PyAny>, py_module: &Bound<'py, PyAny>,
name: &'py str, name: &'py str,
@ -154,8 +220,8 @@ pub async fn ica_new_message_py(message: &ica::messages::NewMessage, client: &Cl
let msg = class::ica::NewMessagePy::new(message); let msg = class::ica::NewMessagePy::new(message);
let client = class::ica::IcaClientPy::new(client); let client = class::ica::IcaClientPy::new(client);
let args = (msg, client); let args = (msg, client);
// 甚至实际上压根不需要await这个spawn, 直接让他自己跑就好了(离谱) let task = call_py_func!(args, plugin, path, ICA_NEW_MESSAGE_FUNC, client);
call_py_func!(args, plugin, path, ICA_NEW_MESSAGE_FUNC, client); PY_TASKS.lock().await.push_ica_new_message(task);
} }
} }
@ -167,7 +233,8 @@ pub async fn ica_delete_message_py(msg_id: ica::MessageId, client: &Client) {
let msg_id = msg_id.clone(); let msg_id = msg_id.clone();
let client = class::ica::IcaClientPy::new(client); let client = class::ica::IcaClientPy::new(client);
let args = (msg_id.clone(), client); let args = (msg_id.clone(), client);
call_py_func!(args, plugin, path, ICA_DELETE_MESSAGE_FUNC, client); let task = call_py_func!(args, plugin, path, ICA_DELETE_MESSAGE_FUNC, client);
PY_TASKS.lock().await.push_ica_delete_message(task);
} }
} }
@ -182,6 +249,7 @@ pub async fn tailchat_new_message_py(
let msg = class::tailchat::TailchatReceiveMessagePy::from_recive_message(message); let msg = class::tailchat::TailchatReceiveMessagePy::from_recive_message(message);
let client = class::tailchat::TailchatClientPy::new(client); let client = class::tailchat::TailchatClientPy::new(client);
let args = (msg, client); let args = (msg, client);
call_py_func!(args, plugin, path, TAILCHAT_NEW_MESSAGE_FUNC, client); let task = call_py_func!(args, plugin, path, TAILCHAT_NEW_MESSAGE_FUNC, client);
PY_TASKS.lock().await.push_tailchat_new_message(task);
} }
} }

View File

@ -14,7 +14,7 @@ use pyo3::prelude::*;
use pyo3::types::PyTuple; use pyo3::types::PyTuple;
use tracing::{event, span, warn, Level}; use tracing::{event, span, warn, Level};
use crate::MainStatus; use crate::{MainStatus, StopGetter};
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct PyStatus { pub struct PyStatus {
@ -463,7 +463,7 @@ fn init_py_with_env_path(path: &str) {
/// Python 侧初始化 /// Python 侧初始化
pub fn init_py() { pub fn init_py() {
// 从 全局配置中获取 python 插件路径 // 从 全局配置中获取 python 插件路径
let span = span!(Level::INFO, "初始化 python 及其插件.ing"); let span = span!(Level::INFO, "py init");
let _enter = span.enter(); let _enter = span.enter();
let plugin_path = MainStatus::global_config().py().plugin_path; let plugin_path = MainStatus::global_config().py().plugin_path;
@ -500,9 +500,26 @@ pub fn init_py() {
event!(Level::INFO, "python 初始化完成") event!(Level::INFO, "python 初始化完成")
} }
pub fn post_py() -> anyhow::Result<()> { pub async fn post_py() -> anyhow::Result<()> {
let status = PyStatus::get_mut(); let status = PyStatus::get_mut();
status.config.sync_status_to_config(); status.config.sync_status_to_config();
status.config.write_to_default()?; status.config.write_to_default()?;
stop_tasks().await;
Ok(()) Ok(())
} }
async fn stop_tasks() {
let waiter = tokio::spawn(async {
call::PY_TASKS.lock().await.cancel_all();
});
tokio::select! {
_ = waiter => {
event!(Level::INFO, "Python 任务完成");
}
_ = tokio::signal::ctrl_c() => {
call::PY_TASKS.lock().await.cancel_all();
event!(Level::INFO, "Python 任务被中断");
}
}
}

View File

@ -10,6 +10,9 @@
- 用于指定 python 插件的虚拟环境路径 - 用于指定 python 插件的虚拟环境路径
- 会覆盖环境变量中的 `VIRTUAL_ENV` - 会覆盖环境变量中的 `VIRTUAL_ENV`
- 现在加入了默认的配置文件路径 `./config.toml` - 现在加入了默认的配置文件路径 `./config.toml`
- 现在会记录所有的 python 运行中 task 了
- 也会在退出的时候等待所有的 task 结束
- 二次 ctrl-c 会立即退出
## 0.8.1 ## 0.8.1