本文最后更新于 2024-05-24,

若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益, 请联系我 删除。

本站只有Telegram群组为唯一交流群组, 点击加入

文章内容有误?申请成为本站文章修订者或作者? 向站长提出申请

要实现将压缩包内的图片(jpg/png)压缩成webp格式,并覆盖原有的图片文件,可以使用Python脚本来完成这一任务。以下是一个详细的步骤:

将此脚本保存为一个Python文件(例如 convert_images_in_zip.py),并将 zip_path变量修改为你要处理的压缩包路径。运行脚本后,它将遍历文件夹下的压缩包,并且处理压缩包内的图像,然后生成一个新的压缩包,覆盖原有的图像文件。

注意事项

确保你已经安装了Pillow库,可以使用以下命令安装:

pip install Pillow

为了节约存储空间。该脚本会覆盖原始的压缩包,建议在测试过程中备份原始文件。

运行脚本,它将遍历指定目录及其子目录中的所有压缩包,处理每个压缩包内的图像,将其转换为 webp格式,并更新压缩包。

这样,你就可以处理文件夹下所有的压缩包,确保它们都进行图像格式转换,并处理过程中遇到的错误。

另外,要将图片转换为 webp格式并指定质量,可以在调用 save方法时传入 quality参数。在下面的脚本中,包含了指定质量的部分,你可以自行修改比值。

在压缩时遇到了一些压缩包路径超过系统限制长度的情况,导致报错异常退出。另外为了提升运行效率,设置了多线程工作。于是脚本做出了以下设置:

限制文件名长度

  • 使用 shorten_filename 函数来确保新生成的文件名不会超过255字节的限制。

覆盖旧文件

  • process_zip 函数中,创建新的压缩文件后,使用 shutil.move 将新文件覆盖旧文件。

并行处理

  • 使用 ThreadPoolExecutor实现并行处理,每个线程处理一个ZIP文件。
  • max_workers参数控制并行线程数,根据你的系统资源进行调整(默认为4)。

任务提交与结果收集

  • 提交所有ZIP文件处理任务,并在所有任务完成后收集结果。
  • 使用 as_completed方法跟踪和收集每个任务的完成状态。

运行脚本:

确保你在适当的路径下运行该脚本,并且脚本中的 directory变量指向你需要处理的目录(例如 /root/zip)。这样,脚本会并行处理该目录中的所有ZIP文件,从而加快处理速度。

完整代码

import os
import zipfile
from PIL import Image, UnidentifiedImageError
import tempfile
import shutil
from concurrent.futures import ThreadPoolExecutor, as_completed

MAX_FILENAME_LENGTH = 255

def shorten_filename(filename, max_length=MAX_FILENAME_LENGTH):
    if len(filename) <= max_length:
        return filename
    name, ext = os.path.splitext(filename)
    return name[:max_length - len(ext)] + ext

def convert_image_to_webp(image_path, quality=60):
    try:
        with Image.open(image_path) as img:
            webp_path = os.path.splitext(image_path)[0] + '.webp'
            img.save(webp_path, 'webp', quality=quality)
        return webp_path
    except UnidentifiedImageError:
        print(f"Error: Unable to identify image file {image_path}. Skipping.")
    except Exception as e:
        print(f"Error: An error occurred while converting {image_path} to webp. Skipping. Details: {e}")
    return None

def process_zip(zip_path, quality=60):
    new_zip_path = zip_path + '_new.zip'
    new_zip_path = os.path.join(os.path.dirname(new_zip_path), shorten_filename(os.path.basename(new_zip_path)))

    try:
        print(f"Processing: {zip_path.encode('utf-8', 'replace')}")
        print(f"New zip path: {new_zip_path.encode('utf-8', 'replace')}")
        print(f"Length of new zip path: {len(new_zip_path)}")

        if len(os.path.basename(new_zip_path)) > MAX_FILENAME_LENGTH:
            raise OSError(f"Filename length exceeds the maximum allowed length: {new_zip_path}")

        with tempfile.TemporaryDirectory() as temp_dir:
            with zipfile.ZipFile(zip_path, 'r') as zip_ref:
                zip_ref.extractall(temp_dir)

            webp_found = False
            for root, _, files in os.walk(temp_dir):
                for file in files:
                    if file.lower().endswith('.webp'):
                        webp_found = True
                        break
                if webp_found:
                    break

            if webp_found:
                print(f"Skipping {zip_path} as it already contains webp images.")
                return

            for root, _, files in os.walk(temp_dir):
                for file in files:
                    if file.lower().endswith(('.jpg', '.jpeg', '.png')):
                        image_path = os.path.join(root, file)
                        webp_path = convert_image_to_webp(image_path, quality)

                        if webp_path:
                            os.remove(image_path)
                            shutil.move(webp_path, image_path)

            with zipfile.ZipFile(new_zip_path, 'w') as zip_ref:
                for root, _, files in os.walk(temp_dir):
                    for file in files:
                        file_path = os.path.join(root, file)
                        zip_ref.write(file_path, os.path.relpath(file_path, temp_dir))

            shutil.move(new_zip_path, zip_path)

    except Exception as e:
        print(f"Error processing {zip_path.encode('utf-8', 'replace')}: {e}")

def process_all_zips(directory, quality=60, max_workers=4):
    zip_files = [os.path.join(root, file)
                 for root, _, files in os.walk(directory)
                 for file in files if file.lower().endswith('.zip')]

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {executor.submit(process_zip, zip_file, quality): zip_file for zip_file in zip_files}
        for future in as_completed(futures):
            zip_file = futures[future]
            try:
                future.result()
                print(f"Finished processing {zip_file.encode('utf-8', 'replace')}")
            except Exception as e:
                print(f"Error processing {zip_file.encode('utf-8', 'replace')}: {e}")

if __name__ == '__main__':
    directory = 'zip_path'
    process_all_zips(directory, quality=60, max_workers=4)