半半拉拉

parent cad61c3d
using System; using Microsoft.Extensions.Hosting;
using System.Collections.Concurrent; using Microsoft.Extensions.Logging;
using Performance.Services.Queues;
using System;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace Performance.Api namespace Performance.Api
{ {
public interface IBackgroundTaskQueue
{
void QueueBackgroundWorkItem(Func<CancellationToken, Task> workItem);
Task<Func<CancellationToken, Task>> DequeueAsync(CancellationToken cancellationToken);
}
public class BackgroundTaskQueue : IBackgroundTaskQueue
{
private readonly ConcurrentQueue<Func<CancellationToken, Task>> _workItems = new ConcurrentQueue<Func<CancellationToken, Task>>();
private readonly SemaphoreSlim _signal = new SemaphoreSlim(0);
public void QueueBackgroundWorkItem(Func<CancellationToken, Task> workItem)
{
if (workItem == null)
{
throw new ArgumentNullException(nameof(workItem));
}
_workItems.Enqueue(workItem);
_signal.Release();
}
public async Task<Func<CancellationToken, Task>> DequeueAsync(CancellationToken cancellationToken)
{
await _signal.WaitAsync(cancellationToken);
_workItems.TryDequeue(out var workItem);
return workItem;
}
}
public class QueuedHostedService : BackgroundService public class QueuedHostedService : BackgroundService
{ {
private readonly ILogger<QueuedHostedService> _logger; private readonly ILogger<QueuedHostedService> _logger;
...@@ -87,4 +55,5 @@ public class QueuedHostedService : BackgroundService ...@@ -87,4 +55,5 @@ public class QueuedHostedService : BackgroundService
await base.StopAsync(stoppingToken); await base.StopAsync(stoppingToken);
} }
} }
}
\ No newline at end of file }
using Microsoft.AspNetCore.Mvc; using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Performance.DtoModels; using Performance.DtoModels;
using Performance.Services; using Performance.Services;
using System;
using System.Threading.Tasks;
namespace Performance.Api.Controllers namespace Performance.Api.Controllers
{ {
...@@ -9,25 +12,44 @@ namespace Performance.Api.Controllers ...@@ -9,25 +12,44 @@ namespace Performance.Api.Controllers
public class ModExtractController : Controller public class ModExtractController : Controller
{ {
private readonly ClaimService _claim; private readonly ClaimService _claim;
private readonly CustomExtractService _service; private readonly IServiceScopeFactory _serviceScopeFactory;
private readonly IBackgroundTaskQueue _backgroundTaskQueue;
public ModExtractController( public ModExtractController(
ClaimService claim, ClaimService claim,
CustomExtractService service) IServiceScopeFactory serviceScopeFactory,
IBackgroundTaskQueue backgroundTaskQueue)
{ {
_claim = claim; _claim = claim;
_service = service; _serviceScopeFactory = serviceScopeFactory;
_backgroundTaskQueue = backgroundTaskQueue;
} }
[HttpPost("custom/{allotId}")] [HttpPost("custom/{allotId}")]
public ApiResponse CustomExtract(int allotId) public ApiResponse CustomExtract(int allotId)
{ {
if (_service.ExtractData(_claim.GetUserId(), allotId, out string resultFilePath)) _backgroundTaskQueue.QueueBackgroundWorkItem(async token =>
{ {
using (var scope = _serviceScopeFactory.CreateScope())
{
var scopedServices = scope.ServiceProvider.GetRequiredService<CustomExtractService>();
return new ApiResponse(ResponseType.OK, new { path = resultFilePath }); if (scopedServices.ExtractData(_claim.GetUserId(), allotId, out string resultFilePath))
} {
return new ApiResponse(ResponseType.Fail, new { path = "" });
new ApiResponse(ResponseType.OK, new { path = resultFilePath });
}
else
{
}
await Task.Delay(TimeSpan.FromSeconds(5), token);
}
});
return new ApiResponse(ResponseType.OK);
} }
} }
} }
\ No newline at end of file
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace Performance.Services.Queues
{
public interface IBackgroundTaskQueue
{
void QueueBackgroundWorkItem(Func<CancellationToken, Task> workItem);
Task<Func<CancellationToken, Task>> DequeueAsync(CancellationToken cancellationToken);
}
public class BackgroundTaskQueue : IBackgroundTaskQueue
{
private readonly ConcurrentQueue<Func<CancellationToken, Task>> _workItems = new ConcurrentQueue<Func<CancellationToken, Task>>();
private readonly SemaphoreSlim _signal = new SemaphoreSlim(0);
public void QueueBackgroundWorkItem(Func<CancellationToken, Task> workItem)
{
if (workItem == null)
{
throw new ArgumentNullException(nameof(workItem));
}
_workItems.Enqueue(workItem);
_signal.Release();
}
public async Task<Func<CancellationToken, Task>> DequeueAsync(CancellationToken cancellationToken)
{
await _signal.WaitAsync(cancellationToken);
_workItems.TryDequeue(out var workItem);
return workItem;
}
}
}
using Microsoft.AspNetCore.SignalR;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace Performance.Services.Queues
{
public class HubNotificationQueue
{
private readonly ILogger<HubNotificationQueue> _logger;
private IHubContext<AllotLogHub> _hubContext;
private BlockingCollection<Notification> _queue;
public HubNotificationQueue(
ILogger<HubNotificationQueue> logger,
IHubContext<AllotLogHub> hubContext)
{
_logger = logger;
_hubContext = hubContext;
_queue = new BlockingCollection<Notification>();
Task.Factory.StartNew(() => Consumer());
}
public void Send(Notification message)
{
_queue.TryAdd(message);
}
private void Consumer()
{
foreach (var item in _queue.GetConsumingEnumerable())
{
_logger.LogInformation("ConectionId GroupName:{ConectionId};消息推送:{Method};内容:{Body}", item.UserId.ToString(), item.Method, _converter.Serialize(item.Body));
_hubContext.Clients.Group(item.UserId.ToString()).SendAsync(item.Method, item.Body);
}
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment