본문 바로가기
목차
Python

[Ex] Input_Recoder and Replayer

by ds31x 2025. 7. 14.
728x90
반응형

record_user_input_full.py

"""
마우스와 키보드 입력을 실시간으로 기록하여 JSON 파일로 저장하는 프로그램

핵심 특징:
- 이벤트 후처리 방식으로 100% 정확한 드래그 감지
- mouse_down과 mouse_up 사이의 모든 move 이벤트를 drag로 변경
- 적절한 필터링으로 용량과 정확도의 균형 유지
- 정확한 시작 키 검증 (소문자 's'만 인식)

작동 원리:
1. 녹화 중에는 모든 마우스 이동을 'move' 이벤트로 기록
2. 저장 전에 이벤트 목록을 분석하여 드래그 구간을 찾음
3. mouse_down과 mouse_up 사이의 모든 move를 drag로 변경
4. 최종 처리된 이벤트를 JSON 파일로 저장

사용법:
1. 프로그램 실행
2. 's' 키를 눌러 녹화 시작
3. 3초 후 자동으로 녹화 시작
4. ESC 키를 눌러 녹화 종료 및 파일 저장
"""

import time
import json
import sys
import math
from pynput import mouse, keyboard

# =============================================================================
# 상수 정의
# =============================================================================
START_KEY_CHAR = 's'                    # 녹화 시작 키 (소문자 s만 인식)
EXIT_KEY = keyboard.Key.esc             # 녹화 종료 키 (ESC)
OUTPUT_FILE = "recorded_events.json"    # 출력 파일명 (같은 디렉토리에 저장)
COUNTDOWN_SECONDS = 3                   # 녹화 시작 전 대기 시간 (사용자 준비 시간)

# =============================================================================
# 필터링 설정 클래스
# =============================================================================
class FilterConfig:
    """
    이벤트 필터링 설정을 관리하는 클래스
    
    이 클래스는 어떤 이벤트를 기록할지, 어떤 기준으로 필터링할지를 정의합니다.
    설정값을 조정하여 기록의 상세도와 파일 크기 간의 균형을 맞출 수 있습니다.
    """
    
    # =========================================================================
    # 기본 이벤트 기록 설정 - 각 이벤트 타입별 기록 여부
    # =========================================================================
    RECORD_MOUSE_MOVE = True            # 마우스 이동 이벤트 기록 여부
                                       # False로 설정하면 드래그도 기록되지 않음
    RECORD_MOUSE_DRAG = True            # 마우스 드래그 이벤트 기록 여부
                                       # move가 True여야 drag도 감지 가능
    RECORD_MOUSE_CLICK = True           # 마우스 클릭 이벤트 기록 여부
                                       # down/up 이벤트 모두 포함
    RECORD_MOUSE_SCROLL = True          # 마우스 스크롤 이벤트 기록 여부
                                       # 수직/수평 스크롤 모두 포함
    RECORD_KEY_PRESS = True             # 키보드 눌림 이벤트 기록 여부
    RECORD_KEY_RELEASE = False          # 키보드 뗌 이벤트 기록 여부
                                       # 대부분의 경우 불필요하여 기본값 False
    
    # =========================================================================
    # 필터링 임계값 - 노이즈 제거와 데이터 압축을 위한 설정
    # =========================================================================
    MOUSE_MOVE_INTERVAL = 0.01          # 마우스 이벤트 최소 시간 간격 (10ms)
                                       # 이보다 빠른 연속 이벤트는 무시됨
                                       # 값이 작을수록 더 많은 이벤트 기록
    MOUSE_MOVE_DISTANCE = 2             # 마우스 이벤트 최소 거리 (2픽셀)
                                       # 이보다 작은 움직임은 무시됨
                                       # 미세한 손떨림 등을 제거하는 효과
    
    # =========================================================================
    # 디버깅 설정
    # =========================================================================
    ENABLE_DEBUG_INFO = False           # 디버깅 정보 출력 여부
                                       # True로 설정하면 드래그 변환 과정 출력

# =============================================================================
# 전역 변수
# =============================================================================
# 이벤트 기록 관련 변수들
events = []                     # 모든 이벤트를 저장할 리스트
                               # 각 이벤트는 딕셔너리 형태로 저장됨
pressed_keys = set()            # 현재 눌린 키들을 추적하는 집합
                               # 키 반복 입력 방지를 위해 사용
mouse_button_down = False       # 마우스 버튼 눌림 상태 (단순 추적용)
                               # 실제 드래그 판단은 후처리에서 수행
start_time = None              # 녹화 시작 시간 (time.time() 값)
                               # 모든 이벤트의 시간은 이 시점 기준 상대값

# 필터링용 변수들 - 중복 이벤트 제거를 위한 상태 추적
last_mouse_time = 0            # 마지막 마우스 이벤트 시간
                               # 시간 간격 필터링에 사용
last_mouse_pos = (0, 0)        # 마지막 마우스 위치 (x, y)
                               # 거리 필터링에 사용
filtered_events_count = 0      # 필터링으로 제외된 이벤트 개수
                               # 통계 정보 제공용

# =============================================================================
# 유틸리티 함수들
# =============================================================================
def get_elapsed_time():
    """
    녹화 시작 시점부터 현재까지의 경과 시간을 반환
    
    모든 이벤트의 timestamp는 절대 시간이 아닌 녹화 시작 기준 상대 시간으로 저장됩니다.
    이렇게 하면 이벤트 재생 시 타이밍을 정확히 재현할 수 있습니다.
    
    Returns:
        float: 경과 시간 (초 단위, 소수점 포함)
               녹화 시작 전이면 0 반환
    """
    if start_time is None:
        return 0
    return time.time() - start_time

def get_key_name(key):
    """
    키 객체로부터 문자열 이름을 추출
    
    pynput의 키 객체는 일반 문자키와 특수키가 다른 형태를 가집니다.
    이 함수는 두 가지 경우를 모두 처리하여 일관된 문자열 형태로 변환합니다.
    
    Args:
        key: pynput.keyboard.Key 객체 또는 키 문자
             - 일반 문자: key.char 속성 존재 (예: 'a', '1', ' ')
             - 특수 키: Key.shift, Key.ctrl 등의 형태
        
    Returns:
        str: 키의 문자열 표현
             - 일반 문자: 해당 문자 그대로 (예: 'a', '1')
             - 특수 키: 'Key.shift', 'Key.ctrl' 형태
    """
    if hasattr(key, 'char') and key.char is not None:
        return key.char
    return str(key)

