451 lines
18 KiB
Python
451 lines
18 KiB
Python
import requests
|
||
import json
|
||
import sys
|
||
from urllib.parse import urljoin
|
||
import argparse
|
||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||
import threading
|
||
try:
|
||
from tqdm import tqdm
|
||
except ImportError:
|
||
print("Installing tqdm for progress bars...")
|
||
import subprocess
|
||
subprocess.check_call([sys.executable, "-m", "pip", "install", "tqdm"])
|
||
from tqdm import tqdm
|
||
|
||
def search_category(api_url, username, password, channel_name, category, pbar, pbar_lock):
|
||
"""
|
||
Search a single category for channels (for multithreading)
|
||
"""
|
||
category_id = category.get('category_id')
|
||
category_name = category.get('category_name', 'Unknown')
|
||
|
||
# Create a new session for this thread
|
||
session = requests.Session()
|
||
categories_url = urljoin(api_url.rstrip('/'), '/player_api.php')
|
||
|
||
matches_in_category = []
|
||
|
||
try:
|
||
# Get streams for this category
|
||
streams_params = {
|
||
'username': username,
|
||
'password': password,
|
||
'action': 'get_live_streams',
|
||
'category_id': category_id
|
||
}
|
||
|
||
streams_response = session.get(categories_url, params=streams_params)
|
||
streams_response.raise_for_status()
|
||
streams = streams_response.json()
|
||
|
||
# Check if channel exists in this category (fuzzy matching)
|
||
for stream in streams:
|
||
stream_name = stream.get('name', '').lower()
|
||
if channel_name.lower() in stream_name:
|
||
matches_in_category.append({
|
||
'group_name': category_name,
|
||
'group_id': category_id,
|
||
'channel_name': stream.get('name'),
|
||
'channel_id': stream.get('stream_id'),
|
||
'stream_type': stream.get('stream_type'),
|
||
'stream_icon': stream.get('stream_icon'),
|
||
'epg_channel_id': stream.get('epg_channel_id')
|
||
})
|
||
|
||
if matches_in_category:
|
||
with pbar_lock:
|
||
pbar.write(f" ✅ Found {len(matches_in_category)} match(es) in {category_name}")
|
||
|
||
except requests.exceptions.RequestException as e:
|
||
with pbar_lock:
|
||
pbar.write(f" ❌ Error in {category_name}: {str(e)[:50]}...")
|
||
|
||
finally:
|
||
with pbar_lock:
|
||
pbar.update(1)
|
||
|
||
return matches_in_category
|
||
|
||
def find_channel_in_groups(api_url, username, password, channel_name, max_workers=10):
|
||
"""
|
||
Find which groups contain a specific channel using Xtream Codes API (multithreaded)
|
||
|
||
Args:
|
||
api_url (str): Base URL for the Xtream Codes API
|
||
username (str): Username for authentication
|
||
password (str): Password for authentication
|
||
channel_name (str): Channel name to search for (supports fuzzy matching)
|
||
max_workers (int): Maximum number of threads to use
|
||
|
||
Returns:
|
||
list: List of categories containing the channel
|
||
"""
|
||
session = requests.Session()
|
||
|
||
try:
|
||
# Get all live categories
|
||
print("📡 Fetching live categories...")
|
||
categories_url = urljoin(api_url.rstrip('/'), '/player_api.php')
|
||
categories_params = {
|
||
'username': username,
|
||
'password': password,
|
||
'action': 'get_live_categories'
|
||
}
|
||
|
||
categories_response = session.get(categories_url, params=categories_params)
|
||
categories_response.raise_for_status()
|
||
categories = categories_response.json()
|
||
|
||
print(f"✅ Found {len(categories)} categories to search")
|
||
|
||
matching_groups = []
|
||
pbar_lock = threading.Lock()
|
||
|
||
# Search through categories with multithreading
|
||
with tqdm(total=len(categories), desc="🔍 Searching live categories", unit="cat",
|
||
bar_format="{desc}: {percentage:3.0f}%|{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}]") as pbar:
|
||
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
||
# Submit all tasks
|
||
future_to_category = {
|
||
executor.submit(search_category, api_url, username, password,
|
||
channel_name, category, pbar, pbar_lock): category
|
||
for category in categories
|
||
}
|
||
|
||
# Collect results as they complete
|
||
for future in as_completed(future_to_category):
|
||
try:
|
||
category_matches = future.result()
|
||
matching_groups.extend(category_matches)
|
||
except Exception as e:
|
||
category = future_to_category[future]
|
||
category_name = category.get('category_name', 'Unknown')
|
||
with pbar_lock:
|
||
pbar.write(f" ❌ Exception in {category_name}: {str(e)[:50]}...")
|
||
|
||
return matching_groups
|
||
|
||
except requests.exceptions.RequestException as e:
|
||
print(f"❌ Error connecting to Xtream Codes API: {e}")
|
||
return []
|
||
|
||
def get_api_info(api_url, username, password):
|
||
"""Get basic API information for debugging"""
|
||
session = requests.Session()
|
||
|
||
try:
|
||
# Test basic connectivity and get categories
|
||
print("🔧 Testing API connectivity...")
|
||
api_endpoint = urljoin(api_url.rstrip('/'), '/player_api.php')
|
||
|
||
# Test live categories
|
||
categories_params = {
|
||
'username': username,
|
||
'password': password,
|
||
'action': 'get_live_categories'
|
||
}
|
||
|
||
categories_response = session.get(api_endpoint, params=categories_params)
|
||
print(f"Live categories endpoint status: {categories_response.status_code}")
|
||
|
||
if categories_response.status_code == 200:
|
||
categories = categories_response.json()
|
||
print(f"Found {len(categories)} live categories")
|
||
if categories:
|
||
print("Sample category structure:")
|
||
print(json.dumps(categories[0], indent=2))
|
||
|
||
# Test getting streams from first category
|
||
if categories_response.status_code == 200 and categories:
|
||
print("🔧 Testing streams endpoint...")
|
||
first_category_id = categories[0].get('category_id')
|
||
streams_params = {
|
||
'username': username,
|
||
'password': password,
|
||
'action': 'get_live_streams',
|
||
'category_id': first_category_id
|
||
}
|
||
|
||
streams_response = session.get(api_endpoint, params=streams_params)
|
||
print(f"Streams endpoint status: {streams_response.status_code}")
|
||
|
||
if streams_response.status_code == 200:
|
||
streams = streams_response.json()
|
||
print(f"Found {len(streams)} streams in first category")
|
||
if streams:
|
||
print("Sample stream structure:")
|
||
print(json.dumps(streams[0], indent=2))
|
||
|
||
except Exception as e:
|
||
print(f"❌ Error testing API: {e}")
|
||
|
||
def search_content_type_category(api_url, username, password, search_term, category, content_type, stream_action, pbar, pbar_lock):
|
||
"""Search a single category within a content type (for multithreading)"""
|
||
category_id = category.get('category_id')
|
||
category_name = category.get('category_name', 'Unknown')
|
||
|
||
session = requests.Session()
|
||
api_endpoint = urljoin(api_url.rstrip('/'), '/player_api.php')
|
||
|
||
matches_in_category = []
|
||
|
||
try:
|
||
# Get streams for this category
|
||
stream_params = {
|
||
'username': username,
|
||
'password': password,
|
||
'action': stream_action,
|
||
'category_id': category_id
|
||
}
|
||
|
||
stream_response = session.get(api_endpoint, params=stream_params)
|
||
if stream_response.status_code != 200:
|
||
with pbar_lock:
|
||
pbar.write(f" ❌ Failed to get {content_type.lower()} from {category_name}")
|
||
return matches_in_category
|
||
|
||
streams = stream_response.json()
|
||
|
||
for stream in streams:
|
||
stream_name = stream.get('name', '').lower()
|
||
if search_term.lower() in stream_name:
|
||
matches_in_category.append({
|
||
'content_type': content_type,
|
||
'group_name': category_name,
|
||
'group_id': category_id,
|
||
'channel_name': stream.get('name'),
|
||
'channel_id': stream.get('stream_id') or stream.get('series_id'),
|
||
'stream_type': stream.get('stream_type'),
|
||
'stream_icon': stream.get('stream_icon'),
|
||
'epg_channel_id': stream.get('epg_channel_id')
|
||
})
|
||
|
||
if matches_in_category:
|
||
with pbar_lock:
|
||
pbar.write(f" ✅ Found {len(matches_in_category)} match(es) in {category_name}")
|
||
|
||
except requests.exceptions.RequestException as e:
|
||
with pbar_lock:
|
||
pbar.write(f" ❌ Error in {category_name}: {str(e)[:50]}...")
|
||
|
||
finally:
|
||
with pbar_lock:
|
||
pbar.update(1)
|
||
|
||
return matches_in_category
|
||
|
||
def search_all_content_types(api_url, username, password, search_term, max_workers=15):
|
||
"""Search across live streams, VOD, and series (multithreaded)"""
|
||
session = requests.Session()
|
||
api_endpoint = urljoin(api_url.rstrip('/'), '/player_api.php')
|
||
all_matches = []
|
||
|
||
content_types = [
|
||
('Live Streams', 'get_live_categories', 'get_live_streams'),
|
||
('VOD', 'get_vod_categories', 'get_vod_streams'),
|
||
('Series', 'get_series_categories', 'get_series')
|
||
]
|
||
|
||
for content_type, cat_action, stream_action in content_types:
|
||
try:
|
||
print(f"\n<EFBFBD> Fetching {content_type} categories...")
|
||
|
||
# Get categories
|
||
cat_params = {
|
||
'username': username,
|
||
'password': password,
|
||
'action': cat_action
|
||
}
|
||
|
||
cat_response = session.get(api_endpoint, params=cat_params)
|
||
if cat_response.status_code != 200:
|
||
print(f"❌ Could not get {content_type} categories (status: {cat_response.status_code})")
|
||
continue
|
||
|
||
categories = cat_response.json()
|
||
print(f"✅ Found {len(categories)} {content_type.lower()} categories")
|
||
|
||
if not categories:
|
||
continue
|
||
|
||
pbar_lock = threading.Lock()
|
||
|
||
# Use multithreading for this content type
|
||
with tqdm(total=len(categories), desc=f"🔍 Searching {content_type.lower()}", unit="cat",
|
||
bar_format="{desc}: {percentage:3.0f}%|{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}]") as pbar:
|
||
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
||
# Submit all tasks for this content type
|
||
future_to_category = {
|
||
executor.submit(search_content_type_category, api_url, username, password,
|
||
search_term, category, content_type, stream_action, pbar, pbar_lock): category
|
||
for category in categories
|
||
}
|
||
|
||
# Collect results as they complete
|
||
for future in as_completed(future_to_category):
|
||
try:
|
||
category_matches = future.result()
|
||
all_matches.extend(category_matches)
|
||
except Exception as e:
|
||
category = future_to_category[future]
|
||
category_name = category.get('category_name', 'Unknown')
|
||
with pbar_lock:
|
||
pbar.write(f" ❌ Exception in {category_name}: {str(e)[:50]}...")
|
||
|
||
except Exception as e:
|
||
print(f"❌ Error searching {content_type}: {e}")
|
||
continue
|
||
|
||
return all_matches
|
||
|
||
def interactive_search(api_url, username, password, max_workers=10):
|
||
"""Interactive search mode"""
|
||
print(f"Connected to: {api_url}")
|
||
print("Commands: live:<term> | vod:<term> | series:<term> | all:<term> | debug | quit")
|
||
print("-" * 50)
|
||
|
||
while True:
|
||
try:
|
||
query = input("\n🔍 Search: ").strip()
|
||
|
||
if not query:
|
||
continue
|
||
|
||
if query.lower() in ['quit', 'exit', 'q']:
|
||
print("👋 Goodbye!")
|
||
break
|
||
|
||
if query.lower() == 'debug':
|
||
print("\n=== API Debug Info ===")
|
||
get_api_info(api_url, username, password)
|
||
print("=====================")
|
||
continue
|
||
|
||
# Parse search type and term
|
||
search_all_types = False
|
||
search_term = query
|
||
|
||
if ':' in query:
|
||
search_type, search_term = query.split(':', 1)
|
||
search_type = search_type.lower().strip()
|
||
search_term = search_term.strip()
|
||
|
||
if search_type == 'all':
|
||
search_all_types = True
|
||
elif search_type in ['vod', 'series']:
|
||
# For now, we'll treat these as all-types searches
|
||
# You could implement specific VOD/series-only searches later
|
||
search_all_types = True
|
||
print(f"🔍 Searching {search_type.upper()} for '{search_term}'...")
|
||
elif search_type == 'live':
|
||
print(f"🔍 Searching Live Streams for '{search_term}'...")
|
||
else:
|
||
print(f"❓ Unknown search type '{search_type}', searching live streams for '{query}'...")
|
||
search_term = query
|
||
else:
|
||
print(f"🔍 Searching Live Streams for '{search_term}'...")
|
||
|
||
# Perform search
|
||
if search_all_types:
|
||
matching_groups = search_all_content_types(
|
||
api_url, username, password, search_term, max_workers
|
||
)
|
||
else:
|
||
matching_groups = find_channel_in_groups(
|
||
api_url, username, password, search_term, max_workers
|
||
)
|
||
|
||
# Display results
|
||
if matching_groups:
|
||
print(f"\n✅ Found {len(matching_groups)} result(s) for '{search_term}':")
|
||
for i, match in enumerate(matching_groups, 1):
|
||
content_type = match.get('content_type', 'Live Stream')
|
||
type_short = content_type.replace(' Streams', '').replace(' ', '')
|
||
print(f"{i:2d}. [{type_short}] {match['group_name']} → {match['channel_name']}")
|
||
else:
|
||
print(f"❌ No results found for '{search_term}'")
|
||
|
||
except KeyboardInterrupt:
|
||
print("\n\n👋 Goodbye!")
|
||
break
|
||
except Exception as e:
|
||
print(f"❌ Error during search: {e}")
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(description='Xtream Codes Channel Search Tool')
|
||
parser.add_argument('--api-url', required=True, help='Xtream Codes API base URL')
|
||
parser.add_argument('--username', required=True, help='Username for authentication')
|
||
parser.add_argument('--password', required=True, help='Password for authentication')
|
||
|
||
# Optional one-time search arguments
|
||
parser.add_argument('--channel', help='Channel name to search for (one-time search)')
|
||
parser.add_argument('--debug', action='store_true', help='Show API debugging info')
|
||
parser.add_argument('--all-types', action='store_true', help='Search in Live, VOD, and Series (one-time search)')
|
||
parser.add_argument('--interactive', '-i', action='store_true', help='Start interactive search mode')
|
||
parser.add_argument('--max-workers', type=int, default=10, help='Maximum number of concurrent threads (default: 10)')
|
||
|
||
args = parser.parse_args()
|
||
|
||
# Test connection first
|
||
print("🔧 Testing connection to Xtream Codes API...")
|
||
try:
|
||
session = requests.Session()
|
||
test_url = urljoin(args.api_url.rstrip('/'), '/player_api.php')
|
||
test_params = {
|
||
'username': args.username,
|
||
'password': args.password,
|
||
'action': 'get_live_categories'
|
||
}
|
||
response = session.get(test_url, params=test_params, timeout=10)
|
||
if response.status_code == 200:
|
||
print("✅ Connection successful!")
|
||
else:
|
||
print(f"❌ Connection failed with status {response.status_code}")
|
||
return
|
||
except Exception as e:
|
||
print(f"❌ Connection failed: {e}")
|
||
return
|
||
|
||
if args.debug:
|
||
print("\n=== API Debug Info ===")
|
||
get_api_info(args.api_url, args.username, args.password)
|
||
print("=====================\n")
|
||
|
||
# If channel is provided, do one-time search
|
||
if args.channel:
|
||
print(f"🔍 Searching for '{args.channel}' in Xtream Codes API...")
|
||
|
||
if args.all_types:
|
||
matching_groups = search_all_content_types(
|
||
args.api_url,
|
||
args.username,
|
||
args.password,
|
||
args.channel,
|
||
args.max_workers
|
||
)
|
||
else:
|
||
matching_groups = find_channel_in_groups(
|
||
args.api_url,
|
||
args.username,
|
||
args.password,
|
||
args.channel,
|
||
args.max_workers
|
||
)
|
||
|
||
print(f"\n{'='*50}")
|
||
if matching_groups:
|
||
print(f"✅ Found {len(matching_groups)} result(s) for '{args.channel}':")
|
||
for i, match in enumerate(matching_groups, 1):
|
||
content_type = match.get('content_type', 'Live Stream')
|
||
type_short = content_type.replace(' Streams', '').replace(' ', '')
|
||
print(f"{i:2d}. [{type_short}] {match['group_name']} → {match['channel_name']}")
|
||
else:
|
||
print(f"❌ No groups found containing '{args.channel}'")
|
||
|
||
# Start interactive mode if requested or no channel provided
|
||
if args.interactive or not args.channel:
|
||
interactive_search(args.api_url, args.username, args.password, args.max_workers)
|
||
|
||
if __name__ == "__main__":
|
||
main() |