批量将压缩包内的图片压缩成webp格式
本文最后更新于 2024-05-24,
若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益, 请联系我 删除。
本站只有Telegram群组为唯一交流群组, 点击加入
文章内容有误?申请成为本站文章修订者或作者? 向站长提出申请
要实现将压缩包内的图片(jpg/png)压缩成webp格式,并覆盖原有的图片文件,可以使用Python脚本来完成这一任务。以下是一个详细的步骤:
将此脚本保存为一个Python文件(例如 convert_images_in_zip.py
),并将 zip_path
变量修改为你要处理的压缩包路径。运行脚本后,它将遍历文件夹下的压缩包,并且处理压缩包内的图像,然后生成一个新的压缩包,覆盖原有的图像文件。
注意事项
确保你已经安装了Pillow库,可以使用以下命令安装:
pip install Pillow
为了节约存储空间。该脚本会覆盖原始的压缩包,建议在测试过程中备份原始文件。
运行脚本,它将遍历指定目录及其子目录中的所有压缩包,处理每个压缩包内的图像,将其转换为 webp
格式,并更新压缩包。
这样,你就可以处理文件夹下所有的压缩包,确保它们都进行图像格式转换,并处理过程中遇到的错误。
另外,要将图片转换为 webp
格式并指定质量,可以在调用 save
方法时传入 quality
参数。在下面的脚本中,包含了指定质量的部分,你可以自行修改比值。
在压缩时遇到了一些压缩包路径超过系统限制长度的情况,导致报错异常退出。另外为了提升运行效率,设置了多线程工作。于是脚本做出了以下设置:
限制文件名长度:
- 使用
shorten_filename
函数来确保新生成的文件名不会超过255字节的限制。
覆盖旧文件:
- 在
process_zip
函数中,创建新的压缩文件后,使用shutil.move
将新文件覆盖旧文件。
并行处理:
- 使用
ThreadPoolExecutor
实现并行处理,每个线程处理一个ZIP文件。 max_workers
参数控制并行线程数,根据你的系统资源进行调整(默认为4)。
任务提交与结果收集:
- 提交所有ZIP文件处理任务,并在所有任务完成后收集结果。
- 使用
as_completed
方法跟踪和收集每个任务的完成状态。
运行脚本:
确保你在适当的路径下运行该脚本,并且脚本中的 directory
变量指向你需要处理的目录(例如 /root/zip
)。这样,脚本会并行处理该目录中的所有ZIP文件,从而加快处理速度。
完整代码
import os
import zipfile
from PIL import Image, UnidentifiedImageError
import tempfile
import shutil
from concurrent.futures import ThreadPoolExecutor, as_completed
MAX_FILENAME_LENGTH = 255
def shorten_filename(filename, max_length=MAX_FILENAME_LENGTH):
if len(filename) <= max_length:
return filename
name, ext = os.path.splitext(filename)
return name[:max_length - len(ext)] + ext
def convert_image_to_webp(image_path, quality=60):
try:
with Image.open(image_path) as img:
webp_path = os.path.splitext(image_path)[0] + '.webp'
img.save(webp_path, 'webp', quality=quality)
return webp_path
except UnidentifiedImageError:
print(f"Error: Unable to identify image file {image_path}. Skipping.")
except Exception as e:
print(f"Error: An error occurred while converting {image_path} to webp. Skipping. Details: {e}")
return None
def process_zip(zip_path, quality=60):
new_zip_path = zip_path + '_new.zip'
new_zip_path = os.path.join(os.path.dirname(new_zip_path), shorten_filename(os.path.basename(new_zip_path)))
try:
print(f"Processing: {zip_path.encode('utf-8', 'replace')}")
print(f"New zip path: {new_zip_path.encode('utf-8', 'replace')}")
print(f"Length of new zip path: {len(new_zip_path)}")
if len(os.path.basename(new_zip_path)) > MAX_FILENAME_LENGTH:
raise OSError(f"Filename length exceeds the maximum allowed length: {new_zip_path}")
with tempfile.TemporaryDirectory() as temp_dir:
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(temp_dir)
webp_found = False
for root, _, files in os.walk(temp_dir):
for file in files:
if file.lower().endswith('.webp'):
webp_found = True
break
if webp_found:
break
if webp_found:
print(f"Skipping {zip_path} as it already contains webp images.")
return
for root, _, files in os.walk(temp_dir):
for file in files:
if file.lower().endswith(('.jpg', '.jpeg', '.png')):
image_path = os.path.join(root, file)
webp_path = convert_image_to_webp(image_path, quality)
if webp_path:
os.remove(image_path)
shutil.move(webp_path, image_path)
with zipfile.ZipFile(new_zip_path, 'w') as zip_ref:
for root, _, files in os.walk(temp_dir):
for file in files:
file_path = os.path.join(root, file)
zip_ref.write(file_path, os.path.relpath(file_path, temp_dir))
shutil.move(new_zip_path, zip_path)
except Exception as e:
print(f"Error processing {zip_path.encode('utf-8', 'replace')}: {e}")
def process_all_zips(directory, quality=60, max_workers=4):
zip_files = [os.path.join(root, file)
for root, _, files in os.walk(directory)
for file in files if file.lower().endswith('.zip')]
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(process_zip, zip_file, quality): zip_file for zip_file in zip_files}
for future in as_completed(futures):
zip_file = futures[future]
try:
future.result()
print(f"Finished processing {zip_file.encode('utf-8', 'replace')}")
except Exception as e:
print(f"Error processing {zip_file.encode('utf-8', 'replace')}: {e}")
if __name__ == '__main__':
directory = 'zip_path'
process_all_zips(directory, quality=60, max_workers=4)