Python操作系统的10个自动化脚本
Python自动化运维脚本合集:10个实用工具提升系统管理效率 本文介绍了5个Python实现的系统自动化脚本,涵盖常见运维需求: 文件批量重命名工具 - 可批量修改指定目录下的文件名 磁盘空间监控脚本 - 检测磁盘使用率并发送邮件警报 自动备份系统 - 创建压缩备份并自动清理旧备份 定时任务调度器 - 灵活安排周期性任务 日志分析工具 - 统计错误并分析访问IP 这些脚本均采用Python标准库
·
Python是自动化操作系统任务的强大工具,下面我将详细介绍10个实用的自动化脚本,涵盖文件管理、系统监控、定时任务等多个方面。
CSDN大礼包:《2025年最新全套学习资料包》免费分享
1. 文件批量重命名工具
import os
def batch_rename(path, prefix, start_num=1, extension=None):
"""
批量重命名指定目录下的文件
:param path: 目录路径
:param prefix: 新文件名前缀
:param start_num: 起始编号
:param extension: 指定扩展名(可选)
"""
files = os.listdir(path)
if extension:
files = [f for f in files if f.endswith(extension)]
for i, filename in enumerate(files, start=start_num):
file_ext = os.path.splitext(filename)[1]
new_name = f"{prefix}_{i}{file_ext}"
old_path = os.path.join(path, filename)
new_path = os.path.join(path, new_name)
os.rename(old_path, new_path)
print(f"Renamed: {filename} -> {new_name}")
# 使用示例
# batch_rename('./photos', 'vacation', extension='.jpg')
2. 磁盘空间监控脚本
import shutil
import smtplib
from email.mime.text import MIMEText
def check_disk_usage(threshold=80, send_email=False):
"""
检查磁盘使用情况,超过阈值时发送警报
:param threshold: 警报阈值(百分比)
:param send_email: 是否发送邮件警报
"""
disk = shutil.disk_usage("/")
percent_used = (disk.used / disk.total) * 100
if percent_used > threshold:
message = f"警告: 磁盘使用率已达 {percent_used:.2f}%"
print(message)
if send_email:
# 配置邮件参数
sender = "your_email@example.com"
receiver = "admin@example.com"
password = "your_password"
msg = MIMEText(message)
msg['Subject'] = "磁盘空间警报"
msg['From'] = sender
msg['To'] = receiver
try:
with smtplib.SMTP_SSL('smtp.example.com', 465) as server:
server.login(sender, password)
server.sendmail(sender, receiver, msg.as_string())
print("警报邮件已发送")
except Exception as e:
print(f"发送邮件失败: {e}")
else:
print(f"磁盘使用正常: {percent_used:.2f}%")
# 使用示例
# check_disk_usage(threshold=90, send_email=True)
3. 自动备份脚本
import zipfile
import os
from datetime import datetime
def create_backup(source_dir, backup_dir, max_backups=5):
"""
创建目录的压缩备份并管理备份数量
:param source_dir: 要备份的目录
:param backup_dir: 备份存储目录
:param max_backups: 保留的最大备份数量
"""
# 确保备份目录存在
os.makedirs(backup_dir, exist_ok=True)
# 创建带时间戳的备份文件名
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_name = f"backup_{timestamp}.zip"
backup_path = os.path.join(backup_dir, backup_name)
# 创建压缩备份
with zipfile.ZipFile(backup_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for root, dirs, files in os.walk(source_dir):
for file in files:
file_path = os.path.join(root, file)
arcname = os.path.relpath(file_path, start=source_dir)
zipf.write(file_path, arcname)
print(f"备份创建成功: {backup_path}")
# 管理备份数量
backups = sorted([f for f in os.listdir(backup_dir) if f.startswith('backup_')])
if len(backups) > max_backups:
for old_backup in backups[:-max_backups]:
os.remove(os.path.join(backup_dir, old_backup))
print(f"已删除旧备份: {old_backup}")
# 使用示例
# create_backup('/path/to/important/files', '/backups', max_backups=3)
4. 定时任务调度器
import schedule
import time
from datetime import datetime
def job(task_name):
"""示例任务函数"""
print(f"[{datetime.now()}] 执行任务: {task_name}")
def task_scheduler():
"""定时任务调度器"""
# 安排任务
schedule.every(10).seconds.do(job, "每10秒任务")
schedule.every().minute.at(":30").do(job, "每分钟第30秒任务")
schedule.every().hour.do(job, "每小时任务")
schedule.every().day.at("10:30").do(job, "每天10:30任务")
schedule.every().monday.do(job, "每周一任务")
schedule.every().wednesday.at("13:15").do(job, "每周三13:15任务")
print("任务调度器已启动,按Ctrl+C退出...")
while True:
try:
schedule.run_pending()
time.sleep(1)
except KeyboardInterrupt:
print("\n任务调度器已停止")
break
# 使用示例
# task_scheduler()
5. 日志文件分析器
import re
from collections import defaultdict
def analyze_logs(log_file, output_file=None):
"""
分析日志文件,统计错误和访问情况
:param log_file: 日志文件路径
:param output_file: 结果输出文件路径(可选)
"""
error_pattern = re.compile(r'ERROR')
ip_pattern = re.compile(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}')
stats = {
'total_lines': 0,
'error_count': 0,
'ip_counts': defaultdict(int),
'error_lines': []
}
with open(log_file, 'r') as f:
for line in f:
stats['total_lines'] += 1
# 检查错误
if error_pattern.search(line):
stats['error_count'] += 1
stats['error_lines'].append(line.strip())
# 统计IP访问
ip_match = ip_pattern.search(line)
if ip_match:
ip = ip_match.group()
stats['ip_counts'][ip] += 1
# 准备输出
output = [
f"日志分析结果: {log_file}",
f"总行数: {stats['total_lines']}",
f"错误数量: {stats['error_count']}",
"\nIP访问统计(前10):",
]
# 按访问次数排序IP
sorted_ips = sorted(stats['ip_counts'].items(), key=lambda x: x[1], reverse=True)
for ip, count in sorted_ips[:10]:
output.append(f" {ip}: {count}次")
# 如果有输出文件,写入文件
if output_file:
with open(output_file, 'w') as f:
f.write('\n'.join(output))
print(f"分析结果已保存到: {output_file}")
else:
print('\n'.join(output))
# 打印部分错误日志
if stats['error_lines']:
print("\n部分错误日志:")
for line in stats['error_lines'][:5]:
print(f" {line}")
# 使用示例
# analyze_logs('app.log', 'log_analysis.txt')
6. 系统信息收集器
import platform
import psutil
import socket
from datetime import datetime
def get_system_info():
"""收集并返回系统信息"""
info = {}
# 基本信息
info['timestamp'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
info['hostname'] = socket.gethostname()
info['os'] = f"{platform.system()} {platform.release()}"
info['os_version'] = platform.version()
info['machine'] = platform.machine()
info['processor'] = platform.processor()
# CPU信息
info['cpu_count'] = psutil.cpu_count(logical=True)
info['cpu_usage'] = psutil.cpu_percent(interval=1)
# 内存信息
mem = psutil.virtual_memory()
info['memory_total'] = f"{mem.total / (1024**3):.2f} GB"
info['memory_available'] = f"{mem.available / (1024**3):.2f} GB"
info['memory_usage'] = mem.percent
# 磁盘信息
disk = psutil.disk_usage('/')
info['disk_total'] = f"{disk.total / (1024**3):.2f} GB"
info['disk_used'] = f"{disk.used / (1024**3):.2f} GB"
info['disk_usage'] = disk.percent
# 网络信息
net_info = psutil.net_if_addrs()
info['network_interfaces'] = list(net_info.keys())
return info
def print_system_info():
"""打印格式化的系统信息"""
info = get_system_info()
print("="*50)
print(f"系统信息报告 - {info['timestamp']}")
print("="*50)
print(f"主机名: {info['hostname']}")
print(f"操作系统: {info['os']}")
print(f"版本: {info['os_version']}")
print(f"架构: {info['machine']}")
print(f"处理器: {info['processor']}")
print("-"*50)
print(f"CPU: {info['cpu_count']} 逻辑核心, 当前使用率: {info['cpu_usage']}%")
print("-"*50)
print(f"内存: 总计 {info['memory_total']}, 可用 {info['memory_available']}, 使用率 {info['memory_usage']}%")
print("-"*50)
print(f"磁盘(/): 总计 {info['disk_total']}, 已用 {info['disk_used']}, 使用率 {info['disk_usage']}%")
print("-"*50)
print(f"网络接口: {', '.join(info['network_interfaces'])}")
print("="*50)
# 使用示例
# print_system_info()
7. 自动下载管理器
import os
import requests
from urllib.parse import urlparse
from concurrent.futures import ThreadPoolExecutor
def download_file(url, download_dir="downloads"):
"""
下载单个文件
:param url: 文件URL
:param download_dir: 下载目录
"""
try:
# 创建下载目录
os.makedirs(download_dir, exist_ok=True)
# 解析文件名
parsed_url = urlparse(url)
filename = os.path.basename(parsed_url.path)
if not filename:
filename = "downloaded_file"
# 下载文件
print(f"开始下载: {url}")
response = requests.get(url, stream=True)
response.raise_for_status()
# 保存文件
filepath = os.path.join(download_dir, filename)
with open(filepath, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
print(f"下载完成: {filepath}")
return filepath
except Exception as e:
print(f"下载失败 {url}: {e}")
return None
def batch_download(urls, max_workers=5):
"""
批量下载多个文件
:param urls: URL列表
:param max_workers: 最大并发数
"""
with ThreadPoolExecutor(max_workers=max_workers) as executor:
results = list(executor.map(download_file, urls))
successful = [r for r in results if r]
print(f"\n下载完成! 成功下载 {len(successful)}/{len(urls)} 个文件")
# 使用示例
# urls = [
# "https://example.com/file1.zip",
# "https://example.com/file2.pdf",
# "https://example.com/image.jpg"
# ]
# batch_download(urls)
8. 进程监控与自动重启
import psutil
import time
import subprocess
def monitor_process(process_name, restart_command, check_interval=60):
"""
监控指定进程,如果进程不存在则自动重启
:param process_name: 要监控的进程名
:param restart_command: 重启进程的命令(列表形式)
:param check_interval: 检查间隔(秒)
"""
print(f"开始监控进程: {process_name}")
print(f"重启命令: {' '.join(restart_command)}")
while True:
# 检查进程是否存在
process_exists = any(
proc.name() == process_name
for proc in psutil.process_iter(['name'])
)
if not process_exists:
print(f"[{time.ctime()}] 进程 {process_name} 未运行,尝试重启...")
try:
subprocess.Popen(restart_command)
print("重启命令执行成功")
except Exception as e:
print(f"重启失败: {e}")
else:
print(f"[{time.ctime()}] 进程 {process_name} 运行正常")
time.sleep(check_interval)
# 使用示例
# monitor_process('nginx', ['sudo', 'systemctl', 'start', 'nginx'])
9. 自动清理临时文件
import os
import time
from pathlib import Path
def clean_temp_files(directories, days_old=7, extensions=None):
"""
清理指定目录中的旧临时文件
:param directories: 要清理的目录列表
:param days_old: 文件超过多少天未修改则删除
:param extensions: 只清理指定扩展名的文件(可选)
"""
if extensions is None:
extensions = ['.tmp', '.temp', '.bak', '.log', '.old']
cutoff_time = time.time() - (days_old * 86400)
deleted_count = 0
saved_space = 0
for directory in directories:
dir_path = Path(directory)
if not dir_path.exists() or not dir_path.is_dir():
print(f"警告: 目录不存在或不是有效目录: {directory}")
continue
print(f"正在清理目录: {directory}")
for item in dir_path.iterdir():
try:
# 只处理文件
if item.is_file():
# 检查扩展名
if extensions and item.suffix.lower() not in extensions:
continue
# 检查修改时间
if item.stat().st_mtime < cutoff_time:
file_size = item.stat().st_size
item.unlink()
deleted_count += 1
saved_space += file_size
print(f"已删除: {item} (大小: {file_size/1024:.2f} KB)")
except Exception as e:
print(f"删除 {item} 时出错: {e}")
print("\n清理完成!")
print(f"删除文件总数: {deleted_count}")
print(f"释放空间: {saved_space/1024/1024:.2f} MB")
# 使用示例
# clean_temp_files(['/tmp', './temp_files'], days_old=3)
10. 自动生成报告并发送邮件
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.application import MIMEApplication
from datetime import datetime, timedelta
import pandas as pd
import matplotlib.pyplot as plt
import io
def generate_and_send_report(data, report_title, recipients, smtp_config):
"""
生成报告并通过邮件发送
:param data: 用于生成报告的数据(DataFrame或字典)
:param report_title: 报告标题
:param recipients: 收件人列表
:param smtp_config: SMTP配置字典
"""
# 1. 创建报告内容
report_date = datetime.now().strftime("%Y-%m-%d")
subject = f"{report_title} - {report_date}"
# 如果是字典,转换为DataFrame
if isinstance(data, dict):
data = pd.DataFrame(data)
# 2. 生成文本报告
text_report = f"""
{report_title}
报告日期: {report_date}
{data.describe().to_string() if isinstance(data, pd.DataFrame) else str(data)}
此邮件由系统自动发送,请勿直接回复。
"""
# 3. 生成图表(如果数据是DataFrame)
attachments = []
if isinstance(data, pd.DataFrame):
try:
# 简单图表示例
plt.figure(figsize=(10, 6))
data.plot(kind='bar' if len(data) < 10 else 'line')
plt.title(f"{report_title} 数据趋势")
# 将图表保存到内存
img_buffer = io.BytesIO()
plt.savefig(img_buffer, format='png')
img_buffer.seek(0)
plt.close()
# 添加图表附件
chart_attach = MIMEApplication(img_buffer.read(), Name='report_chart.png')
chart_attach['Content-Disposition'] = 'attachment; filename="report_chart.png"'
attachments.append(chart_attach)
except Exception as e:
print(f"生成图表失败: {e}")
# 4. 创建邮件
msg = MIMEMultipart()
msg['Subject'] = subject
msg['From'] = smtp_config['sender']
msg['To'] = ', '.join(recipients)
# 添加文本内容
msg.attach(MIMEText(text_report, 'plain'))
# 添加附件
for attach in attachments:
msg.attach(attach)
# 5. 发送邮件
try:
with smtplib.SMTP_SSL(smtp_config['server'], smtp_config['port']) as server:
server.login(smtp_config['username'], smtp_config['password'])
server.sendmail(smtp_config['sender'], recipients, msg.as_string())
print("报告邮件发送成功")
except Exception as e:
print(f"发送邮件失败: {e}")
# 使用示例
# data = {'sales': [100, 150, 200, 180, 210, 240, 300]}
# smtp_config = {
# 'server': 'smtp.example.com',
# 'port': 465,
# 'username': 'your_email@example.com',
# 'password': 'your_password',
# 'sender': 'reports@example.com'
# }
# generate_and_send_report(data, "每周销售报告", ['manager@example.com'], smtp_config)
总结
这10个自动化脚本涵盖了操作系统自动化的多个方面:
- 文件管理:批量重命名、自动备份、临时文件清理
- 系统监控:磁盘空间监控、系统信息收集、进程监控
- 任务调度:定时任务执行
- 网络操作:自动下载管理
- 报告生成:日志分析、自动报告生成与邮件发送
这些脚本可以根据实际需求进行修改和扩展,例如:
- 添加更详细的日志记录
- 增加配置文件支持
- 添加图形用户界面
- 实现Web服务接口
Python的强大之处在于其丰富的标准库和第三方库生态系统,使得自动化任务变得简单高效。
更多推荐
所有评论(0)