def print_info(message):
    """
    정보 메시지를 일관된 형태로 출력
    
    모든 프로그램 메시지를 [Info] 태그와 함께 출력하여
    사용자 입력이나 다른 출력과 구분되도록 합니다.
    
    Args:
        message (str): 출력할 메시지
    """
    print(f"[Info] {message}")

def calculate_distance(pos1, pos2):
    """
    두 점 사이의 유클리드 거리를 계산
    
    마우스 이동 거리를 계산하여 미세한 움직임을 필터링하는 데 사용됩니다.
    픽셀 단위의 정확한 거리를 계산합니다.
    
    Args:
        pos1 (tuple): 첫 번째 점의 좌표 (x, y)
        pos2 (tuple): 두 번째 점의 좌표 (x, y)
        
    Returns:
        float: 두 점 사이의 거리 (픽셀 단위)
               예: (0,0)과 (3,4) 사이의 거리는 5.0
    """
    return math.sqrt((pos1[0] - pos2[0])**2 + (pos1[1] - pos2[1])**2)

def add_event(event):
    """
    이벤트를 전역 이벤트 리스트에 추가하고 진행 상황을 출력
    
    단순히 리스트에 추가하는 것 외에도 일정 간격으로 진행 상황을 보고하여
    사용자가 녹화가 정상적으로 진행되고 있음을 알 수 있도록 합니다.
    
    Args:
        event (dict): 추가할 이벤트 딕셔너리
                     필수 키: type, time
                     선택 키: x, y, key, button 등
    """
    events.append(event)
    
    # 500개마다 진행 상황 출력 (사용자에게 피드백 제공)
    # 너무 자주 출력하면 성능에 영향을 줄 수 있어 적절한 간격 선택
    if len(events) % 500 == 0:
        print_info(f"기록된 이벤트: {len(events)}개")

# =============================================================================
# 마우스 이벤트 처리 함수들
# =============================================================================
def on_mouse_click(x, y, button, pressed):
    """
    마우스 클릭 이벤트 처리 함수
    
    pynput에서 마우스 버튼이 눌리거나 떼질 때마다 호출됩니다.
    pressed 파라미터로 눌림/뗌을 구분하여 각각 mouse_down/mouse_up 이벤트로 기록합니다.
    
    Args:
        x (int): 마우스 X 좌표 (화면 기준 절대 좌표)
        y (int): 마우스 Y 좌표 (화면 기준 절대 좌표)
        button: 클릭된 마우스 버튼
               - Button.left: 왼쪽 버튼
               - Button.right: 오른쪽 버튼
               - Button.middle: 가운데 버튼 (휠 클릭)
        pressed (bool): True이면 버튼 눌림, False이면 버튼 뗌
    """
    global mouse_button_down
    
    # 마우스 클릭 이벤트 기록이 비활성화된 경우 무시
    if not FilterConfig.RECORD_MOUSE_CLICK:
        return
    
    # 마우스 버튼 상태 업데이트
    # 이 변수는 실시간 상태 추적용이며, 실제 드래그 판단은 후처리에서 수행
    mouse_button_down = pressed
    
    # 이벤트 타입 결정
    # pressed가 True면 "mouse_down", False면 "mouse_up"
    event_type = "mouse_down" if pressed else "mouse_up"
    
    # 마우스 클릭 이벤트 추가
    # button은 객체이므로 문자열로 변환하여 저장
    add_event({
        "type": event_type,
        "time": get_elapsed_time(),
        "x": x,
        "y": y,
        "button": str(button)  # 예: "Button.left", "Button.right"
    })

def on_mouse_move(x, y):
    """
    마우스 이동 이벤트 처리 함수
    
    *** 핵심 포인트 ***
    이 함수에서는 드래그 여부를 판단하지 않고 모든 이동을 'move'로 기록합니다.
    드래그 판단은 모든 이벤트를 수집한 후 후처리 단계에서 수행합니다.
    이렇게 하면 100% 정확한 드래그 감지가 가능합니다.
    
    필터링 로직:
    1. 시간 간격 체크: 너무 빠른 연속 이벤트 제거 (CPU 부하 감소)
    2. 거리 체크: 미세한 움직임 제거 (손떨림, 센서 노이즈 제거)
    
    Args:
        x (int): 마우스 X 좌표 (화면 기준 절대 좌표)
        y (int): 마우스 Y 좌표 (화면 기준 절대 좌표)
    """
    global last_mouse_time, last_mouse_pos, filtered_events_count
    
    current_time = time.time()
    current_pos = (x, y)
    
    # 마우스 이동 이벤트 기록이 비활성화된 경우 무시
    # 이 경우 드래그도 감지할 수 없게 됨
    if not FilterConfig.RECORD_MOUSE_MOVE:
        filtered_events_count += 1
        return
    
    # =========================================================================
    # 필터링 1: 시간 간격 체크
    # =========================================================================
    # 마지막 이벤트로부터 충분한 시간이 지나지 않았으면 무시
    # 예: 10ms 간격이면 초당 최대 100개의 이벤트만 기록
    if current_time - last_mouse_time < FilterConfig.MOUSE_MOVE_INTERVAL:
        filtered_events_count += 1
        return
    
    # =========================================================================
    # 필터링 2: 거리 체크
    # =========================================================================
    # 마지막 위치로부터 충분히 움직이지 않았으면 무시
    # 미세한 손떨림이나 센서 노이즈를 제거하는 효과
    if calculate_distance(current_pos, last_mouse_pos) < FilterConfig.MOUSE_MOVE_DISTANCE:
        filtered_events_count += 1
        return
    
    # =========================================================================
    # 이벤트 기록
    # =========================================================================
    # 중요: 여기서는 모든 이동을 'move'로 기록
    # 드래그 여부는 후처리(fix_drag_events)에서 판단
    add_event({
        "type": "move",  # 후처리에서 필요시 'drag'로 변경됨
        "time": get_elapsed_time(),
        "x": x,
        "y": y
    })
    
    # 다음 필터링을 위한 정보 업데이트
    last_mouse_time = current_time
    last_mouse_pos = current_pos

