# -*- coding: utf-8 -*- import datetime from flask import Flask, render_template from flask_socketio import SocketIO, emit from scriptBase.comon import * import pyautogui import base64 import threading from dongri_task import * from collections import deque import json from concurrent.futures import ThreadPoolExecutor # 全局线程池,限制最大线程数为1 executor = ThreadPoolExecutor(max_workers=1) app = Flask(__name__) socketio = SocketIO(app) event = threading.Event() g_status = '' last_time = 0.0 task_queue = deque() last_process = '' isGameBegin = True autoTask = None isReset = False def thread_runTask(): global last_process global task_queue,isReset while True: if event.is_set(): task_queue.clear() if len(task_queue) != 0: task = task_queue[-1] task_queue.pop() last_process = task.name task.run() myTimeSleep_small() else: myTimeSleep_big() if isReset: isReset = False restart_game() @app.route('/') def index(): return render_template('index_dongri.html') @socketio.on('connect') def handle_connect(): print('Client connected') @socketio.on('disconnect') def handle_disconnect(): print('Client disconnected') def send_hint(msg):#数组信息 emit('processing_hint', msg) def send_status(msg):#软件执行状态 global g_status try: if msg == '': emit('processing_status', g_status) return else: g_status = msg if msg == "结束": event.clear() emit('processing_status', msg) except: return @socketio.on('monitor_begin') def monitor_begin(): global last_time, last_process current_time = time.time() elapsed_time = current_time - last_time if elapsed_time < 0.5: return last_time = current_time regionRet, regionPos = game_region() screenshot = pyautogui.screenshot(region=regionPos) binary_img = binarize_image(screenshot) compressed_data = compress_image(binary_img) image_data_base64 = base64.b64encode(compressed_data).decode('utf-8') socketio.emit('image_data', image_data_base64) task_arr = [] if not event.is_set(): task_arr.append(last_process) for item in reversed(task_queue): task_arr.append(item.name) send_hint(json.dumps(task_arr, ensure_ascii=False)) send_status('') #print("send img") @socketio.on('end_script') def handle_end_script(): event.set() @socketio.on('end_game') def handle_end_game(): event.set() task_close_game() send_status("结束2") event.clear() @socketio.on('get_title') def handle_get_title(): str = task_getComputerName() dst = str + ' machine' emit('processing_title', dst) @socketio.on('reset_script') def handle_reset_script(): python = sys.executable while '--reset' in sys.argv: # 从 sys.argv 列表中删除 --reset 参数 sys.argv.remove('--reset') os.execl(python, python, *sys.argv) @socketio.on('restart_game') def handle_restart_game(): python = sys.executable os.execl(python, python, *sys.argv, '--reset') @socketio.on('read_cfg') def handle_read_cfg(): cfg = read_cfg() emit('processing_cfg', cfg) def restart_game(): global isGameBegin isGameBegin = False while True: task_close_game() if True == task_start_game(): break else: send_status("启动失败") isGameBegin = True send_status("结束") config = read_cfg() auto_task(config) def auto_participate(): task_queue.appendleft(task_returnAllLine()) timeout = 40 * 60 start_time = time.time() # 记录开始时间 while not event.is_set(): if len(task_queue) < 4: task_queue.appendleft(task_paticipateInTeam()) task_queue.appendleft(task_checkHelp(True)) myTimeSleep_big() # 每次循环检查已用时间 current_time = time.time() elapsed_time = current_time - start_time if elapsed_time >= timeout: handle_restart_game() break def add_auto_task(isMaxCollect, isSimple = False): collectType = 0 if isMaxCollect: collectType = 5 times = 0 while not event.is_set(): if not isSimple: task_queue.appendleft(task_information()) task_queue.appendleft(task_checkHelp(False)) if not isSimple: task_queue.appendleft(task_paticipateInTeam()) task_queue.appendleft(check_buildOrResearch()) task_queue.appendleft(task_cure()) if not isSimple: task_queue.appendleft(task_fightMonster(False, True, isSimple)) task_queue.appendleft(task_collect(collectType, isSimple)) task_queue.appendleft(task_train(False)) task_queue.appendleft(task_collect(collectType, isSimple)) task_queue.appendleft(task_train(False)) if isSimple: task_queue.appendleft(check_buildOrResearch()) #else: # task_queue.appendleft(task_collect(collectType, isSimple)) task_queue.appendleft(task_checkStoreRoom()) if not isSimple: task_queue.appendleft(task_checkConfilits()) task_queue.appendleft(task_checkDonata()) task_queue.appendleft(task_cure()) task_queue.appendleft(task_useAnnimalSkill()) #task_queue.appendleft(task_waitTime()) times += 1 if times == 5: task_queue.appendleft(task_checkAdventure()) if times == 10: handle_end_game() myTimeSleep(random.randint(1000, 2000), send_status) handle_restart_game() else: myTimeSleep(random.randint(600, 1000), send_status) task_queue.clear() send_status(f'自动模式结束') event.clear() def write_cfg(config): with open('config.json', 'w') as config_file: json.dump(config, config_file, indent=4) def read_cfg(): try: with open('config.json', 'r') as config_file: config = json.load(config_file) return config except FileNotFoundError: print("配置文件不存在,请检查文件路径。") return None except PermissionError: print("没有权限读取配置文件。") return None except json.JSONDecodeError: print("配置文件格式错误,请检查文件内容是否为有效的 JSON。") return None @socketio.on('begin_auto') def handle_auto(data): write_cfg(data) auto_task(data) def auto_task(data): global autoTask if data == None: isMaxCollect = False isSimple = False else: isMaxCollect = data['maxCollect'] isSimple = data['simple'] send_status(f'开始自动模式') executor.submit(add_auto_task, isMaxCollect, isSimple) ''' autoTask = threading.Thread(target=add_auto_task, args=(isMaxCollect,))#启动线程往里面添加任务 autoTask.daemon = True autoTask.start() ''' @socketio.on('begin_auto_participate') def handle_auto_participate(): global autoTask send_status(f'开始自动集结模式') executor.submit(auto_participate) ''' autoTask = threading.Thread(target=auto_participate)#启动线程往里面添加任务 autoTask.daemon = True autoTask.start() ''' if __name__ == '__main__': init() if '--reset' in sys.argv: isReset = True print("需要重启游戏") runTask = threading.Thread(target=thread_runTask)#启动线程往里面添加任务 runTask.daemon = True runTask.start() socketio.run(app, host= '0.0.0.0', debug=True)