开机自动录像,自动打包录像文件

为了记录自己工作的内容,方便后续出现问题对账,我打算将自己的每日工作都使用OBS录像,录制下来,最开始几天手动开启的录像,但是偶尔会忘,所以会有时间差。
写一个脚本,通过windows的计划任务启动该脚本,打开OBS,然后调用OBS的Websocket连接,然后开始录像,录像开始后获取到昨日的日期,然后打包昨日的所有录像文件。
在关机时强制关闭OBS,OBS会自动保存文件

import os
import time
import threading
import subprocess
from datetime import datetime, timedelta
from obswebsocket import obsws, requests
import zipfile

配置信息

CONFIG = {
'obs_host': "----",# 你的IP地址
'obs_port': ----,# 你的端口
'obs_password': "----",# 你的密码
'obs_path': r"----", # OBS程序路径
'obs_working_dir': r"----", # OBS工作目录
'recordings_dir': r"----",# 你的文件保存路径
'archive_dir': r"----",# 你的文件打包后的路径
'log_file': r"----"# 你的log保存路径
}

def log_message(message):
"""记录日志"""
log_dir = os.path.dirname(CONFIG['log_file'])
os.makedirs(log_dir, exist_ok=True)

timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
log_entry = f"[{timestamp}] {message}"
print(log_entry)
with open(CONFIG['log_file'], 'a', encoding='utf-8') as f:
    f.write(log_entry + "\n")

def is_obs_running():
"""检查OBS是否已经在运行"""
try:
import psutil
for process in psutil.process_iter(['name']):
if process.info['name'] and 'obs64.exe' in process.info['name'].lower():
return True
return False
except ImportError:
# 如果没有psutil,使用简单的方法检查
try:
result = subprocess.run(['tasklist', '/fi', 'imagename eq obs64.exe'],
capture_output=True, text=True, creationflags=subprocess.CREATE_NO_WINDOW)
return 'obs64.exe' in result.stdout
except:
# 最后的手段,尝试连接WebSocket
return check_obs_via_websocket()

def check_obs_via_websocket():
"""通过WebSocket检查OBS是否运行"""
try:
client = obsws(CONFIG['obs_host'], CONFIG['obs_port'], CONFIG['obs_password'])
client.connect()
client.disconnect()
return True
except:
return False

def start_obs_program():
"""启动OBS程序(修复工作目录问题)"""
try:
if is_obs_running():
log_message("OBS已经在运行中")
return True

    log_message(f"正在启动OBS程序: {CONFIG['obs_path']}")
    log_message(f"工作目录: {CONFIG['obs_working_dir']}")
    
    if not os.path.exists(CONFIG['obs_path']):
        log_message(f"错误:OBS路径不存在: {CONFIG['obs_path']}")
        return False
    
    if not os.path.exists(CONFIG['obs_working_dir']):
        log_message(f"错误:OBS工作目录不存在: {CONFIG['obs_working_dir']}")
        return False
    
    # 启动OBS程序,设置正确的工作目录
    subprocess.Popen([CONFIG['obs_path']], 
                    cwd=CONFIG['obs_working_dir'],  # 关键修复:设置工作目录
                    stdout=subprocess.DEVNULL, 
                    stderr=subprocess.DEVNULL,
                    creationflags=subprocess.CREATE_NO_WINDOW)
    
    log_message("OBS启动命令已执行,等待程序启动...")
    
    # 等待OBS完全启动
    for i in range(15):  # 最多等待30秒
        time.sleep(2)
        if check_obs_via_websocket():
            log_message("OBS程序启动成功")
            return True
        log_message(f"等待OBS启动... ({i+1}/15)")
    
    log_message("警告:OBS启动可能较慢或失败,但继续尝试连接")
    return False
    
except Exception as e:
    log_message(f"启动OBS程序时出错: {e}")
    return False

def get_obs_status(client):
"""获取OBS状态信息"""
try:
stream_status = client.call(requests.GetStreamStatus())
record_status = client.call(requests.GetRecordStatus())
stats = client.call(requests.GetStats())

    status_info = {
        'streaming': stream_status.datain['outputActive'],
        'recording': record_status.datain['outputActive'],
        'cpu_usage': stats.datain['cpuUsage'],
        'fps': stats.datain['activeFps']
    }
    
    return status_info
    
except Exception as e:
    log_message(f"获取OBS状态时出错: {e}")
    return None

def start_recording():
"""启动OBS并开始录像"""
try:
# 1. 首先启动OBS程序
if not start_obs_program():
log_message("OBS程序启动失败,尝试直接连接WebSocket")

    # 2. 等待一下让OBS完全初始化
    time.sleep(5)
    
    log_message("正在连接到OBS WebSocket...")
    client = obsws(CONFIG['obs_host'], CONFIG['obs_port'], CONFIG['obs_password'])
    client.connect()
    log_message("成功连接到OBS")
    
    # 获取当前状态
    status = get_obs_status(client)
    if status:
        log_message(f"OBS状态 - 录像: {status['recording']}, 流媒体: {status['streaming']}, CPU: {status['cpu_usage']:.1f}%")
    
    # 如果不在录像,则开始录像
    if status and not status['recording']:
        client.call(requests.StartRecord())
        log_message("开始录像")
        
        # 等待并验证录像是否真正开始
        for i in range(5):
            time.sleep(2)
            new_status = get_obs_status(client)
            if new_status and new_status['recording']:
                log_message("录像确认已成功启动")
                break
            elif i == 4:
                log_message("警告:录像启动可能较慢或失败")
    else:
        log_message("OBS已经在录像中")
    
    client.disconnect()
    log_message("OBS WebSocket连接已断开")
    return True
    
except Exception as e:
    log_message(f"启动录像时出错: {e}")
    return False

def archive_previous_day_recordings():
"""归档前一天的录像文件"""
archive_complete = threading.Event()
archive_success = False

def archive_task():
    nonlocal archive_success
    try:
        # 计算昨天日期
        yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
        archive_name = f"{yesterday}_工作录像.zip"
        archive_path = os.path.join(CONFIG['archive_dir'], archive_name)
        
        # 确保归档目录存在
        os.makedirs(CONFIG['archive_dir'], exist_ok=True)
        
        log_message(f"开始归档前一天的录像文件: {yesterday}")
        
        # 查找昨天的文件
        files_to_archive = []
        if os.path.exists(CONFIG['recordings_dir']):
            for filename in os.listdir(CONFIG['recordings_dir']):
                file_path = os.path.join(CONFIG['recordings_dir'], filename)
                if os.path.isfile(file_path) and yesterday in filename:
                    files_to_archive.append(filename)
        
        if not files_to_archive:
            log_message("未找到需要归档的昨天文件。")
            archive_success = True
            archive_complete.set()
            return
        
        log_message(f"找到 {len(files_to_archive)} 个文件需要归档")
        
        # 创建压缩包
        try:
            total_size = 0
            with zipfile.ZipFile(archive_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
                for filename in files_to_archive:
                    file_path = os.path.join(CONFIG['recordings_dir'], filename)
                    file_size = os.path.getsize(file_path)
                    total_size += file_size
                    log_message(f"正在归档: {filename} ({file_size / (1024 * 1024):.2f} MB)")
                    zipf.write(file_path, filename)
            
            archive_size = os.path.getsize(archive_path)
            log_message(f"成功创建压缩包: {archive_path}")
            log_message(f"原文件总大小: {total_size / (1024 * 1024):.2f} MB")
            log_message(f"压缩包大小: {archive_size / (1024 * 1024):.2f} MB")
            if total_size > 0:
                log_message(f"压缩比: {(1 - archive_size/total_size) * 100:.1f}%")
            
            # 压缩成功后删除原文件(谨慎操作!先注释掉进行测试)
            try:
                with zipfile.ZipFile(archive_path, 'r') as zipf:
                    if zipf.testzip() is None:
                        log_message("压缩包验证通过,开始删除原文件...")
                        for filename in files_to_archive:
                            file_path = os.path.join(CONFIG['recordings_dir'], filename)
                            os.remove(file_path)
                            log_message(f"已删除: {filename}")
                        log_message("原文件已安全删除。")
                    else:
                        log_message("警告:压缩包验证失败,保留原文件")
            except Exception as e:
                log_message(f"压缩包验证失败: {e},保留原文件")
            
            archive_success = True
            
        except Exception as e:
            log_message(f"归档过程中出错: {e}")
            
    except Exception as e:
        log_message(f"归档任务出错: {e}")
    finally:
        archive_complete.set()

# 启动归档线程
archive_thread = threading.Thread(target=archive_task, daemon=True)
archive_thread.start()
log_message("归档线程已启动")

# 等待归档完成,但设置超时时间(例如30分钟)
log_message("等待归档任务完成...")
archive_complete.wait(timeout=1800)  # 30分钟超时

if archive_complete.is_set():
    if archive_success:
        log_message("归档任务成功完成")
    else:
        log_message("归档任务完成但可能有错误")
else:
    log_message("警告:归档任务超时,但程序继续运行")


return archive_success

def main():
"""主函数"""
log_message("=== OBS自动化脚本开始执行 ===")

# 1. 启动OBS程序并开始录像
recording_started = start_recording()

# 2. 录像启动后,开始归档前一天的文件(等待完成)
if recording_started:
    log_message("录像已启动,开始归档任务...")
    archive_success = archive_previous_day_recordings()
    
    if archive_success:
        log_message("=== OBS自动化脚本执行完毕 ===")
    else:
        log_message("=== OBS自动化脚本执行完毕(归档有警告)===")
else:
    log_message("=== OBS自动化脚本执行完毕(录像启动失败)===")

if name == "main":
main()