QueueWorker.cs 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. using System;
  2. using System.Threading;
  3. using Microsoft.Extensions.Configuration;
  4. using Microsoft.Extensions.Logging;
  5. using NATS.Client;
  6. using Worker.Data;
  7. using Worker.Messaging;
  8. using Worker.Messaging.Messages;
  9. namespace Worker.Workers
  10. {
  11. public class QueueWorker
  12. {
  13. private static ManualResetEvent _ResetEvent = new ManualResetEvent(false);
  14. private const string QUEUE_GROUP = "save-handler";
  15. private readonly IMessageQueue _messageQueue;
  16. private readonly IConfiguration _config;
  17. private readonly IVoteData _data;
  18. protected readonly ILogger _logger;
  19. public QueueWorker(IMessageQueue messageQueue, IVoteData data, IConfiguration config, ILogger<QueueWorker> logger)
  20. {
  21. _messageQueue = messageQueue;
  22. _data = data;
  23. _config = config;
  24. _logger = logger;
  25. }
  26. public void Start()
  27. {
  28. _logger.LogInformation($"Connecting to message queue url: {_config.GetValue<string>("MessageQueue:Url")}");
  29. using (var connection = _messageQueue.CreateConnection())
  30. {
  31. var subscription = connection.SubscribeAsync(VoteCastEvent.MessageSubject, QUEUE_GROUP);
  32. subscription.MessageHandler += SaveVote;
  33. subscription.Start();
  34. _logger.LogInformation($"Listening on subject: {VoteCastEvent.MessageSubject}, queue: {QUEUE_GROUP}");
  35. _ResetEvent.WaitOne();
  36. connection.Close();
  37. }
  38. }
  39. private void SaveVote(object sender, MsgHandlerEventArgs e)
  40. {
  41. _logger.LogDebug($"Received message, subject: {e.Message.Subject}");
  42. var voteMessage = MessageHelper.FromData<VoteCastEvent>(e.Message.Data);
  43. _logger.LogInformation($"Processing vote for '{voteMessage.Vote}' by '{voteMessage.VoterId}'");
  44. try
  45. {
  46. _data.Set(voteMessage.VoterId, voteMessage.Vote);
  47. _logger.LogDebug($"Succesffuly processed vote by '{voteMessage.VoterId}'");
  48. }
  49. catch (Exception ex)
  50. {
  51. _logger.LogError($"Vote processing FAILED for '{voteMessage.VoterId}', exception: {ex}");
  52. }
  53. }
  54. }
  55. }