Created
April 4, 2026 10:55
-
-
Save jericjan/be02f82e3ff644905d3e2af4ef3d0246 to your computer and use it in GitHub Desktop.
GIF compressor using gifsicle and binary searching
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # flake8: noqa: E501 | |
| import concurrent.futures | |
| import os | |
| import re | |
| import shutil | |
| import subprocess | |
| import sys | |
| import tempfile | |
| MAX_LOSSY = 500 # Don't use lossy values higher than this | |
| def test_lossy(input_path: str, lossy_val: int, temp_dir: str): | |
| """Runs gifsicle with a specific lossy value and returns the resulting file size.""" | |
| out_path = os.path.join(temp_dir, f"temp_{lossy_val}.gif") | |
| cmd = [ | |
| "gifsicle", "-O3", input_path, | |
| "--colors", "256", | |
| f"--lossy={lossy_val}", | |
| "-o", out_path | |
| ] | |
| try: | |
| subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) | |
| return lossy_val, os.path.getsize(out_path), out_path | |
| except subprocess.CalledProcessError: | |
| return lossy_val, float('inf'), None | |
| def get_next_points(low: int, high: int, num_points: int = 3) -> list[int]: | |
| """Calculates equidistant points within a range to test concurrently.""" | |
| if low > high: | |
| return [] | |
| if high - low + 1 <= num_points: | |
| return list(range(low, high + 1)) | |
| step = (high - low) / (num_points + 1) | |
| return [int(low + step * i) for i in range(1, num_points + 1)] | |
| def main(): | |
| if not shutil.which("gifsicle"): | |
| print("Error: 'gifsicle' command not found. Please install it first.") | |
| sys.exit(1) | |
| print("=== Concurrent GIF Compressor ===") | |
| input_file = input("Enter the input GIF file path: ").strip("'\"") | |
| if not os.path.isfile(input_file): | |
| print(f"Error: File '{input_file}' not found.") | |
| sys.exit(1) | |
| try: | |
| target_mib = input("Enter the target filesize and add \"MB\" or \"MiB\" at the end: ").strip() | |
| if not target_mib.endswith("MB") and not target_mib.endswith("MiB"): | |
| raise ValueError | |
| digit_match = re.search(r"\d+", target_mib) | |
| if not digit_match: | |
| raise ValueError | |
| target_mib_digits = digit_match.group(0) | |
| if target_mib.endswith("MB"): | |
| multiplier = 1000 * 1000 | |
| elif target_mib.endswith("MiB"): | |
| multiplier = 1024 * 1024 | |
| else: | |
| raise ValueError | |
| except ValueError: | |
| print("Error: Target size must be a number followed by \"MB\" or \"MiB\".") | |
| sys.exit(1) | |
| target_bytes = float(target_mib_digits) * multiplier | |
| base_name, ext = os.path.splitext(input_file) | |
| output_file = f"{base_name}_compressed{ext}" | |
| evaluated_cache: dict[int, tuple[int | float, str]] = {} # Stores: lossy_val -> (size_in_bytes, temp_file_path) | |
| points_to_test: list[int] = [0, 100, 200] # Initial 3 threads of lossy values to test | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| try: | |
| while points_to_test: | |
| print(f"\n[*] Evaluating lossy values: {points_to_test}") | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=len(points_to_test)) as executor: | |
| futures = { | |
| executor.submit(test_lossy, input_file, p, temp_dir): p | |
| for p in points_to_test | |
| } | |
| for future in concurrent.futures.as_completed(futures): | |
| l_val, size, path = future.result() | |
| if path: | |
| evaluated_cache[l_val] = (size, path) | |
| print(f" -> Lossy {l_val:3}: {size / (multiplier):.2f} MiB") | |
| # Analyze all results collected so far | |
| valid_ls = [l for l, (s, _p) in evaluated_cache.items() if s <= target_bytes] | |
| invalid_ls = [l for l, (s, _p) in evaluated_cache.items() if s > target_bytes] | |
| best_l = min(valid_ls) if valid_ls else None | |
| lower_bound = max(invalid_ls) if invalid_ls else -1 | |
| if best_l is not None: | |
| # We have a candidate, try to find an even lower lossy value | |
| low = lower_bound + 1 | |
| high = best_l - 1 | |
| else: | |
| # Target not met yet, push the search window higher | |
| low = lower_bound + 1 | |
| high = MAX_LOSSY | |
| if low > MAX_LOSSY: | |
| print(f"\n[-] Reached maximum tested lossy value ({MAX_LOSSY}). Target size may be unreachable.") | |
| break | |
| # Generate the next batch of 3 points | |
| points_to_test = get_next_points(low, high, num_points=3) | |
| # Filter out points we've already tested | |
| points_to_test = [p for p in points_to_test if p not in evaluated_cache] | |
| # Write the final result | |
| valid_ls = [l for l, (s, _p) in evaluated_cache.items() if s <= target_bytes] | |
| if valid_ls: | |
| final_best_l = min(valid_ls) | |
| final_size, final_path = evaluated_cache[final_best_l] | |
| shutil.copy2(final_path, output_file) | |
| print(f"\n[+] Success! Optimal lossy value: {final_best_l}") | |
| print(f"[+] Output saved to: {output_file} ({final_size / (multiplier):.2f} MiB)") | |
| else: | |
| # Save the best compression we managed to achieve | |
| best_effort_l = max(evaluated_cache.keys()) | |
| final_size, final_path = evaluated_cache[best_effort_l] | |
| shutil.copy2(final_path, output_file) | |
| print(f"\n[-] Could not reach the target size even at lossy={best_effort_l}.") | |
| print(f"[!] Saved best effort to: {output_file} ({final_size / (multiplier):.2f} MiB)") | |
| except KeyboardInterrupt: | |
| print("\n[!] Process cancelled by user. Cleaning up...") | |
| sys.exit(0) | |
| if __name__ == '__main__': | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment