This commit is contained in:
eric sciple
2024-06-07 13:54:33 -07:00
parent edfdbb9661
commit d124b0104e
4 changed files with 65 additions and 7 deletions

View File

@@ -75,6 +75,12 @@ namespace GitHub.Runner.Common
await _brokerHttpClient.DeleteSessionAsync(cancellationToken);
}
public async Task DeleteRunnerMessageAsync(Guid sessionId, string jobMessageKey, CancellationToken cancellationToken)
{
CheckConnection();
await _brokerHttpClient.DeleteRunnerMessageAsync(sessionId, jobMessageKey, cancellationToken);
}
public Task UpdateConnectionIfNeeded(Uri serverUri, VssCredentials credentials)
{
if (_brokerUri != serverUri || !_hasConnection)

View File

@@ -314,7 +314,30 @@ namespace GitHub.Runner.Listener
public async Task DeleteMessageAsync(TaskAgentMessage message)
{
await Task.CompletedTask;
Trace.Entering();
ArgUtil.NotNull(_session, nameof(_session));
if (message == null || _session.SessionId == Guid.Empty)
{
return;
}
var jobMessageKey = "";
if (MessageUtil.IsRunServiceJob(message.MessageType))
{
var messageRef = StringUtil.ConvertFromJson<RunnerJobRequestRef>(message.Body);
jobMessageKey = messageRef.RunnerRequestId;
}
else
{
// Broker currently doesn't support delete for other message types
return;
}
using (var cs = new CancellationTokenSource(TimeSpan.FromSeconds(30)))
{
await _brokerServer.DeleteRunnerMessageAsync(_session.SessionId, jobMessageKey, cs.Token);
}
}
private bool IsGetNextMessageExceptionRetriable(Exception ex)

View File

@@ -563,9 +563,7 @@ namespace GitHub.Runner.Listener
await runServer.ConnectAsync(new Uri(messageRef.RunServiceUrl), creds);
try
{
jobRequestMessage =
await runServer.GetJobMessageAsync(messageRef.RunnerRequestId,
messageQueueLoopTokenSource.Token);
jobRequestMessage = await runServer.GetJobMessageAsync(messageRef.RunnerRequestId, messageQueueLoopTokenSource.Token);
}
catch (TaskOrchestrationJobAlreadyAcquiredException)
{

View File

@@ -63,8 +63,7 @@ namespace GitHub.Actions.RunService.WebApi
string os = null,
string architecture = null,
bool? disableUpdate = null,
CancellationToken cancellationToken = default
)
CancellationToken cancellationToken = default)
{
var requestUri = new Uri(Client.BaseAddress, "message");
@@ -123,8 +122,40 @@ namespace GitHub.Actions.RunService.WebApi
throw new Exception($"Failed to get job message: {result.Error}");
}
public async Task<TaskAgentSession> CreateSessionAsync(
public async Task<TaskAgentMessage> GetRunnerMessageAsync(
Guid? sessionId,
string jobMessageKey,
CancellationToken cancellationToken = default)
{
var requestUri = new Uri(Client.BaseAddress, "message");
List<KeyValuePair<string, string>> queryParams = new List<KeyValuePair<string, string>>();
if (sessionId != null)
{
queryParams.Add("sessionId", sessionId.Value.ToString());
}
if (!string.IsNullOrEmpty(jobMessageKey))
{
queryParams.Add("jobMessageKey", jobMessageKey);
}
var result = await SendAsync<TaskAgentMessage>(
new HttpMethod("DELETE"),
requestUri: requestUri,
queryParameters: queryParams,
cancellationToken: cancellationToken);
if (result.IsSuccess)
{
return result.Value;
}
throw new Exception($"Failed to get job message: StatusCode={result.StatusCode} Error={result.Error}");
}
public async Task<TaskAgentSession> CreateSessionAsync(
TaskAgentSession session,
CancellationToken cancellationToken = default)
{