Program.cs 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. using System;
  2. using System.Data.Common;
  3. using System.Linq;
  4. using System.Net;
  5. using System.Net.Sockets;
  6. using System.Threading;
  7. using Newtonsoft.Json;
  8. using Npgsql;
  9. using StackExchange.Redis;
  10. namespace Worker
  11. {
  12. public class Program
  13. {
  14. public static int Main(string[] args)
  15. {
  16. try
  17. {
  18. var pgsql = OpenDbConnection("Server=db;Username=postgres;Password=postgres;");
  19. var redisConn = OpenRedisConnection("redis");
  20. var redis = redisConn.GetDatabase();
  21. // Keep alive is not implemented in Npgsql yet. This workaround was recommended:
  22. // https://github.com/npgsql/npgsql/issues/1214#issuecomment-235828359
  23. var keepAliveCommand = pgsql.CreateCommand();
  24. keepAliveCommand.CommandText = "SELECT 1";
  25. var definition = new { vote = "", voter_id = "" };
  26. while (true)
  27. {
  28. // Slow down to prevent CPU spike, only query each 100ms
  29. Thread.Sleep(100);
  30. // Reconnect redis if down
  31. if (redisConn == null || !redisConn.IsConnected) {
  32. Console.WriteLine("Reconnecting Redis");
  33. redisConn = OpenRedisConnection("redis");
  34. redis = redisConn.GetDatabase();
  35. }
  36. string json = redis.ListLeftPopAsync("votes").Result;
  37. if (json != null)
  38. {
  39. var vote = JsonConvert.DeserializeAnonymousType(json, definition);
  40. Console.WriteLine($"Processing vote for '{vote.vote}' by '{vote.voter_id}'");
  41. // Reconnect DB if down
  42. if (!pgsql.State.Equals(System.Data.ConnectionState.Open))
  43. {
  44. Console.WriteLine("Reconnecting DB");
  45. pgsql = OpenDbConnection("Server=db;Username=postgres;Password=postgres;");
  46. }
  47. else
  48. { // Normal +1 vote requested
  49. UpdateVote(pgsql, vote.voter_id, vote.vote);
  50. }
  51. }
  52. else
  53. {
  54. keepAliveCommand.ExecuteNonQuery();
  55. }
  56. }
  57. }
  58. catch (Exception ex)
  59. {
  60. Console.Error.WriteLine(ex.ToString());
  61. return 1;
  62. }
  63. }
  64. private static NpgsqlConnection OpenDbConnection(string connectionString)
  65. {
  66. NpgsqlConnection connection;
  67. while (true)
  68. {
  69. try
  70. {
  71. connection = new NpgsqlConnection(connectionString);
  72. connection.Open();
  73. break;
  74. }
  75. catch (SocketException)
  76. {
  77. Console.Error.WriteLine("Waiting for db");
  78. Thread.Sleep(1000);
  79. }
  80. catch (DbException)
  81. {
  82. Console.Error.WriteLine("Waiting for db");
  83. Thread.Sleep(1000);
  84. }
  85. }
  86. Console.Error.WriteLine("Connected to db");
  87. var command = connection.CreateCommand();
  88. command.CommandText = @"CREATE TABLE IF NOT EXISTS votes (
  89. id VARCHAR(255) NOT NULL UNIQUE,
  90. vote VARCHAR(255) NOT NULL
  91. )";
  92. command.ExecuteNonQuery();
  93. return connection;
  94. }
  95. private static ConnectionMultiplexer OpenRedisConnection(string hostname)
  96. {
  97. // Use IP address to workaround https://github.com/StackExchange/StackExchange.Redis/issues/410
  98. var ipAddress = GetIp(hostname);
  99. Console.WriteLine($"Found redis at {ipAddress}");
  100. while (true)
  101. {
  102. try
  103. {
  104. Console.Error.WriteLine("Connecting to redis");
  105. return ConnectionMultiplexer.Connect(ipAddress);
  106. }
  107. catch (RedisConnectionException)
  108. {
  109. Console.Error.WriteLine("Waiting for redis");
  110. Thread.Sleep(1000);
  111. }
  112. }
  113. }
  114. private static string GetIp(string hostname)
  115. => Dns.GetHostEntryAsync(hostname)
  116. .Result
  117. .AddressList
  118. .First(a => a.AddressFamily == AddressFamily.InterNetwork)
  119. .ToString();
  120. private static void UpdateVote(NpgsqlConnection connection, string voterId, string vote)
  121. {
  122. var command = connection.CreateCommand();
  123. try
  124. {
  125. command.CommandText = "INSERT INTO votes (id, vote) VALUES (@id, @vote)";
  126. command.Parameters.AddWithValue("@id", voterId);
  127. command.Parameters.AddWithValue("@vote", vote);
  128. command.ExecuteNonQuery();
  129. }
  130. catch (DbException)
  131. {
  132. command.CommandText = "UPDATE votes SET vote = @vote WHERE id = @id";
  133. command.ExecuteNonQuery();
  134. }
  135. finally
  136. {
  137. command.Dispose();
  138. }
  139. }
  140. }
  141. }