All checks were successful
Build & Push Docker Image / build-and-publish (push) Successful in 11s
332 lines
13 KiB
Python
332 lines
13 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Emby Live TV Channel Logo Updater
|
||
|
||
Copies 'Logo (Dark Version)' to 'Logo (Light Version)' and 'Logo (Light with Colour)'.
|
||
"""
|
||
|
||
import argparse
|
||
import base64
|
||
import json
|
||
import sys
|
||
from typing import Dict, List, Optional
|
||
import urllib.request
|
||
import urllib.error
|
||
import urllib.parse
|
||
|
||
|
||
class EmbyClient:
|
||
"""Client for interacting with the Emby Server API."""
|
||
|
||
def __init__(self, server_url: str, api_key: str):
|
||
self.server_url = server_url.rstrip('/')
|
||
self.api_key = api_key
|
||
|
||
def _make_request(self, endpoint: str, method: str = 'GET', data: Optional[Dict] = None) -> Dict:
|
||
url = f"{self.server_url}{endpoint}"
|
||
if method == 'GET':
|
||
separator = '&' if '?' in url else '?'
|
||
url = f"{url}{separator}api_key={self.api_key}"
|
||
|
||
request = urllib.request.Request(url, method=method)
|
||
request.add_header('X-Emby-Token', self.api_key)
|
||
|
||
if data is not None:
|
||
request.add_header('Content-Type', 'application/json')
|
||
request.data = json.dumps(data).encode('utf-8')
|
||
|
||
try:
|
||
with urllib.request.urlopen(request) as response:
|
||
if response.status == 204:
|
||
return {}
|
||
return json.loads(response.read().decode('utf-8'))
|
||
except urllib.error.HTTPError as e:
|
||
error_body = e.read().decode('utf-8') if e.fp else 'No error details'
|
||
raise Exception(f"HTTP {e.code}: {error_body}")
|
||
|
||
def get_live_tv_channels(self) -> List[Dict]:
|
||
response = self._make_request('/LiveTv/Channels?Type=TV&EnableImages=true')
|
||
return response.get('Items', [])
|
||
|
||
def get_channel_by_id(self, channel_id: str) -> Dict:
|
||
return self._make_request(f'/LiveTv/Channels/{channel_id}')
|
||
|
||
def download_image(self, item_id: str, image_type: str) -> bytes:
|
||
url = f"{self.server_url}/Items/{item_id}/Images/{image_type}?api_key={self.api_key}"
|
||
with urllib.request.urlopen(urllib.request.Request(url)) as response:
|
||
return response.read()
|
||
|
||
def upload_image(self, item_id: str, image_type: str, image_data: bytes) -> None:
|
||
if image_data.startswith(b'\x89PNG'):
|
||
content_type = 'image/png'
|
||
elif image_data.startswith(b'\xff\xd8'):
|
||
content_type = 'image/jpeg'
|
||
elif image_data.startswith(b'GIF'):
|
||
content_type = 'image/gif'
|
||
else:
|
||
content_type = 'application/octet-stream'
|
||
|
||
url = f"{self.server_url}/Items/{item_id}/Images/{image_type}"
|
||
request = urllib.request.Request(url, method='POST')
|
||
request.add_header('X-Emby-Token', self.api_key)
|
||
request.add_header('Content-Type', content_type)
|
||
request.data = base64.b64encode(image_data).decode('utf-8').encode('utf-8')
|
||
|
||
try:
|
||
with urllib.request.urlopen(request):
|
||
pass
|
||
except urllib.error.HTTPError as e:
|
||
error_body = e.read().decode('utf-8') if e.fp else 'No error details'
|
||
raise Exception(f"Failed to upload {image_type}: HTTP {e.code} - {error_body}")
|
||
|
||
def delete_image(self, item_id: str, image_type: str) -> None:
|
||
url = f"{self.server_url}/Items/{item_id}/Images/{image_type}"
|
||
request = urllib.request.Request(url, method='DELETE')
|
||
request.add_header('X-Emby-Token', self.api_key)
|
||
|
||
try:
|
||
with urllib.request.urlopen(request):
|
||
pass
|
||
except urllib.error.HTTPError as e:
|
||
error_body = e.read().decode('utf-8') if e.fp else 'No error details'
|
||
raise Exception(f"Failed to delete {image_type}: HTTP {e.code} - {error_body}")
|
||
|
||
|
||
def filter_channels_by_tags(channels: List[Dict], tags: Optional[List[str]]) -> List[Dict]:
|
||
"""Filter channels by tags. Returns all channels if tags is None or empty."""
|
||
if not tags:
|
||
return channels
|
||
|
||
filtered = []
|
||
for channel in channels:
|
||
channel_tags = channel.get('Tags', [])
|
||
if any(tag in channel_tags for tag in tags):
|
||
filtered.append(channel)
|
||
|
||
return filtered
|
||
|
||
|
||
def copy_primary_to_light_logos(client: EmbyClient, channel_id: str, channel_name: str, dry_run: bool = True, force: bool = False) -> bool:
|
||
try:
|
||
channel_data = client.get_channel_by_id(channel_id)
|
||
image_tags = channel_data.get('ImageTags', {})
|
||
|
||
if not image_tags.get('Primary'):
|
||
print(f" ⚠️ No Primary image, skipping")
|
||
return False
|
||
|
||
has_logo_light = 'LogoLight' in image_tags
|
||
has_logo_light_color = 'LogoLightColor' in image_tags
|
||
|
||
if has_logo_light and has_logo_light_color and not force:
|
||
print(f" ✓ Already has both logos")
|
||
return True
|
||
|
||
to_upload = []
|
||
if not has_logo_light or force:
|
||
to_upload.append('LogoLight')
|
||
if not has_logo_light_color or force:
|
||
to_upload.append('LogoLightColor')
|
||
|
||
if dry_run:
|
||
action = "overwrite" if (has_logo_light or has_logo_light_color) and force else "upload"
|
||
print(f" → Would {action}: {', '.join(to_upload)}")
|
||
return True
|
||
|
||
print(f" 📥 Downloading Primary...")
|
||
primary_image_data = client.download_image(channel_id, 'Primary')
|
||
|
||
if 'LogoLight' in to_upload:
|
||
action = "Overwriting" if has_logo_light else "Uploading"
|
||
print(f" 📤 {action} LogoLight...")
|
||
client.upload_image(channel_id, 'LogoLight', primary_image_data)
|
||
|
||
if 'LogoLightColor' in to_upload:
|
||
action = "Overwriting" if has_logo_light_color else "Uploading"
|
||
print(f" 📤 {action} LogoLightColor...")
|
||
client.upload_image(channel_id, 'LogoLightColor', primary_image_data)
|
||
|
||
print(f" ✓ Successfully uploaded")
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f" ✗ Error: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return False
|
||
|
||
|
||
def clear_channel_logos(client: EmbyClient, channel_id: str, channel_name: str, dry_run: bool = True) -> bool:
|
||
try:
|
||
channel_data = client.get_channel_by_id(channel_id)
|
||
image_tags = channel_data.get('ImageTags', {})
|
||
|
||
image_types = ['Primary', 'LogoLight', 'LogoLightColor']
|
||
existing = [img_type for img_type in image_types if img_type in image_tags]
|
||
|
||
if not existing:
|
||
print(f" ℹ️ No logos to clear")
|
||
return True
|
||
|
||
if dry_run:
|
||
print(f" → Would delete: {', '.join(existing)}")
|
||
return True
|
||
|
||
for img_type in existing:
|
||
print(f" 🗑️ Deleting {img_type}...")
|
||
client.delete_image(channel_id, img_type)
|
||
|
||
print(f" ✓ Successfully cleared {len(existing)} logo(s)")
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f" ✗ Error: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return False
|
||
|
||
|
||
def update_channel_logos(client: EmbyClient, dry_run: bool = True, first_only: bool = False, force: bool = False, tags: Optional[List[str]] = None) -> None:
|
||
channels = client.get_live_tv_channels()
|
||
|
||
# Filter by tags if specified
|
||
if tags:
|
||
print(f"Filtering channels by tags: {', '.join(tags)}")
|
||
channels = filter_channels_by_tags(channels, tags)
|
||
if not channels:
|
||
print(f"\n⚠️ No channels found with the specified tags")
|
||
return
|
||
|
||
channels_to_process = [channels[0]] if first_only else channels
|
||
|
||
print("\n" + "=" * 80)
|
||
if first_only:
|
||
print(f"{'DRY RUN: TESTING ON' if dry_run else 'PROCESSING'} FIRST CHANNEL ONLY")
|
||
else:
|
||
print(f"{'DRY RUN: SIMULATING' if dry_run else 'PROCESSING'} ALL {len(channels)} CHANNELS")
|
||
if force:
|
||
print("FORCE MODE: Will overwrite existing logos")
|
||
print("=" * 80)
|
||
|
||
success_count = 0
|
||
skip_count = 0
|
||
|
||
for i, channel in enumerate(channels_to_process, 1):
|
||
print(f"\n[{i}/{len(channels_to_process)}] {channel.get('Name', 'Unknown')}")
|
||
if copy_primary_to_light_logos(client, channel['Id'], channel.get('Name'), dry_run, force):
|
||
success_count += 1
|
||
else:
|
||
skip_count += 1
|
||
|
||
print("\n" + "=" * 80)
|
||
print("SUMMARY")
|
||
print("=" * 80)
|
||
print(f"Total: {len(channels_to_process)}")
|
||
print(f"{'Would update' if dry_run else 'Updated'}: {success_count}")
|
||
print(f"Skipped: {skip_count}")
|
||
|
||
if dry_run:
|
||
print("\n💡 Dry run. Use --execute to perform actual updates.")
|
||
if first_only and not dry_run:
|
||
print("\n💡 Processed first channel only. Remove --first-only to process all.")
|
||
|
||
|
||
def clear_logos(client: EmbyClient, dry_run: bool = True, first_only: bool = False, tags: Optional[List[str]] = None) -> None:
|
||
channels = client.get_live_tv_channels()
|
||
|
||
# Filter by tags if specified
|
||
if tags:
|
||
print(f"Filtering channels by tags: {', '.join(tags)}")
|
||
channels = filter_channels_by_tags(channels, tags)
|
||
if not channels:
|
||
print(f"\n⚠️ No channels found with the specified tags")
|
||
return
|
||
|
||
channels_to_process = [channels[0]] if first_only else channels
|
||
|
||
print("\n" + "=" * 80)
|
||
if first_only:
|
||
print(f"{'DRY RUN: CLEARING' if dry_run else 'CLEARING'} FIRST CHANNEL ONLY")
|
||
else:
|
||
print(f"{'DRY RUN: CLEARING' if dry_run else 'CLEARING'} ALL {len(channels)} CHANNELS")
|
||
print("=" * 80)
|
||
|
||
success_count = 0
|
||
skip_count = 0
|
||
|
||
for i, channel in enumerate(channels_to_process, 1):
|
||
print(f"\n[{i}/{len(channels_to_process)}] {channel.get('Name', 'Unknown')}")
|
||
if clear_channel_logos(client, channel['Id'], channel.get('Name'), dry_run):
|
||
success_count += 1
|
||
else:
|
||
skip_count += 1
|
||
|
||
print("\n" + "=" * 80)
|
||
print("SUMMARY")
|
||
print("=" * 80)
|
||
print(f"Total: {len(channels_to_process)}")
|
||
print(f"{'Would clear' if dry_run else 'Cleared'}: {success_count}")
|
||
print(f"Skipped: {skip_count}")
|
||
|
||
if dry_run:
|
||
print("\n💡 Dry run. Use --execute to perform actual deletions.")
|
||
if first_only and not dry_run:
|
||
print("\n💡 Processed first channel only. Remove --first-only to process all.")
|
||
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(description='Copy Emby Live TV channel dark logos to light versions')
|
||
parser.add_argument('--server', required=True, help='Emby server URL')
|
||
parser.add_argument('--api-key', required=True, help='Emby API key')
|
||
parser.add_argument('--execute', action='store_true', help='Actually perform updates')
|
||
parser.add_argument('--first-only', action='store_true', help='Only process first channel')
|
||
parser.add_argument('--clear', action='store_true', help='Clear all logos (Primary, LogoLight, LogoLightColor)')
|
||
parser.add_argument('--force', action='store_true', help='Overwrite existing logos (only applies to copy mode)')
|
||
parser.add_argument('--non-interactive', action='store_true', help='Skip confirmation prompts (for automated execution)')
|
||
parser.add_argument('--tags', help='Comma-separated list of tags to filter channels (e.g., "sports,news")')
|
||
args = parser.parse_args()
|
||
|
||
# Parse tags if provided
|
||
tags = None
|
||
if args.tags:
|
||
tags = [tag.strip() for tag in args.tags.split(',') if tag.strip()]
|
||
if not tags:
|
||
print("Warning: --tags specified but no valid tags found", file=sys.stderr)
|
||
tags = None
|
||
|
||
client = EmbyClient(args.server, args.api_key)
|
||
|
||
try:
|
||
print(f"Connecting to {args.server}...")
|
||
channels = client.get_live_tv_channels()
|
||
print(f"✓ Connected! Found {len(channels)} TV channels")
|
||
except Exception as e:
|
||
print(f"✗ Connection failed: {e}", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
if args.clear:
|
||
if args.execute:
|
||
target = "first channel" if args.first_only else "all channels"
|
||
tag_info = f" with tags [{', '.join(tags)}]" if tags else ""
|
||
if args.non_interactive or input(f"\nCLEAR ALL LOGOS from {target}{tag_info}? (yes/no): ").lower() == 'yes':
|
||
clear_logos(client, dry_run=False, first_only=args.first_only, tags=tags)
|
||
else:
|
||
print("Cancelled.")
|
||
else:
|
||
clear_logos(client, dry_run=True, first_only=args.first_only, tags=tags)
|
||
else:
|
||
if args.execute:
|
||
target = "first channel" if args.first_only else "all channels"
|
||
action = "overwrite logos in" if args.force else "modify"
|
||
tag_info = f" with tags [{', '.join(tags)}]" if tags else ""
|
||
if args.non_interactive or input(f"\n{action.capitalize()} {target}{tag_info}? (yes/no): ").lower() == 'yes':
|
||
update_channel_logos(client, dry_run=False, first_only=args.first_only, force=args.force, tags=tags)
|
||
else:
|
||
print("Cancelled.")
|
||
else:
|
||
update_channel_logos(client, dry_run=True, first_only=args.first_only, force=args.force, tags=tags)
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main()
|