1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283 |
- import time
- import cv2
- import numpy as np
- from PIL import ImageGrab
- import threading
- from datetime import datetime
- class ScreenRecorder:
- def __init__(self, fps=15.0):
- self.fps = fps
- self.recording_thread = None
- self.stop_event = threading.Event() # 线程停止信号
- self.writer = None
- def record_with_timestamp(self, region=(0, 0, 1920, 1080)):
- """多线程录屏方法"""
- region = (region[0], region[1], region[2] + region[0], region[3] + region[1])
- try:
- curData = datetime.now().strftime("%Y_%m_%d_%H_%M_%S")
- output = "temp_record/" + curData + ".mp4"
-
- # 计算实际帧尺寸并验证
- width, height = region[2] - region[0], region[3] - region[1]
- if width <= 0 or height <= 0:
- raise ValueError("Invalid region dimensions")
- fourcc = cv2.VideoWriter_fourcc(*'mp4v')
- self.writer = cv2.VideoWriter(output, fourcc, float(self.fps), (width, height))
- if not self.writer.isOpened():
- raise RuntimeError("Failed to initialize VideoWriter")
- while not self.stop_event.is_set():
- img = ImageGrab.grab(bbox=region)
- frame = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR)
-
- # 添加时间水印(带半透明背景)
- overlay = frame.copy()
- cv2.rectangle(overlay, (10, 10), (300, 60), (0, 0, 0), -1)
- cv2.addWeighted(overlay, 0.5, frame, 0.5, 0, frame)
- cv2.putText(frame, datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
- (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
-
- self.writer.write(frame)
- cv2.waitKey(int(1000 / self.fps)) # 控制帧率
- except Exception as e:
- print(f"Recording error: {str(e)}")
- finally:
- if self.writer:
- self.writer.release()
- def start_recording(self, region):
- """启动录屏线程"""
- if self.recording_thread and self.recording_thread.is_alive():
- print("Recording is already running")
- return
-
- self.stop_event.clear()
- self.recording_thread = threading.Thread(
- target=self.record_with_timestamp,
- args=(region,),
- daemon=True # 设为守护线程,主程序退出时自动终止
- )
- self.recording_thread.start()
- def stop_recording(self):
- """中止录屏"""
- if self.recording_thread and self.recording_thread.is_alive():
- self.stop_event.set()
- self.recording_thread.join(timeout=2) # 等待线程结束(超时2秒)
- if self.writer:
- self.writer.release()
- print("Recording stopped")
- else:
- print("No active recording to stop")
- # 示例调用
- if __name__ == "__main__":
- recorder = ScreenRecorder()
- # 输入参数示例:left=100, top=100, width=800, height=600
- recorder.start_recording(region=(884, 69, 608, 840))
- time.sleep(15)
- recorder.stop_recording()
|