开机自动录像,自动打包录像文件
为了记录自己工作的内容,方便后续出现问题对账,我打算将自己的每日工作都使用OBS录像,录制下来,最开始几天手动开启的录像,但是偶尔会忘,所以会有时间差。
写一个脚本,通过windows的计划任务启动该脚本,打开OBS,然后调用OBS的Websocket连接,然后开始录像,录像开始后获取到昨日的日期,然后打包昨日的所有录像文件。
在关机时强制关闭OBS,OBS会自动保存文件
import os
import time
import threading
import subprocess
from datetime import datetime, timedelta
from obswebsocket import obsws, requests
import zipfile
配置信息
CONFIG = {
'obs_host': "----",# 你的IP地址
'obs_port': ----,# 你的端口
'obs_password': "----",# 你的密码
'obs_path': r"----", # OBS程序路径
'obs_working_dir': r"----", # OBS工作目录
'recordings_dir': r"----",# 你的文件保存路径
'archive_dir': r"----",# 你的文件打包后的路径
'log_file': r"----"# 你的log保存路径
}
def log_message(message):
"""记录日志"""
log_dir = os.path.dirname(CONFIG['log_file'])
os.makedirs(log_dir, exist_ok=True)
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
log_entry = f"[{timestamp}] {message}"
print(log_entry)
with open(CONFIG['log_file'], 'a', encoding='utf-8') as f:
f.write(log_entry + "\n")
def is_obs_running():
"""检查OBS是否已经在运行"""
try:
import psutil
for process in psutil.process_iter(['name']):
if process.info['name'] and 'obs64.exe' in process.info['name'].lower():
return True
return False
except ImportError:
# 如果没有psutil,使用简单的方法检查
try:
result = subprocess.run(['tasklist', '/fi', 'imagename eq obs64.exe'],
capture_output=True, text=True, creationflags=subprocess.CREATE_NO_WINDOW)
return 'obs64.exe' in result.stdout
except:
# 最后的手段,尝试连接WebSocket
return check_obs_via_websocket()
def check_obs_via_websocket():
"""通过WebSocket检查OBS是否运行"""
try:
client = obsws(CONFIG['obs_host'], CONFIG['obs_port'], CONFIG['obs_password'])
client.connect()
client.disconnect()
return True
except:
return False
def start_obs_program():
"""启动OBS程序(修复工作目录问题)"""
try:
if is_obs_running():
log_message("OBS已经在运行中")
return True
log_message(f"正在启动OBS程序: {CONFIG['obs_path']}")
log_message(f"工作目录: {CONFIG['obs_working_dir']}")
if not os.path.exists(CONFIG['obs_path']):
log_message(f"错误:OBS路径不存在: {CONFIG['obs_path']}")
return False
if not os.path.exists(CONFIG['obs_working_dir']):
log_message(f"错误:OBS工作目录不存在: {CONFIG['obs_working_dir']}")
return False
# 启动OBS程序,设置正确的工作目录
subprocess.Popen([CONFIG['obs_path']],
cwd=CONFIG['obs_working_dir'], # 关键修复:设置工作目录
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
creationflags=subprocess.CREATE_NO_WINDOW)
log_message("OBS启动命令已执行,等待程序启动...")
# 等待OBS完全启动
for i in range(15): # 最多等待30秒
time.sleep(2)
if check_obs_via_websocket():
log_message("OBS程序启动成功")
return True
log_message(f"等待OBS启动... ({i+1}/15)")
log_message("警告:OBS启动可能较慢或失败,但继续尝试连接")
return False
except Exception as e:
log_message(f"启动OBS程序时出错: {e}")
return False
def get_obs_status(client):
"""获取OBS状态信息"""
try:
stream_status = client.call(requests.GetStreamStatus())
record_status = client.call(requests.GetRecordStatus())
stats = client.call(requests.GetStats())
status_info = {
'streaming': stream_status.datain['outputActive'],
'recording': record_status.datain['outputActive'],
'cpu_usage': stats.datain['cpuUsage'],
'fps': stats.datain['activeFps']
}
return status_info
except Exception as e:
log_message(f"获取OBS状态时出错: {e}")
return None
def start_recording():
"""启动OBS并开始录像"""
try:
# 1. 首先启动OBS程序
if not start_obs_program():
log_message("OBS程序启动失败,尝试直接连接WebSocket")
# 2. 等待一下让OBS完全初始化
time.sleep(5)
log_message("正在连接到OBS WebSocket...")
client = obsws(CONFIG['obs_host'], CONFIG['obs_port'], CONFIG['obs_password'])
client.connect()
log_message("成功连接到OBS")
# 获取当前状态
status = get_obs_status(client)
if status:
log_message(f"OBS状态 - 录像: {status['recording']}, 流媒体: {status['streaming']}, CPU: {status['cpu_usage']:.1f}%")
# 如果不在录像,则开始录像
if status and not status['recording']:
client.call(requests.StartRecord())
log_message("开始录像")
# 等待并验证录像是否真正开始
for i in range(5):
time.sleep(2)
new_status = get_obs_status(client)
if new_status and new_status['recording']:
log_message("录像确认已成功启动")
break
elif i == 4:
log_message("警告:录像启动可能较慢或失败")
else:
log_message("OBS已经在录像中")
client.disconnect()
log_message("OBS WebSocket连接已断开")
return True
except Exception as e:
log_message(f"启动录像时出错: {e}")
return False
def archive_previous_day_recordings():
"""归档前一天的录像文件"""
archive_complete = threading.Event()
archive_success = False
def archive_task():
nonlocal archive_success
try:
# 计算昨天日期
yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
archive_name = f"{yesterday}_工作录像.zip"
archive_path = os.path.join(CONFIG['archive_dir'], archive_name)
# 确保归档目录存在
os.makedirs(CONFIG['archive_dir'], exist_ok=True)
log_message(f"开始归档前一天的录像文件: {yesterday}")
# 查找昨天的文件
files_to_archive = []
if os.path.exists(CONFIG['recordings_dir']):
for filename in os.listdir(CONFIG['recordings_dir']):
file_path = os.path.join(CONFIG['recordings_dir'], filename)
if os.path.isfile(file_path) and yesterday in filename:
files_to_archive.append(filename)
if not files_to_archive:
log_message("未找到需要归档的昨天文件。")
archive_success = True
archive_complete.set()
return
log_message(f"找到 {len(files_to_archive)} 个文件需要归档")
# 创建压缩包
try:
total_size = 0
with zipfile.ZipFile(archive_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for filename in files_to_archive:
file_path = os.path.join(CONFIG['recordings_dir'], filename)
file_size = os.path.getsize(file_path)
total_size += file_size
log_message(f"正在归档: {filename} ({file_size / (1024 * 1024):.2f} MB)")
zipf.write(file_path, filename)
archive_size = os.path.getsize(archive_path)
log_message(f"成功创建压缩包: {archive_path}")
log_message(f"原文件总大小: {total_size / (1024 * 1024):.2f} MB")
log_message(f"压缩包大小: {archive_size / (1024 * 1024):.2f} MB")
if total_size > 0:
log_message(f"压缩比: {(1 - archive_size/total_size) * 100:.1f}%")
# 压缩成功后删除原文件(谨慎操作!先注释掉进行测试)
try:
with zipfile.ZipFile(archive_path, 'r') as zipf:
if zipf.testzip() is None:
log_message("压缩包验证通过,开始删除原文件...")
for filename in files_to_archive:
file_path = os.path.join(CONFIG['recordings_dir'], filename)
os.remove(file_path)
log_message(f"已删除: {filename}")
log_message("原文件已安全删除。")
else:
log_message("警告:压缩包验证失败,保留原文件")
except Exception as e:
log_message(f"压缩包验证失败: {e},保留原文件")
archive_success = True
except Exception as e:
log_message(f"归档过程中出错: {e}")
except Exception as e:
log_message(f"归档任务出错: {e}")
finally:
archive_complete.set()
# 启动归档线程
archive_thread = threading.Thread(target=archive_task, daemon=True)
archive_thread.start()
log_message("归档线程已启动")
# 等待归档完成,但设置超时时间(例如30分钟)
log_message("等待归档任务完成...")
archive_complete.wait(timeout=1800) # 30分钟超时
if archive_complete.is_set():
if archive_success:
log_message("归档任务成功完成")
else:
log_message("归档任务完成但可能有错误")
else:
log_message("警告:归档任务超时,但程序继续运行")
return archive_success
def main():
"""主函数"""
log_message("=== OBS自动化脚本开始执行 ===")
# 1. 启动OBS程序并开始录像
recording_started = start_recording()
# 2. 录像启动后,开始归档前一天的文件(等待完成)
if recording_started:
log_message("录像已启动,开始归档任务...")
archive_success = archive_previous_day_recordings()
if archive_success:
log_message("=== OBS自动化脚本执行完毕 ===")
else:
log_message("=== OBS自动化脚本执行完毕(归档有警告)===")
else:
log_message("=== OBS自动化脚本执行完毕(录像启动失败)===")
if name == "main":
main()