Program.cs 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. using System;
  2. using System.Data.Common;
  3. using System.Linq;
  4. using System.Net;
  5. using System.Net.Sockets;
  6. using System.Threading;
  7. using Newtonsoft.Json;
  8. using Npgsql;
  9. using StackExchange.Redis;
  10. namespace Worker
  11. {
  12. public class Program
  13. {
  14. public static int Main(string[] args)
  15. {
  16. try
  17. {
  18. var pgsql = OpenDbConnection("Server=db;Username=postgres;Password=postgres;");
  19. var redisConn = OpenRedisConnection("redis");
  20. var redis = redisConn.GetDatabase();
  21. // Keep alive is not implemented in Npgsql yet. This workaround was recommended:
  22. // https://github.com/npgsql/npgsql/issues/1214#issuecomment-235828359
  23. var keepAliveCommand = pgsql.CreateCommand();
  24. keepAliveCommand.CommandText = "SELECT 1";
  25. var definition = new { vote = "", voter_id = "" };
  26. while (true)
  27. {
  28. // Slow down to prevent CPU spike, only query each 100ms
  29. Thread.Sleep(100);
  30. // Reconnect redis if down
  31. if (redisConn == null || !redisConn.IsConnected) {
  32. Console.WriteLine("Reconnecting Redis");
  33. redisConn = OpenRedisConnection("redis");
  34. redis = redisConn.GetDatabase();
  35. }
  36. string json = redis.ListLeftPopAsync("votes").Result;
  37. if (json != null)
  38. {
  39. var vote = JsonConvert.DeserializeAnonymousType(json, definition);
  40. Console.WriteLine($"Processing vote for '{vote.vote}' by '{vote.voter_id}'");
  41. // Reconnect DB if down
  42. if (!pgsql.State.Equals(System.Data.ConnectionState.Open))
  43. {
  44. Console.WriteLine("Reconnecting DB");
  45. pgsql = OpenDbConnection("Server=db;Username=postgres;Password=postgres;");
  46. }
  47. // Normal +1 vote requested
  48. UpdateVote(pgsql, vote.voter_id, vote.vote);
  49. }
  50. else
  51. {
  52. keepAliveCommand.ExecuteNonQuery();
  53. }
  54. }
  55. }
  56. catch (Exception ex)
  57. {
  58. Console.Error.WriteLine(ex.ToString());
  59. return 1;
  60. }
  61. }
  62. private static NpgsqlConnection OpenDbConnection(string connectionString)
  63. {
  64. NpgsqlConnection connection;
  65. while (true)
  66. {
  67. try
  68. {
  69. connection = new NpgsqlConnection(connectionString);
  70. connection.Open();
  71. break;
  72. }
  73. catch (SocketException)
  74. {
  75. Console.Error.WriteLine("Waiting for db");
  76. Thread.Sleep(1000);
  77. }
  78. catch (DbException)
  79. {
  80. Console.Error.WriteLine("Waiting for db");
  81. Thread.Sleep(1000);
  82. }
  83. }
  84. Console.Error.WriteLine("Connected to db");
  85. var command = connection.CreateCommand();
  86. command.CommandText = @"CREATE TABLE IF NOT EXISTS votes (
  87. id VARCHAR(255) NOT NULL UNIQUE,
  88. vote VARCHAR(255) NOT NULL
  89. )";
  90. command.ExecuteNonQuery();
  91. return connection;
  92. }
  93. private static ConnectionMultiplexer OpenRedisConnection(string hostname)
  94. {
  95. // Use IP address to workaround https://github.com/StackExchange/StackExchange.Redis/issues/410
  96. var ipAddress = GetIp(hostname);
  97. Console.WriteLine($"Found redis at {ipAddress}");
  98. while (true)
  99. {
  100. try
  101. {
  102. Console.Error.WriteLine("Connecting to redis");
  103. return ConnectionMultiplexer.Connect(ipAddress);
  104. }
  105. catch (RedisConnectionException)
  106. {
  107. Console.Error.WriteLine("Waiting for redis");
  108. Thread.Sleep(1000);
  109. }
  110. }
  111. }
  112. private static string GetIp(string hostname)
  113. => Dns.GetHostEntryAsync(hostname)
  114. .Result
  115. .AddressList
  116. .First(a => a.AddressFamily == AddressFamily.InterNetwork)
  117. .ToString();
  118. private static void UpdateVote(NpgsqlConnection connection, string voterId, string vote)
  119. {
  120. var command = connection.CreateCommand();
  121. try
  122. {
  123. command.CommandText = "INSERT INTO votes (id, vote) VALUES (@id, @vote)";
  124. command.Parameters.AddWithValue("@id", voterId);
  125. command.Parameters.AddWithValue("@vote", vote);
  126. command.ExecuteNonQuery();
  127. }
  128. catch (DbException)
  129. {
  130. command.CommandText = "UPDATE votes SET vote = @vote WHERE id = @id";
  131. command.ExecuteNonQuery();
  132. }
  133. finally
  134. {
  135. command.Dispose();
  136. }
  137. }
  138. }
  139. }