import requests import json import sys from urllib.parse import urljoin import argparse from concurrent.futures import ThreadPoolExecutor, as_completed import threading import time import re from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry try: from tqdm import tqdm except ImportError: print("Installing tqdm for progress bars...") import subprocess subprocess.check_call([sys.executable, "-m", "pip", "install", "tqdm"]) from tqdm import tqdm def is_regex_pattern(pattern): """Check if pattern is wrapped in forward slashes for regex matching""" return pattern.startswith('/') and len(pattern) > 2 and '/' in pattern[1:] def matches_search(stream_name, search_term): """Check if stream name matches search term (supports regex with /pattern/flags)""" if is_regex_pattern(search_term): # Extract pattern and flags first_slash = 0 last_slash = search_term.rfind('/') if last_slash <= first_slash: # Invalid pattern, fall back to substring match return search_term.lower() in stream_name.lower() pattern = search_term[1:last_slash] flags_str = search_term[last_slash + 1:] if last_slash < len(search_term) - 1 else '' # Parse flags flags = 0 for flag in flags_str.lower(): if flag == 'i': flags |= re.IGNORECASE elif flag == 'm': flags |= re.MULTILINE elif flag == 's': flags |= re.DOTALL # Ignore unknown flags # Default to case-insensitive if no flags specified if not flags_str: flags = re.IGNORECASE try: return re.search(pattern, stream_name, flags) is not None except re.error: # If regex is invalid, fall back to substring match return search_term.lower() in stream_name.lower() else: # Regular substring match return search_term.lower() in stream_name.lower() def create_session_with_retries(): """Create a requests session with retry logic for 503 errors""" session = requests.Session() retry_strategy = Retry( total=2, # Reduced from 3 to 2 retries status_forcelist=[503], # Only retry 503 errors backoff_factor=0.5, # Reduced from 1 to 0.5 seconds allowed_methods=["GET"] ) adapter = HTTPAdapter(max_retries=retry_strategy) session.mount("http://", adapter) session.mount("https://", adapter) return session def search_category(api_url, username, password, channel_name, category, pbar, pbar_lock): """ Search a single category for channels (for multithreading) """ category_id = category.get('category_id') category_name = category.get('category_name', 'Unknown') # Create a new session for this thread with retry logic session = create_session_with_retries() categories_url = urljoin(api_url.rstrip('/'), '/player_api.php') matches_in_category = [] try: # Get streams for this category streams_params = { 'username': username, 'password': password, 'action': 'get_live_streams', 'category_id': category_id } streams_response = session.get(categories_url, params=streams_params) streams_response.raise_for_status() streams = streams_response.json() # Check if channel exists in this category (supports regex matching) for stream in streams: stream_name = stream.get('name', '') if matches_search(stream_name, channel_name): matches_in_category.append({ 'group_name': category_name, 'group_id': category_id, 'channel_name': stream.get('name'), 'channel_id': stream.get('stream_id'), 'stream_type': stream.get('stream_type'), 'stream_icon': stream.get('stream_icon'), 'epg_channel_id': stream.get('epg_channel_id') }) if matches_in_category: with pbar_lock: pbar.write(f" āœ… Found {len(matches_in_category)} match(es) in {category_name}") except requests.exceptions.RequestException as e: with pbar_lock: pbar.write(f" āŒ Error in {category_name}: {str(e)[:50]}...") finally: with pbar_lock: pbar.update(1) return matches_in_category def find_channel_in_groups(api_url, username, password, channel_name, max_workers=10): """ Find which groups contain a specific channel using Xtream Codes API (multithreaded) Args: api_url (str): Base URL for the Xtream Codes API username (str): Username for authentication password (str): Password for authentication channel_name (str): Channel name to search for (supports fuzzy matching) max_workers (int): Maximum number of threads to use Returns: list: List of categories containing the channel """ session = create_session_with_retries() try: # Get all live categories print("šŸ“” Fetching live categories...") categories_url = urljoin(api_url.rstrip('/'), '/player_api.php') categories_params = { 'username': username, 'password': password, 'action': 'get_live_categories' } categories_response = session.get(categories_url, params=categories_params) categories_response.raise_for_status() categories = categories_response.json() print(f"āœ… Found {len(categories)} categories to search") matching_groups = [] pbar_lock = threading.Lock() thread_start_counter = 0 counter_lock = threading.Lock() def search_with_delay(api_url, username, password, channel_name, category, pbar, pbar_lock): """Wrapper to add small staggered delay for first few requests only""" nonlocal thread_start_counter with counter_lock: # Only delay the first 20 threads to prevent initial thundering herd if thread_start_counter < 20: delay = thread_start_counter * 0.01 # 10ms delay for first 20 threads thread_start_counter += 1 else: delay = 0 thread_start_counter += 1 if delay > 0: time.sleep(delay) return search_category(api_url, username, password, channel_name, category, pbar, pbar_lock) # Search through categories with multithreading with tqdm(total=len(categories), desc="šŸ” Searching live categories", unit="cat", bar_format="{desc}: {percentage:3.0f}%|{bar}| {n_fmt}/{total_fmt} [Elapsed: {elapsed} | ETA: {remaining}]", smoothing=0.1) as pbar: with ThreadPoolExecutor(max_workers=max_workers) as executor: # Submit all tasks future_to_category = { executor.submit(search_with_delay, api_url, username, password, channel_name, category, pbar, pbar_lock): category for category in categories } # Collect results as they complete for future in as_completed(future_to_category): try: category_matches = future.result() matching_groups.extend(category_matches) except Exception as e: category = future_to_category[future] category_name = category.get('category_name', 'Unknown') with pbar_lock: pbar.write(f" āŒ Exception in {category_name}: {str(e)[:50]}...") return matching_groups except requests.exceptions.RequestException as e: print(f"āŒ Error connecting to Xtream Codes API: {e}") return [] def get_api_info(api_url, username, password): """Get basic API information for debugging""" session = create_session_with_retries() try: # Test basic connectivity and get categories print("šŸ”§ Testing API connectivity...") api_endpoint = urljoin(api_url.rstrip('/'), '/player_api.php') # Test live categories categories_params = { 'username': username, 'password': password, 'action': 'get_live_categories' } categories_response = session.get(api_endpoint, params=categories_params) print(f"Live categories endpoint status: {categories_response.status_code}") if categories_response.status_code == 200: categories = categories_response.json() print(f"Found {len(categories)} live categories") if categories: print("Sample category structure:") print(json.dumps(categories[0], indent=2)) # Test getting streams from first category if categories_response.status_code == 200 and categories: print("šŸ”§ Testing streams endpoint...") first_category_id = categories[0].get('category_id') streams_params = { 'username': username, 'password': password, 'action': 'get_live_streams', 'category_id': first_category_id } streams_response = session.get(api_endpoint, params=streams_params) print(f"Streams endpoint status: {streams_response.status_code}") if streams_response.status_code == 200: streams = streams_response.json() print(f"Found {len(streams)} streams in first category") if streams: print("Sample stream structure:") print(json.dumps(streams[0], indent=2)) except Exception as e: print(f"āŒ Error testing API: {e}") def search_content_type_category(api_url, username, password, search_term, category, content_type, stream_action, pbar, pbar_lock): """Search a single category within a content type (for multithreading)""" category_id = category.get('category_id') category_name = category.get('category_name', 'Unknown') session = create_session_with_retries() api_endpoint = urljoin(api_url.rstrip('/'), '/player_api.php') matches_in_category = [] try: # Get streams for this category stream_params = { 'username': username, 'password': password, 'action': stream_action, 'category_id': category_id } stream_response = session.get(api_endpoint, params=stream_params) if stream_response.status_code != 200: with pbar_lock: pbar.write(f" āŒ Failed to get {content_type.lower()} from {category_name}") return matches_in_category streams = stream_response.json() for stream in streams: stream_name = stream.get('name', '') if matches_search(stream_name, search_term): matches_in_category.append({ 'content_type': content_type, 'group_name': category_name, 'group_id': category_id, 'channel_name': stream.get('name'), 'channel_id': stream.get('stream_id') or stream.get('series_id'), 'stream_type': stream.get('stream_type'), 'stream_icon': stream.get('stream_icon'), 'epg_channel_id': stream.get('epg_channel_id') }) if matches_in_category: with pbar_lock: pbar.write(f" āœ… Found {len(matches_in_category)} match(es) in {category_name}") except requests.exceptions.RequestException as e: with pbar_lock: pbar.write(f" āŒ Error in {category_name}: {str(e)[:50]}...") finally: with pbar_lock: pbar.update(1) return matches_in_category def search_all_content_types(api_url, username, password, search_term, max_workers=15): """Search across live streams, VOD, and series (multithreaded)""" session = requests.Session() api_endpoint = urljoin(api_url.rstrip('/'), '/player_api.php') all_matches = [] content_types = [ ('Live Streams', 'get_live_categories', 'get_live_streams'), ('VOD', 'get_vod_categories', 'get_vod_streams'), ('Series', 'get_series_categories', 'get_series') ] for content_type, cat_action, stream_action in content_types: try: print(f"\nšŸ“ŗ Fetching {content_type} categories...") # Get categories cat_params = { 'username': username, 'password': password, 'action': cat_action } cat_response = session.get(api_endpoint, params=cat_params) if cat_response.status_code != 200: print(f"āŒ Could not get {content_type} categories (status: {cat_response.status_code})") continue categories = cat_response.json() print(f"āœ… Found {len(categories)} {content_type.lower()} categories") if not categories: continue pbar_lock = threading.Lock() thread_start_counter = 0 counter_lock = threading.Lock() def search_with_delay(api_url, username, password, search_term, category, content_type, stream_action, pbar, pbar_lock): """Wrapper to add small staggered delay for first few requests only""" nonlocal thread_start_counter with counter_lock: # Only delay the first 20 threads to prevent initial thundering herd if thread_start_counter < 20: delay = thread_start_counter * 0.01 # 10ms delay for first 20 threads thread_start_counter += 1 else: delay = 0 thread_start_counter += 1 if delay > 0: time.sleep(delay) return search_content_type_category(api_url, username, password, search_term, category, content_type, stream_action, pbar, pbar_lock) # Use multithreading for this content type with tqdm(total=len(categories), desc=f"šŸ” Searching {content_type.lower()}", unit="cat", bar_format="{desc}: {percentage:3.0f}%|{bar}| {n_fmt}/{total_fmt} [Elapsed: {elapsed} | ETA: {remaining}]", smoothing=0.1) as pbar: with ThreadPoolExecutor(max_workers=max_workers) as executor: # Submit all tasks for this content type future_to_category = { executor.submit(search_with_delay, api_url, username, password, search_term, category, content_type, stream_action, pbar, pbar_lock): category for category in categories } # Collect results as they complete for future in as_completed(future_to_category): try: category_matches = future.result() all_matches.extend(category_matches) except Exception as e: category = future_to_category[future] category_name = category.get('category_name', 'Unknown') with pbar_lock: pbar.write(f" āŒ Exception in {category_name}: {str(e)[:50]}...") except Exception as e: print(f"āŒ Error searching {content_type}: {e}") continue return all_matches def interactive_search(api_url, username, password, max_workers=10): """Interactive search mode""" print(f"Connected to: {api_url}") print("Commands: live: | vod: | series: | all: | /regex/ | debug | quit") print("Tip: Use /pattern/flags for regex (e.g., /^CNN/i or /HD$/) - flags: i=case-insensitive, m=multiline, s=dotall") print("-" * 50) while True: try: query = input("\nšŸ” Search: ").strip() if not query: continue if query.lower() in ['quit', 'exit', 'q']: print("šŸ‘‹ Goodbye!") break if query.lower() == 'debug': print("\n=== API Debug Info ===") get_api_info(api_url, username, password) print("=====================") continue # Parse search type and term search_all_types = False search_term = query if ':' in query: search_type, search_term = query.split(':', 1) search_type = search_type.lower().strip() search_term = search_term.strip() if search_type == 'all': search_all_types = True elif search_type in ['vod', 'series']: # For now, we'll treat these as all-types searches # You could implement specific VOD/series-only searches later search_all_types = True print(f"šŸ” Searching {search_type.upper()} for '{search_term}'...") elif search_type == 'live': print(f"šŸ” Searching Live Streams for '{search_term}'...") else: print(f"ā“ Unknown search type '{search_type}', searching live streams for '{query}'...") search_term = query else: print(f"šŸ” Searching Live Streams for '{search_term}'...") # Perform search if search_all_types: matching_groups = search_all_content_types( api_url, username, password, search_term, max_workers ) else: matching_groups = find_channel_in_groups( api_url, username, password, search_term, max_workers ) # Display results if matching_groups: print(f"\nāœ… Found {len(matching_groups)} result(s) for '{search_term}':") for i, match in enumerate(matching_groups, 1): content_type = match.get('content_type', 'Live Stream') type_short = content_type.replace(' Streams', '').replace(' ', '') print(f"{i:2d}. [{type_short}] {match['group_name']} → {match['channel_name']}") else: print(f"āŒ No results found for '{search_term}'") except KeyboardInterrupt: print("\n\nšŸ‘‹ Goodbye!") break except Exception as e: print(f"āŒ Error during search: {e}") def main(): parser = argparse.ArgumentParser(description='Xtream Codes Channel Search Tool') parser.add_argument('--api-url', required=True, help='Xtream Codes API base URL') parser.add_argument('--username', required=True, help='Username for authentication') parser.add_argument('--password', required=True, help='Password for authentication') # Optional one-time search arguments parser.add_argument('--channel', help='Channel name to search for (one-time search)') parser.add_argument('--debug', action='store_true', help='Show API debugging info') parser.add_argument('--all-types', action='store_true', help='Search in Live, VOD, and Series (one-time search)') parser.add_argument('--interactive', '-i', action='store_true', help='Start interactive search mode') parser.add_argument('--max-workers', type=int, default=10, help='Maximum number of concurrent threads (default: 10)') args = parser.parse_args() # Test connection first print("šŸ”§ Testing connection to Xtream Codes API...") try: session = create_session_with_retries() test_url = urljoin(args.api_url.rstrip('/'), '/player_api.php') test_params = { 'username': args.username, 'password': args.password, 'action': 'get_live_categories' } response = session.get(test_url, params=test_params, timeout=10) if response.status_code == 200: print("āœ… Connection successful!") else: print(f"āŒ Connection failed with status {response.status_code}") return except Exception as e: print(f"āŒ Connection failed: {e}") return if args.debug: print("\n=== API Debug Info ===") get_api_info(args.api_url, args.username, args.password) print("=====================\n") # If channel is provided, do one-time search if args.channel: print(f"šŸ” Searching for '{args.channel}' in Xtream Codes API...") if args.all_types: matching_groups = search_all_content_types( args.api_url, args.username, args.password, args.channel, args.max_workers ) else: matching_groups = find_channel_in_groups( args.api_url, args.username, args.password, args.channel, args.max_workers ) print(f"\n{'='*50}") if matching_groups: print(f"āœ… Found {len(matching_groups)} result(s) for '{args.channel}':") for i, match in enumerate(matching_groups, 1): content_type = match.get('content_type', 'Live Stream') type_short = content_type.replace(' Streams', '').replace(' ', '') print(f"{i:2d}. [{type_short}] {match['group_name']} → {match['channel_name']}") else: print(f"āŒ No groups found containing '{args.channel}'") # Start interactive mode if requested or no channel provided if args.interactive or not args.channel: interactive_search(args.api_url, args.username, args.password, args.max_workers) if __name__ == "__main__": main()