#!/usr/bin/env python3 """ Emby Live TV Channel Logo Updater Copies 'Logo (Dark Version)' to 'Logo (Light Version)' and 'Logo (Light with Colour)'. """ import argparse import base64 import json import sys from typing import Dict, List, Optional import urllib.request import urllib.error import urllib.parse class EmbyClient: """Client for interacting with the Emby Server API.""" def __init__(self, server_url: str, api_key: str): self.server_url = server_url.rstrip('/') self.api_key = api_key def _make_request(self, endpoint: str, method: str = 'GET', data: Optional[Dict] = None) -> Dict: url = f"{self.server_url}{endpoint}" if method == 'GET': separator = '&' if '?' in url else '?' url = f"{url}{separator}api_key={self.api_key}" request = urllib.request.Request(url, method=method) request.add_header('X-Emby-Token', self.api_key) if data is not None: request.add_header('Content-Type', 'application/json') request.data = json.dumps(data).encode('utf-8') try: with urllib.request.urlopen(request) as response: if response.status == 204: return {} return json.loads(response.read().decode('utf-8')) except urllib.error.HTTPError as e: error_body = e.read().decode('utf-8') if e.fp else 'No error details' raise Exception(f"HTTP {e.code}: {error_body}") def get_live_tv_channels(self) -> List[Dict]: response = self._make_request('/LiveTv/Channels?Type=TV&EnableImages=true') return response.get('Items', []) def get_channel_by_id(self, channel_id: str) -> Dict: return self._make_request(f'/LiveTv/Channels/{channel_id}') def download_image(self, item_id: str, image_type: str) -> bytes: url = f"{self.server_url}/Items/{item_id}/Images/{image_type}?api_key={self.api_key}" with urllib.request.urlopen(urllib.request.Request(url)) as response: return response.read() def upload_image(self, item_id: str, image_type: str, image_data: bytes) -> None: if image_data.startswith(b'\x89PNG'): content_type = 'image/png' elif image_data.startswith(b'\xff\xd8'): content_type = 'image/jpeg' elif image_data.startswith(b'GIF'): content_type = 'image/gif' else: content_type = 'application/octet-stream' url = f"{self.server_url}/Items/{item_id}/Images/{image_type}" request = urllib.request.Request(url, method='POST') request.add_header('X-Emby-Token', self.api_key) request.add_header('Content-Type', content_type) request.data = base64.b64encode(image_data).decode('utf-8').encode('utf-8') try: with urllib.request.urlopen(request): pass except urllib.error.HTTPError as e: error_body = e.read().decode('utf-8') if e.fp else 'No error details' raise Exception(f"Failed to upload {image_type}: HTTP {e.code} - {error_body}") def delete_image(self, item_id: str, image_type: str) -> None: url = f"{self.server_url}/Items/{item_id}/Images/{image_type}" request = urllib.request.Request(url, method='DELETE') request.add_header('X-Emby-Token', self.api_key) try: with urllib.request.urlopen(request): pass except urllib.error.HTTPError as e: error_body = e.read().decode('utf-8') if e.fp else 'No error details' raise Exception(f"Failed to delete {image_type}: HTTP {e.code} - {error_body}") def copy_primary_to_light_logos(client: EmbyClient, channel_id: str, channel_name: str, dry_run: bool = True) -> bool: try: channel_data = client.get_channel_by_id(channel_id) image_tags = channel_data.get('ImageTags', {}) if not image_tags.get('Primary'): print(f" āš ļø No Primary image, skipping") return False has_logo_light = 'LogoLight' in image_tags has_logo_light_color = 'LogoLightColor' in image_tags if has_logo_light and has_logo_light_color: print(f" āœ“ Already has both logos") return True missing = [] if not has_logo_light: missing.append('LogoLight') if not has_logo_light_color: missing.append('LogoLightColor') if dry_run: print(f" → Would upload: {', '.join(missing)}") return True print(f" šŸ“„ Downloading Primary...") primary_image_data = client.download_image(channel_id, 'Primary') if not has_logo_light: print(f" šŸ“¤ Uploading LogoLight...") client.upload_image(channel_id, 'LogoLight', primary_image_data) if not has_logo_light_color: print(f" šŸ“¤ Uploading LogoLightColor...") client.upload_image(channel_id, 'LogoLightColor', primary_image_data) print(f" āœ“ Successfully uploaded") return True except Exception as e: print(f" āœ— Error: {e}") import traceback traceback.print_exc() return False def clear_channel_logos(client: EmbyClient, channel_id: str, channel_name: str, dry_run: bool = True) -> bool: try: channel_data = client.get_channel_by_id(channel_id) image_tags = channel_data.get('ImageTags', {}) image_types = ['Primary', 'LogoLight', 'LogoLightColor'] existing = [img_type for img_type in image_types if img_type in image_tags] if not existing: print(f" ā„¹ļø No logos to clear") return True if dry_run: print(f" → Would delete: {', '.join(existing)}") return True for img_type in existing: print(f" šŸ—‘ļø Deleting {img_type}...") client.delete_image(channel_id, img_type) print(f" āœ“ Successfully cleared {len(existing)} logo(s)") return True except Exception as e: print(f" āœ— Error: {e}") import traceback traceback.print_exc() return False def update_channel_logos(client: EmbyClient, dry_run: bool = True, first_only: bool = False) -> None: channels = client.get_live_tv_channels() channels_to_process = [channels[0]] if first_only else channels print("\n" + "=" * 80) if first_only: print(f"{'DRY RUN: TESTING ON' if dry_run else 'PROCESSING'} FIRST CHANNEL ONLY") else: print(f"{'DRY RUN: SIMULATING' if dry_run else 'PROCESSING'} ALL {len(channels)} CHANNELS") print("=" * 80) success_count = 0 skip_count = 0 for i, channel in enumerate(channels_to_process, 1): print(f"\n[{i}/{len(channels_to_process)}] {channel.get('Name', 'Unknown')}") if copy_primary_to_light_logos(client, channel['Id'], channel.get('Name'), dry_run): success_count += 1 else: skip_count += 1 print("\n" + "=" * 80) print("SUMMARY") print("=" * 80) print(f"Total: {len(channels_to_process)}") print(f"{'Would update' if dry_run else 'Updated'}: {success_count}") print(f"Skipped: {skip_count}") if dry_run: print("\nšŸ’” Dry run. Use --execute to perform actual updates.") if first_only and not dry_run: print("\nšŸ’” Processed first channel only. Remove --first-only to process all.") def clear_logos(client: EmbyClient, dry_run: bool = True, first_only: bool = False) -> None: channels = client.get_live_tv_channels() channels_to_process = [channels[0]] if first_only else channels print("\n" + "=" * 80) if first_only: print(f"{'DRY RUN: CLEARING' if dry_run else 'CLEARING'} FIRST CHANNEL ONLY") else: print(f"{'DRY RUN: CLEARING' if dry_run else 'CLEARING'} ALL {len(channels)} CHANNELS") print("=" * 80) success_count = 0 skip_count = 0 for i, channel in enumerate(channels_to_process, 1): print(f"\n[{i}/{len(channels_to_process)}] {channel.get('Name', 'Unknown')}") if clear_channel_logos(client, channel['Id'], channel.get('Name'), dry_run): success_count += 1 else: skip_count += 1 print("\n" + "=" * 80) print("SUMMARY") print("=" * 80) print(f"Total: {len(channels_to_process)}") print(f"{'Would clear' if dry_run else 'Cleared'}: {success_count}") print(f"Skipped: {skip_count}") if dry_run: print("\nšŸ’” Dry run. Use --execute to perform actual deletions.") if first_only and not dry_run: print("\nšŸ’” Processed first channel only. Remove --first-only to process all.") def main(): parser = argparse.ArgumentParser(description='Copy Emby Live TV channel dark logos to light versions') parser.add_argument('--server', required=True, help='Emby server URL') parser.add_argument('--api-key', required=True, help='Emby API key') parser.add_argument('--execute', action='store_true', help='Actually perform updates') parser.add_argument('--first-only', action='store_true', help='Only process first channel') parser.add_argument('--clear', action='store_true', help='Clear all logos (Primary, LogoLight, LogoLightColor)') args = parser.parse_args() client = EmbyClient(args.server, args.api_key) try: print(f"Connecting to {args.server}...") channels = client.get_live_tv_channels() print(f"āœ“ Connected! Found {len(channels)} TV channels") except Exception as e: print(f"āœ— Connection failed: {e}", file=sys.stderr) sys.exit(1) if args.clear: if args.execute: target = "first channel" if args.first_only else "all channels" if input(f"\nCLEAR ALL LOGOS from {target}? (yes/no): ").lower() == 'yes': clear_logos(client, dry_run=False, first_only=args.first_only) else: print("Cancelled.") else: clear_logos(client, dry_run=True, first_only=args.first_only) else: if args.execute: target = "first channel" if args.first_only else "all channels" if input(f"\nModify {target}? (yes/no): ").lower() == 'yes': update_channel_logos(client, dry_run=False, first_only=args.first_only) else: print("Cancelled.") else: update_channel_logos(client, dry_run=True, first_only=args.first_only) if __name__ == '__main__': main()