1
0
Files
emby-tv-logo-tools/update_channel_logos.py
Seth Van Niekerk 312c54c5bd
All checks were successful
Build & Push Docker Image / build-and-publish (push) Successful in 14s
containerize & enhance filtering
2026-02-08 12:21:20 -05:00

408 lines
16 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Emby Live TV Channel Logo Updater
Copies 'Logo (Dark Version)' to 'Logo (Light Version)' and 'Logo (Light with Colour)'.
"""
import argparse
import base64
import json
import sys
from typing import Dict, List, Optional
import urllib.request
import urllib.error
import urllib.parse
class EmbyClient:
"""Client for interacting with the Emby Server API."""
def __init__(self, server_url: str, api_key: str):
self.server_url = server_url.rstrip('/')
self.api_key = api_key
def _make_request(self, endpoint: str, method: str = 'GET', data: Optional[Dict] = None) -> Dict:
url = f"{self.server_url}{endpoint}"
if method == 'GET':
separator = '&' if '?' in url else '?'
url = f"{url}{separator}api_key={self.api_key}"
request = urllib.request.Request(url, method=method)
request.add_header('X-Emby-Token', self.api_key)
if data is not None:
request.add_header('Content-Type', 'application/json')
request.data = json.dumps(data).encode('utf-8')
try:
with urllib.request.urlopen(request) as response:
if response.status == 204:
return {}
return json.loads(response.read().decode('utf-8'))
except urllib.error.HTTPError as e:
error_body = e.read().decode('utf-8') if e.fp else 'No error details'
raise Exception(f"HTTP {e.code}: {error_body}")
def get_live_tv_channels(self) -> List[Dict]:
# Request channels with Fields parameter to ensure tags are included
response = self._make_request('/LiveTv/Channels?Type=TV&EnableImages=true&Fields=Tags,TagItems')
return response.get('Items', [])
def get_channel_by_id(self, channel_id: str) -> Dict:
return self._make_request(f'/LiveTv/Channels/{channel_id}')
def download_image(self, item_id: str, image_type: str) -> bytes:
url = f"{self.server_url}/Items/{item_id}/Images/{image_type}?api_key={self.api_key}"
with urllib.request.urlopen(urllib.request.Request(url)) as response:
return response.read()
def upload_image(self, item_id: str, image_type: str, image_data: bytes) -> None:
if image_data.startswith(b'\x89PNG'):
content_type = 'image/png'
elif image_data.startswith(b'\xff\xd8'):
content_type = 'image/jpeg'
elif image_data.startswith(b'GIF'):
content_type = 'image/gif'
else:
content_type = 'application/octet-stream'
url = f"{self.server_url}/Items/{item_id}/Images/{image_type}"
request = urllib.request.Request(url, method='POST')
request.add_header('X-Emby-Token', self.api_key)
request.add_header('Content-Type', content_type)
request.data = base64.b64encode(image_data).decode('utf-8').encode('utf-8')
try:
with urllib.request.urlopen(request):
pass
except urllib.error.HTTPError as e:
error_body = e.read().decode('utf-8') if e.fp else 'No error details'
raise Exception(f"Failed to upload {image_type}: HTTP {e.code} - {error_body}")
def delete_image(self, item_id: str, image_type: str) -> None:
url = f"{self.server_url}/Items/{item_id}/Images/{image_type}"
request = urllib.request.Request(url, method='DELETE')
request.add_header('X-Emby-Token', self.api_key)
try:
with urllib.request.urlopen(request):
pass
except urllib.error.HTTPError as e:
error_body = e.read().decode('utf-8') if e.fp else 'No error details'
raise Exception(f"Failed to delete {image_type}: HTTP {e.code} - {error_body}")
def filter_channels_by_tags(channels: List[Dict], tags: Optional[List[str]]) -> List[Dict]:
"""Filter channels by tags. Returns all channels if tags is None or empty."""
if not tags:
return channels
filtered = []
for channel in channels:
# Try multiple possible tag field names
channel_tags = channel.get('Tags', []) or channel.get('TagItems', [])
# Extract tag names from the channel
tag_names = []
if isinstance(channel_tags, list) and channel_tags:
if isinstance(channel_tags[0], dict):
# TagItems format: list of dicts with 'Name' field
tag_names = [tag.get('Name', '') for tag in channel_tags if tag.get('Name')]
else:
# Tags format: simple list of strings
tag_names = channel_tags
# Check if any requested tag matches this channel's tags
if any(tag in tag_names for tag in tags):
filtered.append(channel)
return filtered
def get_all_unique_tags(channels: List[Dict]) -> List[str]:
"""Get all unique tags from channels."""
tags = set()
for channel in channels:
# Try multiple possible tag field names
channel_tags = channel.get('Tags', []) or channel.get('TagItems', [])
if isinstance(channel_tags, list):
# If it's a list of dicts with 'Name' field (TagItems format)
if channel_tags and isinstance(channel_tags[0], dict):
tags.update(tag.get('Name', '') for tag in channel_tags if tag.get('Name'))
else:
# If it's a simple list of strings (Tags format)
tags.update(channel_tags)
return sorted(list(tags))
def copy_primary_to_light_logos(client: EmbyClient, channel_id: str, channel_name: str, dry_run: bool = True, force: bool = False) -> bool:
try:
channel_data = client.get_channel_by_id(channel_id)
image_tags = channel_data.get('ImageTags', {})
if not image_tags.get('Primary'):
print(f" ⚠️ No Primary image, skipping")
return False
has_logo_light = 'LogoLight' in image_tags
has_logo_light_color = 'LogoLightColor' in image_tags
if has_logo_light and has_logo_light_color and not force:
print(f" ✓ Already has both logos")
return True
to_upload = []
if not has_logo_light or force:
to_upload.append('LogoLight')
if not has_logo_light_color or force:
to_upload.append('LogoLightColor')
if dry_run:
action = "overwrite" if (has_logo_light or has_logo_light_color) and force else "upload"
print(f" → Would {action}: {', '.join(to_upload)}")
return True
print(f" 📥 Downloading Primary...")
primary_image_data = client.download_image(channel_id, 'Primary')
if 'LogoLight' in to_upload:
action = "Overwriting" if has_logo_light else "Uploading"
print(f" 📤 {action} LogoLight...")
client.upload_image(channel_id, 'LogoLight', primary_image_data)
if 'LogoLightColor' in to_upload:
action = "Overwriting" if has_logo_light_color else "Uploading"
print(f" 📤 {action} LogoLightColor...")
client.upload_image(channel_id, 'LogoLightColor', primary_image_data)
print(f" ✓ Successfully uploaded")
return True
except Exception as e:
print(f" ✗ Error: {e}")
import traceback
traceback.print_exc()
return False
def clear_channel_logos(client: EmbyClient, channel_id: str, channel_name: str, dry_run: bool = True) -> bool:
try:
channel_data = client.get_channel_by_id(channel_id)
image_tags = channel_data.get('ImageTags', {})
image_types = ['Primary', 'LogoLight', 'LogoLightColor']
existing = [img_type for img_type in image_types if img_type in image_tags]
if not existing:
print(f" No logos to clear")
return True
if dry_run:
print(f" → Would delete: {', '.join(existing)}")
return True
for img_type in existing:
print(f" 🗑️ Deleting {img_type}...")
client.delete_image(channel_id, img_type)
print(f" ✓ Successfully cleared {len(existing)} logo(s)")
return True
except Exception as e:
print(f" ✗ Error: {e}")
import traceback
traceback.print_exc()
return False
def update_channel_logos(client: EmbyClient, dry_run: bool = True, first_only: bool = False, force: bool = False, tags: Optional[List[str]] = None) -> None:
channels = client.get_live_tv_channels()
# Filter by tags if specified
if tags:
print(f"Filtering channels by tags: {', '.join(tags)}")
channels = filter_channels_by_tags(channels, tags)
if not channels:
print(f"\n⚠️ No channels found with the specified tags")
all_tags = get_all_unique_tags(client.get_live_tv_channels())
if all_tags:
print(f"\nAvailable tags in your channels ({len(all_tags)}):")
for tag in all_tags[:20]: # Show first 20 tags
print(f" - {tag}")
if len(all_tags) > 20:
print(f" ... and {len(all_tags) - 20} more")
else:
print("No tags found in any channels")
return
channels_to_process = [channels[0]] if first_only else channels
print("\n" + "=" * 80)
if first_only:
print(f"{'DRY RUN: TESTING ON' if dry_run else 'PROCESSING'} FIRST CHANNEL ONLY")
else:
print(f"{'DRY RUN: SIMULATING' if dry_run else 'PROCESSING'} ALL {len(channels)} CHANNELS")
if force:
print("FORCE MODE: Will overwrite existing logos")
print("=" * 80)
success_count = 0
skip_count = 0
for i, channel in enumerate(channels_to_process, 1):
print(f"\n[{i}/{len(channels_to_process)}] {channel.get('Name', 'Unknown')}")
if copy_primary_to_light_logos(client, channel['Id'], channel.get('Name'), dry_run, force):
success_count += 1
else:
skip_count += 1
print("\n" + "=" * 80)
print("SUMMARY")
print("=" * 80)
print(f"Total: {len(channels_to_process)}")
print(f"{'Would update' if dry_run else 'Updated'}: {success_count}")
print(f"Skipped: {skip_count}")
if dry_run:
print("\n💡 Dry run. Use --execute to perform actual updates.")
if first_only and not dry_run:
print("\n💡 Processed first channel only. Remove --first-only to process all.")
def clear_logos(client: EmbyClient, dry_run: bool = True, first_only: bool = False, tags: Optional[List[str]] = None) -> None:
channels = client.get_live_tv_channels()
# Filter by tags if specified
if tags:
print(f"Filtering channels by tags: {', '.join(tags)}")
channels = filter_channels_by_tags(channels, tags)
if not channels:
print(f"\n⚠️ No channels found with the specified tags")
all_tags = get_all_unique_tags(client.get_live_tv_channels())
if all_tags:
print(f"\nAvailable tags in your channels ({len(all_tags)}):")
for tag in all_tags[:20]: # Show first 20 tags
print(f" - {tag}")
if len(all_tags) > 20:
print(f" ... and {len(all_tags) - 20} more")
else:
print("No tags found in any channels")
return
channels_to_process = [channels[0]] if first_only else channels
print("\n" + "=" * 80)
if first_only:
print(f"{'DRY RUN: CLEARING' if dry_run else 'CLEARING'} FIRST CHANNEL ONLY")
else:
print(f"{'DRY RUN: CLEARING' if dry_run else 'CLEARING'} ALL {len(channels)} CHANNELS")
print("=" * 80)
success_count = 0
skip_count = 0
for i, channel in enumerate(channels_to_process, 1):
print(f"\n[{i}/{len(channels_to_process)}] {channel.get('Name', 'Unknown')}")
if clear_channel_logos(client, channel['Id'], channel.get('Name'), dry_run):
success_count += 1
else:
skip_count += 1
print("\n" + "=" * 80)
print("SUMMARY")
print("=" * 80)
print(f"Total: {len(channels_to_process)}")
print(f"{'Would clear' if dry_run else 'Cleared'}: {success_count}")
print(f"Skipped: {skip_count}")
if dry_run:
print("\n💡 Dry run. Use --execute to perform actual deletions.")
if first_only and not dry_run:
print("\n💡 Processed first channel only. Remove --first-only to process all.")
def main():
parser = argparse.ArgumentParser(description='Copy Emby Live TV channel dark logos to light versions')
parser.add_argument('--server', required=True, help='Emby server URL')
parser.add_argument('--api-key', required=True, help='Emby API key')
parser.add_argument('--execute', action='store_true', help='Actually perform updates')
parser.add_argument('--first-only', action='store_true', help='Only process first channel')
parser.add_argument('--clear', action='store_true', help='Clear all logos (Primary, LogoLight, LogoLightColor)')
parser.add_argument('--force', action='store_true', help='Overwrite existing logos (only applies to copy mode)')
parser.add_argument('--non-interactive', action='store_true', help='Skip confirmation prompts (for automated execution)')
parser.add_argument('--tags', help='Comma-separated list of tags to filter channels (e.g., "sports,news")')
parser.add_argument('--list-tags', action='store_true', help='List all available tags and exit')
parser.add_argument('--debug-tags', action='store_true', help='Show tag fields from first channel for debugging')
args = parser.parse_args()
# Parse tags if provided
tags = None
if args.tags:
tags = [tag.strip() for tag in args.tags.split(',') if tag.strip()]
if not tags:
print("Warning: --tags specified but no valid tags found", file=sys.stderr)
tags = None
client = EmbyClient(args.server, args.api_key)
try:
print(f"Connecting to {args.server}...")
channels = client.get_live_tv_channels()
print(f"✓ Connected! Found {len(channels)} TV channels")
except Exception as e:
print(f"✗ Connection failed: {e}", file=sys.stderr)
sys.exit(1)
# Handle --list-tags
if args.list_tags:
all_tags = get_all_unique_tags(channels)
if all_tags:
print(f"\nFound {len(all_tags)} unique tags:")
for tag in all_tags:
print(f" - {tag}")
else:
print("\nNo tags found in any channels")
sys.exit(0)
# Handle --debug-tags
if args.debug_tags:
if channels:
print("\nFirst channel debug info:")
print(f"Channel name: {channels[0].get('Name', 'Unknown')}")
print(f"Tags field: {channels[0].get('Tags', 'NOT PRESENT')}")
print(f"TagItems field: {channels[0].get('TagItems', 'NOT PRESENT')}")
print("\nAll available fields:")
for key in sorted(channels[0].keys()):
if 'tag' in key.lower():
print(f" {key}: {channels[0][key]}")
else:
print("\nNo channels available for debugging")
sys.exit(0)
if args.clear:
if args.execute:
target = "first channel" if args.first_only else "all channels"
tag_info = f" with tags [{', '.join(tags)}]" if tags else ""
if args.non_interactive or input(f"\nCLEAR ALL LOGOS from {target}{tag_info}? (yes/no): ").lower() == 'yes':
clear_logos(client, dry_run=False, first_only=args.first_only, tags=tags)
else:
print("Cancelled.")
else:
clear_logos(client, dry_run=True, first_only=args.first_only, tags=tags)
else:
if args.execute:
target = "first channel" if args.first_only else "all channels"
action = "overwrite logos in" if args.force else "modify"
tag_info = f" with tags [{', '.join(tags)}]" if tags else ""
if args.non_interactive or input(f"\n{action.capitalize()} {target}{tag_info}? (yes/no): ").lower() == 'yes':
update_channel_logos(client, dry_run=False, first_only=args.first_only, force=args.force, tags=tags)
else:
print("Cancelled.")
else:
update_channel_logos(client, dry_run=True, first_only=args.first_only, force=args.force, tags=tags)
if __name__ == '__main__':
main()