diff --git a/main.py b/main.py new file mode 100644 index 0000000..d9a6d0d --- /dev/null +++ b/main.py @@ -0,0 +1,447 @@ +import tkinter as tk +import webbrowser +from tkinter import messagebox, ttk, filedialog +import re +import threading +import requests +import json +from pynput import keyboard + +DEFAULT_STRENGTH = 20 +DEFAULT_TIME = 5000 +MAX_TIME = 30000 +MAX_STRENGTH = 40 + +root = tk.Tk() +root.title("按键监听一键开火工具") + +connection_code_var = tk.StringVar() +key_settings = [] + +monitoring = False +monitor_thread = None +hotkey_infos = [] +pulse_list_cache = [] + + +def center_window(window, width, height): + window.update_idletasks() + x = (window.winfo_screenwidth() - width) // 2 + y = (window.winfo_screenheight() - height) // 2 + window.geometry(f"{width}x{height}+{x}+{y}") + + +def parse_connection_code(code): + m = re.match(r"^([a-fA-F0-9\-]+)@(.+)$", code.strip()) + if not m: + return None, None + client_id, api_host = m.group(1), m.group(2) + return client_id, api_host + + +def send_fire_request(client_id, api_host, strength, time_ms, override, pulseid): + url = f"{api_host}/api/v2/game/{client_id}/action/fire" + data = { + "strength": strength, + "time": time_ms, + "override": override, + } + if pulseid: + data["pulseId"] = pulseid + try: + resp = requests.post(url, json=data, timeout=5) + if resp.status_code == 200: + return True, resp.text + return False, f"HTTP {resp.status_code}: {resp.text}" + except Exception as e: + return False, str(e) + + +def get_pulse_list(client_id, api_host): + global pulse_list_cache + url = f"{api_host}/api/v2/game/{client_id}/pulse_list" + try: + resp = requests.get(url, timeout=5) + if resp.status_code == 200: + d = resp.json() + if d.get("status") == 1 and "pulseList" in d: + pulse_list_cache = d["pulseList"] + return pulse_list_cache + return [] + except Exception: + return [] + + +def parse_hotkey(hotkey_str): + key_map = { + "ctrl": keyboard.Key.ctrl, + "alt": keyboard.Key.alt, + "shift": keyboard.Key.shift, + "cmd": keyboard.Key.cmd, + "win": keyboard.Key.cmd, + "super": keyboard.Key.cmd, + "enter": keyboard.Key.enter, + "tab": keyboard.Key.tab, + "esc": keyboard.Key.esc, + "space": keyboard.Key.space, + "up": keyboard.Key.up, + "down": keyboard.Key.down, + "left": keyboard.Key.left, + "right": keyboard.Key.right, + "delete": keyboard.Key.delete, + "backspace": keyboard.Key.backspace, + } + parts = [p.strip().lower() for p in hotkey_str.split('+')] + keys = set() + for p in parts: + if p in key_map: + keys.add(key_map[p]) + elif len(p) == 1: + keys.add(keyboard.KeyCode.from_char(p)) + else: + raise ValueError(f"无法识别的按键: {p}") + return keys + + +def hotkey_setting_dialog(edit_idx=None): + if edit_idx is not None: + setting = key_settings[edit_idx] + initial_key_combo = setting["key_combo"] + initial_strength = str(setting["strength"]) + initial_time = str(setting["time"]) + initial_pulseid = setting["pulseid"] + initial_override = setting["override"] + else: + initial_key_combo = "" + initial_strength = str(DEFAULT_STRENGTH) + initial_time = str(DEFAULT_TIME) + initial_pulseid = "" + initial_override = False + + dialog = tk.Toplevel(root) + dialog.title("编辑按键配置" if edit_idx is not None else "添加按键配置") + dialog.transient(root) + dialog.grab_set() + + frm_d = tk.Frame(dialog, padx=10, pady=10) + frm_d.pack() + + tk.Label(frm_d, text="按键组合:").grid(row=0, column=0, sticky="e") + key_combo_var = tk.StringVar(value=initial_key_combo) + key_combo_entry = tk.Entry(frm_d, textvariable=key_combo_var, width=20) + key_combo_entry.grid(row=0, column=1, sticky="w") + + tk.Label(frm_d, text="开火强度:").grid(row=1, column=0, sticky="e") + strength_var = tk.StringVar(value=initial_strength) + tk.Entry(frm_d, textvariable=strength_var, width=20).grid(row=1, column=1, sticky="w") + + tk.Label(frm_d, text="开火时长:").grid(row=2, column=0, sticky="e") + time_var = tk.StringVar(value=initial_time) + tk.Entry(frm_d, textvariable=time_var, width=20).grid(row=2, column=1, sticky="w") + + tk.Label(frm_d, text="开火波形:").grid(row=3, column=0, sticky="e") + pulseid_cmb = ttk.Combobox(frm_d, width=18, state="readonly") + pulseid_cmb.grid(row=3, column=1, sticky="w") + + def fetch_and_set_pulse_list(): + code = connection_code_var.get().strip() + if not code: + pulseid_cmb["values"] = ["(不指定)"] + pulseid_cmb.current(0) + return + client_id, api_host = parse_connection_code(code) + if not client_id or not api_host: + pulseid_cmb["values"] = ["(不指定)"] + pulseid_cmb.current(0) + return + pulse_list = get_pulse_list(client_id, api_host) + if not pulse_list: + pulseid_cmb["values"] = ["(不指定)"] + pulseid_cmb.current(0) + else: + vals = ["(不指定)"] + [ + f"{p['name']} ({p['id']})" for p in pulse_list + ] + pulseid_cmb["values"] = vals + # pulseid_cmb.current(0) + if edit_idx is not None and initial_pulseid: + select_idx = 0 + for i, p in enumerate(pulse_list): + if p["id"] == initial_pulseid: + select_idx = i + 1 + break + pulseid_cmb.current(select_idx) + else: + pulseid_cmb.current(0) + + fetch_and_set_pulse_list() + + override_var = tk.BooleanVar(value=initial_override) + tk.Checkbutton(frm_d, text="重置开火时间", variable=override_var).grid(row=4, column=1, sticky="w") + + def on_ok(): + key_combo = key_combo_var.get().strip() + strength = strength_var.get().strip() + time_ms = time_var.get().strip() + override = override_var.get() + pulseid_sel = pulseid_cmb.current() + pulseid = "" + if pulseid_sel > 0 and pulse_list_cache: + pulseid = pulse_list_cache[pulseid_sel - 1]["id"] + if not key_combo: + messagebox.showerror("错误", "必须填写按键组合", parent=dialog) + return + try: + hotkey_keys = parse_hotkey(key_combo) + except ValueError as e: + messagebox.showerror("错误", str(e), parent=dialog) + return + try: + strength = int(strength) + if not (1 <= strength <= MAX_STRENGTH): + raise ValueError() + except: + messagebox.showerror("错误", f"强度必须为 1~{MAX_STRENGTH} 的整数", parent=dialog) + return + try: + time_ms = int(time_ms) + if not (1 <= time_ms <= MAX_TIME): + raise ValueError() + except: + messagebox.showerror("错误", f"时间必须为 1~{MAX_TIME} 的整数", parent=dialog) + return + obj = { + "key_combo": key_combo, + "hotkey_keys": hotkey_keys, + "strength": strength, + "time": time_ms, + "pulseid": pulseid, + "override": override, + } + if edit_idx is None: + key_settings.append(obj) + else: + key_settings[edit_idx] = obj + update_hotkey_list() + dialog.destroy() + + tk.Button(frm_d, text="刷新", command=fetch_and_set_pulse_list, width=6).grid(row=3, column=2, sticky="w", padx=8) + tk.Button(frm_d, text="确定", command=on_ok, width=10).grid(row=5, column=1, sticky="w", pady=8) + center_window(dialog, 300, 180) + dialog.wait_window(dialog) + + +def add_hotkey_setting(): + hotkey_setting_dialog(edit_idx=None) + + +def edit_hotkey_setting(): + sel = hotkey_list.curselection() + if not sel: + messagebox.showerror("错误", "请先选择一个配置项") + return + idx = sel[0] + hotkey_setting_dialog(edit_idx=idx) + + +def del_hotkey_setting(): + sel = hotkey_list.curselection() + if not sel: + return + idx = sel[0] + del key_settings[idx] + update_hotkey_list() + + +def update_hotkey_list(): + hotkey_list.delete(0, tk.END) + for i, st in enumerate(key_settings): + pulselabel = st['pulseid'] if st['pulseid'] else "(无)" + desc = f"<{st['key_combo']}> 强度:{st['strength']} 时间:{st['time']}ms 波形:{pulselabel} 重置时间:{'是' if st['override'] else '否'}" + hotkey_list.insert(tk.END, desc) + + +class MultiHotkeyMonitor(threading.Thread): + def __init__(self, hotkey_infos): + super().__init__(daemon=True) + self.hotkey_infos = hotkey_infos + self.pressed_keys = set() + self.lock = threading.Lock() + self.running = True + + def run(self): + def on_press(key): + with self.lock: + self.pressed_keys.add(key) + for info in self.hotkey_infos: + if all(k in self.pressed_keys for k in info['hotkey_keys']): + ok, msg = send_fire_request( + info['client_id'], info['api_host'], + info['strength'], info['time'], + info['override'], info['pulseid']) + if ok: + print(f"[{info['key_combo']}] 已发送一键开火请求") + else: + print(f"[{info['key_combo']}] 发送失败: {msg}") + + def on_release(key): + with self.lock: + if key in self.pressed_keys: + self.pressed_keys.remove(key) + + with keyboard.Listener(on_press=on_press, on_release=on_release) as listener: + while self.running: + listener.join(0.1) + + def stop(self): + self.running = False + + +def start_monitor(): + global monitor_thread, monitoring, hotkey_infos + code = connection_code_var.get().strip() + if not code: + messagebox.showerror("错误", "请填写 CGH 连接码") + return + client_id, api_host = parse_connection_code(code) + if not client_id or not api_host: + messagebox.showerror("错误", "CGH 连接码格式错误") + return + if not key_settings: + messagebox.showerror("错误", "请至少添加一个按键监听配置") + return + + hotkey_infos.clear() + for st in key_settings: + hotkey_infos.append({ + "hotkey_keys": st["hotkey_keys"], + "key_combo": st["key_combo"], + "client_id": client_id, + "api_host": api_host, + "strength": st["strength"], + "time": st["time"], + "pulseid": st["pulseid"], + "override": st["override"] + }) + + monitoring = True + monitor_thread = MultiHotkeyMonitor(hotkey_infos) + monitor_thread.start() + btn_start["state"] = "disabled" + btn_stop["state"] = "normal" + lbl_status["text"] = "监听按键事件中..." + + +def stop_monitor(): + global monitoring, monitor_thread + monitoring = False + if monitor_thread: + monitor_thread.stop() + monitor_thread = None + btn_start["state"] = "disabled" + btn_stop["state"] = "disabled" + lbl_status["text"] = "等待关闭程序,关闭程序后才可彻底取消监听..." + messagebox.showinfo("提示", "必须手动关闭本程序才可取消监听!") + + +def save_config(): + config = { + "connection_code": connection_code_var.get(), + "key_settings": [ + { + "key_combo": st["key_combo"], + "strength": st["strength"], + "time": st["time"], + "pulseid": st["pulseid"], + "override": st["override"], + } + for st in key_settings + ] + } + filename = filedialog.asksaveasfilename( + defaultextension=".json", + filetypes=[("JSON 配置文件", "*.json")], + title="保存配置文件" + ) + if filename: + try: + with open(filename, "w", encoding="utf-8") as f: + json.dump(config, f, ensure_ascii=False, indent=2) + messagebox.showinfo("提示", f"配置已保存到 {filename}") + except Exception as e: + messagebox.showerror("保存失败", str(e)) + + +def load_config(): + filename = filedialog.askopenfilename( + defaultextension=".json", + filetypes=[("JSON 配置文件", "*.json")], + title="打开配置文件" + ) + if filename: + try: + with open(filename, "r", encoding="utf-8") as f: + config = json.load(f) + connection_code_var.set(config.get("connection_code", "")) + key_settings.clear() + for st in config.get("key_settings", []): + try: + hotkey_keys = parse_hotkey(st["key_combo"]) + except Exception: + hotkey_keys = set() + key_settings.append({ + "key_combo": st["key_combo"], + "hotkey_keys": hotkey_keys, + "strength": st.get("strength", DEFAULT_STRENGTH), + "time": st.get("time", DEFAULT_TIME), + "pulseid": st.get("pulseid", ""), + "override": st.get("override", False), + }) + update_hotkey_list() + messagebox.showinfo("提示", f"配置已读取: {filename}") + except Exception as e: + messagebox.showerror("读取失败", str(e)) + + +frm = tk.Frame(root, padx=15, pady=15) +frm.pack() + +menubar = tk.Menu(root) +filemenu = tk.Menu(menubar, tearoff=0) +filemenu.add_command(label="保存配置", command=save_config) +filemenu.add_command(label="读取配置", command=load_config) +menubar.add_cascade(label="文件", menu=filemenu) +aboutmenu = tk.Menu(menubar, tearoff=0) +aboutmenu.add_command(label="开源", + command=lambda: webbrowser.open("https://gitea.miri.site/Mr_Fang/cgh_keyboard_fire_tool")) +menubar.add_cascade(label="帮助", menu=aboutmenu) +root.config(menu=menubar) + +tk.Label(frm, text="连接码:").grid(row=0, column=0, sticky="e") +tk.Entry(frm, textvariable=connection_code_var, width=60).grid(row=0, column=1, columnspan=3, sticky="w") + +tk.Label(frm, text="按键配置:").grid(row=1, column=0, sticky="ne", pady=4) +hotkey_list = tk.Listbox(frm, height=6, width=60) +hotkey_list.grid(row=1, column=1, columnspan=2, sticky="w") +btn_container = tk.Frame(frm) +btn_container.grid(row=1, column=3, sticky="nw", padx=8) +btn_add = tk.Button(btn_container, text="添加", width=8, command=add_hotkey_setting) +btn_add.pack(pady=4) +btn_edit = tk.Button(btn_container, text="编辑", width=8, command=edit_hotkey_setting) +btn_edit.pack(pady=4) +btn_del = tk.Button(btn_container, text="删除", width=8, command=del_hotkey_setting) +btn_del.pack(pady=4) + +btn_start = tk.Button(frm, text="开始监听", command=start_monitor, width=16, fg="green") +btn_start.grid(row=2, column=1, sticky="w", pady=10) +btn_stop = tk.Button(frm, text="停止监听", command=stop_monitor, width=16, fg="darkred", state="disabled") +btn_stop.grid(row=2, column=2, sticky="w", pady=10) + +lbl_status = tk.Label(frm, text="等待开始监听...", fg="blue") +lbl_status.grid(row=3, column=1, sticky="w") + +tk.Label(frm, text="Tips: 支持添加多个不同配置的按键组合,可用“文件”菜单保存/读取配置").grid(row=4, column=0, + columnspan=4, sticky="w") +center_window(root, 600, 250) + +root.mainloop()