def on_mouse_scroll(x, y, dx, dy):
    """
    마우스 스크롤 이벤트 처리 함수
    
    마우스 휠을 스크롤할 때 호출되며, 수직/수평 스크롤을 모두 지원합니다.
    대부분의 마우스는 수직 스크롤만 지원하지만, 일부 마우스나 트랙패드는
    수평 스크롤도 가능합니다.
    
    Args:
        x (int): 마우스 X 좌표 (스크롤 시점의 마우스 위치)
        y (int): 마우스 Y 좌표 (스크롤 시점의 마우스 위치)
        dx (int): 수평 스크롤 양
                 - 음수: 왼쪽으로 스크롤
                 - 양수: 오른쪽으로 스크롤
                 - 0: 수평 스크롤 없음
        dy (int): 수직 스크롤 양
                 - 음수: 아래로 스크롤 (일반적인 방향)
                 - 양수: 위로 스크롤
                 - 0: 수직 스크롤 없음
    """
    # 마우스 스크롤 이벤트 기록이 비활성화된 경우 무시
    if not FilterConfig.RECORD_MOUSE_SCROLL:
        return
    
    add_event({
        "type": "scroll",
        "time": get_elapsed_time(),
        "x": x,
        "y": y,
        "dx": dx,  # 수평 스크롤 정보
        "dy": dy   # 수직 스크롤 정보
    })

# =============================================================================
# 키보드 이벤트 처리 함수들
# =============================================================================
def on_key_press(key):
    """
    키보드 키 눌림 이벤트 처리 함수
    
    키가 눌릴 때마다 호출되며, 두 가지 주요 기능을 수행합니다:
    1. ESC 키 감지하여 녹화 종료
    2. 일반 키 입력을 이벤트로 기록
    
    키 반복 입력 처리:
    - 키를 계속 누르고 있으면 OS에서 반복 이벤트를 발생시킴
    - pressed_keys 집합을 사용해 중복 제거
    
    Args:
        key: 눌린 키 객체
            - 일반 문자키: key.char 속성 존재
            - 특수키: Key.shift, Key.ctrl, Key.esc 등
    """
    # =========================================================================
    # ESC 키 처리 - 최우선 처리
    # =========================================================================
    if key == EXIT_KEY:
        print_info("ESC 키로 녹화 종료됨.")
        save_events_and_exit()  # 즉시 저장하고 종료
    
    # 키보드 눌림 이벤트 기록이 비활성화된 경우 무시
    if not FilterConfig.RECORD_KEY_PRESS:
        return
    
    key_name = get_key_name(key)
    
    # =========================================================================
    # 키 반복 입력 방지
    # =========================================================================
    # 키를 계속 누르고 있을 때 OS가 발생시키는 반복 이벤트를 걸러냄
    # pressed_keys 집합에 이미 있는 키는 무시
    if key_name not in pressed_keys:
        pressed_keys.add(key_name)  # 눌린 키 목록에 추가
        add_event({
            "type": "key_down",
            "time": get_elapsed_time(),
            "key": key_name
        })

def on_key_release(key):
    """
    키보드 키 뗌 이벤트 처리 함수
    
    키가 떼질 때 호출됩니다. 기본적으로 비활성화되어 있으며,
    특별한 경우가 아니면 key_down 이벤트만으로도 충분합니다.
    
    Args:
        key: 뗀 키 객체 (on_key_press와 동일한 형태)
    """
    # 키보드 뗌 이벤트 기록이 비활성화된 경우 무시
    # 기본값이 False이므로 대부분의 경우 이 함수는 즉시 리턴됨
    if not FilterConfig.RECORD_KEY_RELEASE:
        return
    
    key_name = get_key_name(key)
    
    # 눌린 키 목록에서 제거
    # 키가 떼졌으므로 더 이상 "눌린" 상태가 아님
    if key_name in pressed_keys:
        pressed_keys.remove(key_name)
    
    add_event({
        "type": "key_up",
        "time": get_elapsed_time(),
        "key": key_name
    })

# =============================================================================
# 핵심 기능: 이벤트 후처리
# =============================================================================
def fix_drag_events(events):
    """
    이벤트 목록을 분석하여 드래그 상태를 정확히 구분하는 후처리 함수
    
    *** 이 프로그램의 핵심 알고리즘 ***
    
    실시간으로 드래그를 판단하는 것은 여러 문제가 있습니다:
    1. 이벤트 순서가 보장되지 않을 수 있음
    2. 버퍼링으로 인한 지연
    3. 멀티 버튼 처리의 복잡성
    
    해결책: 모든 이벤트를 먼저 수집한 후, 순차적으로 분석하여 드래그 구간을 찾습니다.
    
    알고리즘:
    1. 각 마우스 버튼별로 상태를 추적 (왼쪽, 오른쪽, 가운데 독립적으로)
    2. mouse_down 이벤트를 만나면 해당 버튼을 "눌림" 상태로 표시
    3. mouse_up 이벤트를 만나면 해당 버튼을 "뗌" 상태로 표시
    4. move 이벤트를 만났을 때 어떤 버튼이라도 눌려있으면 drag로 변경
    
    Args:
        events (list): 원본 이벤트 목록
                      각 이벤트는 type, time, x, y 등의 키를 가진 딕셔너리
        
    Returns:
        list: 드래그 상태가 수정된 이벤트 목록
              원본은 변경하지 않고 새 리스트 반환
    """
    if not events:
        return events
    
    print_info("드래그 이벤트 후처리 시작...")
    
    fixed_events = []           # 수정된 이벤트를 저장할 새 리스트
    mouse_buttons_state = {}    # 각 마우스 버튼의 눌림 상태 추적
                               # 키: "Button.left", "Button.right" 등
                               # 값: True(눌림) 또는 False(뗌)
    changes_made = 0           # 수정된 이벤트 개수 (통계용)
    
    # =========================================================================
    # 모든 이벤트를 순차적으로 처리
    # =========================================================================
    for i, event in enumerate(events):
        # 이벤트 복사본 생성 (원본 수정 방지)
        new_event = event.copy()
        
        # =====================================================================
        # 1. 마우스 버튼 상태 업데이트
        # =====================================================================
        if event["type"] == "mouse_down":
            # 마우스 버튼이 눌렸음 - 해당 버튼을 "눌림" 상태로 표시
            button = event.get("button", "Button.left")  # 버튼 정보가 없으면 왼쪽 버튼으로 가정
            mouse_buttons_state[button] = True
            
        elif event["type"] == "mouse_up":
            # 마우스 버튼이 떼졌음 - 해당 버튼을 "뗌" 상태로 표시
            button = event.get("button", "Button.left")
            mouse_buttons_state[button] = False
            
        # =====================================================================
        # 2. move 이벤트를 drag로 변경 (필요한 경우)
        # =====================================================================
        elif event["type"] == "move":
            # 현재 어떤 마우스 버튼이라도 눌려있는지 확인
            # values()로 모든 버튼의 상태를 가져와서 하나라도 True인지 체크
            any_button_down = any(mouse_buttons_state.values())
            
            if any_button_down:
                # 마우스 버튼이 눌린 상태에서의 이동은 드래그!
                new_event["type"] = "drag"
                changes_made += 1
                
                # 디버깅 정보 출력 (옵션)
                if FilterConfig.ENABLE_DEBUG_INFO:
                    # 어떤 버튼이 눌려있는지도 함께 출력
                    pressed_buttons = [btn for btn, pressed in mouse_buttons_state.items() if pressed]
                    print(f"[Fix] 이벤트 {i}: move -> drag "
                          f"(위치: {event['x']}, {event['y']}, "
                          f"눌린 버튼: {pressed_buttons})")
        
        # 수정된(또는 그대로인) 이벤트를 결과 리스트에 추가
        fixed_events.append(new_event)
    
    # 처리 완료 보고
    print_info(f"드래그 이벤트 후처리 완료: {changes_made}개 이벤트가 move에서 drag로 변경됨")
    return fixed_events

