Used Gemini Pro to write python script for PMS on Linux
Features:
- Interactive Menu
- Skip media with local assets or refresh (with a file size check)
- Process a Library or individual Movie/Show
- Dry Run Mode.
- Logging option
- Plex DVR : Notification to use sudo if Permission error due to plex:plex ownership
#!/usr/bin/env python3
"""
Downloads active posters, backgrounds and logos and
Saves them as local assets in the media's directory.
Skips downloading if the local asset already exists.
Provides workflows for full library processing or individual items.
Supports a dry-run mode to simulate downloads without writing files.
"""
import os
import sys
import requests
import stat
import shlex
import logging
from urllib.parse import urljoin
from plexapi.server import PlexServer
# --------- Configuration ---------
PLEX_URL = 'YOUR_PLEX_URL' # i.e. http://localhost:32400
PLEX_TOKEN = 'YOUR_PLEX_TOKEN'
LOG_FILE = 'asset_downloader.log'
# ---------------------------------
def download_asset(url, folder, filename_base, session, dry_run=False, refresh_mode=False):
"""
Downloads the asset from the given URL.
If refresh_mode is False, skips if the file already exists.
If refresh_mode is True, compares the remote Content-Length
to the local file size.
Utilizes a requests session for connection pooling with timeouts.
Exits script if a permission error occurs during the file write.
If dry_run is True, simulates the download without writing files.
"""
if not url:
return
# Prepend Plex URL safely if path is relative
if url.startswith('/'):
request_url = urljoin(PLEX_URL, url)
else:
request_url = url
existing_file = None
# Check if the local asset already exists in the destination folder
for ext in ['.jpg', '.png', '.jpeg', '.webp']:
temp_path = os.path.join(folder, f"{filename_base}{ext}")
if os.path.exists(temp_path):
existing_file = temp_path
break
if existing_file:
if not refresh_mode:
logging.info(f"Skipping {filename_base} in {folder} (Already exists)")
return
else:
try:
# Use HEAD request to get metadata without downloading the whole file
head_resp = session.head(request_url, timeout=10)
remote_size = int(head_resp.headers.get('Content-Length', 0))
local_size = os.path.getsize(existing_file)
# Compare sizes; ETags in Plex do not match file MD5s
if remote_size and local_size == remote_size:
logging.info(f"Skipping {filename_base} in {folder} (Size Matches)")
return
except Exception as e:
logging.error(f"Metadata check failed for {filename_base}: {e}")
if dry_run:
logging.info(f"[DRY-RUN] Would download {filename_base} to {folder}")
return
try:
# Added timeout and raise_for_status for robust network handling
response = session.get(request_url, stream=True, timeout=15)
response.raise_for_status()
content_type = response.headers.get('content-type', '').lower()
if 'webp' in content_type:
new_ext = '.webp'
elif 'png' in content_type:
new_ext = '.png'
else:
new_ext = '.jpg'
filepath = os.path.join(folder, f"{filename_base}{new_ext}")
# If refreshing and the new extension differs from the existing file, remove the old one
if refresh_mode and existing_file and existing_file != filepath:
logging.info(f"Extension change detected. Removing old asset: {existing_file}")
os.remove(existing_file)
with open(filepath, 'wb') as f:
for chunk in response.iter_content(8192):
if chunk: # Check for empty keep-alive chunks
f.write(chunk)
# Set permissions to 0644 (rw-r--r--)
os.chmod(filepath, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
logging.info(f"Downloaded {filename_base}{new_ext} to {folder}")
except PermissionError:
logging.error(f"Permission Denied while writing to: {folder}")
# Check if running as root
if os.getuid() == 0:
print("Script is already running as root. This error may indicate a Read-Only filesystem or mount restriction.")
else:
print(f"Permission Denied.\nTo resolve, run the script using: sudo {sys.argv[0]}")
sys.exit(1)
except Exception as e:
logging.error(f"Error downloading {filename_base}: {e}")
def process_media(plex, library_name, session, target_title=None, is_tv=False, dry_run=False, refresh_mode=False, collected_paths=None):
"""
Processes a library section using an existing PlexServer instance and requests Session.
If target_title is None, processes the entire library.
If is_tv is True, iterates through all seasons for each series found.
Populates collected_paths with processed locations for a final ownership reminder.
"""
try:
section = plex.library.section(library_name)
except Exception as e:
logging.error(f"Error accessing library '{library_name}': {e}")
return False
# Determine whether to search for a specific item or grab all items
try:
if target_title:
raw_items = section.search(title=target_title)
items = [item for item in raw_items if item.title.lower() == target_title.lower()]
if not items:
logging.warning(f"No exact match found for title: '{target_title}'")
return False
else:
logging.info(f"Retrieving all items from library: {library_name}...")
items = section.all()
except Exception as e:
logging.error(f"Error retrieving items from '{library_name}': {e}")
return False
processed_paths = set()
for item in items:
locations = getattr(item, 'locations', [])
if not locations:
continue
for media_folder in locations:
asset_prefix = ""
if os.path.isfile(media_folder):
base_name = os.path.splitext(os.path.basename(media_folder))[0]
media_folder = os.path.dirname(media_folder)
asset_prefix = f"{base_name}-"
if not os.path.isdir(media_folder):
continue
processed_paths.add(media_folder)
logging.info(f"Processing: {item.title} at {media_folder}")
thumb_url = getattr(item, 'thumbUrl', getattr(item, 'thumb', None))
art_url = getattr(item, 'artUrl', getattr(item, 'art', None))
logo_path = getattr(item, 'logoUrl', getattr(item, 'logo', None))
download_asset(thumb_url, media_folder, f'{asset_prefix}poster', session, dry_run, refresh_mode)
download_asset(art_url, media_folder, f'{asset_prefix}background', session, dry_run, refresh_mode)
if logo_path:
download_asset(logo_path, media_folder, f'{asset_prefix}logo', session, dry_run, refresh_mode)
# Moved outside the locations loop to prevent duplicating season downloads across multiple show paths
if is_tv:
for season in item.seasons():
season_index = getattr(season, 'index', None)
if season_index is None:
continue
season_num = str(season_index).zfill(2)
season_dir_name = "Specials" if season_index == 0 else f"Season {season_num}"
season_thumb = getattr(season, 'thumbUrl', getattr(season, 'thumb', None))
season_art = getattr(season, 'artUrl', getattr(season, 'art', None))
# Use season locations to map seasons to their specific physical paths
season_locations = getattr(season, 'locations', [])
# Fallback if season locations aren't exposed by the API version
if not season_locations and locations:
season_locations = [os.path.join(locations[0], season_dir_name)]
for season_folder in season_locations:
if os.path.isdir(season_folder):
download_asset(season_thumb, season_folder, 'poster', session, dry_run, refresh_mode)
download_asset(season_art, season_folder, 'background', session, dry_run, refresh_mode)
else:
# If season folder is missing, fall back to the parent directory logic
parent_folder = os.path.dirname(season_folder)
if os.path.isdir(parent_folder):
download_asset(season_thumb, parent_folder, f'Season{season_num}-poster', session, dry_run, refresh_mode)
download_asset(season_art, parent_folder, f'Season{season_num}-background', session, dry_run, refresh_mode)
# Store processed locations for final ownership reminder
if collected_paths is not None:
display_paths = processed_paths if target_title else section.locations
collected_paths.update(display_paths)
return True
def main():
"""
Main menu supporting dynamic library processing and individual items.
Uses a context manager to ensure the requests session is properly closed.
"""
if not PLEX_URL or not PLEX_TOKEN:
print("[ERROR] PLEX_URL and PLEX_TOKEN variables cannot be empty.")
sys.exit(1)
print("Verifying Plex server connection...")
try:
plex = PlexServer(PLEX_URL, PLEX_TOKEN)
print("Connection successful.")
except Exception as e:
print(f"[ERROR] Failed to connect to Plex API. Details: {e}")
sys.exit(1)
dry_run_input = input("Enable dry-run mode? (yes/no): ").strip().lower()
if dry_run_input in ("y", "yes"):
dry_run = True
elif dry_run_input in ("n", "no"):
dry_run = False
else:
print(f"Exiting script. Received: {dry_run_input}")
sys.exit(0)
print("\n--- Asset Download Mode ---")
print("1. Skip Existing Local Assets")
print("2. Refresh Local Assets")
refresh_input = input("\nSelect an option (1-2): ").strip()
refresh_mode = True if refresh_input == '2' else False
print("\n--- Logging ---")
log_to_file_input = input("Enable logging? (y/n): ").strip().lower()
if log_to_file_input in ("y", "yes"):
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[logging.FileHandler(LOG_FILE), logging.StreamHandler(sys.stdout)]
)
elif log_to_file_input in ("n", "no"):
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[logging.StreamHandler(sys.stdout)]
)
else:
print(f"Exiting script. Received: '{log_to_file_input}'")
sys.exit(0)
collected_paths = set()
while True:
print("\n--- Plex Local Asset Downloader ---")
print("1. Process Full Library")
print("2. Process Individual Item")
print("3. Quit")
choice = input("\nSelect an option (1-3): ").strip()
if choice == '3':
logging.info("User requested quit. Exiting.")
break
with requests.Session() as session:
# Ensure token is passed in header for all requests within the session
session.headers.update({'X-Plex-Token': PLEX_TOKEN})
if choice in ['1', '2']:
# Filter out the Photos library by checking the section type
sections = [s for s in plex.library.sections() if s.type != 'photo']
print("\nAvailable Libraries:")
for i, lib in enumerate(sections, 1):
print(f"{i}. {lib.title} ({lib.type})")
try:
l_choice = int(input("\nSelect a library by number: ")) - 1
if l_choice < 0 or l_choice >= len(sections):
print("Invalid library selection.")
continue
selected_lib = sections[l_choice]
is_tv = (selected_lib.type == 'show')
if choice == '1':
process_media(plex, selected_lib.title, session, None, is_tv=is_tv, dry_run=dry_run, refresh_mode=refresh_mode, collected_paths=collected_paths)
elif choice == '2':
success = False
while not success:
title = input(f"Enter exact title from '{selected_lib.title}' or 'q' to cancel: ").strip()
if title.lower() == 'q':
break
if not title:
print("Title cannot be empty. Please try again.")
continue
success = process_media(plex, selected_lib.title, session, title, is_tv=is_tv, dry_run=dry_run, refresh_mode=refresh_mode, collected_paths=collected_paths)
except ValueError:
print("Invalid input. Please enter a number.")
else:
print("Invalid selection.")
# Show ownership reminder if running as root
if not dry_run and os.getuid() == 0 and collected_paths:
print("\n--- Ownership Reminder ---")
print("Since the script was run with sudo, new assets are owned by root.")
print("Fix ownership for Plex by running:")
for path in collected_paths:
safe_path = shlex.quote(path)
print(f" sudo find {safe_path} -type f \\( -name '*poster*' -o -name '*background*' -o -name '*logo*' \\) -exec chown plex:plex {{}} \\;")
if __name__ == '__main__':
main()