更新 IcaClient 类,添加 version 属性***

***更新 IcaClientPy 类,添加 get_version 方法***
***更新 base.py,修改回复消息中的版本号显示
This commit is contained in:
shenjack 2024-02-25 21:33:11 +08:00
parent e3708dc41a
commit 469db17c3b
Signed by: shenjack
GPG Key ID: 7B1134A979775551
4 changed files with 149 additions and 107 deletions

View File

@ -115,6 +115,9 @@ class IcaClient:
@property @property
def status() -> IcaStatus: def status() -> IcaStatus:
... ...
@property
def version() -> str:
...
def debug(self, message: str) -> None: def debug(self, message: str) -> None:
"""向日志中输出调试信息""" """向日志中输出调试信息"""

View File

@ -11,5 +11,5 @@ _version_ = "1.1.0"
def on_message(msg: NewMessage, client: IcaClient) -> None: def on_message(msg: NewMessage, client: IcaClient) -> None:
if not (msg.is_from_self or msg.is_reply): if not (msg.is_from_self or msg.is_reply):
if msg.content == "/bot": if msg.content == "/bot":
reply = msg.reply_with(f"ica-async-rs-sync-py {_version_}") reply = msg.reply_with(f"ica-async-rs-sync-py {_version_}({client.version})")
client.send_message(reply) client.send_message(reply)

View File

