291 lines
8.9 KiB
Python
Executable File
291 lines
8.9 KiB
Python
Executable File
#!/usr/bin/env python
|
|
import io
|
|
import os
|
|
import sqlite3
|
|
import sys
|
|
import tempfile
|
|
from typing import Iterator, Optional, Tuple
|
|
|
|
|
|
def _iter_jpegs(data: bytes) -> Iterator[Tuple[int, int]]:
|
|
start = 0
|
|
while True:
|
|
start = data.find(b"\xff\xd8\xff", start)
|
|
if start == -1:
|
|
return
|
|
end = data.find(b"\xff\xd9", start + 3)
|
|
if end != -1:
|
|
yield start, end + 2
|
|
start = end + 2
|
|
else:
|
|
start += 3
|
|
|
|
|
|
def _is_valid_jpeg(blob: bytes) -> bool:
|
|
# Minimal sanity check for JPEG structure.
|
|
if len(blob) < 4 or not (blob[0] == 0xFF and blob[1] == 0xD8):
|
|
return False
|
|
i = 2
|
|
saw_sof = False
|
|
while i + 1 < len(blob):
|
|
if blob[i] != 0xFF:
|
|
# After SOS, image data is byte-stuffed; stop scanning.
|
|
return saw_sof
|
|
# Skip padding FFs.
|
|
while i < len(blob) and blob[i] == 0xFF:
|
|
i += 1
|
|
if i >= len(blob):
|
|
return False
|
|
marker = blob[i]
|
|
i += 1
|
|
if marker == 0xD9: # EOI
|
|
return saw_sof
|
|
if marker == 0xDA: # SOS
|
|
return saw_sof
|
|
if 0xD0 <= marker <= 0xD7 or marker == 0x01:
|
|
continue # no length
|
|
if i + 1 >= len(blob):
|
|
return False
|
|
seg_len = (blob[i] << 8) + blob[i + 1]
|
|
if seg_len < 2:
|
|
return False
|
|
if marker in (0xC0, 0xC1, 0xC2, 0xC3, 0xC5, 0xC6, 0xC7, 0xC9, 0xCA, 0xCB, 0xCD, 0xCE, 0xCF):
|
|
saw_sof = True
|
|
i += seg_len
|
|
return False
|
|
|
|
|
|
def _find_png(data: bytes) -> Optional[Tuple[int, int]]:
|
|
sig = b"\x89PNG\r\n\x1a\n"
|
|
start = data.find(sig)
|
|
if start == -1:
|
|
return None
|
|
i = start + len(sig)
|
|
# Walk PNG chunks until IEND.
|
|
while i + 12 <= len(data):
|
|
length = int.from_bytes(data[i:i + 4], "big")
|
|
ctype = data[i + 4:i + 8]
|
|
i = i + 12 + length
|
|
if ctype == b"IEND":
|
|
return start, i
|
|
return None
|
|
|
|
|
|
def _extract_canvas_preview(data: bytes) -> Optional[bytes]:
|
|
# Extract PNG preview from embedded SQLite chunk.
|
|
marker = b"CHNKSQLi"
|
|
sqlite_header = b"SQLite format 3\x00"
|
|
pos = 0
|
|
while True:
|
|
off = data.find(marker, pos)
|
|
if off == -1:
|
|
return None
|
|
if off + 24 > len(data):
|
|
return None
|
|
size = int.from_bytes(data[off + 8:off + 16], "big")
|
|
chunk_start = off + 24
|
|
chunk_end = chunk_start + size
|
|
if chunk_end > len(data):
|
|
pos = off + 8
|
|
continue
|
|
chunk = data[off:chunk_end]
|
|
idx = chunk.find(sqlite_header)
|
|
if idx == -1:
|
|
pos = off + 8
|
|
continue
|
|
db_start = off + idx
|
|
db_end = db_start + size
|
|
if db_end > len(data):
|
|
pos = off + 8
|
|
continue
|
|
db_bytes = data[db_start:db_end]
|
|
tmp_path = None
|
|
try:
|
|
with tempfile.NamedTemporaryFile(delete=False) as tmp:
|
|
tmp.write(db_bytes)
|
|
tmp_path = tmp.name
|
|
conn = sqlite3.connect(tmp_path)
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
"SELECT length(ImageData) FROM CanvasPreview ORDER BY length(ImageData) DESC LIMIT 1"
|
|
)
|
|
row = cur.fetchone()
|
|
blob = None
|
|
if row and row[0]:
|
|
total = int(row[0])
|
|
parts = []
|
|
chunk = 1024 * 1024
|
|
for offset in range(1, total + 1, chunk):
|
|
size = min(chunk, total - offset + 1)
|
|
cur.execute(
|
|
"SELECT substr(ImageData, ?, ?) FROM CanvasPreview ORDER BY length(ImageData) DESC LIMIT 1",
|
|
(offset, size),
|
|
)
|
|
part = cur.fetchone()[0]
|
|
if not part:
|
|
parts = None
|
|
break
|
|
parts.append(part)
|
|
if parts is not None:
|
|
blob = b"".join(parts)
|
|
conn.close()
|
|
finally:
|
|
if tmp_path:
|
|
try:
|
|
os.remove(tmp_path)
|
|
except Exception:
|
|
pass
|
|
if blob:
|
|
return blob
|
|
pos = off + 8
|
|
|
|
|
|
def _best_image(data: bytes) -> Optional[Tuple[int, int]]:
|
|
best = None
|
|
for start, end in _iter_jpegs(data):
|
|
blob = data[start:end]
|
|
if not _is_valid_jpeg(blob):
|
|
continue
|
|
if best is None or (end - start) > (best[1] - best[0]):
|
|
best = (start, end)
|
|
if best:
|
|
return best
|
|
return _find_png(data)
|
|
|
|
|
|
def _parse_args(argv):
|
|
# Accept: [size, input, output] or [input, output]
|
|
if len(argv) == 3:
|
|
return None, argv[1], argv[2]
|
|
if len(argv) == 4:
|
|
return argv[1], argv[2], argv[3]
|
|
return None, None, None
|
|
|
|
|
|
def _write_preview(preview: bytes, out_path: str, size: Optional[int]) -> bool:
|
|
try:
|
|
from PIL import Image
|
|
except Exception:
|
|
Image = None
|
|
|
|
if Image is None:
|
|
if preview.startswith(b"\x89PNG\r\n\x1a\n") and size is None:
|
|
with open(out_path, "wb") as f:
|
|
f.write(preview)
|
|
return True
|
|
return False
|
|
|
|
with Image.open(io.BytesIO(preview)) as im:
|
|
im.load()
|
|
if size:
|
|
im.thumbnail((size, size), Image.Resampling.LANCZOS)
|
|
im.save(out_path, format="PNG")
|
|
return True
|
|
|
|
|
|
def main() -> int:
|
|
if len(sys.argv) >= 2 and sys.argv[1] == "--batch":
|
|
out_dir = None
|
|
inputs = []
|
|
i = 2
|
|
while i < len(sys.argv):
|
|
arg = sys.argv[i]
|
|
if arg in ("-o", "--output"):
|
|
if i + 1 >= len(sys.argv):
|
|
sys.stderr.write("missing value for -o/--output\n")
|
|
return 2
|
|
out_dir = sys.argv[i + 1]
|
|
i += 2
|
|
continue
|
|
inputs.append(arg)
|
|
i += 1
|
|
if not out_dir:
|
|
sys.stderr.write("batch mode requires -o/--output directory\n")
|
|
return 2
|
|
if not inputs:
|
|
sys.stderr.write("batch mode requires at least one input file\n")
|
|
return 2
|
|
if not os.path.isdir(out_dir):
|
|
sys.stderr.write(f"output path is not a directory: {out_dir}\n")
|
|
return 2
|
|
exit_code = 0
|
|
for in_path in inputs:
|
|
base = os.path.basename(in_path)
|
|
name, _ = os.path.splitext(base)
|
|
out_path = os.path.join(out_dir, name + ".png")
|
|
try:
|
|
with open(in_path, "rb") as f:
|
|
data = f.read()
|
|
except OSError as exc:
|
|
sys.stderr.write(f"failed to read {in_path}: {exc}\n")
|
|
exit_code = 1
|
|
continue
|
|
preview = _extract_canvas_preview(data)
|
|
if preview is None:
|
|
sys.stderr.write(f"no embedded preview found: {in_path}\n")
|
|
exit_code = 1
|
|
continue
|
|
try:
|
|
if not _write_preview(preview, out_path, None):
|
|
sys.stderr.write(
|
|
f"failed to write preview for {in_path} (missing Pillow or unsupported format)\n"
|
|
)
|
|
exit_code = 1
|
|
except OSError as exc:
|
|
sys.stderr.write(f"failed to write {out_path}: {exc}\n")
|
|
exit_code = 1
|
|
return exit_code
|
|
|
|
size_str, in_path, out_path = _parse_args(sys.argv)
|
|
if not in_path or not out_path:
|
|
sys.stderr.write(
|
|
"usage: clip-thumbnailer [size] INPUT OUTPUT | clip-extract-preview INPUT OUTPUT | "
|
|
"clip-extract-preview --batch -o OUTPUT_DIR INPUT...\n"
|
|
)
|
|
return 2
|
|
|
|
size = None
|
|
if size_str:
|
|
try:
|
|
size = max(1, int(size_str))
|
|
except ValueError:
|
|
size = None
|
|
|
|
try:
|
|
with open(in_path, "rb") as f:
|
|
data = f.read()
|
|
except OSError as exc:
|
|
sys.stderr.write(f"failed to read {in_path}: {exc}\n")
|
|
return 1
|
|
|
|
preview = _extract_canvas_preview(data)
|
|
if preview is not None:
|
|
try:
|
|
if not _write_preview(preview, out_path, size):
|
|
sys.stderr.write("failed to write preview (missing Pillow or unsupported format)\n")
|
|
return 1
|
|
except OSError as exc:
|
|
sys.stderr.write(f"failed to write {out_path}: {exc}\n")
|
|
return 1
|
|
return 0
|
|
|
|
loc = _best_image(data)
|
|
if loc is None:
|
|
sys.stderr.write("no embedded preview found\n")
|
|
return 1
|
|
|
|
start, end = loc
|
|
try:
|
|
if not _write_preview(data[start:end], out_path, size):
|
|
sys.stderr.write("failed to write preview (missing Pillow or unsupported format)\n")
|
|
return 1
|
|
except OSError as exc:
|
|
sys.stderr.write(f"failed to write {out_path}: {exc}\n")
|
|
return 1
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|