From 84ced3b30a447e929bd562a9e4e7e5b84a8ba193 Mon Sep 17 00:00:00 2001 From: shenjack <3695888@qq.com> Date: Sat, 30 Mar 2024 18:30:43 +0800 Subject: [PATCH] tailchat p6 --- ica-rs/src/config.rs | 4 +- ica-rs/src/data_struct/tailchat.rs | 1 + ica-rs/src/data_struct/tailchat/status.rs | 13 +++++ ica-rs/src/status.rs | 1 - ica-rs/src/tailchat.rs | 69 +++++++++++++++++++---- ica-rs/src/tailchat/client.rs | 1 + ica-rs/src/tailchat/events.rs | 58 +++++++++++++++++++ 7 files changed, 135 insertions(+), 12 deletions(-) create mode 100644 ica-rs/src/data_struct/tailchat/status.rs create mode 100644 ica-rs/src/tailchat/client.rs diff --git a/ica-rs/src/config.rs b/ica-rs/src/config.rs index 4c0f4de..a9fd4ab 100644 --- a/ica-rs/src/config.rs +++ b/ica-rs/src/config.rs @@ -154,6 +154,8 @@ impl BotConfig { } pub fn ica(&self) -> IcaConfig { self.ica.clone().expect("No ica config found") } - pub fn tailchat(&self) -> TailchatConfig { self.tailchat.clone().expect("No tailchat config found") } + pub fn tailchat(&self) -> TailchatConfig { + self.tailchat.clone().expect("No tailchat config found") + } pub fn py(&self) -> PyConfig { self.py.clone().expect("No py config found") } } diff --git a/ica-rs/src/data_struct/tailchat.rs b/ica-rs/src/data_struct/tailchat.rs index 5df09c8..057c328 100644 --- a/ica-rs/src/data_struct/tailchat.rs +++ b/ica-rs/src/data_struct/tailchat.rs @@ -1,4 +1,5 @@ pub mod messages; +pub mod status; pub type GroupId = String; pub type ConverseId = String; diff --git a/ica-rs/src/data_struct/tailchat/status.rs b/ica-rs/src/data_struct/tailchat/status.rs new file mode 100644 index 0000000..224e7cd --- /dev/null +++ b/ica-rs/src/data_struct/tailchat/status.rs @@ -0,0 +1,13 @@ +use serde::{Deserialize, Serialize}; + +use crate::data_struct::tailchat::UserId; + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct LoginData { + pub jwt: String, + #[serde(rename = "userId")] + pub user_id: UserId, + pub email: String, + pub nickname: String, + pub avatar: String, +} diff --git a/ica-rs/src/status.rs b/ica-rs/src/status.rs index eecc728..7893c25 100644 --- a/ica-rs/src/status.rs +++ b/ica-rs/src/status.rs @@ -79,7 +79,6 @@ pub mod ica { } } - pub mod tailchat { use crate::data_struct::tailchat::UserId; diff --git a/ica-rs/src/tailchat.rs b/ica-rs/src/tailchat.rs index 1ccfdb3..537a2e6 100644 --- a/ica-rs/src/tailchat.rs +++ b/ica-rs/src/tailchat.rs @@ -1,3 +1,4 @@ +pub mod client; pub mod events; use futures_util::FutureExt; @@ -5,16 +6,17 @@ use md5::{Digest, Md5}; use reqwest::{Body, ClientBuilder as reqwest_ClientBuilder}; use rust_socketio::asynchronous::{Client, ClientBuilder}; use rust_socketio::{Event, Payload, TransportType}; -use serde_json::json; +use serde_json::{from_str, from_value, json, Value}; use tracing::{event, span, Level}; use crate::config::TailchatConfig; +use crate::data_struct::tailchat::status::LoginData; use crate::error::{ClientResult, TailchatError}; -use crate::StopGetter; +use crate::{wrap_any_callback, wrap_callback, StopGetter}; pub async fn start_tailchat( config: TailchatConfig, - stop_receiver: StopGetter, + stop_reciver: StopGetter, ) -> ClientResult<(), TailchatError> { let span = span!(Level::INFO, "Tailchat Client"); let _enter = span.enter(); @@ -30,26 +32,73 @@ pub async fn start_tailchat( let mut header_map = reqwest::header::HeaderMap::new(); header_map.append("Content-Type", "application/json".parse().unwrap()); - let client = reqwest_ClientBuilder::new().default_headers(header_map).build()?; + let client = reqwest_ClientBuilder::new().default_headers(header_map.clone()).build()?; let status = match client .post(&format!("{}/api/openapi/bot/login", config.host)) - .body(json!{{"appId": config.app_id, "token": token}}.to_string()) + .body(json! {{"appId": config.app_id, "token": token}}.to_string()) .send() .await { Ok(resp) => { if resp.status().is_success() { - let body = resp.text().await?; - event!(Level::INFO, "login success: {}", body); - body + let raw_data = resp.text().await?; + let json_data = serde_json::from_str::(&raw_data).unwrap(); + let login_data = serde_json::from_value::(json_data["data"].clone()); + match login_data { + Ok(data) => data, + Err(e) => { + event!(Level::ERROR, "login failed: {}|{}", e, raw_data); + return Err(TailchatError::LoginFailed(e.to_string())); + } + } } else { - Err(TailchatError::LoginFailed(resp.text().await?)) + return Err(TailchatError::LoginFailed(resp.text().await?)); } } Err(e) => return Err(TailchatError::LoginFailed(e.to_string())), }; + + header_map.append("X-Token", status.jwt.clone().parse().unwrap()); + + let client = reqwest_ClientBuilder::new().default_headers(header_map).build()?; + + let socket = ClientBuilder::new(config.host) + .auth(json!({"token": status.jwt.clone()})) + .transport_type(TransportType::Websocket) + .on_any(wrap_any_callback!(events::any_event)) + .on("chat.message.sendMessage", wrap_callback!(events::on_message)) + .connect() + .await?; + // notify:chat.message.delete // notify:chat.message.add - Ok(()) + stop_reciver.await.ok(); + event!(Level::INFO, "socketio client stopping"); + match socket.disconnect().await { + Ok(_) => { + event!(Level::INFO, "socketio client stopped"); + Ok(()) + } + Err(e) => { + // 单独处理 SocketIoError(IncompleteResponseFromEngineIo(WebsocketError(AlreadyClosed))) + match e { + rust_socketio::Error::IncompleteResponseFromEngineIo(inner_e) => { + if inner_e.to_string().contains("AlreadyClosed") { + event!(Level::INFO, "socketio client stopped"); + return Ok(()); + } else { + event!(Level::ERROR, "socketio client stopped with error: {:?}", inner_e); + Err(TailchatError::SocketIoError( + rust_socketio::Error::IncompleteResponseFromEngineIo(inner_e), + )) + } + } + e => { + event!(Level::ERROR, "socketio client stopped with error: {}", e); + Err(TailchatError::SocketIoError(e)) + } + } + } + } } diff --git a/ica-rs/src/tailchat/client.rs b/ica-rs/src/tailchat/client.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/ica-rs/src/tailchat/client.rs @@ -0,0 +1 @@ + diff --git a/ica-rs/src/tailchat/events.rs b/ica-rs/src/tailchat/events.rs index 8b13789..5755640 100644 --- a/ica-rs/src/tailchat/events.rs +++ b/ica-rs/src/tailchat/events.rs @@ -1 +1,59 @@ +use colored::Colorize; +use rust_socketio::asynchronous::Client; +use rust_socketio::{Event, Payload}; +use tracing::info; +/// 所有 +pub async fn any_event(event: Event, payload: Payload, _client: Client) { + let handled = vec![ + // 真正处理过的 + "chat.message.sendMessage", // 也许以后会用到 + + // 忽略的 + ]; + match &event { + Event::Custom(event_name) => { + if handled.contains(&event_name.as_str()) { + return; + } + } + Event::Message => { + match payload { + Payload::Text(values) => { + if let Some(value) = values.first() { + if handled.contains(&value.as_str().unwrap()) { + return; + } + info!("收到消息 {}", value.to_string().yellow()); + } + } + _ => (), + } + return; + } + _ => (), + } + match payload { + Payload::Binary(ref data) => { + println!("event: {} |{:?}", event, data) + } + Payload::Text(ref data) => { + print!("event: {}", event.as_str().purple()); + for value in data { + println!("|{}", value); + } + } + _ => (), + } +} + +pub async fn on_message(payload: Payload, client: Client) { + match payload { + Payload::Text(values) => { + if let Some(value) = values.first() { + info!("收到消息 {}", value.to_string().yellow()); + } + } + _ => (), + } +}