2025-08-23 22:45:49 +08:00

448 lines
15 KiB
Python

import tkinter as tk
import webbrowser
from tkinter import messagebox, ttk, filedialog
import re
import threading
import requests
import json
from pynput import keyboard
DEFAULT_STRENGTH = 20
DEFAULT_TIME = 5000
MAX_TIME = 30000
MAX_STRENGTH = 40
root = tk.Tk()
root.title("按键监听一键开火工具")
connection_code_var = tk.StringVar()
key_settings = []
monitoring = False
monitor_thread = None
hotkey_infos = []
pulse_list_cache = []
def center_window(window, width, height):
window.update_idletasks()
x = (window.winfo_screenwidth() - width) // 2
y = (window.winfo_screenheight() - height) // 2
window.geometry(f"{width}x{height}+{x}+{y}")
def parse_connection_code(code):
m = re.match(r"^([a-fA-F0-9\-]+)@(.+)$", code.strip())
if not m:
return None, None
client_id, api_host = m.group(1), m.group(2)
return client_id, api_host
def send_fire_request(client_id, api_host, strength, time_ms, override, pulseid):
url = f"{api_host}/api/v2/game/{client_id}/action/fire"
data = {
"strength": strength,
"time": time_ms,
"override": override,
}
if pulseid:
data["pulseId"] = pulseid
try:
resp = requests.post(url, json=data, timeout=5)
if resp.status_code == 200:
return True, resp.text
return False, f"HTTP {resp.status_code}: {resp.text}"
except Exception as e:
return False, str(e)
def get_pulse_list(client_id, api_host):
global pulse_list_cache
url = f"{api_host}/api/v2/game/{client_id}/pulse_list"
try:
resp = requests.get(url, timeout=5)
if resp.status_code == 200:
d = resp.json()
if d.get("status") == 1 and "pulseList" in d:
pulse_list_cache = d["pulseList"]
return pulse_list_cache
return []
except Exception:
return []
def parse_hotkey(hotkey_str):
key_map = {
"ctrl": keyboard.Key.ctrl,
"alt": keyboard.Key.alt,
"shift": keyboard.Key.shift,
"cmd": keyboard.Key.cmd,
"win": keyboard.Key.cmd,
"super": keyboard.Key.cmd,
"enter": keyboard.Key.enter,
"tab": keyboard.Key.tab,
"esc": keyboard.Key.esc,
"space": keyboard.Key.space,
"up": keyboard.Key.up,
"down": keyboard.Key.down,
"left": keyboard.Key.left,
"right": keyboard.Key.right,
"delete": keyboard.Key.delete,
"backspace": keyboard.Key.backspace,
}
parts = [p.strip().lower() for p in hotkey_str.split('+')]
keys = set()
for p in parts:
if p in key_map:
keys.add(key_map[p])
elif len(p) == 1:
keys.add(keyboard.KeyCode.from_char(p))
else:
raise ValueError(f"无法识别的按键: {p}")
return keys
def hotkey_setting_dialog(edit_idx=None):
if edit_idx is not None:
setting = key_settings[edit_idx]
initial_key_combo = setting["key_combo"]
initial_strength = str(setting["strength"])
initial_time = str(setting["time"])
initial_pulseid = setting["pulseid"]
initial_override = setting["override"]
else:
initial_key_combo = ""
initial_strength = str(DEFAULT_STRENGTH)
initial_time = str(DEFAULT_TIME)
initial_pulseid = ""
initial_override = False
dialog = tk.Toplevel(root)
dialog.title("编辑按键配置" if edit_idx is not None else "添加按键配置")
dialog.transient(root)
dialog.grab_set()
frm_d = tk.Frame(dialog, padx=10, pady=10)
frm_d.pack()
tk.Label(frm_d, text="按键组合:").grid(row=0, column=0, sticky="e")
key_combo_var = tk.StringVar(value=initial_key_combo)
key_combo_entry = tk.Entry(frm_d, textvariable=key_combo_var, width=20)
key_combo_entry.grid(row=0, column=1, sticky="w")
tk.Label(frm_d, text="开火强度:").grid(row=1, column=0, sticky="e")
strength_var = tk.StringVar(value=initial_strength)
tk.Entry(frm_d, textvariable=strength_var, width=20).grid(row=1, column=1, sticky="w")
tk.Label(frm_d, text="开火时长:").grid(row=2, column=0, sticky="e")
time_var = tk.StringVar(value=initial_time)
tk.Entry(frm_d, textvariable=time_var, width=20).grid(row=2, column=1, sticky="w")
tk.Label(frm_d, text="开火波形:").grid(row=3, column=0, sticky="e")
pulseid_cmb = ttk.Combobox(frm_d, width=18, state="readonly")
pulseid_cmb.grid(row=3, column=1, sticky="w")
def fetch_and_set_pulse_list():
code = connection_code_var.get().strip()
if not code:
pulseid_cmb["values"] = ["(不指定)"]
pulseid_cmb.current(0)
return
client_id, api_host = parse_connection_code(code)
if not client_id or not api_host:
pulseid_cmb["values"] = ["(不指定)"]
pulseid_cmb.current(0)
return
pulse_list = get_pulse_list(client_id, api_host)
if not pulse_list:
pulseid_cmb["values"] = ["(不指定)"]
pulseid_cmb.current(0)
else:
vals = ["(不指定)"] + [
f"{p['name']} ({p['id']})" for p in pulse_list
]
pulseid_cmb["values"] = vals
# pulseid_cmb.current(0)
if edit_idx is not None and initial_pulseid:
select_idx = 0
for i, p in enumerate(pulse_list):
if p["id"] == initial_pulseid:
select_idx = i + 1
break
pulseid_cmb.current(select_idx)
else:
pulseid_cmb.current(0)
fetch_and_set_pulse_list()
override_var = tk.BooleanVar(value=initial_override)
tk.Checkbutton(frm_d, text="重置开火时间", variable=override_var).grid(row=4, column=1, sticky="w")
def on_ok():
key_combo = key_combo_var.get().strip()
strength = strength_var.get().strip()
time_ms = time_var.get().strip()
override = override_var.get()
pulseid_sel = pulseid_cmb.current()
pulseid = ""
if pulseid_sel > 0 and pulse_list_cache:
pulseid = pulse_list_cache[pulseid_sel - 1]["id"]
if not key_combo:
messagebox.showerror("错误", "必须填写按键组合", parent=dialog)
return
try:
hotkey_keys = parse_hotkey(key_combo)
except ValueError as e:
messagebox.showerror("错误", str(e), parent=dialog)
return
try:
strength = int(strength)
if not (1 <= strength <= MAX_STRENGTH):
raise ValueError()
except:
messagebox.showerror("错误", f"强度必须为 1~{MAX_STRENGTH} 的整数", parent=dialog)
return
try:
time_ms = int(time_ms)
if not (1 <= time_ms <= MAX_TIME):
raise ValueError()
except:
messagebox.showerror("错误", f"时间必须为 1~{MAX_TIME} 的整数", parent=dialog)
return
obj = {
"key_combo": key_combo,
"hotkey_keys": hotkey_keys,
"strength": strength,
"time": time_ms,
"pulseid": pulseid,
"override": override,
}
if edit_idx is None:
key_settings.append(obj)
else:
key_settings[edit_idx] = obj
update_hotkey_list()
dialog.destroy()
tk.Button(frm_d, text="刷新", command=fetch_and_set_pulse_list, width=6).grid(row=3, column=2, sticky="w", padx=8)
tk.Button(frm_d, text="确定", command=on_ok, width=10).grid(row=5, column=1, sticky="w", pady=8)
center_window(dialog, 300, 180)
dialog.wait_window(dialog)
def add_hotkey_setting():
hotkey_setting_dialog(edit_idx=None)
def edit_hotkey_setting():
sel = hotkey_list.curselection()
if not sel:
messagebox.showerror("错误", "请先选择一个配置项")
return
idx = sel[0]
hotkey_setting_dialog(edit_idx=idx)
def del_hotkey_setting():
sel = hotkey_list.curselection()
if not sel:
return
idx = sel[0]
del key_settings[idx]
update_hotkey_list()
def update_hotkey_list():
hotkey_list.delete(0, tk.END)
for i, st in enumerate(key_settings):
pulselabel = st['pulseid'] if st['pulseid'] else "(无)"
desc = f"<{st['key_combo']}> 强度:{st['strength']} 时间:{st['time']}ms 波形:{pulselabel} 重置时间:{'' if st['override'] else ''}"
hotkey_list.insert(tk.END, desc)
class MultiHotkeyMonitor(threading.Thread):
def __init__(self, hotkey_infos):
super().__init__(daemon=True)
self.hotkey_infos = hotkey_infos
self.pressed_keys = set()
self.lock = threading.Lock()
self.running = True
def run(self):
def on_press(key):
with self.lock:
self.pressed_keys.add(key)
for info in self.hotkey_infos:
if all(k in self.pressed_keys for k in info['hotkey_keys']):
ok, msg = send_fire_request(
info['client_id'], info['api_host'],
info['strength'], info['time'],
info['override'], info['pulseid'])
if ok:
print(f"[{info['key_combo']}] 已发送一键开火请求")
else:
print(f"[{info['key_combo']}] 发送失败: {msg}")
def on_release(key):
with self.lock:
if key in self.pressed_keys:
self.pressed_keys.remove(key)
with keyboard.Listener(on_press=on_press, on_release=on_release) as listener:
while self.running:
listener.join(0.1)
def stop(self):
self.running = False
def start_monitor():
global monitor_thread, monitoring, hotkey_infos
code = connection_code_var.get().strip()
if not code:
messagebox.showerror("错误", "请填写 CGH 连接码")
return
client_id, api_host = parse_connection_code(code)
if not client_id or not api_host:
messagebox.showerror("错误", "CGH 连接码格式错误")
return
if not key_settings:
messagebox.showerror("错误", "请至少添加一个按键监听配置")
return
hotkey_infos.clear()
for st in key_settings:
hotkey_infos.append({
"hotkey_keys": st["hotkey_keys"],
"key_combo": st["key_combo"],
"client_id": client_id,
"api_host": api_host,
"strength": st["strength"],
"time": st["time"],
"pulseid": st["pulseid"],
"override": st["override"]
})
monitoring = True
monitor_thread = MultiHotkeyMonitor(hotkey_infos)
monitor_thread.start()
btn_start["state"] = "disabled"
btn_stop["state"] = "normal"
lbl_status["text"] = "监听按键事件中..."
def stop_monitor():
global monitoring, monitor_thread
monitoring = False
if monitor_thread:
monitor_thread.stop()
monitor_thread = None
btn_start["state"] = "disabled"
btn_stop["state"] = "disabled"
lbl_status["text"] = "等待关闭程序,关闭程序后才可彻底取消监听..."
messagebox.showinfo("提示", "必须手动关闭本程序才可取消监听!")
def save_config():
config = {
"connection_code": connection_code_var.get(),
"key_settings": [
{
"key_combo": st["key_combo"],
"strength": st["strength"],
"time": st["time"],
"pulseid": st["pulseid"],
"override": st["override"],
}
for st in key_settings
]
}
filename = filedialog.asksaveasfilename(
defaultextension=".json",
filetypes=[("JSON 配置文件", "*.json")],
title="保存配置文件"
)
if filename:
try:
with open(filename, "w", encoding="utf-8") as f:
json.dump(config, f, ensure_ascii=False, indent=2)
messagebox.showinfo("提示", f"配置已保存到 {filename}")
except Exception as e:
messagebox.showerror("保存失败", str(e))
def load_config():
filename = filedialog.askopenfilename(
defaultextension=".json",
filetypes=[("JSON 配置文件", "*.json")],
title="打开配置文件"
)
if filename:
try:
with open(filename, "r", encoding="utf-8") as f:
config = json.load(f)
connection_code_var.set(config.get("connection_code", ""))
key_settings.clear()
for st in config.get("key_settings", []):
try:
hotkey_keys = parse_hotkey(st["key_combo"])
except Exception:
hotkey_keys = set()
key_settings.append({
"key_combo": st["key_combo"],
"hotkey_keys": hotkey_keys,
"strength": st.get("strength", DEFAULT_STRENGTH),
"time": st.get("time", DEFAULT_TIME),
"pulseid": st.get("pulseid", ""),
"override": st.get("override", False),
})
update_hotkey_list()
messagebox.showinfo("提示", f"配置已读取: {filename}")
except Exception as e:
messagebox.showerror("读取失败", str(e))
frm = tk.Frame(root, padx=15, pady=15)
frm.pack()
menubar = tk.Menu(root)
filemenu = tk.Menu(menubar, tearoff=0)
filemenu.add_command(label="保存配置", command=save_config)
filemenu.add_command(label="读取配置", command=load_config)
menubar.add_cascade(label="文件", menu=filemenu)
aboutmenu = tk.Menu(menubar, tearoff=0)
aboutmenu.add_command(label="开源",
command=lambda: webbrowser.open("https://gitea.miri.site/Mr_Fang/cgh_keyboard_fire_tool"))
menubar.add_cascade(label="帮助", menu=aboutmenu)
root.config(menu=menubar)
tk.Label(frm, text="连接码:").grid(row=0, column=0, sticky="e")
tk.Entry(frm, textvariable=connection_code_var, width=60).grid(row=0, column=1, columnspan=3, sticky="w")
tk.Label(frm, text="按键配置:").grid(row=1, column=0, sticky="ne", pady=4)
hotkey_list = tk.Listbox(frm, height=6, width=60)
hotkey_list.grid(row=1, column=1, columnspan=2, sticky="w")
btn_container = tk.Frame(frm)
btn_container.grid(row=1, column=3, sticky="nw", padx=8)
btn_add = tk.Button(btn_container, text="添加", width=8, command=add_hotkey_setting)
btn_add.pack(pady=4)
btn_edit = tk.Button(btn_container, text="编辑", width=8, command=edit_hotkey_setting)
btn_edit.pack(pady=4)
btn_del = tk.Button(btn_container, text="删除", width=8, command=del_hotkey_setting)
btn_del.pack(pady=4)
btn_start = tk.Button(frm, text="开始监听", command=start_monitor, width=16, fg="green")
btn_start.grid(row=2, column=1, sticky="w", pady=10)
btn_stop = tk.Button(frm, text="停止监听", command=stop_monitor, width=16, fg="darkred", state="disabled")
btn_stop.grid(row=2, column=2, sticky="w", pady=10)
lbl_status = tk.Label(frm, text="等待开始监听...", fg="blue")
lbl_status.grid(row=3, column=1, sticky="w")
tk.Label(frm, text="Tips: 支持添加多个不同配置的按键组合,可用“文件”菜单保存/读取配置").grid(row=4, column=0,
columnspan=4, sticky="w")
center_window(root, 600, 250)
root.mainloop()