@ -1,3 +1,4 @@
import io
import re import re
import time import time
import requests import requests
@ -13,7 +14,7 @@ else:
NewMessage = TypeVar("NewMessage") NewMessage = TypeVar("NewMessage")
IcaClient = TypeVar("IcaClient") IcaClient = TypeVar("IcaClient")
_version_ = "2.1.2-rs" _version_ = "2.2.0-rs"
def format_data_size(data_bytes: float) -> str: def format_data_size(data_bytes: float) -> str:
data_lens = ["B", "KB", "MB", "GB", "TB"] data_lens = ["B", "KB", "MB", "GB", "TB"]
@ -82,7 +83,6 @@ def wrap_request(url: str, msg: NewMessage, client: IcaClient) -> Optional[dict]
return response.json() return response.json()
def bmcl_dashboard(msg: NewMessage, client: IcaClient) -> None: def bmcl_dashboard(msg: NewMessage, client: IcaClient) -> None:
req_time = time.time() req_time = time.time()
# 记录请求时间 # 记录请求时间
@ -111,99 +111,129 @@ def bmcl_dashboard(msg: NewMessage, client: IcaClient) -> None:
client.send_message(reply) client.send_message(reply)
def parse_rank(data: dict) -> dict: def check_is_full_data(data: list) -> bool:
rank_data = {"hits": 0, "bytes": 0} return 'user' in data[0]
if "metric" in data:
rank_data["hits"] = data["metric"]["hits"]
rank_data["bytes"] = data["metric"]["bytes"]
return {
"name": data["name"],
"start": data["isEnabled"],
# "full": "全量" if "fullSize" in data else "分片",
# "version": data["version"] if "version" in data else "未知版本",
"owner": data["sponsor"]["name"] if "sponsor" in data else "未知",
"rank": rank_data
}
def bmcl_rank(msg: NewMessage, client: IcaClient, name: Optional[str]) -> None: def display_rank_min(ranks: list, req_time) -> str:
cache = io.StringIO()
cache.write(f"bmclapi v{_version_}-排名({len(ranks)})")
if check_is_full_data(ranks):
cache.write("完整\n")
for rank in ranks:
cache.write('' if rank['isEnabled'] else '')
if 'fullSize' in rank:
cache.write('🌕' if rank['fullSize'] else '🌘')
cache.write(f"-{rank['index']+1:3}")
cache.write(f"|{rank['name']}\n")
else:
cache.write("无cookie\n")
for rank in ranks:
cache.write('' if rank['isEnabled'] else '')
cache.write(f"-{rank['index']+1:3}")
cache.write(f"|{rank['name']}\n")
cache.write(f"请求时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(req_time))}")
return cache.getvalue()
def display_rank_full(ranks: list, req_time) -> str:
cache = io.StringIO()
cache.write(f"bmclapi v{_version_}-排名({len(ranks)})")
if check_is_full_data(ranks):
cache.write("完整\n")
for rank in ranks:
# 基本信息
cache.write('' if rank['isEnabled'] else '')
if 'fullSize' in rank:
cache.write('🌕' if rank['fullSize'] else '🌘')
cache.write(f"|{rank['index']+1:3}|")
cache.write(f"{rank['name']}")
if 'version' in rank:
cache.write(f"|{rank['version']}")
cache.write('\n')
# 用户/赞助信息
if ('user' in rank) and (rank['user'] is not None):
cache.write(f"所有者:{rank['user']['name']}")
if 'sponsor' in rank:
cache.write(f"|赞助者:{rank['sponsor']['name']}")
if 'sponsor' in rank or ('user' in rank and rank['user'] is not None):
cache.write('\n')
# 数据信息
if 'metric' in rank:
hits = format_hit_count(rank['metric']['hits'])
data = format_data_size(rank['metric']['bytes'])
cache.write(f"hit/data|{hits}|{data}")
cache.write('\n')
else:
cache.write("无cookie\n")
for rank in ranks:
cache.write('' if rank['isEnabled'] else '')
cache.write(f"-{rank['index']+1:3}")
cache.write(f"|{rank['name']}|\n")
if 'sponsor' in rank:
cache.write(f"赞助者: {rank['sponsor']['name']}|")
if 'metric' in rank:
cache.write(f"hit/data|{format_hit_count(rank['metric']['hits'])}|{format_data_size(rank['metric']['bytes'])}\n")
cache.write(f"请求时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(req_time))}")
return cache.getvalue()
def bmcl_rank_general(msg, client):
req_time = time.time() req_time = time.time()
# 记录请求时间 # 记录请求时间
rank_data = wrap_request("https://bd.bangbang93.com/openbmclapi/metric/rank", msg, client) rank_data = wrap_request("https://bd.bangbang93.com/openbmclapi/metric/rank", msg, client)
if rank_data is None: if rank_data is None:
return return
ranks = [parse_rank(data) for data in rank_data] # 预处理数据
if name is None: for i, r in enumerate(rank_data):
# 全部排名 r['index'] = i
# 显示前3名 # 显示前3名
limit = 3 ranks = rank_data[:3]
if len(ranks) < limit: # ranks = rank_data
show_ranks = ranks report_msg = display_rank_full(ranks, req_time)
client.info(report_msg)
reply = msg.reply_with(display_rank_full(ranks, req_time))
client.send_message(reply)
def bmcl_rank(msg: NewMessage, client: IcaClient, name: str) -> None:
req_time = time.time()
# 记录请求时间
rank_data = wrap_request("https://bd.bangbang93.com/openbmclapi/metric/rank", msg, client)
if rank_data is None:
return
# 预处理数据
for i, r in enumerate(rank_data):
r['index'] = i
# 搜索是否有这个名字的节点
names = [r["name"].lower() for r in rank_data]
try:
finds = [re.search(name.lower(), n) for n in names]
except re.error as e:
reply = msg.reply_with(f"正则表达式错误: {e}, 请检查输入")
client.send_message(reply)
return
if not any(finds):
reply = msg.reply_with(f"未找到名为{name}的节点")
client.send_message(reply)
return
# 如果找到 > 3 个节点, 则提示 不显示
counts = [f for f in finds if f]
ranks = [rank_data[i] for i, f in enumerate(finds) if f]
if len(counts) > 3:
if len(counts) > 10:
reply = msg.reply_with(f"搜索|{name}|到{len(counts)}个节点, 请用更精确的名字")
else: else:
show_ranks = ranks[:limit] # 4~10 个节点 只显示名称和次序
rank_msg = ( report_msg = display_rank_min(ranks, req_time)
f"{'' if r['start'] else ''}名称: {r['name']}\n" reply = msg.reply_with(report_msg)
# f"-{rank['full']} \n"
# f"版本: {r['version']}\n"
f"赞助者: {r['owner']}|"
f"h/d {format_hit_count(r['rank']['hits'])}|{format_data_size(r['rank']['bytes'])}"
for r in show_ranks
)
rank_msg = "\n".join(rank_msg)
report_msg = (
f"OpenBMCLAPI 面板v{_version_}-排名\n{rank_msg}\n"
f"请求时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(req_time))}\n"
)
reply = msg.reply_with(report_msg)
client.info(report_msg)
client.send_message(reply) client.send_message(reply)
return return
else: # 如果找到 <= 3 个节点, 则显示全部信息
# 搜索是否有这个名字的节点 report_msg = display_rank_full(ranks, req_time)
names = [r["name"].lower() for r in ranks] client.info(report_msg)
try: reply = msg.reply_with(report_msg)
finds = [re.search(name.lower(), n) for n in names] client.send_message(reply)
except re.error as e:
reply = msg.reply_with(f"正则表达式错误: {e}, 请检查输入")
client.send_message(reply)
return
if not any(finds):
reply = msg.reply_with(f"未找到名为{name}的节点")
client.send_message(reply)
return
# 如果找到 > 3 个节点, 则提示 不显示
counts = [True for find in finds if find]
if len(counts) > 3:
if len(counts) > 10:
reply = msg.reply_with(f"搜索|{name}|到{len(counts)}个节点, 请用更精确的名字")
else:
# 4~10 个节点 只显示名称和次序
find_msg = [f"{'' if r['start'] else ''}{r['name']}-No.{i+1}" for i, r in enumerate(ranks) if finds[i]]
find_msg = "\n".join(find_msg)
report_msg = f"OpenBMCLAPI 面板v{_version_}-搜索|{name}|\n{find_msg}"
reply = msg.reply_with(report_msg)
client.send_message(reply)
return
rank_msgs = []
for i, find in enumerate(finds):
if find:
rank = ranks[i]
rank_msg = (
f"{'' if rank['start'] else ''}名称: {rank['name']}-No.{i+1}\n"
# f"-{rank['full']} \n"
# f"版本: {rank['version']}\n"
f"赞助商: {rank['owner']}|"
f"h/d {format_hit_count(rank['rank']['hits'])}|{format_data_size(rank['rank']['bytes'])}"
)
rank_msgs.append(rank_msg)
rank_msgs = "\n".join(rank_msgs)
report_msg = f"OpenBMCLAPI 面板v{_version_}-排名\n{rank_msgs}"
reply = msg.reply_with(report_msg)
client.info(report_msg)
client.send_message(reply)
return
help = """/bmcl -> dashboard help = """/bmcl -> dashboard
@ -219,26 +249,33 @@ help = """/bmcl -> dashboard
def on_message(msg: NewMessage, client: IcaClient) -> None: def on_message(msg: NewMessage, client: IcaClient) -> None:
if not (msg.is_from_self or msg.is_reply): if not (msg.is_from_self or msg.is_reply):
if msg.content.startswith("/bmcl"): if '\n' in msg.content:
if msg.content == "/bmcl": return
bmcl_dashboard(msg, client) try:
elif msg.content == "/bmcl rank": if msg.content.startswith("/bmcl"):
bmcl_rank(msg, client, None) if msg.content == "/bmcl":
elif msg.content.startswith("/bmcl rank") and len(msg.content) > 11: bmcl_dashboard(msg, client)
name = msg.content[11:] elif msg.content == "/bmcl rank":
bmcl_rank(msg, client, name) bmcl_rank_general(msg, client)
else: elif msg.content.startswith("/bmcl rank") and len(msg.content) > 11:
reply = msg.reply_with(help) name = msg.content[11:]
client.send_message(reply)
elif msg.content.startswith("/brrs"):
if msg.content == "/brrs":
reply = msg.reply_with(help)
client.send_message(reply)
else:
name = msg.content.split(" ")
if len(name) > 1:
name = name[1]
bmcl_rank(msg, client, name) bmcl_rank(msg, client, name)
else:
reply = msg.reply_with(help)
client.send_message(reply)
elif msg.content.startswith("/brrs"):
if msg.content == "/brrs":
reply = msg.reply_with(help)
client.send_message(reply)
else:
name = msg.content.split(" ")
if len(name) > 1:
name = name[1]
bmcl_rank(msg, client, name)
except: # noqa
report_msg = f"bmcl插件发生错误,请呼叫shenjack\n{traceback.format_exc()}"
reply = msg.reply_with(report_msg)
client.send_and_warn(reply)
def on_config() -> Tuple[str, str]: def on_config() -> Tuple[str, str]:

View File

@ -193,6 +193,8 @@ impl IcaClientPy {
#[getter] #[getter]
pub fn get_status(&self) -> IcaStatusPy { IcaStatusPy::new() } pub fn get_status(&self) -> IcaStatusPy { IcaStatusPy::new() }
#[getter]
pub fn get_verison(&self) -> String { crate::VERSION.to_string() }
pub fn debug(&self, content: String) { pub fn debug(&self, content: String) {
debug!("{}", content); debug!("{}", content);