1
0

Implement retries & Regex search

This commit is contained in:
2025-10-17 12:45:42 -04:00
parent a51d71f4de
commit ac69867548
2 changed files with 143 additions and 21 deletions

View File

@@ -5,6 +5,10 @@ from urllib.parse import urljoin
import argparse
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
import time
import re
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
try:
from tqdm import tqdm
except ImportError:
@@ -13,6 +17,62 @@ except ImportError:
subprocess.check_call([sys.executable, "-m", "pip", "install", "tqdm"])
from tqdm import tqdm
def is_regex_pattern(pattern):
"""Check if pattern is wrapped in forward slashes for regex matching"""
return pattern.startswith('/') and len(pattern) > 2 and '/' in pattern[1:]
def matches_search(stream_name, search_term):
"""Check if stream name matches search term (supports regex with /pattern/flags)"""
if is_regex_pattern(search_term):
# Extract pattern and flags
first_slash = 0
last_slash = search_term.rfind('/')
if last_slash <= first_slash:
# Invalid pattern, fall back to substring match
return search_term.lower() in stream_name.lower()
pattern = search_term[1:last_slash]
flags_str = search_term[last_slash + 1:] if last_slash < len(search_term) - 1 else ''
# Parse flags
flags = 0
for flag in flags_str.lower():
if flag == 'i':
flags |= re.IGNORECASE
elif flag == 'm':
flags |= re.MULTILINE
elif flag == 's':
flags |= re.DOTALL
# Ignore unknown flags
# Default to case-insensitive if no flags specified
if not flags_str:
flags = re.IGNORECASE
try:
return re.search(pattern, stream_name, flags) is not None
except re.error:
# If regex is invalid, fall back to substring match
return search_term.lower() in stream_name.lower()
else:
# Regular substring match
return search_term.lower() in stream_name.lower()
def create_session_with_retries():
"""Create a requests session with retry logic for 503 errors"""
session = requests.Session()
retry_strategy = Retry(
total=2, # Reduced from 3 to 2 retries
status_forcelist=[503], # Only retry 503 errors
backoff_factor=0.5, # Reduced from 1 to 0.5 seconds
allowed_methods=["GET"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
def search_category(api_url, username, password, channel_name, category, pbar, pbar_lock):
"""
Search a single category for channels (for multithreading)
@@ -20,8 +80,8 @@ def search_category(api_url, username, password, channel_name, category, pbar, p
category_id = category.get('category_id')
category_name = category.get('category_name', 'Unknown')
# Create a new session for this thread
session = requests.Session()
# Create a new session for this thread with retry logic
session = create_session_with_retries()
categories_url = urljoin(api_url.rstrip('/'), '/player_api.php')
matches_in_category = []
@@ -39,10 +99,10 @@ def search_category(api_url, username, password, channel_name, category, pbar, p
streams_response.raise_for_status()
streams = streams_response.json()
# Check if channel exists in this category (fuzzy matching)
# Check if channel exists in this category (supports regex matching)
for stream in streams:
stream_name = stream.get('name', '').lower()
if channel_name.lower() in stream_name:
stream_name = stream.get('name', '')
if matches_search(stream_name, channel_name):
matches_in_category.append({
'group_name': category_name,
'group_id': category_id,
@@ -81,7 +141,7 @@ def find_channel_in_groups(api_url, username, password, channel_name, max_worker
Returns:
list: List of categories containing the channel
"""
session = requests.Session()
session = create_session_with_retries()
try:
# Get all live categories
@@ -101,14 +161,34 @@ def find_channel_in_groups(api_url, username, password, channel_name, max_worker
matching_groups = []
pbar_lock = threading.Lock()
thread_start_counter = 0
counter_lock = threading.Lock()
def search_with_delay(api_url, username, password, channel_name, category, pbar, pbar_lock):
"""Wrapper to add small staggered delay for first few requests only"""
nonlocal thread_start_counter
with counter_lock:
# Only delay the first 20 threads to prevent initial thundering herd
if thread_start_counter < 20:
delay = thread_start_counter * 0.01 # 10ms delay for first 20 threads
thread_start_counter += 1
else:
delay = 0
thread_start_counter += 1
if delay > 0:
time.sleep(delay)
return search_category(api_url, username, password, channel_name, category, pbar, pbar_lock)
# Search through categories with multithreading
with tqdm(total=len(categories), desc="🔍 Searching live categories", unit="cat",
bar_format="{desc}: {percentage:3.0f}%|{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}]") as pbar:
bar_format="{desc}: {percentage:3.0f}%|{bar}| {n_fmt}/{total_fmt} [Elapsed: {elapsed} | ETA: {remaining}]",
smoothing=0.1) as pbar:
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all tasks
future_to_category = {
executor.submit(search_category, api_url, username, password,
executor.submit(search_with_delay, api_url, username, password,
channel_name, category, pbar, pbar_lock): category
for category in categories
}
@@ -132,7 +212,7 @@ def find_channel_in_groups(api_url, username, password, channel_name, max_worker
def get_api_info(api_url, username, password):
"""Get basic API information for debugging"""
session = requests.Session()
session = create_session_with_retries()
try:
# Test basic connectivity and get categories
@@ -185,7 +265,7 @@ def search_content_type_category(api_url, username, password, search_term, categ
category_id = category.get('category_id')
category_name = category.get('category_name', 'Unknown')
session = requests.Session()
session = create_session_with_retries()
api_endpoint = urljoin(api_url.rstrip('/'), '/player_api.php')
matches_in_category = []
@@ -208,8 +288,8 @@ def search_content_type_category(api_url, username, password, search_term, categ
streams = stream_response.json()
for stream in streams:
stream_name = stream.get('name', '').lower()
if search_term.lower() in stream_name:
stream_name = stream.get('name', '')
if matches_search(stream_name, search_term):
matches_in_category.append({
'content_type': content_type,
'group_name': category_name,
@@ -249,7 +329,7 @@ def search_all_content_types(api_url, username, password, search_term, max_worke
for content_type, cat_action, stream_action in content_types:
try:
print(f"\n<EFBFBD> Fetching {content_type} categories...")
print(f"\n📺 Fetching {content_type} categories...")
# Get categories
cat_params = {
@@ -270,14 +350,34 @@ def search_all_content_types(api_url, username, password, search_term, max_worke
continue
pbar_lock = threading.Lock()
thread_start_counter = 0
counter_lock = threading.Lock()
def search_with_delay(api_url, username, password, search_term, category, content_type, stream_action, pbar, pbar_lock):
"""Wrapper to add small staggered delay for first few requests only"""
nonlocal thread_start_counter
with counter_lock:
# Only delay the first 20 threads to prevent initial thundering herd
if thread_start_counter < 20:
delay = thread_start_counter * 0.01 # 10ms delay for first 20 threads
thread_start_counter += 1
else:
delay = 0
thread_start_counter += 1
if delay > 0:
time.sleep(delay)
return search_content_type_category(api_url, username, password, search_term, category, content_type, stream_action, pbar, pbar_lock)
# Use multithreading for this content type
with tqdm(total=len(categories), desc=f"🔍 Searching {content_type.lower()}", unit="cat",
bar_format="{desc}: {percentage:3.0f}%|{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}]") as pbar:
bar_format="{desc}: {percentage:3.0f}%|{bar}| {n_fmt}/{total_fmt} [Elapsed: {elapsed} | ETA: {remaining}]",
smoothing=0.1) as pbar:
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all tasks for this content type
future_to_category = {
executor.submit(search_content_type_category, api_url, username, password,
executor.submit(search_with_delay, api_url, username, password,
search_term, category, content_type, stream_action, pbar, pbar_lock): category
for category in categories
}
@@ -302,7 +402,8 @@ def search_all_content_types(api_url, username, password, search_term, max_worke
def interactive_search(api_url, username, password, max_workers=10):
"""Interactive search mode"""
print(f"Connected to: {api_url}")
print("Commands: live:<term> | vod:<term> | series:<term> | all:<term> | debug | quit")
print("Commands: live:<term> | vod:<term> | series:<term> | all:<term> | /regex/ | debug | quit")
print("Tip: Use /pattern/flags for regex (e.g., /^CNN/i or /HD$/) - flags: i=case-insensitive, m=multiline, s=dotall")
print("-" * 50)
while True:
@@ -390,7 +491,7 @@ def main():
# Test connection first
print("🔧 Testing connection to Xtream Codes API...")
try:
session = requests.Session()
session = create_session_with_retries()
test_url = urljoin(args.api_url.rstrip('/'), '/player_api.php')
test_params = {
'username': args.username,