diff --git a/ica-rs/ica_typing.py b/ica-rs/ica_typing.py index 659d757..6b6a26b 100644 --- a/ica-rs/ica_typing.py +++ b/ica-rs/ica_typing.py @@ -72,8 +72,11 @@ class IcaDeleteMessage: class IcaNewMessage: + """ + Icalingua 接收到新消息 + """ def reply_with(self, message: str) -> IcaSendMessage: - ... + """回复这条消息""" def as_deleted(self) -> IcaDeleteMessage: ... def __str__(self) -> str: @@ -95,16 +98,22 @@ class IcaNewMessage: ... @property def is_room_msg(self) -> bool: - ... + """是否是群聊消息""" @property def is_chat_msg(self) -> bool: - ... + """是否是私聊消息""" @property def room_id(self) -> RoomId: - ... + """ + 如果是群聊消息, 返回 (-群号) + 如果是私聊消息, 返回 对面qq + """ class IcaClient: + """ + Icalingua 的客户端 + """ # @staticmethod # async def send_message_a(client: "IcaClient", message: SendMessage) -> bool: # """ @@ -142,6 +151,12 @@ class IcaClient: """向日志中输出警告信息""" +class MatrixClient: + """ + Matrix 的客户端 + """ + + class ConfigData: def __getitem__(self, key: str): ... @@ -162,7 +177,7 @@ on_ica_delete_message = Callable[[MessageId, IcaClient], None] # ... # TODO: Matrix adapter -# on_matrix_room_message = Callable[[RoomId, NewMessage, IcaClient], None] +on_matrix_message = Callable[[], None] on_config = Callable[[None], Tuple[str, str]] diff --git a/ica-rs/src/ica/events.rs b/ica-rs/src/ica/events.rs index 83fd39a..1fe9611 100644 --- a/ica-rs/src/ica/events.rs +++ b/ica-rs/src/ica/events.rs @@ -1,7 +1,7 @@ use colored::Colorize; use rust_socketio::asynchronous::Client; use rust_socketio::{Event, Payload}; -use tracing::{info, warn}; +use tracing::{event, info, span, warn, Level}; use crate::data_struct::ica::all_rooms::Room; use crate::data_struct::ica::messages::{Message, MessageTrait, NewMessage}; @@ -122,7 +122,7 @@ pub async fn any_event(event: Event, payload: Payload, _client: Client) { "notify", "closeLoading", // 发送消息/加载新聊天 有一个 loading "updateRoom", - // "syncRead", + "syncRead", // 同步已读 ]; match &event { Event::Custom(event_name) => { @@ -161,22 +161,24 @@ pub async fn any_event(event: Event, payload: Payload, _client: Client) { } pub async fn connect_callback(payload: Payload, _client: Client) { + let span = span!(Level::INFO, "ica connect_callback"); + let _enter = span.enter(); match payload { Payload::Text(values) => { if let Some(value) = values.first() { match value.as_str() { Some("authSucceed") => { - info!("{}", "已经登录到 icalingua!".green()) + event!(Level::INFO, "{}", "已经登录到 icalingua!".green()) } Some("authFailed") => { - info!("{}", "登录到 icalingua 失败!".red()); + event!(Level::ERROR, "{}", "登录到 icalingua 失败!".red()); panic!("登录失败") } Some("authRequired") => { - warn!("{}", "需要登录到 icalingua!".yellow()) + event!(Level::INFO, "{}", "需要登录到 icalingua!".yellow()) } Some(msg) => { - warn!("未知消息 {}", msg.yellow()) + event!(Level::INFO, "{}", "未知消息".yellow()); } None => (), } diff --git a/ica-rs/src/matrix.rs b/ica-rs/src/matrix.rs index e66d26b..6c32e42 100644 --- a/ica-rs/src/matrix.rs +++ b/ica-rs/src/matrix.rs @@ -1,18 +1,15 @@ pub mod events; -use std::str::FromStr; +use std::{str::FromStr, time::Duration}; +use futures_util::StreamExt; use matrix_sdk::{ config::SyncSettings, ruma::{ - api::client::message::send_message_event, - events::room::message::{ - AddMentions, ForwardThread, MessageType, OriginalSyncRoomMessageEvent, - RoomMessageEventContent, - }, + api::client::message::send_message_event, events::room::message::RoomMessageEventContent, OwnedRoomId, TransactionId, }, - Client, Room, RoomState, + Client, }; use tracing::{event, span, Level}; use url::Url; @@ -23,7 +20,7 @@ use crate::StopGetter; pub async fn start_matrix( config: &MatrixConfig, - stop_reciver: StopGetter, + mut stop_reciver: StopGetter, ) -> ClientResult<(), MatrixError> { let span = span!(Level::INFO, "Matrix Client"); let _enter = span.enter(); @@ -93,9 +90,11 @@ pub async fn start_matrix( event!(Level::INFO, "未启用启动消息"); } - client.add_event_handler(on_room_message); + client.add_event_handler(events::on_room_message); - match client.sync_once(SyncSettings::new()).await { + let init_sync_setting = SyncSettings::new().timeout(Duration::from_secs(60)); + + match client.sync_once(init_sync_setting).await { Ok(_) => { event!(Level::INFO, "Synced"); } @@ -105,48 +104,37 @@ pub async fn start_matrix( } } - client.sync(SyncSettings::default()).await?; + let mut stream_sync = + Box::pin(client.sync_stream(SyncSettings::new().timeout(Duration::from_secs(60))).await); - // while stop_reciver.await.is_err() { - // event!(Level::INFO, "Matrix client is running"); - // tokio::time::sleep(std::time::Duration::from_secs(1)).await; + while let Some(Ok(response)) = stream_sync.next().await { + for room in response.rooms.join.values() { + for e in &room.timeline.events { + if let Ok(event) = e.event.deserialize() { + println!("Received event {:?}", event); + } + } + } + if stop_reciver.try_recv().is_ok() { + event!(Level::INFO, "Matrix client stopping"); + break; + } + } + // loop { + // match stop_reciver.try_recv() { + // Ok(_) => { + // event!(Level::INFO, "Matrix client stopping"); + // break; + // } + // Err(tokio::sync::oneshot::error::TryRecvError::Empty) => { + + // } + // Err(tokio::sync::oneshot::error::TryRecvError::Closed) => { + // event!(Level::INFO, "Matrix client stopping"); + // break; + // } + // } // } - event!(Level::INFO, "Matrix is not implemented yet"); - stop_reciver.await.ok(); - event!(Level::INFO, "Matrix client stopping"); - // some stop - event!(Level::INFO, "Matrix client stopped"); Ok(()) } - -pub async fn on_room_message(event: OriginalSyncRoomMessageEvent, room: Room) { - // We only want to listen to joined rooms. - if room.state() != RoomState::Joined { - return; - } - - // We only want to log text messages. - let MessageType::Text(msgtype) = &event.content.msgtype else { - return; - }; - - // 匹配消息 - - // /bot - if msgtype.body == "/bot" { - let pong = format!("shenbot v {}\nmatrix-rs v{}", crate::VERSION, crate::MATRIX_VERSION); - - let reply = RoomMessageEventContent::text_plain(pong); - let reply = reply.make_reply_to( - &event.into_full_event(room.room_id().to_owned()), - ForwardThread::No, - AddMentions::No, - ); - - room.send(reply).await.expect("Failed to send message"); - return; - } - - // 发给 Python 处理剩下的 -} diff --git a/ica-rs/src/matrix/events.rs b/ica-rs/src/matrix/events.rs index 8b13789..ef72ed8 100644 --- a/ica-rs/src/matrix/events.rs +++ b/ica-rs/src/matrix/events.rs @@ -1 +1,42 @@ +use matrix_sdk::{ + ruma::events::room::message::{ + AddMentions, ForwardThread, MessageType, OriginalSyncRoomMessageEvent, + RoomMessageEventContent, + }, + Room, RoomState, +}; +use tracing::{event, span, Level}; +use crate::py::call::matrix_new_message_py; + +pub async fn on_room_message(event: OriginalSyncRoomMessageEvent, room: Room) { + // We only want to listen to joined rooms. + if room.state() != RoomState::Joined { + return; + } + + // We only want to log text messages. + let MessageType::Text(msgtype) = &event.content.msgtype else { + return; + }; + + // 匹配消息 + + // /bot + if msgtype.body == "/bot" { + let pong = format!("shenbot v {}\nmatrix-rs v{}", crate::VERSION, crate::MATRIX_VERSION); + + let reply = RoomMessageEventContent::text_plain(pong); + let reply = reply.make_reply_to( + &event.into_full_event(room.room_id().to_owned()), + ForwardThread::Yes, + AddMentions::No, + ); + + room.send(reply).await.expect("Failed to send message"); + return; + } + + // 发给 Python 处理剩下的 + matrix_new_message_py().await; +} diff --git a/ica-rs/src/py/class.rs b/ica-rs/src/py/class.rs index 81b956d..ed87b09 100644 --- a/ica-rs/src/py/class.rs +++ b/ica-rs/src/py/class.rs @@ -1,4 +1,5 @@ pub mod ica; +pub mod matrix; use pyo3::prelude::*; use toml::Value as TomlValue; diff --git a/ica-rs/src/py/class/matrix.rs b/ica-rs/src/py/class/matrix.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/ica-rs/src/py/class/matrix.rs @@ -0,0 +1 @@ +