1
0
Files
xtream-search/xtream-search.py

552 lines
23 KiB
Python

import requests
import json
import sys
from urllib.parse import urljoin
import argparse
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
import time
import re
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
try:
from tqdm import tqdm
except ImportError:
print("Installing tqdm for progress bars...")
import subprocess
subprocess.check_call([sys.executable, "-m", "pip", "install", "tqdm"])
from tqdm import tqdm
def is_regex_pattern(pattern):
"""Check if pattern is wrapped in forward slashes for regex matching"""
return pattern.startswith('/') and len(pattern) > 2 and '/' in pattern[1:]
def matches_search(stream_name, search_term):
"""Check if stream name matches search term (supports regex with /pattern/flags)"""
if is_regex_pattern(search_term):
# Extract pattern and flags
first_slash = 0
last_slash = search_term.rfind('/')
if last_slash <= first_slash:
# Invalid pattern, fall back to substring match
return search_term.lower() in stream_name.lower()
pattern = search_term[1:last_slash]
flags_str = search_term[last_slash + 1:] if last_slash < len(search_term) - 1 else ''
# Parse flags
flags = 0
for flag in flags_str.lower():
if flag == 'i':
flags |= re.IGNORECASE
elif flag == 'm':
flags |= re.MULTILINE
elif flag == 's':
flags |= re.DOTALL
# Ignore unknown flags
# Default to case-insensitive if no flags specified
if not flags_str:
flags = re.IGNORECASE
try:
return re.search(pattern, stream_name, flags) is not None
except re.error:
# If regex is invalid, fall back to substring match
return search_term.lower() in stream_name.lower()
else:
# Regular substring match
return search_term.lower() in stream_name.lower()
def create_session_with_retries():
"""Create a requests session with retry logic for 503 errors"""
session = requests.Session()
retry_strategy = Retry(
total=2, # Reduced from 3 to 2 retries
status_forcelist=[503], # Only retry 503 errors
backoff_factor=0.5, # Reduced from 1 to 0.5 seconds
allowed_methods=["GET"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
def search_category(api_url, username, password, channel_name, category, pbar, pbar_lock):
"""
Search a single category for channels (for multithreading)
"""
category_id = category.get('category_id')
category_name = category.get('category_name', 'Unknown')
# Create a new session for this thread with retry logic
session = create_session_with_retries()
categories_url = urljoin(api_url.rstrip('/'), '/player_api.php')
matches_in_category = []
try:
# Get streams for this category
streams_params = {
'username': username,
'password': password,
'action': 'get_live_streams',
'category_id': category_id
}
streams_response = session.get(categories_url, params=streams_params)
streams_response.raise_for_status()
streams = streams_response.json()
# Check if channel exists in this category (supports regex matching)
for stream in streams:
stream_name = stream.get('name', '')
if matches_search(stream_name, channel_name):
matches_in_category.append({
'group_name': category_name,
'group_id': category_id,
'channel_name': stream.get('name'),
'channel_id': stream.get('stream_id'),
'stream_type': stream.get('stream_type'),
'stream_icon': stream.get('stream_icon'),
'epg_channel_id': stream.get('epg_channel_id')
})
if matches_in_category:
with pbar_lock:
pbar.write(f" ✅ Found {len(matches_in_category)} match(es) in {category_name}")
except requests.exceptions.RequestException as e:
with pbar_lock:
pbar.write(f" ❌ Error in {category_name}: {str(e)[:50]}...")
finally:
with pbar_lock:
pbar.update(1)
return matches_in_category
def find_channel_in_groups(api_url, username, password, channel_name, max_workers=10):
"""
Find which groups contain a specific channel using Xtream Codes API (multithreaded)
Args:
api_url (str): Base URL for the Xtream Codes API
username (str): Username for authentication
password (str): Password for authentication
channel_name (str): Channel name to search for (supports fuzzy matching)
max_workers (int): Maximum number of threads to use
Returns:
list: List of categories containing the channel
"""
session = create_session_with_retries()
try:
# Get all live categories
print("📡 Fetching live categories...")
categories_url = urljoin(api_url.rstrip('/'), '/player_api.php')
categories_params = {
'username': username,
'password': password,
'action': 'get_live_categories'
}
categories_response = session.get(categories_url, params=categories_params)
categories_response.raise_for_status()
categories = categories_response.json()
print(f"✅ Found {len(categories)} categories to search")
matching_groups = []
pbar_lock = threading.Lock()
thread_start_counter = 0
counter_lock = threading.Lock()
def search_with_delay(api_url, username, password, channel_name, category, pbar, pbar_lock):
"""Wrapper to add small staggered delay for first few requests only"""
nonlocal thread_start_counter
with counter_lock:
# Only delay the first 20 threads to prevent initial thundering herd
if thread_start_counter < 20:
delay = thread_start_counter * 0.01 # 10ms delay for first 20 threads
thread_start_counter += 1
else:
delay = 0
thread_start_counter += 1
if delay > 0:
time.sleep(delay)
return search_category(api_url, username, password, channel_name, category, pbar, pbar_lock)
# Search through categories with multithreading
with tqdm(total=len(categories), desc="🔍 Searching live categories", unit="cat",
bar_format="{desc}: {percentage:3.0f}%|{bar}| {n_fmt}/{total_fmt} [Elapsed: {elapsed} | ETA: {remaining}]",
smoothing=0.1) as pbar:
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all tasks
future_to_category = {
executor.submit(search_with_delay, api_url, username, password,
channel_name, category, pbar, pbar_lock): category
for category in categories
}
# Collect results as they complete
for future in as_completed(future_to_category):
try:
category_matches = future.result()
matching_groups.extend(category_matches)
except Exception as e:
category = future_to_category[future]
category_name = category.get('category_name', 'Unknown')
with pbar_lock:
pbar.write(f" ❌ Exception in {category_name}: {str(e)[:50]}...")
return matching_groups
except requests.exceptions.RequestException as e:
print(f"❌ Error connecting to Xtream Codes API: {e}")
return []
def get_api_info(api_url, username, password):
"""Get basic API information for debugging"""
session = create_session_with_retries()
try:
# Test basic connectivity and get categories
print("🔧 Testing API connectivity...")
api_endpoint = urljoin(api_url.rstrip('/'), '/player_api.php')
# Test live categories
categories_params = {
'username': username,
'password': password,
'action': 'get_live_categories'
}
categories_response = session.get(api_endpoint, params=categories_params)
print(f"Live categories endpoint status: {categories_response.status_code}")
if categories_response.status_code == 200:
categories = categories_response.json()
print(f"Found {len(categories)} live categories")
if categories:
print("Sample category structure:")
print(json.dumps(categories[0], indent=2))
# Test getting streams from first category
if categories_response.status_code == 200 and categories:
print("🔧 Testing streams endpoint...")
first_category_id = categories[0].get('category_id')
streams_params = {
'username': username,
'password': password,
'action': 'get_live_streams',
'category_id': first_category_id
}
streams_response = session.get(api_endpoint, params=streams_params)
print(f"Streams endpoint status: {streams_response.status_code}")
if streams_response.status_code == 200:
streams = streams_response.json()
print(f"Found {len(streams)} streams in first category")
if streams:
print("Sample stream structure:")
print(json.dumps(streams[0], indent=2))
except Exception as e:
print(f"❌ Error testing API: {e}")
def search_content_type_category(api_url, username, password, search_term, category, content_type, stream_action, pbar, pbar_lock):
"""Search a single category within a content type (for multithreading)"""
category_id = category.get('category_id')
category_name = category.get('category_name', 'Unknown')
session = create_session_with_retries()
api_endpoint = urljoin(api_url.rstrip('/'), '/player_api.php')
matches_in_category = []
try:
# Get streams for this category
stream_params = {
'username': username,
'password': password,
'action': stream_action,
'category_id': category_id
}
stream_response = session.get(api_endpoint, params=stream_params)
if stream_response.status_code != 200:
with pbar_lock:
pbar.write(f" ❌ Failed to get {content_type.lower()} from {category_name}")
return matches_in_category
streams = stream_response.json()
for stream in streams:
stream_name = stream.get('name', '')
if matches_search(stream_name, search_term):
matches_in_category.append({
'content_type': content_type,
'group_name': category_name,
'group_id': category_id,
'channel_name': stream.get('name'),
'channel_id': stream.get('stream_id') or stream.get('series_id'),
'stream_type': stream.get('stream_type'),
'stream_icon': stream.get('stream_icon'),
'epg_channel_id': stream.get('epg_channel_id')
})
if matches_in_category:
with pbar_lock:
pbar.write(f" ✅ Found {len(matches_in_category)} match(es) in {category_name}")
except requests.exceptions.RequestException as e:
with pbar_lock:
pbar.write(f" ❌ Error in {category_name}: {str(e)[:50]}...")
finally:
with pbar_lock:
pbar.update(1)
return matches_in_category
def search_all_content_types(api_url, username, password, search_term, max_workers=15):
"""Search across live streams, VOD, and series (multithreaded)"""
session = requests.Session()
api_endpoint = urljoin(api_url.rstrip('/'), '/player_api.php')
all_matches = []
content_types = [
('Live Streams', 'get_live_categories', 'get_live_streams'),
('VOD', 'get_vod_categories', 'get_vod_streams'),
('Series', 'get_series_categories', 'get_series')
]
for content_type, cat_action, stream_action in content_types:
try:
print(f"\n📺 Fetching {content_type} categories...")
# Get categories
cat_params = {
'username': username,
'password': password,
'action': cat_action
}
cat_response = session.get(api_endpoint, params=cat_params)
if cat_response.status_code != 200:
print(f"❌ Could not get {content_type} categories (status: {cat_response.status_code})")
continue
categories = cat_response.json()
print(f"✅ Found {len(categories)} {content_type.lower()} categories")
if not categories:
continue
pbar_lock = threading.Lock()
thread_start_counter = 0
counter_lock = threading.Lock()
def search_with_delay(api_url, username, password, search_term, category, content_type, stream_action, pbar, pbar_lock):
"""Wrapper to add small staggered delay for first few requests only"""
nonlocal thread_start_counter
with counter_lock:
# Only delay the first 20 threads to prevent initial thundering herd
if thread_start_counter < 20:
delay = thread_start_counter * 0.01 # 10ms delay for first 20 threads
thread_start_counter += 1
else:
delay = 0
thread_start_counter += 1
if delay > 0:
time.sleep(delay)
return search_content_type_category(api_url, username, password, search_term, category, content_type, stream_action, pbar, pbar_lock)
# Use multithreading for this content type
with tqdm(total=len(categories), desc=f"🔍 Searching {content_type.lower()}", unit="cat",
bar_format="{desc}: {percentage:3.0f}%|{bar}| {n_fmt}/{total_fmt} [Elapsed: {elapsed} | ETA: {remaining}]",
smoothing=0.1) as pbar:
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all tasks for this content type
future_to_category = {
executor.submit(search_with_delay, api_url, username, password,
search_term, category, content_type, stream_action, pbar, pbar_lock): category
for category in categories
}
# Collect results as they complete
for future in as_completed(future_to_category):
try:
category_matches = future.result()
all_matches.extend(category_matches)
except Exception as e:
category = future_to_category[future]
category_name = category.get('category_name', 'Unknown')
with pbar_lock:
pbar.write(f" ❌ Exception in {category_name}: {str(e)[:50]}...")
except Exception as e:
print(f"❌ Error searching {content_type}: {e}")
continue
return all_matches
def interactive_search(api_url, username, password, max_workers=10):
"""Interactive search mode"""
print(f"Connected to: {api_url}")
print("Commands: live:<term> | vod:<term> | series:<term> | all:<term> | /regex/ | debug | quit")
print("Tip: Use /pattern/flags for regex (e.g., /^CNN/i or /HD$/) - flags: i=case-insensitive, m=multiline, s=dotall")
print("-" * 50)
while True:
try:
query = input("\n🔍 Search: ").strip()
if not query:
continue
if query.lower() in ['quit', 'exit', 'q']:
print("👋 Goodbye!")
break
if query.lower() == 'debug':
print("\n=== API Debug Info ===")
get_api_info(api_url, username, password)
print("=====================")
continue
# Parse search type and term
search_all_types = False
search_term = query
if ':' in query:
search_type, search_term = query.split(':', 1)
search_type = search_type.lower().strip()
search_term = search_term.strip()
if search_type == 'all':
search_all_types = True
elif search_type in ['vod', 'series']:
# For now, we'll treat these as all-types searches
# You could implement specific VOD/series-only searches later
search_all_types = True
print(f"🔍 Searching {search_type.upper()} for '{search_term}'...")
elif search_type == 'live':
print(f"🔍 Searching Live Streams for '{search_term}'...")
else:
print(f"❓ Unknown search type '{search_type}', searching live streams for '{query}'...")
search_term = query
else:
print(f"🔍 Searching Live Streams for '{search_term}'...")
# Perform search
if search_all_types:
matching_groups = search_all_content_types(
api_url, username, password, search_term, max_workers
)
else:
matching_groups = find_channel_in_groups(
api_url, username, password, search_term, max_workers
)
# Display results
if matching_groups:
print(f"\n✅ Found {len(matching_groups)} result(s) for '{search_term}':")
for i, match in enumerate(matching_groups, 1):
content_type = match.get('content_type', 'Live Stream')
type_short = content_type.replace(' Streams', '').replace(' ', '')
print(f"{i:2d}. [{type_short}] {match['group_name']}{match['channel_name']}")
else:
print(f"❌ No results found for '{search_term}'")
except KeyboardInterrupt:
print("\n\n👋 Goodbye!")
break
except Exception as e:
print(f"❌ Error during search: {e}")
def main():
parser = argparse.ArgumentParser(description='Xtream Codes Channel Search Tool')
parser.add_argument('--api-url', required=True, help='Xtream Codes API base URL')
parser.add_argument('--username', required=True, help='Username for authentication')
parser.add_argument('--password', required=True, help='Password for authentication')
# Optional one-time search arguments
parser.add_argument('--channel', help='Channel name to search for (one-time search)')
parser.add_argument('--debug', action='store_true', help='Show API debugging info')
parser.add_argument('--all-types', action='store_true', help='Search in Live, VOD, and Series (one-time search)')
parser.add_argument('--interactive', '-i', action='store_true', help='Start interactive search mode')
parser.add_argument('--max-workers', type=int, default=10, help='Maximum number of concurrent threads (default: 10)')
args = parser.parse_args()
# Test connection first
print("🔧 Testing connection to Xtream Codes API...")
try:
session = create_session_with_retries()
test_url = urljoin(args.api_url.rstrip('/'), '/player_api.php')
test_params = {
'username': args.username,
'password': args.password,
'action': 'get_live_categories'
}
response = session.get(test_url, params=test_params, timeout=10)
if response.status_code == 200:
print("✅ Connection successful!")
else:
print(f"❌ Connection failed with status {response.status_code}")
return
except Exception as e:
print(f"❌ Connection failed: {e}")
return
if args.debug:
print("\n=== API Debug Info ===")
get_api_info(args.api_url, args.username, args.password)
print("=====================\n")
# If channel is provided, do one-time search
if args.channel:
print(f"🔍 Searching for '{args.channel}' in Xtream Codes API...")
if args.all_types:
matching_groups = search_all_content_types(
args.api_url,
args.username,
args.password,
args.channel,
args.max_workers
)
else:
matching_groups = find_channel_in_groups(
args.api_url,
args.username,
args.password,
args.channel,
args.max_workers
)
print(f"\n{'='*50}")
if matching_groups:
print(f"✅ Found {len(matching_groups)} result(s) for '{args.channel}':")
for i, match in enumerate(matching_groups, 1):
content_type = match.get('content_type', 'Live Stream')
type_short = content_type.replace(' Streams', '').replace(' ', '')
print(f"{i:2d}. [{type_short}] {match['group_name']}{match['channel_name']}")
else:
print(f"❌ No groups found containing '{args.channel}'")
# Start interactive mode if requested or no channel provided
if args.interactive or not args.channel:
interactive_search(args.api_url, args.username, args.password, args.max_workers)
if __name__ == "__main__":
main()