1
0

initial commit

This commit is contained in:
2025-10-17 12:10:57 -04:00
commit ac3416a1d1
3 changed files with 594 additions and 0 deletions

451
xtream-search.py Normal file
View File

@@ -0,0 +1,451 @@
import requests
import json
import sys
from urllib.parse import urljoin
import argparse
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
try:
from tqdm import tqdm
except ImportError:
print("Installing tqdm for progress bars...")
import subprocess
subprocess.check_call([sys.executable, "-m", "pip", "install", "tqdm"])
from tqdm import tqdm
def search_category(api_url, username, password, channel_name, category, pbar, pbar_lock):
"""
Search a single category for channels (for multithreading)
"""
category_id = category.get('category_id')
category_name = category.get('category_name', 'Unknown')
# Create a new session for this thread
session = requests.Session()
categories_url = urljoin(api_url.rstrip('/'), '/player_api.php')
matches_in_category = []
try:
# Get streams for this category
streams_params = {
'username': username,
'password': password,
'action': 'get_live_streams',
'category_id': category_id
}
streams_response = session.get(categories_url, params=streams_params)
streams_response.raise_for_status()
streams = streams_response.json()
# Check if channel exists in this category (fuzzy matching)
for stream in streams:
stream_name = stream.get('name', '').lower()
if channel_name.lower() in stream_name:
matches_in_category.append({
'group_name': category_name,
'group_id': category_id,
'channel_name': stream.get('name'),
'channel_id': stream.get('stream_id'),
'stream_type': stream.get('stream_type'),
'stream_icon': stream.get('stream_icon'),
'epg_channel_id': stream.get('epg_channel_id')
})
if matches_in_category:
with pbar_lock:
pbar.write(f" ✅ Found {len(matches_in_category)} match(es) in {category_name}")
except requests.exceptions.RequestException as e:
with pbar_lock:
pbar.write(f" ❌ Error in {category_name}: {str(e)[:50]}...")
finally:
with pbar_lock:
pbar.update(1)
return matches_in_category
def find_channel_in_groups(api_url, username, password, channel_name, max_workers=10):
"""
Find which groups contain a specific channel using Xtream Codes API (multithreaded)
Args:
api_url (str): Base URL for the Xtream Codes API
username (str): Username for authentication
password (str): Password for authentication
channel_name (str): Channel name to search for (supports fuzzy matching)
max_workers (int): Maximum number of threads to use
Returns:
list: List of categories containing the channel
"""
session = requests.Session()
try:
# Get all live categories
print("📡 Fetching live categories...")
categories_url = urljoin(api_url.rstrip('/'), '/player_api.php')
categories_params = {
'username': username,
'password': password,
'action': 'get_live_categories'
}
categories_response = session.get(categories_url, params=categories_params)
categories_response.raise_for_status()
categories = categories_response.json()
print(f"✅ Found {len(categories)} categories to search")
matching_groups = []
pbar_lock = threading.Lock()
# Search through categories with multithreading
with tqdm(total=len(categories), desc="🔍 Searching live categories", unit="cat",
bar_format="{desc}: {percentage:3.0f}%|{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}]") as pbar:
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all tasks
future_to_category = {
executor.submit(search_category, api_url, username, password,
channel_name, category, pbar, pbar_lock): category
for category in categories
}
# Collect results as they complete
for future in as_completed(future_to_category):
try:
category_matches = future.result()
matching_groups.extend(category_matches)
except Exception as e:
category = future_to_category[future]
category_name = category.get('category_name', 'Unknown')
with pbar_lock:
pbar.write(f" ❌ Exception in {category_name}: {str(e)[:50]}...")
return matching_groups
except requests.exceptions.RequestException as e:
print(f"❌ Error connecting to Xtream Codes API: {e}")
return []
def get_api_info(api_url, username, password):
"""Get basic API information for debugging"""
session = requests.Session()
try:
# Test basic connectivity and get categories
print("🔧 Testing API connectivity...")
api_endpoint = urljoin(api_url.rstrip('/'), '/player_api.php')
# Test live categories
categories_params = {
'username': username,
'password': password,
'action': 'get_live_categories'
}
categories_response = session.get(api_endpoint, params=categories_params)
print(f"Live categories endpoint status: {categories_response.status_code}")
if categories_response.status_code == 200:
categories = categories_response.json()
print(f"Found {len(categories)} live categories")
if categories:
print("Sample category structure:")
print(json.dumps(categories[0], indent=2))
# Test getting streams from first category
if categories_response.status_code == 200 and categories:
print("🔧 Testing streams endpoint...")
first_category_id = categories[0].get('category_id')
streams_params = {
'username': username,
'password': password,
'action': 'get_live_streams',
'category_id': first_category_id
}
streams_response = session.get(api_endpoint, params=streams_params)
print(f"Streams endpoint status: {streams_response.status_code}")
if streams_response.status_code == 200:
streams = streams_response.json()
print(f"Found {len(streams)} streams in first category")
if streams:
print("Sample stream structure:")
print(json.dumps(streams[0], indent=2))
except Exception as e:
print(f"❌ Error testing API: {e}")
def search_content_type_category(api_url, username, password, search_term, category, content_type, stream_action, pbar, pbar_lock):
"""Search a single category within a content type (for multithreading)"""
category_id = category.get('category_id')
category_name = category.get('category_name', 'Unknown')
session = requests.Session()
api_endpoint = urljoin(api_url.rstrip('/'), '/player_api.php')
matches_in_category = []
try:
# Get streams for this category
stream_params = {
'username': username,
'password': password,
'action': stream_action,
'category_id': category_id
}
stream_response = session.get(api_endpoint, params=stream_params)
if stream_response.status_code != 200:
with pbar_lock:
pbar.write(f" ❌ Failed to get {content_type.lower()} from {category_name}")
return matches_in_category
streams = stream_response.json()
for stream in streams:
stream_name = stream.get('name', '').lower()
if search_term.lower() in stream_name:
matches_in_category.append({
'content_type': content_type,
'group_name': category_name,
'group_id': category_id,
'channel_name': stream.get('name'),
'channel_id': stream.get('stream_id') or stream.get('series_id'),
'stream_type': stream.get('stream_type'),
'stream_icon': stream.get('stream_icon'),
'epg_channel_id': stream.get('epg_channel_id')
})
if matches_in_category:
with pbar_lock:
pbar.write(f" ✅ Found {len(matches_in_category)} match(es) in {category_name}")
except requests.exceptions.RequestException as e:
with pbar_lock:
pbar.write(f" ❌ Error in {category_name}: {str(e)[:50]}...")
finally:
with pbar_lock:
pbar.update(1)
return matches_in_category
def search_all_content_types(api_url, username, password, search_term, max_workers=15):
"""Search across live streams, VOD, and series (multithreaded)"""
session = requests.Session()
api_endpoint = urljoin(api_url.rstrip('/'), '/player_api.php')
all_matches = []
content_types = [
('Live Streams', 'get_live_categories', 'get_live_streams'),
('VOD', 'get_vod_categories', 'get_vod_streams'),
('Series', 'get_series_categories', 'get_series')
]
for content_type, cat_action, stream_action in content_types:
try:
print(f"\n<EFBFBD> Fetching {content_type} categories...")
# Get categories
cat_params = {
'username': username,
'password': password,
'action': cat_action
}
cat_response = session.get(api_endpoint, params=cat_params)
if cat_response.status_code != 200:
print(f"❌ Could not get {content_type} categories (status: {cat_response.status_code})")
continue
categories = cat_response.json()
print(f"✅ Found {len(categories)} {content_type.lower()} categories")
if not categories:
continue
pbar_lock = threading.Lock()
# Use multithreading for this content type
with tqdm(total=len(categories), desc=f"🔍 Searching {content_type.lower()}", unit="cat",
bar_format="{desc}: {percentage:3.0f}%|{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}]") as pbar:
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all tasks for this content type
future_to_category = {
executor.submit(search_content_type_category, api_url, username, password,
search_term, category, content_type, stream_action, pbar, pbar_lock): category
for category in categories
}
# Collect results as they complete
for future in as_completed(future_to_category):
try:
category_matches = future.result()
all_matches.extend(category_matches)
except Exception as e:
category = future_to_category[future]
category_name = category.get('category_name', 'Unknown')
with pbar_lock:
pbar.write(f" ❌ Exception in {category_name}: {str(e)[:50]}...")
except Exception as e:
print(f"❌ Error searching {content_type}: {e}")
continue
return all_matches
def interactive_search(api_url, username, password, max_workers=10):
"""Interactive search mode"""
print(f"Connected to: {api_url}")
print("Commands: live:<term> | vod:<term> | series:<term> | all:<term> | debug | quit")
print("-" * 50)
while True:
try:
query = input("\n🔍 Search: ").strip()
if not query:
continue
if query.lower() in ['quit', 'exit', 'q']:
print("👋 Goodbye!")
break
if query.lower() == 'debug':
print("\n=== API Debug Info ===")
get_api_info(api_url, username, password)
print("=====================")
continue
# Parse search type and term
search_all_types = False
search_term = query
if ':' in query:
search_type, search_term = query.split(':', 1)
search_type = search_type.lower().strip()
search_term = search_term.strip()
if search_type == 'all':
search_all_types = True
elif search_type in ['vod', 'series']:
# For now, we'll treat these as all-types searches
# You could implement specific VOD/series-only searches later
search_all_types = True
print(f"🔍 Searching {search_type.upper()} for '{search_term}'...")
elif search_type == 'live':
print(f"🔍 Searching Live Streams for '{search_term}'...")
else:
print(f"❓ Unknown search type '{search_type}', searching live streams for '{query}'...")
search_term = query
else:
print(f"🔍 Searching Live Streams for '{search_term}'...")
# Perform search
if search_all_types:
matching_groups = search_all_content_types(
api_url, username, password, search_term, max_workers
)
else:
matching_groups = find_channel_in_groups(
api_url, username, password, search_term, max_workers
)
# Display results
if matching_groups:
print(f"\n✅ Found {len(matching_groups)} result(s) for '{search_term}':")
for i, match in enumerate(matching_groups, 1):
content_type = match.get('content_type', 'Live Stream')
type_short = content_type.replace(' Streams', '').replace(' ', '')
print(f"{i:2d}. [{type_short}] {match['group_name']}{match['channel_name']}")
else:
print(f"❌ No results found for '{search_term}'")
except KeyboardInterrupt:
print("\n\n👋 Goodbye!")
break
except Exception as e:
print(f"❌ Error during search: {e}")
def main():
parser = argparse.ArgumentParser(description='Xtream Codes Channel Search Tool')
parser.add_argument('--api-url', required=True, help='Xtream Codes API base URL')
parser.add_argument('--username', required=True, help='Username for authentication')
parser.add_argument('--password', required=True, help='Password for authentication')
# Optional one-time search arguments
parser.add_argument('--channel', help='Channel name to search for (one-time search)')
parser.add_argument('--debug', action='store_true', help='Show API debugging info')
parser.add_argument('--all-types', action='store_true', help='Search in Live, VOD, and Series (one-time search)')
parser.add_argument('--interactive', '-i', action='store_true', help='Start interactive search mode')
parser.add_argument('--max-workers', type=int, default=10, help='Maximum number of concurrent threads (default: 10)')
args = parser.parse_args()
# Test connection first
print("🔧 Testing connection to Xtream Codes API...")
try:
session = requests.Session()
test_url = urljoin(args.api_url.rstrip('/'), '/player_api.php')
test_params = {
'username': args.username,
'password': args.password,
'action': 'get_live_categories'
}
response = session.get(test_url, params=test_params, timeout=10)
if response.status_code == 200:
print("✅ Connection successful!")
else:
print(f"❌ Connection failed with status {response.status_code}")
return
except Exception as e:
print(f"❌ Connection failed: {e}")
return
if args.debug:
print("\n=== API Debug Info ===")
get_api_info(args.api_url, args.username, args.password)
print("=====================\n")
# If channel is provided, do one-time search
if args.channel:
print(f"🔍 Searching for '{args.channel}' in Xtream Codes API...")
if args.all_types:
matching_groups = search_all_content_types(
args.api_url,
args.username,
args.password,
args.channel,
args.max_workers
)
else:
matching_groups = find_channel_in_groups(
args.api_url,
args.username,
args.password,
args.channel,
args.max_workers
)
print(f"\n{'='*50}")
if matching_groups:
print(f"✅ Found {len(matching_groups)} result(s) for '{args.channel}':")
for i, match in enumerate(matching_groups, 1):
content_type = match.get('content_type', 'Live Stream')
type_short = content_type.replace(' Streams', '').replace(' ', '')
print(f"{i:2d}. [{type_short}] {match['group_name']}{match['channel_name']}")
else:
print(f"❌ No groups found containing '{args.channel}'")
# Start interactive mode if requested or no channel provided
if args.interactive or not args.channel:
interactive_search(args.api_url, args.username, args.password, args.max_workers)
if __name__ == "__main__":
main()