From d124b0104e9d3713c48588789794101d39438c81 Mon Sep 17 00:00:00 2001 From: eric sciple Date: Fri, 7 Jun 2024 13:54:33 -0700 Subject: [PATCH] . --- src/Runner.Common/BrokerServer.cs | 6 ++++ src/Runner.Listener/BrokerMessageListener.cs | 25 ++++++++++++- src/Runner.Listener/Runner.cs | 4 +-- src/Sdk/WebApi/WebApi/BrokerHttpClient.cs | 37 ++++++++++++++++++-- 4 files changed, 65 insertions(+), 7 deletions(-) diff --git a/src/Runner.Common/BrokerServer.cs b/src/Runner.Common/BrokerServer.cs index 5e1311715..b2b20f089 100644 --- a/src/Runner.Common/BrokerServer.cs +++ b/src/Runner.Common/BrokerServer.cs @@ -75,6 +75,12 @@ namespace GitHub.Runner.Common await _brokerHttpClient.DeleteSessionAsync(cancellationToken); } + public async Task DeleteRunnerMessageAsync(Guid sessionId, string jobMessageKey, CancellationToken cancellationToken) + { + CheckConnection(); + await _brokerHttpClient.DeleteRunnerMessageAsync(sessionId, jobMessageKey, cancellationToken); + } + public Task UpdateConnectionIfNeeded(Uri serverUri, VssCredentials credentials) { if (_brokerUri != serverUri || !_hasConnection) diff --git a/src/Runner.Listener/BrokerMessageListener.cs b/src/Runner.Listener/BrokerMessageListener.cs index 8f44fe843..71cb4b0b1 100644 --- a/src/Runner.Listener/BrokerMessageListener.cs +++ b/src/Runner.Listener/BrokerMessageListener.cs @@ -314,7 +314,30 @@ namespace GitHub.Runner.Listener public async Task DeleteMessageAsync(TaskAgentMessage message) { - await Task.CompletedTask; + Trace.Entering(); + ArgUtil.NotNull(_session, nameof(_session)); + + if (message == null || _session.SessionId == Guid.Empty) + { + return; + } + + var jobMessageKey = ""; + if (MessageUtil.IsRunServiceJob(message.MessageType)) + { + var messageRef = StringUtil.ConvertFromJson(message.Body); + jobMessageKey = messageRef.RunnerRequestId; + } + else + { + // Broker currently doesn't support delete for other message types + return; + } + + using (var cs = new CancellationTokenSource(TimeSpan.FromSeconds(30))) + { + await _brokerServer.DeleteRunnerMessageAsync(_session.SessionId, jobMessageKey, cs.Token); + } } private bool IsGetNextMessageExceptionRetriable(Exception ex) diff --git a/src/Runner.Listener/Runner.cs b/src/Runner.Listener/Runner.cs index 9acad8395..d610db74a 100644 --- a/src/Runner.Listener/Runner.cs +++ b/src/Runner.Listener/Runner.cs @@ -563,9 +563,7 @@ namespace GitHub.Runner.Listener await runServer.ConnectAsync(new Uri(messageRef.RunServiceUrl), creds); try { - jobRequestMessage = - await runServer.GetJobMessageAsync(messageRef.RunnerRequestId, - messageQueueLoopTokenSource.Token); + jobRequestMessage = await runServer.GetJobMessageAsync(messageRef.RunnerRequestId, messageQueueLoopTokenSource.Token); } catch (TaskOrchestrationJobAlreadyAcquiredException) { diff --git a/src/Sdk/WebApi/WebApi/BrokerHttpClient.cs b/src/Sdk/WebApi/WebApi/BrokerHttpClient.cs index e9ad938fb..2a6ecc7eb 100644 --- a/src/Sdk/WebApi/WebApi/BrokerHttpClient.cs +++ b/src/Sdk/WebApi/WebApi/BrokerHttpClient.cs @@ -63,8 +63,7 @@ namespace GitHub.Actions.RunService.WebApi string os = null, string architecture = null, bool? disableUpdate = null, - CancellationToken cancellationToken = default - ) + CancellationToken cancellationToken = default) { var requestUri = new Uri(Client.BaseAddress, "message"); @@ -123,8 +122,40 @@ namespace GitHub.Actions.RunService.WebApi throw new Exception($"Failed to get job message: {result.Error}"); } - public async Task CreateSessionAsync( + public async Task GetRunnerMessageAsync( + Guid? sessionId, + string jobMessageKey, + CancellationToken cancellationToken = default) + { + var requestUri = new Uri(Client.BaseAddress, "message"); + List> queryParams = new List>(); + + if (sessionId != null) + { + queryParams.Add("sessionId", sessionId.Value.ToString()); + } + + if (!string.IsNullOrEmpty(jobMessageKey)) + { + queryParams.Add("jobMessageKey", jobMessageKey); + } + + var result = await SendAsync( + new HttpMethod("DELETE"), + requestUri: requestUri, + queryParameters: queryParams, + cancellationToken: cancellationToken); + + if (result.IsSuccess) + { + return result.Value; + } + + throw new Exception($"Failed to get job message: StatusCode={result.StatusCode} Error={result.Error}"); + } + + public async Task CreateSessionAsync( TaskAgentSession session, CancellationToken cancellationToken = default) {