# =============================================================================
# 녹화 제어 함수들
# =============================================================================
def reset_recording_state():
    """
    모든 녹화 관련 전역 변수를 초기 상태로 리셋
    
    새로운 녹화를 시작하기 전에 이전 녹화의 잔여 데이터를 모두 제거합니다.
    전역 변수를 사용하므로 명시적인 초기화가 중요합니다.
    """
    global events, pressed_keys, mouse_button_down, start_time
    global last_mouse_time, last_mouse_pos, filtered_events_count
    
    # 이벤트 관련 초기화
    events.clear()              # 이벤트 리스트 비우기
    pressed_keys.clear()        # 눌린 키 추적 집합 비우기
    mouse_button_down = False   # 마우스 버튼 상태 초기화
    start_time = None          # 녹화 시작 시간 초기화
    
    # 필터링 관련 초기화
    last_mouse_time = 0        # 마지막 마우스 이벤트 시간 초기화
    last_mouse_pos = (0, 0)    # 마지막 마우스 위치 초기화
    filtered_events_count = 0   # 필터링 카운터 초기화
    
    print_info("녹화 상태 초기화 완료")

def wait_for_start_key():
    """
    사용자가 정확히 소문자 's' 키를 누를 때까지 대기
    
    특징:
    1. 대소문자 구분: 정확히 소문자 's'만 인식
    2. 잘못된 입력 피드백: 다른 키를 누르면 안내 메시지 출력
    3. ESC로 즉시 종료 가능
    
    내부 동작:
    - 임시 키보드 리스너를 생성하여 's' 키 입력만 대기
    - 's' 키가 입력되면 리스너를 종료하고 함수 리턴
    - 다른 키가 입력되면 안내 메시지 출력 후 계속 대기
    """
    print_info(f"'{START_KEY_CHAR}' 키를 누르면 녹화를 시작합니다... (ESC로 종료)")
    print_info("주의: 정확히 소문자 's' 키만 인식됩니다")
    
    def on_start_key_press(key):
        """
        시작 키 입력을 처리하는 내부 함수
        
        Returns:
            bool: True면 계속 대기, False면 리스너 종료
        """
        
        # =====================================================================
        # ESC 키 처리 - 프로그램 완전 종료
        # =====================================================================
        if key == EXIT_KEY:
            print_info("ESC 키로 프로그램 종료됨.")
            sys.exit(0)
        
        # =====================================================================
        # 문자 키 처리
        # =====================================================================
        if hasattr(key, 'char') and key.char is not None:
            if key.char == START_KEY_CHAR:
                # 정확한 시작 키 ('s') 입력됨
                print_info(f"'{START_KEY_CHAR}' 키 인식됨! 녹화를 시작합니다.")
                return False  # 리스너 종료 → 녹화 시작
            else:
                # 다른 문자 키가 입력됨
                # 사용자가 실수했을 가능성이 높으므로 친절한 안내
                print_info(f"'{key.char}' 키가 눌렸습니다. '{START_KEY_CHAR}' 키를 눌러주세요.")
                return True  # 계속 대기
        
        # =====================================================================
        # 특수 키 처리 (Shift, Ctrl, Alt, Space 등)
        # =====================================================================
        else:
            # Key.shift → 'shift'로 변환하여 읽기 쉽게 표시
            key_name = str(key).replace('Key.', '')
            print_info(f"'{key_name}' 키가 눌렸습니다. '{START_KEY_CHAR}' 키를 눌러주세요.")
            return True  # 계속 대기
    
    # 키보드 리스너 시작하여 시작 키 대기
    # with 문을 사용하여 자동으로 리소스 정리
    with keyboard.Listener(on_press=on_start_key_press) as listener:
        listener.join()  # 리스너가 종료될 때까지 블로킹

