socket-bot/main.py

353 lines
12 KiB
Python

import time
import json
import random
import asyncio
import argparse
import traceback
from typing import Dict, List, Tuple, Any
import aiohttp
import socketio
from colorama import Fore
from nacl.signing import SigningKey
# from lib_not_dr.types import Options
from lib_not_dr.loggers import config
from data_struct import Message, ReplyMessage, get_config, BotConfig, BotStatus
_version_ = "0.2.3"
logger = config.get_logger("icalingua")
BOTCONFIG: BotConfig = get_config()
if __name__ == "__main__":
# --debug
# --config=config.toml
# -n --no-notice
parser = argparse.ArgumentParser()
parser.add_argument("-d", "--debug", action="store_true")
parser.add_argument("-n", "--no-notice", action="store_true")
parser.add_argument("-c", "--config", type=str)
args = parser.parse_args()
if args.debug:
logger.global_level = 0
if args.config:
# global BOTCONFIG
BOTCONFIG: BotConfig = get_config(args.config)
if args.no_notice:
BOTCONFIG.notice_start = False
BotStatus = BotStatus()
sio: socketio.AsyncClient = socketio.AsyncClient()
@sio.on("connect") # type: ignore
def connect():
logger.info(f"{Fore.GREEN}icalingua 已连接")
@sio.on("requireAuth") # type: ignore
async def require_auth(salt: str, versions: Dict[str, str]):
logger.info(f"{Fore.BLUE}versions: {versions}\n{type(salt)}|{salt=}")
# 准备数据
sign = SigningKey(bytes.fromhex(BOTCONFIG.private_key))
signature = sign.sign(bytes.fromhex(salt))
# 发送数据
await sio.emit("auth", signature.signature)
logger.info(f"{Fore.BLUE}send auth emit")
# @sio.on('requireAuth')
# def require_auth(*data: Dict[str, Any]):
# logger.info(f"{Fore.BLUE}requireAuth: {data}")
@sio.on("auth") # type: ignore
def auth(data: Dict[str, Any]):
logger.info(f"auth: {data}")
@sio.on("authFailed") # type: ignore
async def auth_failed():
logger.info(f"{Fore.RED}authFailed")
await sio.disconnect()
@sio.on("authSucceed") # type: ignore
def auth_succeed():
logger.info(f"{Fore.GREEN}authSucceed")
@sio.on("connect_error") # type: ignore
def connect_error(*args, **kwargs):
logger.info(f"连接错误 {args}, {kwargs}")
@sio.on("updateRoom") # type: ignore
def update_room(data: Dict[str, Any]):
logger.info(f"{Fore.CYAN}update_room: {data}")
def safe_eval(code: str) -> str:
try:
# code = code.replace('help', '坏东西!\n')
# code = code.replace('bytes', '坏东西!\n')
# code = code.replace('encode', '坏东西!\n')
# code = code.replace('decode', '坏东西!\n')
# code = code.replace('compile', '屑的!\n')
# code = code.replace('globals', '拿不到!\n')
code = code.replace("os", "坏东西!\n")
code = code.replace("sys", "坏东西!\n")
# code = code.replace('input', '坏东西!\n')
# code = code.replace('__', '啊哈!\n')
# code = code.replace('import', '很坏!\n')
code = code.replace(" kill", "别跑!\n")
code = code.replace(" rm ", "别跑!\n")
code = code.replace("exit", "好坏!\n")
code = code.replace("eval", "啊哈!\n")
code = code.replace("exec", "抓住!\n")
start_time = time.time()
try:
import os
import math
import decimal
global_val = {
"time": time,
"math": math,
"decimal": decimal,
"random": random,
"__import__": "<built-in function __import__>",
"globals": "也别惦记你那个 globals 了",
"compile": "想得美",
"help": "虽然但是 help 也不行",
"exit": "不许 exit",
"input": "你想干嘛",
"return": "别惦记你那个 return 了",
"getattr": "<built-in function getattr>",
"setattr": "<built-in function setattr>",
}
os.system = "不许"
result = str(eval(code, global_val, {}))
limit = 500
if len(result) > limit:
result = result[:limit]
except:
result = traceback.format_exc()
end_time = time.time()
result = result.replace(BOTCONFIG.private_key, "***")
result = result.replace(BOTCONFIG.host, "***")
logger.info(f"{Fore.MAGENTA}safe_eval: {result}")
if result == "6" or result == 6:
result = "他确实等于 6"
result = f"{code}\neval result:\n{result}\n耗时: {end_time - start_time} s"
return result
except:
error = traceback.format_exc()
result = f"error:\n{error}"
return result
@sio.on("addMessage") # type: ignore
async def add_message(data: Dict[str, Any]):
logger.info(f"{Fore.MAGENTA}add_message: {data}")
is_self = data["message"]["senderId"] == BOTCONFIG.self_id
sender_name = data["message"]["username"]
sender_id = data["message"]["senderId"]
content = data["message"]["content"]
room_id = data["roomId"]
msg_id = data["message"]["_id"]
reply_msg = Message(content="", room_id=room_id, reply_to=ReplyMessage(id=msg_id))
if not is_self:
if content == "/bot":
message = reply_msg.to_content(f"icalingua bot pong v{_version_}")
await sio.emit("sendMessage", message.to_json())
elif content.startswith("=="):
evals: str = content[2:]
result = safe_eval(evals)
# whitelist = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ' ', '.', '+', '-', '*', '/', '(', ')', '<',
# '>', '=']
# evals = evals.replace('**', '')
# express = ''
# for text in evals:
# if text in whitelist:
# express += text
# if express == '':
# result = '你在干嘛'
# else:
# result = str(eval(express))
message = reply_msg.to_content(result)
await asyncio.sleep(random.random() * 2)
await sio.emit("sendMessage", message.to_json())
elif content == "!!jrrp":
randomer = random.Random(
f'{sender_id}-{data["message"]["date"]}-jrrp-{_version_}'
)
result = randomer.randint(0, 50) + randomer.randint(0, 50)
logger.info(f"{sender_name} 今日人品值为 {result}")
message = reply_msg.to_content(f"{sender_name} 今日人品为 {result}")
await asyncio.sleep(0.5)
await sio.emit("sendMessage", message.to_json())
elif content == "/bmcl":
await asyncio.sleep(0.1)
await sio.emit(
"sendMessage", reply_msg.to_content("请求数据中……").to_json()
)
async with aiohttp.ClientSession() as session:
async with session.get(
"https://bd.bangbang93.com/openbmclapi/metric/dashboard"
) as response:
if not response.status == 200 or response.reason != "OK":
await sio.emit(
"sendMessage",
reply_msg.to_content(
f"请求数据失败\n{response.status}"
).to_json(),
)
logger.warn(
f"数据请求失败, 请检查网络\n{response.status}",
tag="bmclapi_dashboard",
)
return
raw_data = await response.text()
try:
data = json.loads(raw_data)
data_bytes: int = data["bytes"]
data_hits: int = data["hits"]
data_bandwidth: float = data["currentBandwidth"]
load_str: float = data["load"] * 100
online_node: int = data["currentNodes"]
online_bandwidth: int = data["bandwidth"]
data_lens = ["B", "KB", "MB", "GB", "TB"]
for i in range(5):
if data_bandwidth < 1024:
data_bandwidth = round(data_bandwidth, 5)
data_bandwidth = f"{data_bandwidth}{data_lens[i]}"
break
data_bandwidth /= 1024
report_msg = (
"OpenBMCLAPI 状态:\n"
f"在线节点: {online_node} 带宽: {online_bandwidth}Mbps\n"
f"实时负载带宽: {data_bandwidth:.5f}Mbps 负载: {load_str:.3f}%\n"
f"当日 总请求: {data_hits} 总数据量: {data_bytes}"
)
await sio.emit(
"sendMessage",
reply_msg.to_content(report_msg).to_json()
)
except (json.JSONDecodeError, AttributeError, ValueError) as e:
await sio.emit(
"sendMessage",
reply_msg.to_content(f"返回数据解析错误\n{e}").to_json(),
)
logger.warn(f"返回数据解析错误\n{e}", tag="bmclapi_dashboard")
"""
async with aiohttp.ClientSession() as session:
async with session.get('http://httpbin.org/get') as resp:
print(resp.status)
print(await resp.text())"""
@sio.on("deleteMessage") # type: ignore
def delete_message(message_id: str):
logger.info(f"{Fore.MAGENTA}delete_message: {message_id}")
@sio.on("setMessages") # type: ignore
def set_messages(data: Dict[str, Any]):
logger.info(
f"{Fore.YELLOW}set_messages: {data}\nmessage_len: {len(data['messages'])}"
)
async def notice_startup(room_list: List[int]):
for notice_room in BOTCONFIG.notice_room:
if notice_room in room_list:
notice_message = Message(
content=f"ica bot v{_version_}", room_id=notice_room
)
await sio.emit("sendMessage", notice_message.to_json())
BotStatus.inited = True
logger.info("inited", tag="notice room")
else:
logger.warn(f"未找到通知房间: {notice_room}", tag="notice room")
await asyncio.sleep(random.randint(2, 5))
@sio.on("setAllRooms") # type: ignore
async def set_all_rooms(rooms: List[Dict[str, Any]]):
BotStatus.running = True
room_list: List[int] = [room.get("roomId") for room in rooms] # type: ignore
if not BotStatus.inited:
logger.info("initing...", tag="setAllRooms")
logger.debug(f"room_list: {room_list}", tag="setAllRooms")
if BOTCONFIG.notice_start:
await notice_startup(room_list)
if room_list != BotStatus.rooms:
logger.info(f"{Fore.YELLOW}set_all_rooms: {rooms}\nlen: {len(rooms)}\n")
BotStatus.rooms = room_list
logger.info(f"更新房间: {room_list}", tag="setAllRooms")
@sio.on("setAllChatGroups") # type: ignore
def set_all_chat_groups(groups: List[Dict[str, Any]]):
logger.info(f"{Fore.YELLOW}set_all_chat_groups: {groups}\nlen: {len(groups)}\n")
@sio.on("notify") # type: ignore
def notify(data: List[Tuple[str, Any]]):
logger.info(f"notify: {data}")
@sio.on("closeLoading") # type: ignore
def close_loading(_):
logger.info(f"{Fore.GREEN}close_loading")
@sio.on("onlineData") # type: ignore
def online_data(data: Dict[str, Any]):
logger.info(f"{Fore.GREEN}online_data: {data}")
@sio.on("*") # type: ignore
def catch_all(event, data):
logger.info(f"{Fore.RED}catch_all: {event}|{data}")
async def main():
"""
while True:
await self.eio.wait()
await self.sleep(1) # give the reconnect task time to start up
if not self._reconnect_task:
break
await self._reconnect_task
if self.eio.state != 'connected':
break
"""
await sio.connect(BOTCONFIG.host)
await sio.wait()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("KeyboardInterrupt")
except Exception:
logger.error(traceback.format_exc())