362 lines
13 KiB
Python
362 lines
13 KiB
Python
import time
|
||
import random
|
||
import asyncio
|
||
import traceback
|
||
|
||
from typing import Dict, List, Tuple, Any, Optional, Union, Literal
|
||
|
||
import qtoml
|
||
import socketio
|
||
from colorama import Fore, Style
|
||
from nacl.signing import SigningKey
|
||
from lib_not_dr.types import Options
|
||
from mcstatus import JavaServer
|
||
|
||
# 功能包引用处
|
||
from module import hitokoto
|
||
|
||
|
||
# 功能包结束引用
|
||
|
||
# 读取配置
|
||
def get_config() -> Tuple[str, str, int]:
|
||
with open('config.toml', 'r', encoding='utf-8') as f:
|
||
config = qtoml.load(f)
|
||
return config['host'], config['private_key'], config['self_id']
|
||
|
||
|
||
HOST, KEY, SELF_ID = get_config()
|
||
|
||
|
||
class AtElement(Options):
|
||
text: str
|
||
id: Union[int, Literal['all']] = 'all'
|
||
|
||
|
||
class ReplyMessage(Options):
|
||
id: str
|
||
username: str = ''
|
||
content: str = ''
|
||
files: list = []
|
||
|
||
def to_json(self) -> dict:
|
||
return {
|
||
'_id': self.id,
|
||
'username': self.username,
|
||
'content': self.content,
|
||
'files': self.files
|
||
}
|
||
|
||
|
||
class Message(Options):
|
||
content: str
|
||
room_id: Optional[int] = None
|
||
room: Optional[int] = None # room id 和 room 二选一 ( 实际上直接填 room id 就行了 )
|
||
file: None = None # TODO: 上传文件
|
||
reply_to: Optional[ReplyMessage] = None # 源码 给了一个 any TODO: 回复消息
|
||
b64_img: Optional[str] = None # TODO: 发送图片
|
||
at: Optional[List[AtElement]] = [] # TODO: @某人
|
||
sticker: Optional[None] = None # TODO: 发送表情
|
||
message_type: Optional[str] = None # TODO: 消息类型
|
||
|
||
def to_json(self) -> dict:
|
||
return {
|
||
'content': self.content,
|
||
'roomId': self.room_id,
|
||
'room': self.room,
|
||
'file': self.file,
|
||
'replyMessage': self.reply_to.to_json() if self.reply_to else None,
|
||
'b64img': self.b64_img,
|
||
'at': self.at,
|
||
'sticker': self.sticker,
|
||
'messageType': self.message_type
|
||
}
|
||
|
||
|
||
sio: socketio.AsyncClient = socketio.AsyncClient()
|
||
|
||
|
||
@sio.on('connect')
|
||
def connect():
|
||
print(f'{Fore.GREEN}icalingua 已连接{Style.RESET_ALL}')
|
||
|
||
|
||
@sio.on('requireAuth')
|
||
async def require_auth(salt: str, versions: Dict[str, str]):
|
||
print(f"{Fore.BLUE}versions: {versions}{Style.RESET_ALL}\n{type(salt)}|{salt=}")
|
||
# 准备数据
|
||
sign = SigningKey(bytes.fromhex(KEY))
|
||
signature = sign.sign(bytes.fromhex(salt))
|
||
|
||
# 发送数据
|
||
print(f"{len(signature.signature)=} {type(signature.signature)=}")
|
||
await sio.emit('auth', signature.signature)
|
||
print(f"{Fore.BLUE}send auth emit{Style.RESET_ALL}")
|
||
|
||
# @sio.on('requireAuth')
|
||
# def require_auth(*data: Dict[str, Any]):
|
||
# print(f"{Fore.BLUE}requireAuth: {data}{Style.RESET_ALL}")
|
||
|
||
|
||
@sio.on('auth')
|
||
def auth(data: Dict[str, Any]):
|
||
print(f"auth: {data}")
|
||
|
||
|
||
@sio.on('authFailed')
|
||
async def auth_failed():
|
||
print(f"{Fore.RED}authFailed{Style.RESET_ALL}")
|
||
await sio.disconnect()
|
||
|
||
|
||
@sio.on('authSucceed')
|
||
def auth_succeed():
|
||
print(f"{Fore.GREEN}authSucceed{Style.RESET_ALL}")
|
||
|
||
|
||
@sio.on('connect_error')
|
||
def connect_error(*args, **kwargs):
|
||
print(f"连接错误 {args}, {kwargs}")
|
||
|
||
|
||
@sio.on('updateRoom')
|
||
def update_room(data: Dict[str, Any]):
|
||
print(f"{Fore.CYAN}update_room: {data}{Style.RESET_ALL}")
|
||
|
||
|
||
def safe_eval(code: str) -> str:
|
||
try:
|
||
# code = code.replace('help', '坏东西!\n')
|
||
# code = code.replace('bytes', '坏东西!\n')
|
||
# code = code.replace('encode', '坏东西!\n')
|
||
# code = code.replace('decode', '坏东西!\n')
|
||
# code = code.replace('compile', '屑的!\n')
|
||
# code = code.replace('globals', '拿不到!\n')
|
||
code = code.replace('os', '坏东西!\n')
|
||
code = code.replace('sys', '坏东西!\n')
|
||
# code = code.replace('input', '坏东西!\n')
|
||
# code = code.replace('__', '啊哈!\n')
|
||
# code = code.replace('import', '很坏!\n')
|
||
code = code.replace(' kill', '别跑!\n')
|
||
code = code.replace(' rm ', '别跑!\n')
|
||
code = code.replace('exit', '好坏!\n')
|
||
code = code.replace('eval', '啊哈!\n')
|
||
code = code.replace('exec', '抓住!\n')
|
||
start_time = time.time()
|
||
try:
|
||
import os
|
||
import math
|
||
import decimal
|
||
global_val = {'time': time,
|
||
'math': math,
|
||
'decimal': decimal,
|
||
'random': random,
|
||
'__import__': '<built-in function __import__>',
|
||
'globals': '也别惦记你那个 globals 了',
|
||
'compile': '想得美',
|
||
'help': '虽然但是 help 也不行',
|
||
'exit': '不许 exit',
|
||
'input': '你想干嘛',
|
||
'return': '别惦记你那个 return 了',
|
||
'getattr': '<built-in function getattr>',
|
||
'setattr': '<built-in function setattr>'}
|
||
os.system = '不许'
|
||
result = str(eval(code, global_val, {}))
|
||
limit = 500
|
||
if len(result) > limit:
|
||
result = result[:limit]
|
||
except:
|
||
result = traceback.format_exc()
|
||
end_time = time.time()
|
||
result = result.replace(KEY, '***')
|
||
result = result.replace(HOST, '***')
|
||
|
||
print(f"{Fore.MAGENTA}safe_eval: {result}{Style.RESET_ALL}")
|
||
|
||
if result == '6' or result == 6:
|
||
result = '他确实等于 6'
|
||
|
||
result = f'{code}\neval result:\n{result}\n耗时: {end_time - start_time} s'
|
||
return result
|
||
except:
|
||
error = traceback.format_exc()
|
||
result = f'error:\n{error}'
|
||
return result
|
||
|
||
|
||
# 获取到新的消息
|
||
@sio.on('addMessage')
|
||
async def add_message(data: Dict[str, Any]):
|
||
|
||
print(f"{Fore.MAGENTA}add_message: {data}{Style.RESET_ALL}")
|
||
|
||
is_self = data['message']['senderId'] == SELF_ID
|
||
sender_name = data['message']['username']
|
||
sender_id = data['message']['senderId']
|
||
content = data['message']['content']
|
||
room_id = data['roomId']
|
||
reply = ReplyMessage(id=data['message']['_id'])
|
||
|
||
if not is_self:
|
||
# /bot
|
||
if data.get('message').get('content') == '/bot':
|
||
message = Message(content='icalingua bot test',
|
||
room_id=data['roomId'])
|
||
await sio.emit('sendMessage', message.to_json())
|
||
|
||
# !!status
|
||
elif content == '!!status':
|
||
server = JavaServer.lookup("192.168.1.6:25565")
|
||
status=server.status()
|
||
# query = server.query()
|
||
|
||
# if status.players.online != 0:
|
||
# message = Message(content=f"此服务器有 {status.players.online} 个玩家在线\n当前在线玩家有 {.join(query.players.names)}",room_id=data['roomId'])
|
||
# else:
|
||
# message = Message(content=f"此服务器空无一人",room_id=data['roomId'])
|
||
|
||
message = Message(content=f"此服务器有 {status.players.online} 个玩家在线",reply_to=reply,room_id=data['roomId'])
|
||
|
||
await sio.emit('sendMessage', message.to_json())
|
||
|
||
# !!players
|
||
elif content == '!!players':
|
||
server = JavaServer.lookup("192.168.1.6:25565")
|
||
query = server.query()
|
||
players = query.players.names
|
||
message = Message(content=f"此服务器当前在线玩家有 {players}",reply_to=reply,room_id=data['roomId'])
|
||
|
||
await sio.emit('sendMessage', message.to_json())
|
||
|
||
# !!hitokoto
|
||
elif content.startswith("!!hitokoto"):
|
||
|
||
ctypet = content[len("!!hitokoto "):]
|
||
if content == "!!hitokoto":
|
||
result_code, result_data, result_type= hitokoto.hitokoto(None)
|
||
else:
|
||
result_code, result_data, result_type= hitokoto.hitokoto(ctypet)
|
||
|
||
if result_code == 3:
|
||
message = Message(content=f"参数不对哦!你可以输入!!hitokoto help查看帮助",reply_to=reply,room_id=data['roomId'])
|
||
elif result_code == 2:
|
||
message = Message(content=f"发生了不可描述的错误X_X,但可以肯定的是模块炸了(",reply_to=reply,room_id=data['roomId'])
|
||
elif result_code == 1:
|
||
message = Message(content=f"(参数)句子类型如下\na 动画\nb 漫画\nc 游戏\nd 文学\ne 原创\nf 来自网络\ng 其他\nh 影视\ni 诗词\nj 网易云\nk 哲学\nl 抖机灵",reply_to=reply,room_id=data['roomId'])
|
||
elif result_code == 0:
|
||
message = Message(content=f"“{result_data['hitokoto']}”\n来源:{result_data['from']}\n类型:{result_type}",reply_to=reply,room_id=data['roomId'])
|
||
else:
|
||
message = Message(content=f"你看到这条消息就代表有bug出炉,但肯定不是模块炸了(",reply_to=reply,room_id=data['roomId'])
|
||
|
||
await sio.emit('sendMessage', message.to_json())
|
||
|
||
# 未知
|
||
elif data.get('message').get('content').startswith('=='):
|
||
|
||
evals: str = data.get('message').get('content')[2:]
|
||
|
||
# quene = multiprocessing.Queue()
|
||
# def run(quene, evals):
|
||
# go = safe_eval(evals)
|
||
# quene.put(go)
|
||
# process = multiprocessing.Process(target=run, args=(quene, evals))
|
||
# process.start()
|
||
# process.join(1)
|
||
# if quene.empty():
|
||
# result = '超时'
|
||
# else:
|
||
# result = quene.get()
|
||
whitelist = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ' ', '.', '+', '-', '*', '/', '(', ')', '<',
|
||
'>', '=']
|
||
evals = evals.replace('**', '')
|
||
express = ''
|
||
for text in evals:
|
||
if text in whitelist:
|
||
express += text
|
||
if express == '':
|
||
result = '你在干嘛'
|
||
else:
|
||
result = str(eval(express))
|
||
|
||
reply = ReplyMessage(id=data['message']['_id'])
|
||
message = Message(content=result,
|
||
reply_to=reply,
|
||
room_id=room_id)
|
||
|
||
await asyncio.sleep(random.random() * 2)
|
||
await sio.emit('sendMessage', message.to_json())
|
||
|
||
# !!jrrp
|
||
elif data['message']['content'] == '!!jrrp':
|
||
randomer = random.Random(f'{sender_id}-{data["message"]["date"]}-jrrp-v2')
|
||
result = randomer.randint(0, 50) + randomer.randint(0, 50)
|
||
print(f'{sender_name} 今日人品值为 {result}')
|
||
message = Message(content=f'{sender_name} 今日人品值为 {result}',
|
||
reply_to=reply,
|
||
room_id=room_id)
|
||
await asyncio.sleep(0.5)
|
||
await sio.emit('sendMessage', message.to_json())
|
||
|
||
# 如果只包括一个或多个 6
|
||
# elif data['message']['content'].replace(' ', '') in ('6', '666', '六', '3+3', '5+1', '4+2', '2+4', '1+5'):
|
||
# reply = ReplyMessage(id=data['message']['_id'])
|
||
# message = Message(content='你 6 nm 呢',
|
||
# reply_to=reply,
|
||
# room_id=room_id)
|
||
# await asyncio.sleep(0.5)
|
||
# await sio.emit('sendMessage', message.to_json())
|
||
|
||
|
||
@sio.on('deleteMessage')
|
||
def delete_message(message_id: str):
|
||
print(f"{Fore.MAGENTA}delete_message: {message_id}{Style.RESET_ALL}")
|
||
|
||
|
||
@sio.on('setMessages')
|
||
def set_messages(data: Dict[str, Any]):
|
||
print(f"{Fore.YELLOW}set_messages: {data}\nmessage_len: {len(data['messages'])}{Style.RESET_ALL}")
|
||
|
||
|
||
@sio.on('setAllRooms')
|
||
def set_all_rooms(rooms: List[Dict[str, Any]]):
|
||
print(f"{Fore.YELLOW}set_all_rooms: {rooms}\nlen: {len(rooms)}\n{Style.RESET_ALL}")
|
||
|
||
|
||
@sio.on('setAllChatGroups')
|
||
def set_all_chat_groups(groups: List[Dict[str, Any]]):
|
||
print(f"{Fore.YELLOW}set_all_chat_groups: {groups}\nlen: {len(groups)}\n{Style.RESET_ALL}")
|
||
|
||
|
||
@sio.on('notify')
|
||
def notify(data: List[Tuple[str, Any]]):
|
||
print(f"notify: {data}")
|
||
|
||
|
||
@sio.on('closeLoading')
|
||
def close_loading(_):
|
||
print(f"{Fore.GREEN}close_loading{Style.RESET_ALL}")
|
||
|
||
|
||
@sio.on('onlineData')
|
||
def online_data(data: Dict[str, Any]):
|
||
print(f"{Fore.GREEN}online_data: {data}{Style.RESET_ALL}")
|
||
|
||
|
||
@sio.on('*')
|
||
def catch_all(event, data):
|
||
print(f"{Fore.RED}catch_all: {event}|{data}{Style.RESET_ALL}")
|
||
|
||
|
||
async def main():
|
||
await sio.connect(HOST)
|
||
await sio.wait()
|
||
|
||
# await sio.emit('requireAuth', ('', {'version': '', 'protocolVersion': ''}))
|
||
# await asyncio.sleep(2)
|
||
|
||
# await asyncio.gather(sio.wait(), sio.wait(), sio.wait())
|
||
|
||
|
||
if __name__ == '__main__':
|
||
asyncio.run(main())
|