def start_recording():
    """
    실제 녹화를 시작하고 마우스/키보드 이벤트 리스너를 등록
    
    녹화 프로세스:
    1. 이전 녹화 데이터 초기화
    2. 카운트다운 (사용자 준비 시간)
    3. 시작 시간 기록
    4. 마우스/키보드 리스너 동시 실행
    5. ESC 키 입력 시까지 대기
    6. 예외 발생 시 안전하게 저장 후 종료
    """
    global start_time
    
    # 녹화 상태 초기화 (이전 데이터 제거)
    reset_recording_state()
    
    # =========================================================================
    # 카운트다운 - 사용자에게 준비 시간 제공
    # =========================================================================
    print_info(f"{COUNTDOWN_SECONDS}초 후 녹화 시작...")
    time.sleep(COUNTDOWN_SECONDS)
    
    # 녹화 시작 시간 기록 (모든 이벤트의 시간 기준점)
    start_time = time.time()
    print_info("녹화 시작! (ESC로 중단)")
    print_info("이벤트 후처리 방식: 저장 전에 드래그 상태를 정리합니다")
    
    try:
        # =====================================================================
        # 마우스와 키보드 이벤트 리스너 동시 시작
        # =====================================================================
        # with 문으로 두 리스너를 동시에 관리
        # 하나라도 종료되면 나머지도 자동으로 정리됨
        with mouse.Listener(
            on_click=on_mouse_click,    # 마우스 클릭 처리 (down/up)
            on_move=on_mouse_move,      # 마우스 이동 처리 (move/drag)
            on_scroll=on_mouse_scroll   # 마우스 스크롤 처리
        ) as mouse_listener, \
        keyboard.Listener(
            on_press=on_key_press,      # 키보드 눌림 처리
            on_release=on_key_release   # 키보드 뗌 처리
        ) as keyboard_listener:
            
            # 키보드 리스너가 종료될 때까지 대기
            # ESC 키가 눌리면 on_key_press에서 save_events_and_exit() 호출
            keyboard_listener.join()
            
    except Exception as e:
        # 예상치 못한 오류 발생 시에도 지금까지 기록된 이벤트는 저장
        print_info(f"녹화 중 오류 발생: {e}")
        save_events_and_exit()

def save_events_and_exit():
    """
    기록된 이벤트를 후처리하여 JSON 파일로 저장하고 프로그램 종료
    
    저장 프로세스:
    1. 드래그 이벤트 후처리 (fix_drag_events 호출)
    2. 이벤트 타입별 통계 계산
    3. 메타데이터 생성 (녹화 정보, 설정, 통계 등)
    4. JSON 파일로 저장 (읽기 쉬운 형태로 포맷팅)
    5. 결과 요약 출력
    6. 프로그램 종료
    
    JSON 구조:
    {
        "metadata": {
            "total_events": 이벤트 총 개수,
            "recording_duration": 녹화 시간,
            "event_types": 타입별 개수,
            ...
        },
        "events": [
            {"type": "mouse_down", "time": 0.0, "x": 100, "y": 200, ...},
            {"type": "drag", "time": 0.1, "x": 110, "y": 210},
            ...
        ]
    }
    """
    global events
    
    try:
        print_info("이벤트 저장 시작...")
        
        # =====================================================================
        # 핵심: 드래그 이벤트 후처리 수행
        # =====================================================================
        original_count = len(events)
        events = fix_drag_events(events)  # move → drag 변환
        
        # =====================================================================
        # 이벤트 타입별 개수 계산 (통계 정보)
        # =====================================================================
        event_types = {}
        for event in events:
            event_type = event.get("type", "unknown")
            event_types[event_type] = event_types.get(event_type, 0) + 1
        
        # =====================================================================
        # JSON 파일로 저장
        # =====================================================================
        with open(OUTPUT_FILE, "w", encoding="utf-8") as f:
            json.dump({
                # 메타데이터 섹션 - 녹화 정보와 통계
                "metadata": {
                    "total_events": len(events),                    # 총 이벤트 수
                    "filtered_events": filtered_events_count,       # 필터링된 이벤트 수
                    "recording_duration": get_elapsed_time(),       # 녹화 시간 (초)
                    "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),  # 녹화 일시
                    "platform": sys.platform,                       # OS 정보 (win32, darwin, linux)
                    "algorithm_version": "post_processing_drag_fix_v1.0",  # 알고리즘 버전
                    "event_types": event_types,                     # 타입별 이벤트 개수
                    "filter_settings": {                            # 필터링 설정값
                        "mouse_move_interval": FilterConfig.MOUSE_MOVE_INTERVAL,
                        "mouse_move_distance": FilterConfig.MOUSE_MOVE_DISTANCE
                    }
                },
                # 실제 이벤트 데이터
                "events": events
            }, f, indent=2, ensure_ascii=False)  # indent=2로 읽기 쉽게, 한글 지원
        
        # =====================================================================
        # 결과 출력
        # =====================================================================
        print_info(f"입력 기록 완료: {OUTPUT_FILE}")
        print_info(f"기록된 이벤트: {len(events)}개")
        print_info(f"필터링된 이벤트: {filtered_events_count}개")
        
        # 이벤트 타입별 통계 출력
        if event_types:
            print_info("이벤트 타입별 개수:")
            for event_type, count in sorted(event_types.items()):
                print(f"  - {event_type}: {count}개")
        
        print_info("후처리 완료: mouse_down/up 사이의 move가 모두 drag로 수정됨")
        
    except Exception as e:
        print_info(f"파일 저장 중 오류 발생: {e}")
    
    finally:
        # 오류 발생 여부와 관계없이 프로그램 종료
        sys.exit(0)

# =============================================================================
# 메인 실행 부분
# =============================================================================
def main():
    """
    프로그램의 메인 진입점
    
    실행 흐름:
    1. 프로그램 정보 및 설정 출력
    2. 시작 키 ('s') 대기
    3. 녹화 시작
    4. 예외 처리 (Ctrl+C, 기타 오류)
    
    예외 처리:
    - KeyboardInterrupt: Ctrl+C로 강제 종료 시
    - 기타 예외: 예상치 못한 오류 발생 시
    - 모든 경우에 가능한 한 데이터를 저장하려고 시도
    """
    try:
        # =====================================================================
        # 프로그램 시작 안내
        # =====================================================================
        print_info("마우스/키보드 입력 기록기 시작")
        print_info("작동 방식: 일단 기록 후 저장 전 드래그 상태 후처리")
        print_info("드래그 감지: mouse_down과 mouse_up 사이의 모든 move를 drag로 변경")
        print_info("-" * 60)
        
        # =====================================================================
        # 시스템 및 설정 정보 출력
        # =====================================================================
        print_info(f"시스템: {sys.platform}")
        print_info(f"필터링 시간 간격: {FilterConfig.MOUSE_MOVE_INTERVAL}초")
        print_info(f"필터링 거리: {FilterConfig.MOUSE_MOVE_DISTANCE}픽셀")
        print_info("-" * 60)
        
        # =====================================================================
        # 1단계: 시작 키 대기
        # =====================================================================
        wait_for_start_key()
        
        # =====================================================================
        # 2단계: 실제 녹화 시작
        # =====================================================================
        start_recording()
        
    except KeyboardInterrupt:
        # Ctrl+C로 강제 종료한 경우
        print_info("사용자에 의해 프로그램이 중단되었습니다.")
        save_events_and_exit()  # 지금까지 기록된 이벤트 저장
        
    except Exception as e:
        # 기타 예상치 못한 오류
        print_info(f"예상치 못한 오류가 발생했습니다: {e}")
        sys.exit(1)

