diff --git a/config-temp.toml b/config-temp.toml index 2cc9d7d..dfda7d5 100644 --- a/config-temp.toml +++ b/config-temp.toml @@ -2,5 +2,7 @@ private_key = "" # 与 icalingua 客户端使用的 private_key 一致 host = "" # docker 版 icalingua 服务的地址 self_id = 0 # 机器人的 qq 号 -notice_room = -0 # 启动 bot 后通知的群号/人 +notice_room = [-0] # 启动 bot 后通知的群号/人 # 群号请使用群号的负数 + +notice_start = true # 是否在启动 bot 后通知 diff --git a/connect.py b/connect.py deleted file mode 100644 index 4af88a7..0000000 --- a/connect.py +++ /dev/null @@ -1,300 +0,0 @@ -import time -import random -import asyncio -import traceback - -from typing import Dict, List, Tuple, Any, Optional, Union, Literal - -import qtoml -import socketio -from colorama import Fore, Style -from nacl.signing import SigningKey -from lib_not_dr.types import Options - - -def get_config() -> Tuple[str, str, int]: - with open('config.toml', 'r', encoding='utf-8') as f: - config = qtoml.load(f) - return config['host'], config['private_key'], config['self_id'] - - -HOST, KEY, SELF_ID = get_config() - - -class AtElement(Options): - text: str - id: Union[int, Literal['all']] = 'all' - - -class ReplyMessage(Options): - id: str - username: str = '' - content: str = '' - files: list = [] - - def to_json(self) -> dict: - return { - '_id': self.id, - 'username': self.username, - 'content': self.content, - 'files': self.files - } - - -class Message(Options): - content: str - room_id: Optional[int] = None - room: Optional[int] = None # room id 和 room 二选一 ( 实际上直接填 room id 就行了 ) - file: None = None # TODO: 上传文件 - reply_to: Optional[ReplyMessage] = None # 源码 给了一个 any TODO: 回复消息 - b64_img: Optional[str] = None # TODO: 发送图片 - at: Optional[List[AtElement]] = [] # TODO: @某人 - sticker: Optional[None] = None # TODO: 发送表情 - message_type: Optional[str] = None # TODO: 消息类型 - - def to_json(self) -> dict: - return { - 'content': self.content, - 'roomId': self.room_id, - 'room': self.room, - 'file': self.file, - 'replyMessage': self.reply_to.to_json() if self.reply_to else None, - 'b64img': self.b64_img, - 'at': self.at, - 'sticker': self.sticker, - 'messageType': self.message_type - } - - -sio: socketio.AsyncClient = socketio.AsyncClient() - - -@sio.on('connect') -def connect(): - print(f'{Fore.GREEN}icalingua 已连接{Style.RESET_ALL}') - - -@sio.on('requireAuth') -async def require_auth(salt: str, versions: Dict[str, str]): - print(f"{Fore.BLUE}versions: {versions}{Style.RESET_ALL}\n{type(salt)}|{salt=}") - # 准备数据 - sign = SigningKey(bytes.fromhex(KEY)) - signature = sign.sign(bytes.fromhex(salt)) - - # 发送数据 - print(f"{len(signature.signature)=} {type(signature.signature)=}") - await sio.emit('auth', signature.signature) - print(f"{Fore.BLUE}send auth emit{Style.RESET_ALL}") - -# @sio.on('requireAuth') -# def require_auth(*data: Dict[str, Any]): -# print(f"{Fore.BLUE}requireAuth: {data}{Style.RESET_ALL}") - - -@sio.on('auth') -def auth(data: Dict[str, Any]): - print(f"auth: {data}") - - -@sio.on('authFailed') -async def auth_failed(): - print(f"{Fore.RED}authFailed{Style.RESET_ALL}") - await sio.disconnect() - - -@sio.on('authSucceed') -def auth_succeed(): - print(f"{Fore.GREEN}authSucceed{Style.RESET_ALL}") - - -@sio.on('connect_error') -def connect_error(*args, **kwargs): - print(f"连接错误 {args}, {kwargs}") - - -@sio.on('updateRoom') -def update_room(data: Dict[str, Any]): - print(f"{Fore.CYAN}update_room: {data}{Style.RESET_ALL}") - - -def safe_eval(code: str) -> str: - try: - # code = code.replace('help', '坏东西!\n') - # code = code.replace('bytes', '坏东西!\n') - # code = code.replace('encode', '坏东西!\n') - # code = code.replace('decode', '坏东西!\n') - # code = code.replace('compile', '屑的!\n') - # code = code.replace('globals', '拿不到!\n') - code = code.replace('os', '坏东西!\n') - code = code.replace('sys', '坏东西!\n') - # code = code.replace('input', '坏东西!\n') - # code = code.replace('__', '啊哈!\n') - # code = code.replace('import', '很坏!\n') - code = code.replace(' kill', '别跑!\n') - code = code.replace(' rm ', '别跑!\n') - code = code.replace('exit', '好坏!\n') - code = code.replace('eval', '啊哈!\n') - code = code.replace('exec', '抓住!\n') - start_time = time.time() - try: - import os - import math - import decimal - global_val = {'time': time, - 'math': math, - 'decimal': decimal, - 'random': random, - '__import__': '', - 'globals': '也别惦记你那个 globals 了', - 'compile': '想得美', - 'help': '虽然但是 help 也不行', - 'exit': '不许 exit', - 'input': '你想干嘛', - 'return': '别惦记你那个 return 了', - 'getattr': '', - 'setattr': ''} - os.system = '不许' - result = str(eval(code, global_val, {})) - limit = 500 - if len(result) > limit: - result = result[:limit] - except: - result = traceback.format_exc() - end_time = time.time() - result = result.replace(KEY, '***') - result = result.replace(HOST, '***') - - print(f"{Fore.MAGENTA}safe_eval: {result}{Style.RESET_ALL}") - - if result == '6' or result == 6: - result = '他确实等于 6' - - result = f'{code}\neval result:\n{result}\n耗时: {end_time - start_time} s' - return result - except: - error = traceback.format_exc() - result = f'error:\n{error}' - return result - - -@sio.on('addMessage') -async def add_message(data: Dict[str, Any]): - print(f"{Fore.MAGENTA}add_message: {data}{Style.RESET_ALL}") - - is_self = data['message']['senderId'] == SELF_ID - sender_name = data['message']['username'] - sender_id = data['message']['senderId'] - content = data['message']['content'] - room_id = data['roomId'] - - if not is_self: - if content == '/bot': - message = Message(content='icalingua bot test', - room_id=data['roomId']) - await sio.emit('sendMessage', message.to_json()) - elif content.startswith('=='): - - evals: str = content[2:] - - # quene = multiprocessing.Queue() - # def run(quene, evals): - # go = safe_eval(evals) - # quene.put(go) - # process = multiprocessing.Process(target=run, args=(quene, evals)) - # process.start() - # process.join(1) - # if quene.empty(): - # result = '超时' - # else: - # result = quene.get() - whitelist = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ' ', '.', '+', '-', '*', '/', '(', ')', '<', - '>', '='] - evals = evals.replace('**', '') - express = '' - for text in evals: - if text in whitelist: - express += text - if express == '': - result = '你在干嘛' - else: - result = str(eval(express)) - - reply = ReplyMessage(id=data['message']['_id']) - message = Message(content=result, - reply_to=reply, - room_id=room_id) - - await asyncio.sleep(random.random() * 2) - await sio.emit('sendMessage', message.to_json()) - elif content == '!!jrrp': - randomer = random.Random(f'{sender_id}-{data["message"]["date"]}-jrrp-v2') - result = randomer.randint(0, 50) + randomer.randint(0, 50) - print(f'{sender_name} 今日人品值为 {result}') - reply = ReplyMessage(id=data['message']['_id']) - message = Message(content=f'{sender_name} 今日人品值为 {result}', - reply_to=reply, - room_id=room_id) - await asyncio.sleep(0.5) - await sio.emit('sendMessage', message.to_json()) - # 如果只包括一个或多个 6 - # elif data['message']['content'].replace(' ', '') in ('6', '666', '六', '3+3', '5+1', '4+2', '2+4', '1+5'): - # reply = ReplyMessage(id=data['message']['_id']) - # message = Message(content='你 6 nm 呢', - # reply_to=reply, - # room_id=room_id) - # await asyncio.sleep(0.5) - # await sio.emit('sendMessage', message.to_json()) - - -@sio.on('deleteMessage') -def delete_message(message_id: str): - print(f"{Fore.MAGENTA}delete_message: {message_id}{Style.RESET_ALL}") - - -@sio.on('setMessages') -def set_messages(data: Dict[str, Any]): - print(f"{Fore.YELLOW}set_messages: {data}\nmessage_len: {len(data['messages'])}{Style.RESET_ALL}") - - -@sio.on('setAllRooms') -def set_all_rooms(rooms: List[Dict[str, Any]]): - print(f"{Fore.YELLOW}set_all_rooms: {rooms}\nlen: {len(rooms)}\n{Style.RESET_ALL}") - - -@sio.on('setAllChatGroups') -def set_all_chat_groups(groups: List[Dict[str, Any]]): - print(f"{Fore.YELLOW}set_all_chat_groups: {groups}\nlen: {len(groups)}\n{Style.RESET_ALL}") - - -@sio.on('notify') -def notify(data: List[Tuple[str, Any]]): - print(f"notify: {data}") - - -@sio.on('closeLoading') -def close_loading(_): - print(f"{Fore.GREEN}close_loading{Style.RESET_ALL}") - - -@sio.on('onlineData') -def online_data(data: Dict[str, Any]): - print(f"{Fore.GREEN}online_data: {data}{Style.RESET_ALL}") - - -@sio.on('*') -def catch_all(event, data): - print(f"{Fore.RED}catch_all: {event}|{data}{Style.RESET_ALL}") - - -async def main(): - await sio.connect(HOST) - await sio.wait() - - # await sio.emit('requireAuth', ('', {'version': '', 'protocolVersion': ''})) - # await asyncio.sleep(2) - - # await asyncio.gather(sio.wait(), sio.wait(), sio.wait()) - - -if __name__ == '__main__': - asyncio.run(main()) diff --git a/data_struct.py b/data_struct.py new file mode 100644 index 0000000..82ded30 --- /dev/null +++ b/data_struct.py @@ -0,0 +1,75 @@ +from typing import List, Optional, Union, Literal, Tuple + +import qtoml +from lib_not_dr.types import Options + + +class AtElement(Options): + text: str + id: Union[int, Literal['all']] = 'all' + + +class ReplyMessage(Options): + id: str + username: str = '' + content: str = '' + files: list = [] + + def to_json(self) -> dict: + return { + '_id': self.id, + 'username': self.username, + 'content': self.content, + 'files': self.files + } + + +class Message(Options): + content: str + room_id: Optional[int] = None + room: Optional[int] = None # room id 和 room 二选一 ( 实际上直接填 room id 就行了 ) + file: None = None # TODO: 上传文件 + reply_to: Optional[ReplyMessage] = None # 源码 给了一个 any TODO: 回复消息 + b64_img: Optional[str] = None # TODO: 发送图片 + at: Optional[List[AtElement]] = [] # TODO: @某人 + sticker: Optional[None] = None # TODO: 发送表情 + message_type: Optional[str] = None # TODO: 消息类型 + + def to_json(self) -> dict: + return { + 'content': self.content, + 'roomId': self.room_id, + 'room': self.room, + 'file': self.file, + 'replyMessage': self.reply_to.to_json() if self.reply_to else None, + 'b64img': self.b64_img, + 'at': self.at, + 'sticker': self.sticker, + 'messageType': self.message_type + } + + +class BotConfig(Options): + name = 'icalingua bot config' + # _check_filled = True + host: str + private_key: str + self_id: int + notice_room: List[int] + notice_start: bool = False + + def init(self, **kwargs) -> None: + if self.notice_room is None: + self.notice_start = False + + +def get_config(config_path: str = 'config.toml') -> BotConfig: + with open(config_path, 'r', encoding='utf-8') as f: + config = qtoml.decoder.load(f) + return BotConfig(**config) + + +class BotStatus(Options): + inited: bool = False + running: bool = False + rooms: List[int] = [] diff --git a/main.py b/main.py index 8b13789..053f07d 100644 --- a/main.py +++ b/main.py @@ -1 +1,307 @@ +import time +import random +import asyncio +import argparse +import traceback + +from typing import Dict, List, Tuple, Any + +# import qtoml +import socketio +from colorama import Fore +from nacl.signing import SigningKey +# from lib_not_dr.types import Options +from lib_not_dr.loggers import config + +from data_struct import AtElement, Message, ReplyMessage, get_config, BotConfig, BotStatus + +_version_ = "0.2.1" + +logger = config.get_logger('icalingua') + +BOTCONFIG: BotConfig = get_config() + +if __name__ == '__main__': + # --debug + # --config=config.toml + # -n --no-notice + parser = argparse.ArgumentParser() + parser.add_argument('-d', '--debug', action='store_true') + parser.add_argument('-n', '--no-notice', action='store_true') + parser.add_argument('-c', '--config', type=str) + args = parser.parse_args() + if args.debug: + logger.global_level = 0 + if args.config: + # global BOTCONFIG + BOTCONFIG: BotConfig = get_config(args.config) + if args.no_notice: + BOTCONFIG.notice_start = False + +BotStatus = BotStatus() + +sio: socketio.AsyncClient = socketio.AsyncClient() + + +@sio.on('connect') # type: ignore +def connect(): + logger.info(f'{Fore.GREEN}icalingua 已连接') + + +@sio.on('requireAuth') # type: ignore +async def require_auth(salt: str, versions: Dict[str, str]): + logger.info(f"{Fore.BLUE}versions: {versions}\n{type(salt)}|{salt=}") + # 准备数据 + sign = SigningKey(bytes.fromhex(BOTCONFIG.private_key)) + signature = sign.sign(bytes.fromhex(salt)) + + # 发送数据 + await sio.emit('auth', signature.signature) + logger.info(f"{Fore.BLUE}send auth emit") + +# @sio.on('requireAuth') +# def require_auth(*data: Dict[str, Any]): +# logger.info(f"{Fore.BLUE}requireAuth: {data}") + + +@sio.on('auth') # type: ignore +def auth(data: Dict[str, Any]): + logger.info(f"auth: {data}") + + +@sio.on('authFailed') # type: ignore +async def auth_failed(): + logger.info(f"{Fore.RED}authFailed") + await sio.disconnect() + + +@sio.on('authSucceed') # type: ignore +def auth_succeed(): + logger.info(f"{Fore.GREEN}authSucceed") + + +@sio.on('connect_error') # type: ignore +def connect_error(*args, **kwargs): + logger.info(f"连接错误 {args}, {kwargs}") + + +@sio.on('updateRoom') # type: ignore +def update_room(data: Dict[str, Any]): + logger.info(f"{Fore.CYAN}update_room: {data}") + + +def safe_eval(code: str) -> str: + try: + # code = code.replace('help', '坏东西!\n') + # code = code.replace('bytes', '坏东西!\n') + # code = code.replace('encode', '坏东西!\n') + # code = code.replace('decode', '坏东西!\n') + # code = code.replace('compile', '屑的!\n') + # code = code.replace('globals', '拿不到!\n') + code = code.replace('os', '坏东西!\n') + code = code.replace('sys', '坏东西!\n') + # code = code.replace('input', '坏东西!\n') + # code = code.replace('__', '啊哈!\n') + # code = code.replace('import', '很坏!\n') + code = code.replace(' kill', '别跑!\n') + code = code.replace(' rm ', '别跑!\n') + code = code.replace('exit', '好坏!\n') + code = code.replace('eval', '啊哈!\n') + code = code.replace('exec', '抓住!\n') + start_time = time.time() + try: + import os + import math + import decimal + global_val = {'time': time, + 'math': math, + 'decimal': decimal, + 'random': random, + '__import__': '', + 'globals': '也别惦记你那个 globals 了', + 'compile': '想得美', + 'help': '虽然但是 help 也不行', + 'exit': '不许 exit', + 'input': '你想干嘛', + 'return': '别惦记你那个 return 了', + 'getattr': '', + 'setattr': ''} + os.system = '不许' + result = str(eval(code, global_val, {})) + limit = 500 + if len(result) > limit: + result = result[:limit] + except: + result = traceback.format_exc() + end_time = time.time() + result = result.replace(BOTCONFIG.private_key, '***') + result = result.replace(BOTCONFIG.host, '***') + + logger.info(f"{Fore.MAGENTA}safe_eval: {result}") + + if result == '6' or result == 6: + result = '他确实等于 6' + + result = f'{code}\neval result:\n{result}\n耗时: {end_time - start_time} s' + return result + except: + error = traceback.format_exc() + result = f'error:\n{error}' + return result + + +@sio.on('addMessage') # type: ignore +async def add_message(data: Dict[str, Any]): + logger.info(f"{Fore.MAGENTA}add_message: {data}") + + is_self = data['message']['senderId'] == BOTCONFIG.self_id + sender_name = data['message']['username'] + sender_id = data['message']['senderId'] + content = data['message']['content'] + room_id = data['roomId'] + + if not is_self: + if content == '/bot': + message = Message(content='icalingua bot test', + room_id=data['roomId']) + await sio.emit('sendMessage', message.to_json()) + elif content.startswith('=='): + + evals: str = content[2:] + + # quene = multiprocessing.Queue() + # def run(quene, evals): + # go = safe_eval(evals) + # quene.put(go) + # process = multiprocessing.Process(target=run, args=(quene, evals)) + # process.start() + # process.join(1) + # if quene.empty(): + # result = '超时' + # else: + # result = quene.get() + whitelist = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ' ', '.', '+', '-', '*', '/', '(', ')', '<', + '>', '='] + evals = evals.replace('**', '') + express = '' + for text in evals: + if text in whitelist: + express += text + if express == '': + result = '你在干嘛' + else: + result = str(eval(express)) + + reply = ReplyMessage(id=data['message']['_id']) + message = Message(content=result, + reply_to=reply, + room_id=room_id) + + await asyncio.sleep(random.random() * 2) + await sio.emit('sendMessage', message.to_json()) + elif content == '!!jrrp': + randomer = random.Random(f'{sender_id}-{data["message"]["date"]}-jrrp-{_version_}') + result = randomer.randint(0, 50) + randomer.randint(0, 50) + logger.info(f'{sender_name} 今日人品值为 {result}') + reply = ReplyMessage(id=data['message']['_id']) + message = Message(content=f'{sender_name} 今日人品值为 {result}', + reply_to=reply, + room_id=room_id) + await asyncio.sleep(0.5) + await sio.emit('sendMessage', message.to_json()) + # 如果只包括一个或多个 6 + # elif data['message']['content'].replace(' ', '') in ('6', '666', '六', '3+3', '5+1', '4+2', '2+4', '1+5'): + # reply = ReplyMessage(id=data['message']['_id']) + # message = Message(content='你 6 nm 呢', + # reply_to=reply, + # room_id=room_id) + # await asyncio.sleep(0.5) + # await sio.emit('sendMessage', message.to_json()) + + +@sio.on('deleteMessage') # type: ignore +def delete_message(message_id: str): + logger.info(f"{Fore.MAGENTA}delete_message: {message_id}") + + +@sio.on('setMessages') # type: ignore +def set_messages(data: Dict[str, Any]): + logger.info(f"{Fore.YELLOW}set_messages: {data}\nmessage_len: {len(data['messages'])}") + + +async def notice_startup(room_list: List[int]): + for notice_room in BOTCONFIG.notice_room: + if notice_room in room_list: + notice_message = Message(content=f'ica bot v{_version_}', room_id=notice_room) + await sio.emit('sendMessage', notice_message.to_json()) + BotStatus.inited = True + logger.info("inited", tag='notice room') + else: + logger.warn(f"未找到通知房间: {notice_room}", tag='notice room') + await asyncio.sleep(random.randint(2, 5)) + + +@sio.on('setAllRooms') # type: ignore +async def set_all_rooms(rooms: List[Dict[str, Any]]): + BotStatus.running = True + room_list: List[int] = [room.get('roomId') for room in rooms] # type: ignore + if not BotStatus.inited: + logger.info("initing...", tag='setAllRooms') + logger.debug(f"room_list: {room_list}", tag='setAllRooms') + if BOTCONFIG.notice_start: + await notice_startup(room_list) + if room_list != BotStatus.rooms: + logger.info(f"{Fore.YELLOW}set_all_rooms: {rooms}\nlen: {len(rooms)}\n") + BotStatus.rooms = room_list + logger.info(f"更新房间: {room_list}", tag='setAllRooms') + + +@sio.on('setAllChatGroups') # type: ignore +def set_all_chat_groups(groups: List[Dict[str, Any]]): + logger.info(f"{Fore.YELLOW}set_all_chat_groups: {groups}\nlen: {len(groups)}\n") + + +@sio.on('notify') # type: ignore +def notify(data: List[Tuple[str, Any]]): + logger.info(f"notify: {data}") + + +@sio.on('closeLoading') # type: ignore +def close_loading(_): + logger.info(f"{Fore.GREEN}close_loading") + + +@sio.on('onlineData') # type: ignore +def online_data(data: Dict[str, Any]): + logger.info(f"{Fore.GREEN}online_data: {data}") + + +@sio.on('*') # type: ignore +def catch_all(event, data): + logger.info(f"{Fore.RED}catch_all: {event}|{data}") + + + +async def main(): + """ + while True: + await self.eio.wait() + await self.sleep(1) # give the reconnect task time to start up + if not self._reconnect_task: + break + await self._reconnect_task + if self.eio.state != 'connected': + break + """ + await sio.connect(BOTCONFIG.host) + await sio.wait() + +if __name__ == '__main__': + try: + asyncio.run(main()) + except KeyboardInterrupt: + logger.info("KeyboardInterrupt") + except Exception: + logger.error(traceback.format_exc()) +