icalingua-python-bot/connect.py

359 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import random
import asyncio
import traceback
from typing import Dict, List, Tuple, Any, Optional, Union, Literal
import qtoml
import socketio
from colorama import Fore, Style
from nacl.signing import SigningKey
from lib_not_dr.types import Options
from mcstatus import JavaServer
# 功能包引用处
from module import hitokoto
# 功能包结束引用
def get_config() -> Tuple[str, str, int]:
with open('config.toml', 'r', encoding='utf-8') as f:
config = qtoml.load(f)
return config['host'], config['private_key'], config['self_id']
HOST, KEY, SELF_ID = get_config()
class AtElement(Options):
text: str
id: Union[int, Literal['all']] = 'all'
class ReplyMessage(Options):
id: str
username: str = ''
content: str = ''
files: list = []
def to_json(self) -> dict:
return {
'_id': self.id,
'username': self.username,
'content': self.content,
'files': self.files
}
class Message(Options):
content: str
room_id: Optional[int] = None
room: Optional[int] = None # room id 和 room 二选一 ( 实际上直接填 room id 就行了 )
file: None = None # TODO: 上传文件
reply_to: Optional[ReplyMessage] = None # 源码 给了一个 any TODO: 回复消息
b64_img: Optional[str] = None # TODO: 发送图片
at: Optional[List[AtElement]] = [] # TODO: @某人
sticker: Optional[None] = None # TODO: 发送表情
message_type: Optional[str] = None # TODO: 消息类型
def to_json(self) -> dict:
return {
'content': self.content,
'roomId': self.room_id,
'room': self.room,
'file': self.file,
'replyMessage': self.reply_to.to_json() if self.reply_to else None,
'b64img': self.b64_img,
'at': self.at,
'sticker': self.sticker,
'messageType': self.message_type
}
sio: socketio.AsyncClient = socketio.AsyncClient()
@sio.on('connect')
def connect():
print(f'{Fore.GREEN}icalingua 已连接{Style.RESET_ALL}')
@sio.on('requireAuth')
async def require_auth(salt: str, versions: Dict[str, str]):
print(f"{Fore.BLUE}versions: {versions}{Style.RESET_ALL}\n{type(salt)}|{salt=}")
# 准备数据
sign = SigningKey(bytes.fromhex(KEY))
signature = sign.sign(bytes.fromhex(salt))
# 发送数据
print(f"{len(signature.signature)=} {type(signature.signature)=}")
await sio.emit('auth', signature.signature)
print(f"{Fore.BLUE}send auth emit{Style.RESET_ALL}")
# @sio.on('requireAuth')
# def require_auth(*data: Dict[str, Any]):
# print(f"{Fore.BLUE}requireAuth: {data}{Style.RESET_ALL}")
@sio.on('auth')
def auth(data: Dict[str, Any]):
print(f"auth: {data}")
@sio.on('authFailed')
async def auth_failed():
print(f"{Fore.RED}authFailed{Style.RESET_ALL}")
await sio.disconnect()
@sio.on('authSucceed')
def auth_succeed():
print(f"{Fore.GREEN}authSucceed{Style.RESET_ALL}")
@sio.on('connect_error')
def connect_error(*args, **kwargs):
print(f"连接错误 {args}, {kwargs}")
@sio.on('updateRoom')
def update_room(data: Dict[str, Any]):
print(f"{Fore.CYAN}update_room: {data}{Style.RESET_ALL}")
def safe_eval(code: str) -> str:
try:
# code = code.replace('help', '坏东西!\n')
# code = code.replace('bytes', '坏东西!\n')
# code = code.replace('encode', '坏东西!\n')
# code = code.replace('decode', '坏东西!\n')
# code = code.replace('compile', '屑的!\n')
# code = code.replace('globals', '拿不到!\n')
code = code.replace('os', '坏东西!\n')
code = code.replace('sys', '坏东西!\n')
# code = code.replace('input', '坏东西!\n')
# code = code.replace('__', '啊哈!\n')
# code = code.replace('import', '很坏!\n')
code = code.replace(' kill', '别跑!\n')
code = code.replace(' rm ', '别跑!\n')
code = code.replace('exit', '好坏!\n')
code = code.replace('eval', '啊哈!\n')
code = code.replace('exec', '抓住!\n')
start_time = time.time()
try:
import os
import math
import decimal
global_val = {'time': time,
'math': math,
'decimal': decimal,
'random': random,
'__import__': '<built-in function __import__>',
'globals': '也别惦记你那个 globals 了',
'compile': '想得美',
'help': '虽然但是 help 也不行',
'exit': '不许 exit',
'input': '你想干嘛',
'return': '别惦记你那个 return 了',
'getattr': '<built-in function getattr>',
'setattr': '<built-in function setattr>'}
os.system = '不许'
result = str(eval(code, global_val, {}))
limit = 500
if len(result) > limit:
result = result[:limit]
except:
result = traceback.format_exc()
end_time = time.time()
result = result.replace(KEY, '***')
result = result.replace(HOST, '***')
print(f"{Fore.MAGENTA}safe_eval: {result}{Style.RESET_ALL}")
if result == '6' or result == 6:
result = '他确实等于 6'
result = f'{code}\neval result:\n{result}\n耗时: {end_time - start_time} s'
return result
except:
error = traceback.format_exc()
result = f'error:\n{error}'
return result
@sio.on('addMessage')
async def add_message(data: Dict[str, Any]):
print(f"{Fore.MAGENTA}add_message: {data}{Style.RESET_ALL}")
is_self = data['message']['senderId'] == SELF_ID
sender_name = data['message']['username']
sender_id = data['message']['senderId']
content = data['message']['content']
room_id = data['roomId']
reply = ReplyMessage(id=data['message']['_id'])
if not is_self:
if data.get('message').get('content') == '/bot':
message = Message(content='icalingua bot test',
room_id=data['roomId'])
await sio.emit('sendMessage', message.to_json())
elif content == '!!status':
server = JavaServer.lookup("192.168.1.6:25565")
status=server.status()
# query = server.query()
# if status.players.online != 0:
# message = Message(content=f"此服务器有 {status.players.online} 个玩家在线\n当前在线玩家有 {.join(query.players.names)}",room_id=data['roomId'])
# else:
# message = Message(content=f"此服务器空无一人",room_id=data['roomId'])
message = Message(content=f"此服务器有 {status.players.online} 个玩家在线",reply_to=reply,room_id=data['roomId'])
await sio.emit('sendMessage', message.to_json())
elif content == '!!players':
server = JavaServer.lookup("192.168.1.6:25565")
query = server.query()
players = query.players.names
message = Message(content=f"此服务器当前在线玩家有 {players}",reply_to=reply,room_id=data['roomId'])
await sio.emit('sendMessage', message.to_json())
elif content.startswith("!!hitokoto"):
ctypet = content[len("!!hitokoto "):]
if content == "!!hitokoto":
result_code, result_data, result_type= hitokoto.hitokoto(None)
else:
result_code, result_data, result_type= hitokoto.hitokoto(ctypet)
if result_code == 3:
message = Message(content=f"参数不对哦!你可以输入!!hitokoto help查看帮助",reply_to=reply,room_id=data['roomId'])
elif result_code == 2:
message = Message(content=f"发生了不可描述的错误X_X但可以肯定的是模块炸了",reply_to=reply,room_id=data['roomId'])
elif result_code == 1:
message = Message(content=f"(参数)句子类型如下\na 动画\nb 漫画\nc 游戏\nd 文学\ne 原创\nf 来自网络\ng 其他\nh 影视\ni 诗词\nj 网易云\nk 哲学\nl 抖机灵",reply_to=reply,room_id=data['roomId'])
elif result_code == 0:
message = Message(content=f"{result_data['hitokoto']}\n来源:{result_data['from']}\n类型:{result_type}",reply_to=reply,room_id=data['roomId'])
else:
message = Message(content=f"你看到这条消息就代表有bug出炉但肯定不是模块炸了",reply_to=reply,room_id=data['roomId'])
await sio.emit('sendMessage', message.to_json())
elif data.get('message').get('content').startswith('=='):
evals: str = data.get('message').get('content')[2:]
# quene = multiprocessing.Queue()
# def run(quene, evals):
# go = safe_eval(evals)
# quene.put(go)
# process = multiprocessing.Process(target=run, args=(quene, evals))
# process.start()
# process.join(1)
# if quene.empty():
# result = '超时'
# else:
# result = quene.get()
whitelist = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ' ', '.', '+', '-', '*', '/', '(', ')', '<',
'>', '=']
evals = evals.replace('**', '')
express = ''
for text in evals:
if text in whitelist:
express += text
if express == '':
result = '你在干嘛'
else:
result = str(eval(express))
reply = ReplyMessage(id=data['message']['_id'])
message = Message(content=result,
reply_to=reply,
room_id=room_id)
await asyncio.sleep(random.random() * 2)
await sio.emit('sendMessage', message.to_json())
elif data['message']['content'] == '!!jrrp':
examSet = False
if examSet == True:
print('examSet参数被设定替换人品值')
message = Message(content=f'{sender_name} 今日人品值为 100\n为了让你安心备考加油考试今天你的人品值是100',
reply_to=reply,
room_id=room_id)
await asyncio.sleep(0.5)
await sio.emit('sendMessage', message.to_json())
else:
randomer = random.Random(f'{sender_id}-{data["message"]["date"]}-jrrp-v2')
result = randomer.randint(0, 50) + randomer.randint(0, 50)
print(f'{sender_name} 今日人品值为 {result}')
message = Message(content=f'{sender_name} 今日人品值为 {result}',
reply_to=reply,
room_id=room_id)
await asyncio.sleep(0.5)
await sio.emit('sendMessage', message.to_json())
# 如果只包括一个或多个 6
# elif data['message']['content'].replace(' ', '') in ('6', '666', '六', '3+3', '5+1', '4+2', '2+4', '1+5'):
# reply = ReplyMessage(id=data['message']['_id'])
# message = Message(content='你 6 nm 呢',
# reply_to=reply,
# room_id=room_id)
# await asyncio.sleep(0.5)
# await sio.emit('sendMessage', message.to_json())
@sio.on('deleteMessage')
def delete_message(message_id: str):
print(f"{Fore.MAGENTA}delete_message: {message_id}{Style.RESET_ALL}")
@sio.on('setMessages')
def set_messages(data: Dict[str, Any]):
print(f"{Fore.YELLOW}set_messages: {data}\nmessage_len: {len(data['messages'])}{Style.RESET_ALL}")
@sio.on('setAllRooms')
def set_all_rooms(rooms: List[Dict[str, Any]]):
print(f"{Fore.YELLOW}set_all_rooms: {rooms}\nlen: {len(rooms)}\n{Style.RESET_ALL}")
@sio.on('setAllChatGroups')
def set_all_chat_groups(groups: List[Dict[str, Any]]):
print(f"{Fore.YELLOW}set_all_chat_groups: {groups}\nlen: {len(groups)}\n{Style.RESET_ALL}")
@sio.on('notify')
def notify(data: List[Tuple[str, Any]]):
print(f"notify: {data}")
@sio.on('closeLoading')
def close_loading(_):
print(f"{Fore.GREEN}close_loading{Style.RESET_ALL}")
@sio.on('onlineData')
def online_data(data: Dict[str, Any]):
print(f"{Fore.GREEN}online_data: {data}{Style.RESET_ALL}")
@sio.on('*')
def catch_all(event, data):
print(f"{Fore.RED}catch_all: {event}|{data}{Style.RESET_ALL}")
async def main():
await sio.connect(HOST)
await sio.wait()
# await sio.emit('requireAuth', ('', {'version': '', 'protocolVersion': ''}))
# await asyncio.sleep(2)
# await asyncio.gather(sio.wait(), sio.wait(), sio.wait())
if __name__ == '__main__':
asyncio.run(main())