# =============================================================================
# 프로그램 시작점
# =============================================================================
# 이 스크립트가 직접 실행될 때만 main() 함수 호출
# 다른 모듈에서 import할 때는 실행되지 않음
if __name__ == "__main__":
    main()

replay_input_full_cli.py

"""
기록된 마우스/키보드 이벤트를 재생하는 프로그램

주요 기능:
- JSON 파일에서 이벤트 로드 및 유효성 검사
- 시간 간격 기반 정확한 재생
- 재생 속도 조절 가능 (배속 설정)
- 안전한 재생 (ESC키로 중단 가능)
- 마우스 버튼 상태 정확한 추적
- 부드러운 마우스 이동 처리
- 상세한 진행 상황 및 통계 정보 제공

지원하는 이벤트 타입:
- mouse_down/mouse_up: 마우스 클릭
- move/drag: 마우스 이동 (구분 없이 동일하게 처리)
- scroll: 마우스 스크롤
- key_down/key_up: 키보드 입력

사용법:
python event_player.py [파일명] [재생속도]
예: python event_player.py recorded_events.json 2.0
"""

import time
import json
import sys
import argparse
from pynput import mouse, keyboard
from pynput.mouse import Button
from pynput.keyboard import Key

# =============================================================================
# 상수 정의
# =============================================================================
DEFAULT_FILE = "recorded_events.json"  # 기본 입력 파일명
DEFAULT_SPEED = 1.0                     # 기본 재생 속도 (1배속)
COUNTDOWN_SECONDS = 3                   # 재생 시작 전 대기 시간
EXIT_KEY = Key.esc                      # 재생 중단 키 (ESC)

# =============================================================================
# 전역 변수
# =============================================================================
should_stop = False             # 재생 중단 플래그
mouse_controller = mouse.Controller()      # 마우스 제어 객체
keyboard_controller = keyboard.Controller()  # 키보드 제어 객체

# =============================================================================
# 유틸리티 함수들
# =============================================================================
def print_info(message):
    """
    정보 메시지를 일관된 형태로 출력
    
    Args:
        message (str): 출력할 메시지
    """
    print(f"[Info] {message}")

def print_error(message):
    """
    에러 메시지를 일관된 형태로 출력
    
    Args:
        message (str): 출력할 에러 메시지
    """
    print(f"[Error] {message}")

def on_key_press(key):
    """
    재생 중 키 입력을 감지하여 ESC로 재생 중단 처리
    
    Args:
        key: 눌린 키 객체
        
    Returns:
        bool: False이면 리스너 종료
    """
    global should_stop
    if key == EXIT_KEY:
        print_info("ESC 키로 재생이 중단되었습니다.")
        should_stop = True
        return False

def parse_button(button_str):
    """
    문자열을 마우스 버튼 객체로 변환
    
    Args:
        button_str (str): 버튼 문자열 (예: "Button.left", "left")
        
    Returns:
        Button: 마우스 버튼 객체
    """
    # 문자열과 버튼 객체 매핑 테이블
    button_map = {
        "Button.left": Button.left,
        "Button.right": Button.right,
        "Button.middle": Button.middle,
        "left": Button.left,
        "right": Button.right,
        "middle": Button.middle
    }
    
    # 매핑되지 않은 경우 기본값으로 왼쪽 버튼 반환
    return button_map.get(button_str, Button.left)

def parse_key(key_str):
    """
    문자열을 키보드 키 객체로 변환
    
    Args:
        key_str (str): 키 문자열
        
    Returns:
        Key or str: 키보드 키 객체 또는 문자
    """
    # 특수 키 매핑 테이블
    key_map = {
        "Key.space": Key.space,
        "Key.enter": Key.enter,
        "Key.tab": Key.tab,
        "Key.backspace": Key.backspace,
        "Key.delete": Key.delete,
        "Key.esc": Key.esc,
        "Key.shift": Key.shift,
        "Key.ctrl": Key.ctrl,
        "Key.alt": Key.alt,
        "Key.cmd": Key.cmd,
        "Key.up": Key.up,
        "Key.down": Key.down,
        "Key.left": Key.left,
        "Key.right": Key.right,
        "Key.home": Key.home,
        "Key.end": Key.end,
        "Key.page_up": Key.page_up,
        "Key.page_down": Key.page_down,
    }
    
    # 특수 키인 경우 매핑된 객체 반환
    if key_str in key_map:
        return key_map[key_str]
    
    # 일반 문자인 경우 문자 그대로 반환
    if len(key_str) == 1:
        return key_str
    
    # 기본값으로 문자열 그대로 반환
    return key_str

# =============================================================================
# 파일 로드 및 데이터 검증
# =============================================================================
def load_events(file_path):
    """
    JSON 파일에서 이벤트 데이터를 로드
    신규 형식과 구 형식 모두 지원
    
    Args:
        file_path (str): JSON 파일 경로
        
    Returns:
        tuple: (events_list, metadata_dict) 또는 (None, None) if 실패
    """
    try:
        # UTF-8 인코딩으로 JSON 파일 읽기
        with open(file_path, "r", encoding="utf-8") as f:
            data = json.load(f)
        
        # 신규 형식 확인 (metadata와 events가 분리된 구조)
        if isinstance(data, dict) and "events" in data:
            events = data["events"]
            metadata = data.get("metadata", {})
            print_info("신규 형식의 이벤트 파일을 로드했습니다.")
            
        # 구 형식 확인 (이벤트 배열만 있는 구조)
        elif isinstance(data, list):
            events = data
            metadata = {
                "total_events": len(events), 
                "format": "legacy",
                "algorithm_version": "unknown"
            }
            print_info("구 형식의 이벤트 파일을 로드했습니다.")
            
        else:
            raise ValueError("지원하지 않는 파일 형식입니다.")
        
        return events, metadata
        
    except FileNotFoundError:
        print_error(f"파일을 찾을 수 없습니다: {file_path}")
        return None, None
    except json.JSONDecodeError:
        print_error(f"JSON 파일 형식이 올바르지 않습니다: {file_path}")
        return None, None
    except Exception as e:
        print_error(f"파일 로드 중 오류 발생: {e}")
        return None, None

