Save Posters, Backgrounds and Logos as Local Assets

Used Gemini Pro to write python script for PMS on Linux

Features:

  1. Interactive Menu
  2. Skip media with local assets or refresh (with a file size check)
  3. Process a Library or individual Movie/Show
  4. Dry Run Mode.
  5. Logging option
  6. Plex DVR : Notification to use sudo if Permission error due to plex:plex ownership
#!/usr/bin/env python3

"""
Downloads active posters, backgrounds and logos and
Saves them as local assets in the media's directory.
Skips downloading if the local asset already exists.
Provides workflows for full library processing or individual items.
Supports a dry-run mode to simulate downloads without writing files.
"""

import os
import sys
import requests
import stat
import shlex
import logging
from urllib.parse import urljoin
from plexapi.server import PlexServer

# --------- Configuration ---------
PLEX_URL = 'YOUR_PLEX_URL'	# i.e. http://localhost:32400
PLEX_TOKEN = 'YOUR_PLEX_TOKEN'
LOG_FILE = 'asset_downloader.log'
# ---------------------------------

def download_asset(url, folder, filename_base, session, dry_run=False, refresh_mode=False):
	"""
	Downloads the asset from the given URL.
	If refresh_mode is False, skips if the file already exists.
	If refresh_mode is True, compares the remote Content-Length
	to the local file size.
	Utilizes a requests session for connection pooling with timeouts.
	Exits script if a permission error occurs during the file write.
	If dry_run is True, simulates the download without writing files.
	"""
	if not url:
		return

	# Prepend Plex URL safely if path is relative
	if url.startswith('/'):
		request_url = urljoin(PLEX_URL, url)
	else:
		request_url = url

	existing_file = None
	# Check if the local asset already exists in the destination folder
	for ext in ['.jpg', '.png', '.jpeg', '.webp']:
		temp_path = os.path.join(folder, f"{filename_base}{ext}")
		if os.path.exists(temp_path):
			existing_file = temp_path
			break

	if existing_file:
		if not refresh_mode:
			logging.info(f"Skipping {filename_base} in {folder} (Already exists)")
			return
		else:
			try:
				# Use HEAD request to get metadata without downloading the whole file
				head_resp = session.head(request_url, timeout=10)
				remote_size = int(head_resp.headers.get('Content-Length', 0))
				local_size = os.path.getsize(existing_file)

				# Compare sizes; ETags in Plex do not match file MD5s
				if remote_size and local_size == remote_size:
					logging.info(f"Skipping {filename_base} in {folder} (Size Matches)")
					return
			except Exception as e:
				logging.error(f"Metadata check failed for {filename_base}: {e}")

	if dry_run:
		logging.info(f"[DRY-RUN] Would download {filename_base} to {folder}")
		return

	try:
		# Added timeout and raise_for_status for robust network handling
		response = session.get(request_url, stream=True, timeout=15)
		response.raise_for_status()

		content_type = response.headers.get('content-type', '').lower()
		if 'webp' in content_type:
			new_ext = '.webp'
		elif 'png' in content_type:
			new_ext = '.png'
		else:
			new_ext = '.jpg'

		filepath = os.path.join(folder, f"{filename_base}{new_ext}")

		# If refreshing and the new extension differs from the existing file, remove the old one
		if refresh_mode and existing_file and existing_file != filepath:
			logging.info(f"Extension change detected. Removing old asset: {existing_file}")
			os.remove(existing_file)

		with open(filepath, 'wb') as f:
			for chunk in response.iter_content(8192):
				if chunk: # Check for empty keep-alive chunks
					f.write(chunk)

		# Set permissions to 0644 (rw-r--r--)
		os.chmod(filepath, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
		logging.info(f"Downloaded {filename_base}{new_ext} to {folder}")

	except PermissionError:
		logging.error(f"Permission Denied while writing to: {folder}")

		# Check if running as root
		if os.getuid() == 0:
			print("Script is already running as root. This error may indicate a Read-Only filesystem or mount restriction.")
		else:
			print(f"Permission Denied.\nTo resolve, run the script using: sudo {sys.argv[0]}")
		sys.exit(1)
	except Exception as e:
		logging.error(f"Error downloading {filename_base}: {e}")

def process_media(plex, library_name, session, target_title=None, is_tv=False, dry_run=False, refresh_mode=False, collected_paths=None):
	"""
	Processes a library section using an existing PlexServer instance and requests Session.
	If target_title is None, processes the entire library.
	If is_tv is True, iterates through all seasons for each series found.
	Populates collected_paths with processed locations for a final ownership reminder.
	"""
	try:
		section = plex.library.section(library_name)
	except Exception as e:
		logging.error(f"Error accessing library '{library_name}': {e}")
		return False

	# Determine whether to search for a specific item or grab all items
	try:
		if target_title:
			raw_items = section.search(title=target_title)
			items = [item for item in raw_items if item.title.lower() == target_title.lower()]
			if not items:
				logging.warning(f"No exact match found for title: '{target_title}'")
				return False
		else:
			logging.info(f"Retrieving all items from library: {library_name}...")
			items = section.all()
	except Exception as e:
		logging.error(f"Error retrieving items from '{library_name}': {e}")
		return False

	processed_paths = set()

	for item in items:
		locations = getattr(item, 'locations', [])
		if not locations:
			continue

		for media_folder in locations:
			asset_prefix = ""

			if os.path.isfile(media_folder):
				base_name = os.path.splitext(os.path.basename(media_folder))[0]
				media_folder = os.path.dirname(media_folder)
				asset_prefix = f"{base_name}-"

			if not os.path.isdir(media_folder):
				continue

			processed_paths.add(media_folder)
			logging.info(f"Processing: {item.title} at {media_folder}")

			thumb_url = getattr(item, 'thumbUrl', getattr(item, 'thumb', None))
			art_url = getattr(item, 'artUrl', getattr(item, 'art', None))
			logo_path = getattr(item, 'logoUrl', getattr(item, 'logo', None))

			download_asset(thumb_url, media_folder, f'{asset_prefix}poster', session, dry_run, refresh_mode)
			download_asset(art_url, media_folder, f'{asset_prefix}background', session, dry_run, refresh_mode)
			if logo_path:
				download_asset(logo_path, media_folder, f'{asset_prefix}logo', session, dry_run, refresh_mode)

		# Moved outside the locations loop to prevent duplicating season downloads across multiple show paths
		if is_tv:
			for season in item.seasons():
				season_index = getattr(season, 'index', None)
				if season_index is None:
					continue

				season_num = str(season_index).zfill(2)
				season_dir_name = "Specials" if season_index == 0 else f"Season {season_num}"

				season_thumb = getattr(season, 'thumbUrl', getattr(season, 'thumb', None))
				season_art = getattr(season, 'artUrl', getattr(season, 'art', None))

				# Use season locations to map seasons to their specific physical paths
				season_locations = getattr(season, 'locations', [])

				# Fallback if season locations aren't exposed by the API version
				if not season_locations and locations:
					season_locations = [os.path.join(locations[0], season_dir_name)]

				for season_folder in season_locations:
					if os.path.isdir(season_folder):
						download_asset(season_thumb, season_folder, 'poster', session, dry_run, refresh_mode)
						download_asset(season_art, season_folder, 'background', session, dry_run, refresh_mode)
					else:
						# If season folder is missing, fall back to the parent directory logic
						parent_folder = os.path.dirname(season_folder)
						if os.path.isdir(parent_folder):
							download_asset(season_thumb, parent_folder, f'Season{season_num}-poster', session, dry_run, refresh_mode)
							download_asset(season_art, parent_folder, f'Season{season_num}-background', session, dry_run, refresh_mode)

	# Store processed locations for final ownership reminder
	if collected_paths is not None:
		display_paths = processed_paths if target_title else section.locations
		collected_paths.update(display_paths)

	return True

def main():
	"""
	Main menu supporting dynamic library processing and individual items.
	Uses a context manager to ensure the requests session is properly closed.
	"""
	if not PLEX_URL or not PLEX_TOKEN:
		print("[ERROR] PLEX_URL and PLEX_TOKEN variables cannot be empty.")
		sys.exit(1)

	print("Verifying Plex server connection...")
	try:
		plex = PlexServer(PLEX_URL, PLEX_TOKEN)
		print("Connection successful.")
	except Exception as e:
		print(f"[ERROR] Failed to connect to Plex API. Details: {e}")
		sys.exit(1)

	dry_run_input = input("Enable dry-run mode? (yes/no): ").strip().lower()
	if dry_run_input in ("y", "yes"):
		dry_run = True
	elif dry_run_input in ("n", "no"):
		dry_run = False
	else:
		print(f"Exiting script. Received: {dry_run_input}")
		sys.exit(0)

	print("\n--- Asset Download Mode ---")
	print("1. Skip Existing Local Assets")
	print("2. Refresh Local Assets")
	refresh_input = input("\nSelect an option (1-2): ").strip()
	refresh_mode = True if refresh_input == '2' else False

	print("\n--- Logging ---")
	log_to_file_input = input("Enable logging? (y/n): ").strip().lower()

	if log_to_file_input in ("y", "yes"):
		logging.basicConfig(
			level=logging.INFO,
			format='%(asctime)s [%(levelname)s] %(message)s',
			handlers=[logging.FileHandler(LOG_FILE), logging.StreamHandler(sys.stdout)]
		)
	elif log_to_file_input in ("n", "no"):
		logging.basicConfig(
			level=logging.INFO,
			format='%(asctime)s [%(levelname)s] %(message)s',
			handlers=[logging.StreamHandler(sys.stdout)]
		)
	else:
		print(f"Exiting script. Received: '{log_to_file_input}'")
		sys.exit(0)

	collected_paths = set()

	while True:
		print("\n--- Plex Local Asset Downloader ---")
		print("1. Process Full Library")
		print("2. Process Individual Item")
		print("3. Quit")

		choice = input("\nSelect an option (1-3): ").strip()

		if choice == '3':
			logging.info("User requested quit. Exiting.")
			break

		with requests.Session() as session:
			# Ensure token is passed in header for all requests within the session
			session.headers.update({'X-Plex-Token': PLEX_TOKEN})

			if choice in ['1', '2']:
				# Filter out the Photos library by checking the section type
				sections = [s for s in plex.library.sections() if s.type != 'photo']
				print("\nAvailable Libraries:")
				for i, lib in enumerate(sections, 1):
					print(f"{i}. {lib.title} ({lib.type})")

				try:
					l_choice = int(input("\nSelect a library by number: ")) - 1
					if l_choice < 0 or l_choice >= len(sections):
						print("Invalid library selection.")
						continue

					selected_lib = sections[l_choice]
					is_tv = (selected_lib.type == 'show')

					if choice == '1':
						process_media(plex, selected_lib.title, session, None, is_tv=is_tv, dry_run=dry_run, refresh_mode=refresh_mode, collected_paths=collected_paths)

					elif choice == '2':
						success = False
						while not success:
							title = input(f"Enter exact title from '{selected_lib.title}' or 'q' to cancel: ").strip()
							if title.lower() == 'q':
								break
							if not title:
								print("Title cannot be empty. Please try again.")
								continue
							success = process_media(plex, selected_lib.title, session, title, is_tv=is_tv, dry_run=dry_run, refresh_mode=refresh_mode, collected_paths=collected_paths)

				except ValueError:
					print("Invalid input. Please enter a number.")
			else:
				print("Invalid selection.")

	# Show ownership reminder if running as root
	if not dry_run and os.getuid() == 0 and collected_paths:
		print("\n--- Ownership Reminder ---")
		print("Since the script was run with sudo, new assets are owned by root.")
		print("Fix ownership for Plex by running:")
		for path in collected_paths:
			safe_path = shlex.quote(path)
			print(f"   sudo find {safe_path} -type f \\( -name '*poster*' -o -name '*background*' -o -name '*logo*' \\) -exec chown plex:plex {{}} \\;")

if __name__ == '__main__':
	main()

This script has been around awhile. Which I believe accomplishes what you’re looking to do. Looks like it’s missing the clearLogo piece though.

Reviewing your script and it looks ok.

I updated the script to include logos and square art, as well as an optional overwrite flag.