552 lines
23 KiB
Python
552 lines
23 KiB
Python
import requests
|
|
import json
|
|
import sys
|
|
from urllib.parse import urljoin
|
|
import argparse
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
import threading
|
|
import time
|
|
import re
|
|
from requests.adapters import HTTPAdapter
|
|
from urllib3.util.retry import Retry
|
|
try:
|
|
from tqdm import tqdm
|
|
except ImportError:
|
|
print("Installing tqdm for progress bars...")
|
|
import subprocess
|
|
subprocess.check_call([sys.executable, "-m", "pip", "install", "tqdm"])
|
|
from tqdm import tqdm
|
|
|
|
def is_regex_pattern(pattern):
|
|
"""Check if pattern is wrapped in forward slashes for regex matching"""
|
|
return pattern.startswith('/') and len(pattern) > 2 and '/' in pattern[1:]
|
|
|
|
def matches_search(stream_name, search_term):
|
|
"""Check if stream name matches search term (supports regex with /pattern/flags)"""
|
|
if is_regex_pattern(search_term):
|
|
# Extract pattern and flags
|
|
first_slash = 0
|
|
last_slash = search_term.rfind('/')
|
|
|
|
if last_slash <= first_slash:
|
|
# Invalid pattern, fall back to substring match
|
|
return search_term.lower() in stream_name.lower()
|
|
|
|
pattern = search_term[1:last_slash]
|
|
flags_str = search_term[last_slash + 1:] if last_slash < len(search_term) - 1 else ''
|
|
|
|
# Parse flags
|
|
flags = 0
|
|
for flag in flags_str.lower():
|
|
if flag == 'i':
|
|
flags |= re.IGNORECASE
|
|
elif flag == 'm':
|
|
flags |= re.MULTILINE
|
|
elif flag == 's':
|
|
flags |= re.DOTALL
|
|
# Ignore unknown flags
|
|
|
|
# Default to case-insensitive if no flags specified
|
|
if not flags_str:
|
|
flags = re.IGNORECASE
|
|
|
|
try:
|
|
return re.search(pattern, stream_name, flags) is not None
|
|
except re.error:
|
|
# If regex is invalid, fall back to substring match
|
|
return search_term.lower() in stream_name.lower()
|
|
else:
|
|
# Regular substring match
|
|
return search_term.lower() in stream_name.lower()
|
|
|
|
def create_session_with_retries():
|
|
"""Create a requests session with retry logic for 503 errors"""
|
|
session = requests.Session()
|
|
retry_strategy = Retry(
|
|
total=2, # Reduced from 3 to 2 retries
|
|
status_forcelist=[503], # Only retry 503 errors
|
|
backoff_factor=0.5, # Reduced from 1 to 0.5 seconds
|
|
allowed_methods=["GET"]
|
|
)
|
|
adapter = HTTPAdapter(max_retries=retry_strategy)
|
|
session.mount("http://", adapter)
|
|
session.mount("https://", adapter)
|
|
return session
|
|
|
|
def search_category(api_url, username, password, channel_name, category, pbar, pbar_lock):
|
|
"""
|
|
Search a single category for channels (for multithreading)
|
|
"""
|
|
category_id = category.get('category_id')
|
|
category_name = category.get('category_name', 'Unknown')
|
|
|
|
# Create a new session for this thread with retry logic
|
|
session = create_session_with_retries()
|
|
categories_url = urljoin(api_url.rstrip('/'), '/player_api.php')
|
|
|
|
matches_in_category = []
|
|
|
|
try:
|
|
# Get streams for this category
|
|
streams_params = {
|
|
'username': username,
|
|
'password': password,
|
|
'action': 'get_live_streams',
|
|
'category_id': category_id
|
|
}
|
|
|
|
streams_response = session.get(categories_url, params=streams_params)
|
|
streams_response.raise_for_status()
|
|
streams = streams_response.json()
|
|
|
|
# Check if channel exists in this category (supports regex matching)
|
|
for stream in streams:
|
|
stream_name = stream.get('name', '')
|
|
if matches_search(stream_name, channel_name):
|
|
matches_in_category.append({
|
|
'group_name': category_name,
|
|
'group_id': category_id,
|
|
'channel_name': stream.get('name'),
|
|
'channel_id': stream.get('stream_id'),
|
|
'stream_type': stream.get('stream_type'),
|
|
'stream_icon': stream.get('stream_icon'),
|
|
'epg_channel_id': stream.get('epg_channel_id')
|
|
})
|
|
|
|
if matches_in_category:
|
|
with pbar_lock:
|
|
pbar.write(f" ✅ Found {len(matches_in_category)} match(es) in {category_name}")
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
with pbar_lock:
|
|
pbar.write(f" ❌ Error in {category_name}: {str(e)[:50]}...")
|
|
|
|
finally:
|
|
with pbar_lock:
|
|
pbar.update(1)
|
|
|
|
return matches_in_category
|
|
|
|
def find_channel_in_groups(api_url, username, password, channel_name, max_workers=10):
|
|
"""
|
|
Find which groups contain a specific channel using Xtream Codes API (multithreaded)
|
|
|
|
Args:
|
|
api_url (str): Base URL for the Xtream Codes API
|
|
username (str): Username for authentication
|
|
password (str): Password for authentication
|
|
channel_name (str): Channel name to search for (supports fuzzy matching)
|
|
max_workers (int): Maximum number of threads to use
|
|
|
|
Returns:
|
|
list: List of categories containing the channel
|
|
"""
|
|
session = create_session_with_retries()
|
|
|
|
try:
|
|
# Get all live categories
|
|
print("📡 Fetching live categories...")
|
|
categories_url = urljoin(api_url.rstrip('/'), '/player_api.php')
|
|
categories_params = {
|
|
'username': username,
|
|
'password': password,
|
|
'action': 'get_live_categories'
|
|
}
|
|
|
|
categories_response = session.get(categories_url, params=categories_params)
|
|
categories_response.raise_for_status()
|
|
categories = categories_response.json()
|
|
|
|
print(f"✅ Found {len(categories)} categories to search")
|
|
|
|
matching_groups = []
|
|
pbar_lock = threading.Lock()
|
|
thread_start_counter = 0
|
|
counter_lock = threading.Lock()
|
|
|
|
def search_with_delay(api_url, username, password, channel_name, category, pbar, pbar_lock):
|
|
"""Wrapper to add small staggered delay for first few requests only"""
|
|
nonlocal thread_start_counter
|
|
with counter_lock:
|
|
# Only delay the first 20 threads to prevent initial thundering herd
|
|
if thread_start_counter < 20:
|
|
delay = thread_start_counter * 0.01 # 10ms delay for first 20 threads
|
|
thread_start_counter += 1
|
|
else:
|
|
delay = 0
|
|
thread_start_counter += 1
|
|
|
|
if delay > 0:
|
|
time.sleep(delay)
|
|
|
|
return search_category(api_url, username, password, channel_name, category, pbar, pbar_lock)
|
|
|
|
# Search through categories with multithreading
|
|
with tqdm(total=len(categories), desc="🔍 Searching live categories", unit="cat",
|
|
bar_format="{desc}: {percentage:3.0f}%|{bar}| {n_fmt}/{total_fmt} [Elapsed: {elapsed} | ETA: {remaining}]",
|
|
smoothing=0.1) as pbar:
|
|
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
|
# Submit all tasks
|
|
future_to_category = {
|
|
executor.submit(search_with_delay, api_url, username, password,
|
|
channel_name, category, pbar, pbar_lock): category
|
|
for category in categories
|
|
}
|
|
|
|
# Collect results as they complete
|
|
for future in as_completed(future_to_category):
|
|
try:
|
|
category_matches = future.result()
|
|
matching_groups.extend(category_matches)
|
|
except Exception as e:
|
|
category = future_to_category[future]
|
|
category_name = category.get('category_name', 'Unknown')
|
|
with pbar_lock:
|
|
pbar.write(f" ❌ Exception in {category_name}: {str(e)[:50]}...")
|
|
|
|
return matching_groups
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"❌ Error connecting to Xtream Codes API: {e}")
|
|
return []
|
|
|
|
def get_api_info(api_url, username, password):
|
|
"""Get basic API information for debugging"""
|
|
session = create_session_with_retries()
|
|
|
|
try:
|
|
# Test basic connectivity and get categories
|
|
print("🔧 Testing API connectivity...")
|
|
api_endpoint = urljoin(api_url.rstrip('/'), '/player_api.php')
|
|
|
|
# Test live categories
|
|
categories_params = {
|
|
'username': username,
|
|
'password': password,
|
|
'action': 'get_live_categories'
|
|
}
|
|
|
|
categories_response = session.get(api_endpoint, params=categories_params)
|
|
print(f"Live categories endpoint status: {categories_response.status_code}")
|
|
|
|
if categories_response.status_code == 200:
|
|
categories = categories_response.json()
|
|
print(f"Found {len(categories)} live categories")
|
|
if categories:
|
|
print("Sample category structure:")
|
|
print(json.dumps(categories[0], indent=2))
|
|
|
|
# Test getting streams from first category
|
|
if categories_response.status_code == 200 and categories:
|
|
print("🔧 Testing streams endpoint...")
|
|
first_category_id = categories[0].get('category_id')
|
|
streams_params = {
|
|
'username': username,
|
|
'password': password,
|
|
'action': 'get_live_streams',
|
|
'category_id': first_category_id
|
|
}
|
|
|
|
streams_response = session.get(api_endpoint, params=streams_params)
|
|
print(f"Streams endpoint status: {streams_response.status_code}")
|
|
|
|
if streams_response.status_code == 200:
|
|
streams = streams_response.json()
|
|
print(f"Found {len(streams)} streams in first category")
|
|
if streams:
|
|
print("Sample stream structure:")
|
|
print(json.dumps(streams[0], indent=2))
|
|
|
|
except Exception as e:
|
|
print(f"❌ Error testing API: {e}")
|
|
|
|
def search_content_type_category(api_url, username, password, search_term, category, content_type, stream_action, pbar, pbar_lock):
|
|
"""Search a single category within a content type (for multithreading)"""
|
|
category_id = category.get('category_id')
|
|
category_name = category.get('category_name', 'Unknown')
|
|
|
|
session = create_session_with_retries()
|
|
api_endpoint = urljoin(api_url.rstrip('/'), '/player_api.php')
|
|
|
|
matches_in_category = []
|
|
|
|
try:
|
|
# Get streams for this category
|
|
stream_params = {
|
|
'username': username,
|
|
'password': password,
|
|
'action': stream_action,
|
|
'category_id': category_id
|
|
}
|
|
|
|
stream_response = session.get(api_endpoint, params=stream_params)
|
|
if stream_response.status_code != 200:
|
|
with pbar_lock:
|
|
pbar.write(f" ❌ Failed to get {content_type.lower()} from {category_name}")
|
|
return matches_in_category
|
|
|
|
streams = stream_response.json()
|
|
|
|
for stream in streams:
|
|
stream_name = stream.get('name', '')
|
|
if matches_search(stream_name, search_term):
|
|
matches_in_category.append({
|
|
'content_type': content_type,
|
|
'group_name': category_name,
|
|
'group_id': category_id,
|
|
'channel_name': stream.get('name'),
|
|
'channel_id': stream.get('stream_id') or stream.get('series_id'),
|
|
'stream_type': stream.get('stream_type'),
|
|
'stream_icon': stream.get('stream_icon'),
|
|
'epg_channel_id': stream.get('epg_channel_id')
|
|
})
|
|
|
|
if matches_in_category:
|
|
with pbar_lock:
|
|
pbar.write(f" ✅ Found {len(matches_in_category)} match(es) in {category_name}")
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
with pbar_lock:
|
|
pbar.write(f" ❌ Error in {category_name}: {str(e)[:50]}...")
|
|
|
|
finally:
|
|
with pbar_lock:
|
|
pbar.update(1)
|
|
|
|
return matches_in_category
|
|
|
|
def search_all_content_types(api_url, username, password, search_term, max_workers=15):
|
|
"""Search across live streams, VOD, and series (multithreaded)"""
|
|
session = requests.Session()
|
|
api_endpoint = urljoin(api_url.rstrip('/'), '/player_api.php')
|
|
all_matches = []
|
|
|
|
content_types = [
|
|
('Live Streams', 'get_live_categories', 'get_live_streams'),
|
|
('VOD', 'get_vod_categories', 'get_vod_streams'),
|
|
('Series', 'get_series_categories', 'get_series')
|
|
]
|
|
|
|
for content_type, cat_action, stream_action in content_types:
|
|
try:
|
|
print(f"\n📺 Fetching {content_type} categories...")
|
|
|
|
# Get categories
|
|
cat_params = {
|
|
'username': username,
|
|
'password': password,
|
|
'action': cat_action
|
|
}
|
|
|
|
cat_response = session.get(api_endpoint, params=cat_params)
|
|
if cat_response.status_code != 200:
|
|
print(f"❌ Could not get {content_type} categories (status: {cat_response.status_code})")
|
|
continue
|
|
|
|
categories = cat_response.json()
|
|
print(f"✅ Found {len(categories)} {content_type.lower()} categories")
|
|
|
|
if not categories:
|
|
continue
|
|
|
|
pbar_lock = threading.Lock()
|
|
thread_start_counter = 0
|
|
counter_lock = threading.Lock()
|
|
|
|
def search_with_delay(api_url, username, password, search_term, category, content_type, stream_action, pbar, pbar_lock):
|
|
"""Wrapper to add small staggered delay for first few requests only"""
|
|
nonlocal thread_start_counter
|
|
with counter_lock:
|
|
# Only delay the first 20 threads to prevent initial thundering herd
|
|
if thread_start_counter < 20:
|
|
delay = thread_start_counter * 0.01 # 10ms delay for first 20 threads
|
|
thread_start_counter += 1
|
|
else:
|
|
delay = 0
|
|
thread_start_counter += 1
|
|
|
|
if delay > 0:
|
|
time.sleep(delay)
|
|
|
|
return search_content_type_category(api_url, username, password, search_term, category, content_type, stream_action, pbar, pbar_lock)
|
|
|
|
# Use multithreading for this content type
|
|
with tqdm(total=len(categories), desc=f"🔍 Searching {content_type.lower()}", unit="cat",
|
|
bar_format="{desc}: {percentage:3.0f}%|{bar}| {n_fmt}/{total_fmt} [Elapsed: {elapsed} | ETA: {remaining}]",
|
|
smoothing=0.1) as pbar:
|
|
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
|
# Submit all tasks for this content type
|
|
future_to_category = {
|
|
executor.submit(search_with_delay, api_url, username, password,
|
|
search_term, category, content_type, stream_action, pbar, pbar_lock): category
|
|
for category in categories
|
|
}
|
|
|
|
# Collect results as they complete
|
|
for future in as_completed(future_to_category):
|
|
try:
|
|
category_matches = future.result()
|
|
all_matches.extend(category_matches)
|
|
except Exception as e:
|
|
category = future_to_category[future]
|
|
category_name = category.get('category_name', 'Unknown')
|
|
with pbar_lock:
|
|
pbar.write(f" ❌ Exception in {category_name}: {str(e)[:50]}...")
|
|
|
|
except Exception as e:
|
|
print(f"❌ Error searching {content_type}: {e}")
|
|
continue
|
|
|
|
return all_matches
|
|
|
|
def interactive_search(api_url, username, password, max_workers=10):
|
|
"""Interactive search mode"""
|
|
print(f"Connected to: {api_url}")
|
|
print("Commands: live:<term> | vod:<term> | series:<term> | all:<term> | /regex/ | debug | quit")
|
|
print("Tip: Use /pattern/flags for regex (e.g., /^CNN/i or /HD$/) - flags: i=case-insensitive, m=multiline, s=dotall")
|
|
print("-" * 50)
|
|
|
|
while True:
|
|
try:
|
|
query = input("\n🔍 Search: ").strip()
|
|
|
|
if not query:
|
|
continue
|
|
|
|
if query.lower() in ['quit', 'exit', 'q']:
|
|
print("👋 Goodbye!")
|
|
break
|
|
|
|
if query.lower() == 'debug':
|
|
print("\n=== API Debug Info ===")
|
|
get_api_info(api_url, username, password)
|
|
print("=====================")
|
|
continue
|
|
|
|
# Parse search type and term
|
|
search_all_types = False
|
|
search_term = query
|
|
|
|
if ':' in query:
|
|
search_type, search_term = query.split(':', 1)
|
|
search_type = search_type.lower().strip()
|
|
search_term = search_term.strip()
|
|
|
|
if search_type == 'all':
|
|
search_all_types = True
|
|
elif search_type in ['vod', 'series']:
|
|
# For now, we'll treat these as all-types searches
|
|
# You could implement specific VOD/series-only searches later
|
|
search_all_types = True
|
|
print(f"🔍 Searching {search_type.upper()} for '{search_term}'...")
|
|
elif search_type == 'live':
|
|
print(f"🔍 Searching Live Streams for '{search_term}'...")
|
|
else:
|
|
print(f"❓ Unknown search type '{search_type}', searching live streams for '{query}'...")
|
|
search_term = query
|
|
else:
|
|
print(f"🔍 Searching Live Streams for '{search_term}'...")
|
|
|
|
# Perform search
|
|
if search_all_types:
|
|
matching_groups = search_all_content_types(
|
|
api_url, username, password, search_term, max_workers
|
|
)
|
|
else:
|
|
matching_groups = find_channel_in_groups(
|
|
api_url, username, password, search_term, max_workers
|
|
)
|
|
|
|
# Display results
|
|
if matching_groups:
|
|
print(f"\n✅ Found {len(matching_groups)} result(s) for '{search_term}':")
|
|
for i, match in enumerate(matching_groups, 1):
|
|
content_type = match.get('content_type', 'Live Stream')
|
|
type_short = content_type.replace(' Streams', '').replace(' ', '')
|
|
print(f"{i:2d}. [{type_short}] {match['group_name']} → {match['channel_name']}")
|
|
else:
|
|
print(f"❌ No results found for '{search_term}'")
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n\n👋 Goodbye!")
|
|
break
|
|
except Exception as e:
|
|
print(f"❌ Error during search: {e}")
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Xtream Codes Channel Search Tool')
|
|
parser.add_argument('--api-url', required=True, help='Xtream Codes API base URL')
|
|
parser.add_argument('--username', required=True, help='Username for authentication')
|
|
parser.add_argument('--password', required=True, help='Password for authentication')
|
|
|
|
# Optional one-time search arguments
|
|
parser.add_argument('--channel', help='Channel name to search for (one-time search)')
|
|
parser.add_argument('--debug', action='store_true', help='Show API debugging info')
|
|
parser.add_argument('--all-types', action='store_true', help='Search in Live, VOD, and Series (one-time search)')
|
|
parser.add_argument('--interactive', '-i', action='store_true', help='Start interactive search mode')
|
|
parser.add_argument('--max-workers', type=int, default=10, help='Maximum number of concurrent threads (default: 10)')
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Test connection first
|
|
print("🔧 Testing connection to Xtream Codes API...")
|
|
try:
|
|
session = create_session_with_retries()
|
|
test_url = urljoin(args.api_url.rstrip('/'), '/player_api.php')
|
|
test_params = {
|
|
'username': args.username,
|
|
'password': args.password,
|
|
'action': 'get_live_categories'
|
|
}
|
|
response = session.get(test_url, params=test_params, timeout=10)
|
|
if response.status_code == 200:
|
|
print("✅ Connection successful!")
|
|
else:
|
|
print(f"❌ Connection failed with status {response.status_code}")
|
|
return
|
|
except Exception as e:
|
|
print(f"❌ Connection failed: {e}")
|
|
return
|
|
|
|
if args.debug:
|
|
print("\n=== API Debug Info ===")
|
|
get_api_info(args.api_url, args.username, args.password)
|
|
print("=====================\n")
|
|
|
|
# If channel is provided, do one-time search
|
|
if args.channel:
|
|
print(f"🔍 Searching for '{args.channel}' in Xtream Codes API...")
|
|
|
|
if args.all_types:
|
|
matching_groups = search_all_content_types(
|
|
args.api_url,
|
|
args.username,
|
|
args.password,
|
|
args.channel,
|
|
args.max_workers
|
|
)
|
|
else:
|
|
matching_groups = find_channel_in_groups(
|
|
args.api_url,
|
|
args.username,
|
|
args.password,
|
|
args.channel,
|
|
args.max_workers
|
|
)
|
|
|
|
print(f"\n{'='*50}")
|
|
if matching_groups:
|
|
print(f"✅ Found {len(matching_groups)} result(s) for '{args.channel}':")
|
|
for i, match in enumerate(matching_groups, 1):
|
|
content_type = match.get('content_type', 'Live Stream')
|
|
type_short = content_type.replace(' Streams', '').replace(' ', '')
|
|
print(f"{i:2d}. [{type_short}] {match['group_name']} → {match['channel_name']}")
|
|
else:
|
|
print(f"❌ No groups found containing '{args.channel}'")
|
|
|
|
# Start interactive mode if requested or no channel provided
|
|
if args.interactive or not args.channel:
|
|
interactive_search(args.api_url, args.username, args.password, args.max_workers)
|
|
|
|
if __name__ == "__main__":
|
|
main() |