def print_metadata(metadata):
    """
    메타데이터 정보를 사용자 친화적으로 출력
    
    Args:
        metadata (dict): 메타데이터 딕셔너리
    """
    if not metadata:
        return
    
    print_info("=== 파일 정보 ===")
    
    # 기본 정보
    if "total_events" in metadata:
        print(f"  총 이벤트 수: {metadata['total_events']}개")
    
    # 필터링 정보
    if "filtered_events" in metadata:
        print(f"  필터링된 이벤트: {metadata['filtered_events']}개")
        if "total_events" in metadata:
            total = metadata.get("total_events", 0) + metadata.get("filtered_events", 0)
            if total > 0:
                print(f"  필터링 비율: {metadata['filtered_events']/total*100:.1f}%")
    
    # 시간 정보
    if "recording_duration" in metadata:
        duration = metadata["recording_duration"]
        print(f"  녹화 시간: {duration:.2f}초")
    
    # 생성 정보
    if "timestamp" in metadata:
        print(f"  생성 일시: {metadata['timestamp']}")
    
    # 알고리즘 정보
    if "algorithm_version" in metadata:
        print(f"  알고리즘 버전: {metadata['algorithm_version']}")
    
    # 이벤트 타입별 통계
    if "event_types" in metadata:
        print("  이벤트 타입별 개수:")
        for event_type, count in sorted(metadata["event_types"].items()):
            print(f"    - {event_type}: {count}개")

def validate_events(events):
    """
    이벤트 데이터의 유효성을 검사
    
    Args:
        events (list): 이벤트 리스트
        
    Returns:
        bool: 유효성 검사 통과 여부
    """
    if not events:
        print_error("이벤트가 없습니다.")
        return False
    
    # 필수 필드 목록
    required_fields = ["type", "time"]
    
    # 처음 10개 이벤트만 검사 (성능상 이유)
    for i, event in enumerate(events[:10]):
        if not isinstance(event, dict):
            print_error(f"이벤트 {i}: 딕셔너리 형태가 아닙니다.")
            return False
        
        # 필수 필드 존재 여부 확인
        for field in required_fields:
            if field not in event:
                print_error(f"이벤트 {i}: 필수 필드 '{field}'가 없습니다.")
                return False
    
    print_info(f"이벤트 유효성 검사 통과: {len(events)}개 이벤트")
    return True

# =============================================================================
# 이벤트 재생 엔진
# =============================================================================
def replay_events(events, speed_factor=1.0):
    """
    이벤트 목록을 시간 순서대로 재생
    마우스 버튼 상태를 정확히 추적하고 부드러운 이동 제공
    
    Args:
        events (list): 재생할 이벤트 리스트
        speed_factor (float): 재생 속도 배율 (1.0 = 정상 속도)
    """
    global should_stop
    should_stop = False
    
    # ESC 키 감지를 위한 키보드 리스너 시작
    listener = keyboard.Listener(on_press=on_key_press)
    listener.start()
    
    # 마우스 버튼 상태 추적 (중복 클릭 방지)
    mouse_buttons_down = set()
    
    try:
        print_info(f"재생 시작... (ESC로 중단, 속도: {speed_factor}배)")
        start_replay_time = time.time()
        successful_events = 0
        failed_events = 0
        
        for i, event in enumerate(events):
            # 중단 요청 확인
            if should_stop:
                break
            
            # 이벤트 간 시간 간격 계산 및 대기
            if i > 0:
                delay = event["time"] - events[i - 1]["time"]
                if delay > 0:
                    # 최소 지연 시간 보장 (클릭 이벤트 확실히 처리)
                    actual_delay = max(delay / speed_factor, 0.001)  # 최소 1ms
                    time.sleep(actual_delay)
            
            # 이벤트 타입별 처리
            try:
                event_type = event["type"]
                
                # 마우스 버튼 눌림 처리
                if event_type == "mouse_down":
                    button = parse_button(event.get("button", "Button.left"))
                    mouse_controller.position = (event["x"], event["y"])
                    
                    # 중복 눌림 방지
                    button_str = str(button)
                    if button_str not in mouse_buttons_down:
                        mouse_controller.press(button)
                        mouse_buttons_down.add(button_str)
                        # 디버그용 출력 (필요시 주석 해제)
                        # print(f"[Debug] 마우스 DOWN: {button} at ({event['x']}, {event['y']})")
                    
                    successful_events += 1
                
                # 마우스 버튼 뗌 처리
                elif event_type == "mouse_up":
                    button = parse_button(event.get("button", "Button.left"))
                    mouse_controller.position = (event["x"], event["y"])
                    
                    # 중복 뗌 방지
                    button_str = str(button)
                    if button_str in mouse_buttons_down:
                        mouse_controller.release(button)
                        mouse_buttons_down.remove(button_str)
                        # 디버그용 출력 (필요시 주석 해제)
                        # print(f"[Debug] 마우스 UP: {button} at ({event['x']}, {event['y']})")
                    
                    successful_events += 1
                
                # 마우스 이동 처리 (move와 drag 구분 없이 동일하게 처리)
                elif event_type in ["move", "drag"]:
                    current_pos = mouse_controller.position
                    target_pos = (event["x"], event["y"])
                    
                    # 큰 거리 이동 시 부드러운 이동을 위한 중간 단계 처리
                    distance = ((current_pos[0] - target_pos[0])**2 + (current_pos[1] - target_pos[1])**2)**0.5
                    if distance > 100:  # 100픽셀 이상 차이나면
                        # 중간 지점으로 먼저 이동
                        mid_x = (current_pos[0] + target_pos[0]) / 2
                        mid_y = (current_pos[1] + target_pos[1]) / 2
                        mouse_controller.position = (int(mid_x), int(mid_y))
                        time.sleep(0.001)  # 1ms 대기
                    
                    # 최종 목표 위치로 이동
                    mouse_controller.position = target_pos
                    successful_events += 1
                
                # 마우스 스크롤 처리
                elif event_type == "scroll":
                    mouse_controller.position = (event["x"], event["y"])
                    mouse_controller.scroll(event.get("dx", 0), event.get("dy", 0))
                    successful_events += 1
                
                # 키보드 눌림 처리
                elif event_type == "key_down":
                    key = parse_key(event["key"])
                    keyboard_controller.press(key)
                    successful_events += 1
                
                # 키보드 뗌 처리
                elif event_type == "key_up":
                    key = parse_key(event["key"])
                    keyboard_controller.release(key)
                    successful_events += 1
                
                else:
                    print_error(f"알 수 없는 이벤트 타입: {event_type}")
                    failed_events += 1
                
            except Exception as e:
                print_error(f"이벤트 {i} 재생 중 오류: {e}")
                failed_events += 1
                continue
            
            # 진행 상황 출력 (1000개마다)
            if (i + 1) % 1000 == 0:
                print_info(f"재생 진행: {i+1}/{len(events)} ({(i+1)/len(events)*100:.1f}%)")
        
        # 재생 완료 후 정리 작업
        # 혹시 남아있는 마우스 버튼들을 모두 해제
        for button_str in list(mouse_buttons_down):
            try:
                button = parse_button(button_str)
                mouse_controller.release(button)
                # 디버그용 출력 (필요시 주석 해제)
                # print(f"[Debug] 재생 완료 후 마우스 UP: {button}")
            except:
                pass
        
        # 재생 결과 통계
        actual_duration = time.time() - start_replay_time
        expected_duration = events[-1]["time"] / speed_factor if events else 0
        
        if should_stop:
            print_info(f"재생이 중단되었습니다. (진행률: {i+1}/{len(events)})")
        else:
            print_info(f"재생 완료!")
        
        print_info(f"성공한 이벤트: {successful_events}개")
        if failed_events > 0:
            print_info(f"실패한 이벤트: {failed_events}개")
        print_info(f"예상 시간: {expected_duration:.2f}초, 실제 시간: {actual_duration:.2f}초")
    
    finally:
        listener.stop()

