Commit 0650f856 by wyc

消息队列处理存储过程

parent 2c602f14
......@@ -48,6 +48,8 @@ public static void AddDependencyInjectionConfiguration(this IServiceCollection s
//services.AddHostedService<QueuedHostedService>();
//services.AddSingleton<IBackgroundTaskQueue, BackgroundTaskQueue>();
services.AddSingleton<TaskQueueService>();
services.AddHostedService<TaskProcessorService>(); // 注册后台任务处理服务
services.AddSingleton<IHubNotificationQueue, HubNotificationQueue>();
services
......
......@@ -102,7 +102,6 @@ public void ConfigureServices(IServiceCollection services)
});
});
services.AddTransient<ExtractGenerateJob>();
services.AddTransient<ExtractDataJob>();
services.AddTransient<ClearLoggerJob>();
......
......@@ -7,8 +7,8 @@
},
"AppConnection": {
//"PerformanceConnectionString": "server=112.124.13.17;database=db_performance;uid=suvalue;pwd=suvalue2016;pooling=true;charset=utf8;convert zero datetime=true;port=3306;connection timeout=120;max pool size=512;allow user variables=true;",
//"PerformanceConnectionString": "server=192.168.18.166;database=db_test_beiliu;uid=root;pwd=1234qwer;pooling=true;charset=utf8;convert zero datetime=true;port=3306;connection timeout=120;max pool size=512;allow user variables=true;"
"PerformanceConnectionString": "server=112.124.15.49;database=db_performance_cxjx;uid=cxjx;pwd=Suvalue@cxjx;pooling=true;charset=utf8;convert zero datetime=true;port=3306;connection timeout=120;max pool size=512;allow user variables=true;"
"PerformanceConnectionString": "server=192.168.18.166;database=db_test_beiliu;uid=root;pwd=1234qwer;pooling=true;charset=utf8;convert zero datetime=true;port=3306;connection timeout=120;max pool size=512;allow user variables=true;"
//"PerformanceConnectionString": "server=112.124.15.49;database=db_performance_cxjx;uid=cxjx;pwd=Suvalue@cxjx;pooling=true;charset=utf8;convert zero datetime=true;port=3306;connection timeout=120;max pool size=512;allow user variables=true;"
},
"AppOptions": {
"OpenOAuth": true
......
using System;
using System.Collections.Generic;
using System.Linq;
using AutoMapper;
using AutoMapper;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Performance.DtoModels;
......@@ -9,6 +6,9 @@
using Performance.EntityModels;
using Performance.Infrastructure;
using Performance.Repository;
using System;
using System.Collections.Generic;
using System.Linq;
namespace Performance.Services.AllotCompute
{
......@@ -35,6 +35,7 @@ public class ResultComputeService : IAutoInjection
private readonly PerforImemployeeclinicRepository perforImemployeeclinicRepository;
private readonly PerforImemployeelogisticsRepository perforImemployeelogisticsRepository;
private readonly ILogger logger;
private readonly TaskQueueService _taskQueueService;
public ResultComputeService(
IMapper mapper,
......@@ -52,7 +53,8 @@ public class ResultComputeService : IAutoInjection
PerforAgsecondallotRepository perforAgsecondallotRepository,
PerforImemployeeclinicRepository perforImemployeeclinicRepository,
PerforImemployeelogisticsRepository perforImemployeelogisticsRepository,
ILogger<ResultComputeService> logger)
ILogger<ResultComputeService> logger,
TaskQueueService taskQueueService)
{
_mapper = mapper;
_options = options;
......@@ -72,6 +74,7 @@ public class ResultComputeService : IAutoInjection
this.perforImemployeeclinicRepository = perforImemployeeclinicRepository;
this.perforImemployeelogisticsRepository = perforImemployeelogisticsRepository;
this.logger = logger;
_taskQueueService = taskQueueService;
}
/// <summary>
......@@ -205,7 +208,7 @@ public void SpecialUnitCompute(PerExcel excel, per_allot allot, List<res_baiscno
QuantitativeIndicatorsValue = item.QuantitativeIndicatorsValue,
QuantitativeFee = item.Quantity * item.QuantitativeIndicatorsValue * headcount,
PermanentStaff = dept?.PermanentStaff ?? 0,//定科人数(在册人数)
Drg= dept?.Drg ?? 0,//DRG绩效
Drg = dept?.Drg ?? 0,//DRG绩效
//ScoringAverage = scoreAverage.HasValue ? scoreAverage : dept?.ScoringAverage,
ScoringAverage = dept?.ScoringAverage ?? 1,
//OtherPerfor = dept?.OtherPerfor1,
......@@ -712,9 +715,12 @@ public bool IssuedChangeSecond(per_allot allot, List<ag_secondallot> secondList)
}
perforAgsecondallotRepository.UpdateRange(updSeconds.ToArray());
}
_service.FreezeAllotSync(allot.ID);
_service.SecondUseTempRestore();
_service.RestoreSecondAllot();
//_service.FreezeAllotSync(allot.ID);
//_service.SecondUseTempRestore();
//_service.RestoreSecondAllot();
_taskQueueService.EnqueueTaskAsync(new ProcQueueMessage("数据静态存储", "call proc_freeze_allot(@allotId);", new { allotId = allot.ID })).Wait();
_taskQueueService.EnqueueTaskAsync(new ProcQueueMessage("二次分配模板修复", "call proc_second_restore();", new { })).Wait();
_taskQueueService.EnqueueTaskAsync(new ProcQueueMessage("科室改名历史数据处理", "call proc_restore_secondallot();", new { })).Wait();
//if (promptSeconds.Any(t => t.IssueStatus == 1))
//{
// //删除
......
......@@ -57,6 +57,7 @@ public class AllotService : IAutoInjection
private readonly ReportService reportService;
private readonly QueryDataService queryDataService;
private readonly TaskQueueService _taskQueueService;
public AllotService(
IMapper mapper,
......@@ -91,7 +92,8 @@ public class AllotService : IAutoInjection
PerforReportRepository reportRepository,
PerforPeremployeeRepository perforPeremployeeRepository,
PerforPerbatchRepository batchRepository,
QueryDataService queryDataService)
QueryDataService queryDataService
, TaskQueueService taskQueueService)
{
_mapper = mapper;
_service = service;
......@@ -128,6 +130,7 @@ public class AllotService : IAutoInjection
_perforPeremployeeRepository = perforPeremployeeRepository;
_batchRepository = batchRepository;
this.queryDataService = queryDataService;
_taskQueueService = taskQueueService;
}
#region 基础功能
......@@ -474,11 +477,14 @@ public void Generate(per_allot allot)
logManageService.WriteMsg("正在生成绩效", "待下发科室准备", 1, allot.ID, "ReceiveMessage", true);
// 科室创建但不下发
resultComputeService.GenerateSecondAllot(allot);
_service.SecondUseTempRestore();
_service.RestoreSecondAllot();
//_service.SecondUseTempRestore();
//_service.RestoreSecondAllot();
_taskQueueService.EnqueueTaskAsync(new ProcQueueMessage("二次分配模板修复", "call proc_second_restore();", new { })).Wait();
_taskQueueService.EnqueueTaskAsync(new ProcQueueMessage("科室改名历史数据处理", "call proc_restore_secondallot();", new { })).Wait();
_service.ClearAllot();
logManageService.WriteMsg("正在生成绩效", "绩效数据存储", 1, allot.ID, "ReceiveMessage", true);
_service.FreezeAllotSync(allot.ID);
//_service.FreezeAllotSync(allot.ID);
_taskQueueService.EnqueueTaskAsync(new ProcQueueMessage("数据静态存储", "call proc_freeze_allot(@allotId);", new { allotId = allot.ID })).Wait();
logManageService.WriteMsg("绩效生成结束", "绩效生成成功", 1, allot.ID, "ReceiveMessage", true);
_service.UpdateAllotStates(allot.ID, (int)AllotStates.绩效结果解析成功, EnumHelper.GetDescription(AllotStates.绩效结果解析成功), generate);
......
......@@ -31,20 +31,20 @@ public DapperService(IOptions<AppConnection> options, ILogger<DapperService> log
/// </summary>
/// <param name="allotId"></param>
/// <returns></returns>
public void FreezeAllotSync(int allotId)
{
Task.Factory.StartNew(() =>
{
using (var connection = new MySqlConnection(_options.Value.PerformanceConnectionString))
{
if (connection.State != ConnectionState.Open) connection.Open();
string sql = $@"call proc_freeze_allot({allotId})";
connection.Execute(sql, commandTimeout: 60 * 60);
}
});
}
//public void FreezeAllotSync(int allotId)
//{
// Task.Factory.StartNew(() =>
// {
// using (var connection = new MySqlConnection(_options.Value.PerformanceConnectionString))
// {
// if (connection.State != ConnectionState.Open) connection.Open();
// string sql = $@"call proc_freeze_allot({allotId})";
// connection.Execute(sql, commandTimeout: 60 * 60);
// }
// });
//}
/// <summary>
/// 审核医院其他绩效时,使用异步
/// 数据静态存储
/// </summary>
/// <param name="allotId"></param>
/// <returns></returns>
......
......@@ -54,10 +54,7 @@ public class EmployeeService : IAutoInjection
private readonly PerAprAmountTypeRepository perAprAmountTypeRepository;
private readonly PerAprAmountTypeHideRepository perAprAmountTypeHIdeRepository;
private readonly IConfiguration _configuration;
private readonly TaskQueueService _taskQueueService;
public EmployeeService(
IMapper mapper,
......@@ -88,8 +85,8 @@ public class EmployeeService : IAutoInjection
DownloadService downloadService,
PerAprAmountTypeRepository perAprAmountTypeRepository,
PerAprAmountTypeHideRepository perAprAmountTypeHideRepository,
IConfiguration configuration
IConfiguration configuration,
TaskQueueService taskQueueService
)
{
_mapper = mapper;
......@@ -120,6 +117,7 @@ IConfiguration configuration
this.perAprAmountTypeRepository = perAprAmountTypeRepository;
this.perAprAmountTypeHIdeRepository = perAprAmountTypeHideRepository;
_configuration = configuration;
_taskQueueService = taskQueueService;
}
#region 行政人员
......@@ -682,7 +680,8 @@ public bool CommitApr(int allotId, string department, int status, DateTime date)
}
}
}
await _service.FreezeAllotSyncAsync(request.AllotId);
//await _service.FreezeAllotSyncAsync(request.AllotId);
_taskQueueService.EnqueueTaskAsync(new ProcQueueMessage("数据静态存储", "call proc_freeze_allot(@allotId);", new { allotId = request.AllotId })).Wait();
return new ApiResponse(ResponseType.OK, "");
}
......@@ -1336,7 +1335,9 @@ public ApiResponse ConfirmAuditHide(int userid, AprAmountAuditRequest request)
}
}
}
_service.FreezeAllotSync(request.AllotId);
//_service.FreezeAllotSync(request.AllotId);
_taskQueueService.EnqueueTaskAsync(new ProcQueueMessage("数据静态存储", "call proc_freeze_allot(@allotId);", new { allotId = request.AllotId })).Wait();
return new ApiResponse(ResponseType.OK, "");
}
/// <summary>
......
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.IO;
using System.Linq;
using System.Linq.Expressions;
using AutoMapper;
using AutoMapper;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
......@@ -17,6 +11,12 @@
using Performance.Infrastructure;
using Performance.Repository;
using Performance.Services.ExtractExcelService;
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.IO;
using System.Linq;
using System.Linq.Expressions;
namespace Performance.Services
{
......@@ -59,6 +59,7 @@ public partial class SecondAllotService : IAutoInjection
private readonly IWebHostEnvironment _evn;
private readonly PersonService personService;
private readonly List<ag_tempitem> tempitems = new List<ag_tempitem>();
private readonly TaskQueueService _taskQueueService;
public SecondAllotService(
IMapper mapper,
......@@ -94,7 +95,8 @@ public partial class SecondAllotService : IAutoInjection
PerforCofagainRepository cofagainRepository,
PersonService personService,
ComputeService computeService,
IWebHostEnvironment evn
IWebHostEnvironment evn,
TaskQueueService taskQueueService
)
{
_mapper = mapper;
......@@ -131,6 +133,7 @@ IWebHostEnvironment evn
this.computeService = computeService;
_evn = evn;
this.personService = personService;
_taskQueueService = taskQueueService;
}
#region 二次绩效列表与数据保存
......@@ -1820,9 +1823,12 @@ public bool ConfirmAudit(int userId, SecondAuditRequest request)
// 无论驳回还是通过,都需要清空该科室历史数据
agsecondallotRepository.DeleteComputeHistory(request.SecondId);
_service.FreezeAllotSync(second.AllotId.Value);
_service.SecondUseTempRestore();
_service.RestoreSecondAllot();
//_service.FreezeAllotSync(second.AllotId.Value);
//_service.SecondUseTempRestore();
//_service.RestoreSecondAllot();
_taskQueueService.EnqueueTaskAsync(new ProcQueueMessage("数据静态存储", "call proc_freeze_allot(@allotId);", new { allotId = second.AllotId.Value })).Wait();
_taskQueueService.EnqueueTaskAsync(new ProcQueueMessage("二次分配模板修复", "call proc_second_restore();", new { })).Wait();
_taskQueueService.EnqueueTaskAsync(new ProcQueueMessage("科室改名历史数据处理", "call proc_restore_secondallot();", new { })).Wait();
return result;
}
......
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Performance.Services
{
public class TaskProcessorService : BackgroundService
{
private readonly TaskQueueService _taskQueueService;
private readonly ILogger<TaskProcessorService> _logger;
public TaskProcessorService(TaskQueueService taskQueueService, ILogger<TaskProcessorService> logger)
{
_taskQueueService = taskQueueService;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var workItem = await _taskQueueService.ProcessTasksAsync(stoppingToken); // 获取待处理任务的委托
try
{
await workItem(stoppingToken); // 执行任务
}
catch (Exception ex)
{
_logger.LogError(ex, "执行任务时发生的错误: {WorkItem}.", nameof(workItem));
}
}
}
}
}
\ No newline at end of file
using Dapper;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MySql.Data.MySqlClient;
using Newtonsoft.Json;
using Performance.DtoModels.AppSettings;
using System;
using System.Collections.Concurrent;
using System.Data;
using System.Threading;
using System.Threading.Tasks;
namespace Performance.Services
{
public record ProcQueueMessage(string Title, string Script, object Params);
public class TaskQueueService
{
private readonly BlockingCollection<ProcQueueMessage> _queue = new(); // FIFO 队列
private readonly IServiceScopeFactory _serviceScopeFactory;
private readonly ILogger<TaskQueueService> _logger;
public TaskQueueService(IServiceScopeFactory serviceScopeFactory, ILogger<TaskQueueService> logger)
{
_serviceScopeFactory = serviceScopeFactory;
_logger = logger;
}
// 入队任务并自动启动任务处理
public async Task EnqueueTaskAsync(ProcQueueMessage message)
{
// 异步执行添加任务到队列的操作
await Task.Run(() => _queue.Add(message));
_logger.LogInformation("将存储过程【{Title}】添加到队列", message.Title);
}
// 处理下一个任务
public async Task<Func<CancellationToken, Task>> ProcessTasksAsync(CancellationToken stoppingToken)
{
// 等待队列中有任务可用
var message = await Task.Run(() => _queue.Take(stoppingToken), stoppingToken);
// 返回一个异步委托,执行任务
return async (token) => await ExecuteStoredProcedureAsync(message);
}
// 执行存储过程
private async Task ExecuteStoredProcedureAsync(ProcQueueMessage message)
{
try
{
using var scope = _serviceScopeFactory.CreateScope();
var options = scope.ServiceProvider.GetRequiredService<IOptions<AppConnection>>();
string serializedParams = JsonConvert.SerializeObject(message.Params);
_logger.LogInformation("存储过程【{Title}】开始执行:{Script}; {Params}", message.Title, message.Script, serializedParams);
using (var connection = new MySqlConnection(options.Value.PerformanceConnectionString))
{
if (connection.State != ConnectionState.Open)
await connection.OpenAsync();
await connection.ExecuteAsync(message.Script, message.Params, commandTimeout: 60 * 60);
}
_logger.LogInformation("存储过程【{Title}】执行结束:{Script}; {Params}", message.Title, message.Script, serializedParams);
}
catch (Exception ex)
{
_logger.LogError(ex, "存储过程【{Title}】任务执行异常处理:{Script}; {Params}; {Ex}", message.Title, message.Script, JsonConvert.SerializeObject(message.Params));
}
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment