record.py 3.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. import time
  2. import cv2
  3. import numpy as np
  4. from PIL import ImageGrab
  5. import threading
  6. from datetime import datetime
  7. class ScreenRecorder:
  8. def __init__(self, fps=15.0):
  9. self.fps = fps
  10. self.recording_thread = None
  11. self.stop_event = threading.Event() # 线程停止信号
  12. self.writer = None
  13. def record_with_timestamp(self, region=(0, 0, 1920, 1080)):
  14. """多线程录屏方法"""
  15. region = (region[0], region[1], region[2] + region[0], region[3] + region[1])
  16. try:
  17. curData = datetime.now().strftime("%Y_%m_%d_%H_%M_%S")
  18. output = "temp_record/" + curData + ".mp4"
  19. # 计算实际帧尺寸并验证
  20. width, height = region[2] - region[0], region[3] - region[1]
  21. if width <= 0 or height <= 0:
  22. raise ValueError("Invalid region dimensions")
  23. fourcc = cv2.VideoWriter_fourcc(*'mp4v')
  24. self.writer = cv2.VideoWriter(output, fourcc, float(self.fps), (width, height))
  25. if not self.writer.isOpened():
  26. raise RuntimeError("Failed to initialize VideoWriter")
  27. while not self.stop_event.is_set():
  28. img = ImageGrab.grab(bbox=region)
  29. frame = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR)
  30. # 添加时间水印(带半透明背景)
  31. overlay = frame.copy()
  32. cv2.rectangle(overlay, (10, 10), (300, 60), (0, 0, 0), -1)
  33. cv2.addWeighted(overlay, 0.5, frame, 0.5, 0, frame)
  34. cv2.putText(frame, datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
  35. (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
  36. self.writer.write(frame)
  37. cv2.waitKey(int(1000 / self.fps)) # 控制帧率
  38. except Exception as e:
  39. print(f"Recording error: {str(e)}")
  40. finally:
  41. if self.writer:
  42. self.writer.release()
  43. def start_recording(self, region):
  44. """启动录屏线程"""
  45. if self.recording_thread and self.recording_thread.is_alive():
  46. print("Recording is already running")
  47. return
  48. self.stop_event.clear()
  49. self.recording_thread = threading.Thread(
  50. target=self.record_with_timestamp,
  51. args=(region,),
  52. daemon=True # 设为守护线程,主程序退出时自动终止
  53. )
  54. self.recording_thread.start()
  55. def stop_recording(self):
  56. """中止录屏"""
  57. if self.recording_thread and self.recording_thread.is_alive():
  58. self.stop_event.set()
  59. self.recording_thread.join(timeout=2) # 等待线程结束(超时2秒)
  60. if self.writer:
  61. self.writer.release()
  62. print("Recording stopped")
  63. else:
  64. print("No active recording to stop")
  65. # 示例调用
  66. if __name__ == "__main__":
  67. recorder = ScreenRecorder()
  68. # 输入参数示例:left=100, top=100, width=800, height=600
  69. recorder.start_recording(region=(884, 69, 608, 840))
  70. time.sleep(15)
  71. recorder.stop_recording()