# =============================================================================
# 명령행 인자 처리
# =============================================================================
def parse_arguments():
    """
    명령행 인자를 파싱하여 파일명과 재생 속도 등을 설정
    
    Returns:
        argparse.Namespace: 파싱된 인자들
    """
    parser = argparse.ArgumentParser(
        description="기록된 마우스/키보드 이벤트를 재생합니다.",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
사용 예시:
  python event_player.py                                    # 기본 파일로 1배속 재생
  python event_player.py recorded_events.json              # 지정 파일로 1배속 재생  
  python event_player.py recorded_events.json 2.0          # 지정 파일로 2배속 재생
  python event_player.py --speed 0.5                       # 기본 파일로 0.5배속 재생
  python event_player.py --no-countdown                    # 카운트다운 없이 즉시 재생
        """
    )
    
    # 위치 인자: 파일명
    parser.add_argument(
        "file", 
        nargs="?",  # 선택적 인자
        default=DEFAULT_FILE,
        help=f"재생할 이벤트 파일 (기본값: {DEFAULT_FILE})"
    )
    
    # 위치 인자: 재생 속도
    parser.add_argument(
        "speed", 
        nargs="?",  # 선택적 인자
        type=float, 
        default=DEFAULT_SPEED,
        help=f"재생 속도 배율 (기본값: {DEFAULT_SPEED})"
    )
    
    # 옵션 인자: 재생 속도 (위치 인자보다 우선)
    parser.add_argument(
        "--speed", 
        type=float, 
        dest="speed_arg",
        help="재생 속도 배율 (위치 인자보다 우선)"
    )
    
    # 옵션 인자: 카운트다운 건너뛰기
    parser.add_argument(
        "--no-countdown", 
        action="store_true",
        help="카운트다운 없이 즉시 재생"
    )
    
    return parser.parse_args()

# =============================================================================
# 메인 실행 부분
# =============================================================================
def main():
    """
    프로그램의 메인 진입점
    명령행 인자 처리, 파일 로드, 재생 실행을 담당
    """
    # 명령행 인자 파싱
    args = parse_arguments()
    
    # 재생 속도 결정 (--speed 옵션이 위치 인자보다 우선)
    speed = args.speed_arg if args.speed_arg is not None else args.speed
    
    # 프로그램 정보 출력
    print_info("마우스/키보드 이벤트 재생기")
    print_info(f"파일: {args.file}")
    print_info(f"재생 속도: {speed}배")
    print_info("-" * 50)
    
    # 이벤트 파일 로드
    events, metadata = load_events(args.file)
    if events is None:
        return  # 로드 실패시 종료
    
    # 메타데이터 정보 출력
    print_metadata(metadata)
    print_info("-" * 50)
    
    # 이벤트 데이터 유효성 검사
    if not validate_events(events):
        return  # 검증 실패시 종료
    
    # 카운트다운 (옵션에 따라 생략 가능)
    if not args.no_countdown:
        print_info(f"{COUNTDOWN_SECONDS}초 후 재생을 시작합니다...")
        print_info("재생 중 ESC 키를 누르면 중단됩니다")
        for i in range(COUNTDOWN_SECONDS, 0, -1):
            print(f"{i}...")
            time.sleep(1)
    
    # 이벤트 재생 실행
    try:
        replay_events(events, speed)
        print_info("재생 완료!")
        
    except KeyboardInterrupt:
        print_info("사용자에 의해 재생이 중단되었습니다.")
    except Exception as e:
        print_error(f"재생 중 오류 발생: {e}")

# 프로그램 시작점
if __name__ == "__main__":
    main()
728x90

'Python' 카테고리의 다른 글

[Ex] PyAutoGUI - 키보드 입력  (2) 2025.07.14
[Ex] PyAutoGUI - 마우스 이동 및 클릭  (0) 2025.07.14
pyautogui 기본 사용법  (5) 2025.07.14
[Py] String Interning  (0) 2025.07.07
cv.undistort()와 cv.initUndistortRectifyMap() + cv.remap()  (0) 2025.07.04