using AprsSharp.Parsers.Aprs; using GeoCoordinatePortable; using Npgsql; using NpgsqlTypes; using skyscraper5.Aprs.AprsSharp; using skyscraper5.Skyscraper.Scraper.Storage.Utilities; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using skyscraper5.Data.PostgreSql; namespace skyscraper5.Aprs.AprsStorage { internal class AprsPostgresqlStorage : LX9SESStorage { private readonly PostgresqlToken _postgresql; public AprsPostgresqlStorage(PostgresqlToken postgresql) { _postgresql = postgresql; } #region Background Worker private Thread _writerThread; private delegate void WriterTask(NpgsqlConnection connection); private Queue _writerTasks; private void EnqueueTask(WriterTask task) { if (_writerTasks == null) { _writerTasks = new Queue(); } lock (_writerTasks) { _writerTasks.Enqueue(task); } if (_writerThread == null) { _writerThread = new Thread(WriterThreadFunction); _writerThread.Priority = ThreadPriority.Highest; _writerThread.Name = "PostgreSQL APRS Writer Thread"; _writerThread.Start(); } } private NpgsqlTransaction transaction; private void WriterThreadFunction() { using (NpgsqlConnection connection = new NpgsqlConnection(connectionStringBuilder.ToString())) { connection.Open(); transaction = connection.BeginTransaction(); while (_writerTasks.Count > 0) { WriterTask task; lock (_writerTasks) { task = _writerTasks.Dequeue(); } task(connection); } transaction.Commit(); transaction.Dispose(); connection.Close(); } _writerThread = null; } private void CommitTransaction(NpgsqlConnection conn) { transaction.Commit(); transaction.Dispose(); transaction = conn.BeginTransaction(); } #endregion private HashSet loadedAprsPositionHistories; private HashSet knownAprsPositions; public bool AprsPosition(DateTime currentTime, string packetSender, GeoCoordinate positionCoordinates, string piComment) { if (positionCoordinates.IsNull) return false; if (loadedAprsPositionHistories == null) loadedAprsPositionHistories = new HashSet(); if (knownAprsPositions == null) knownAprsPositions = new HashSet(); if (!loadedAprsPositionHistories.Contains(packetSender)) { FetchAprsPositionHistory(packetSender); } DatabaseKeyAprsPosition key = new DatabaseKeyAprsPosition(currentTime, packetSender, positionCoordinates.Longitude, positionCoordinates.Latitude); if (knownAprsPositions.Contains(key)) return false; EnqueueTask(x => WriteAprsPosition(x, currentTime, packetSender, positionCoordinates, piComment)); knownAprsPositions.Add(key); return true; } private void WriteAprsPosition(NpgsqlConnection connection, DateTime currentTime, string packetSender, GeoCoordinate positionCoordinates, string piComment) { NpgsqlCommand command = connection.CreateCommand(); command.CommandText = "INSERT INTO aprs_positions (ctime, sender, lon, lat, comment) " + "VALUES (@ctime, @sender, @lon, @lat, @comment)"; command.Parameters.AddWithValue("@ctime", NpgsqlDbType.Timestamp, currentTime); command.Parameters.AddWithValue("@sender", NpgsqlDbType.Varchar, packetSender); command.Parameters.AddWithValue("@lon", NpgsqlDbType.Double, positionCoordinates.Longitude); command.Parameters.AddWithValue("@lat", NpgsqlDbType.Double, positionCoordinates.Latitude); if (piComment != null) piComment = piComment.Replace("\0", ""); command.Parameters.AddWithValue("@comment", NpgsqlDbType.Text, piComment); CheckNulls(command.Parameters); command.ExecuteNonQuery(); } public static void CheckNulls(NpgsqlParameterCollection collection) { foreach (NpgsqlParameter o in collection) { if (o.Value == null) o.Value = DBNull.Value; else if (o.Value == DBNull.Value) continue; else { if (o.NpgsqlDbType == NpgsqlDbType.Text || o.NpgsqlDbType == NpgsqlDbType.Varchar) { string oValue = (string)o.Value; oValue = oValue.Trim('\0'); o.Value = oValue; } } } } private void FetchAprsPositionHistory(string packetSender) { using (NpgsqlConnection connection = new NpgsqlConnection(connectionStringBuilder.ToString())) { connection.Open(); NpgsqlCommand command = connection.CreateCommand(); command.CommandText = "SELECT ctime, lon, lat FROM aprs_positions WHERE sender = @sender"; command.Parameters.AddWithValue("@sender", NpgsqlDbType.Varchar, packetSender); NpgsqlDataReader dataReader = command.ExecuteReader(); while (dataReader.Read()) { DateTime ctime = dataReader.GetDateTime(0); double lon = dataReader.GetDouble(1); double lat = dataReader.GetDouble(2); knownAprsPositions.Add(new DatabaseKeyAprsPosition(ctime, packetSender, lon, lat)); } dataReader.Close(); command.Dispose(); connection.Close(); } loadedAprsPositionHistories.Add(packetSender); } private HashSet knownAprsWeatherReporters; private HashSet knownAprsWeatherReports; public bool AprsWeatherReport(DateTime currentTime, string packetSender, WeatherInfo wi) { if (knownAprsWeatherReporters == null) knownAprsWeatherReporters = new HashSet(); if (knownAprsWeatherReports == null) knownAprsWeatherReports = new HashSet(); if (!knownAprsWeatherReporters.Contains(packetSender)) SelectAprsWeatherReports(packetSender); DatabaseKeyAprsWeatherReport key = new DatabaseKeyAprsWeatherReport(packetSender, currentTime); if (knownAprsWeatherReports.Contains(key)) return false; EnqueueTask(x => WriteAprsWeatherReport(x, currentTime, packetSender, wi)); knownAprsWeatherReports.Add(key); return true; } private void SelectAprsWeatherReports(string sender) { using (NpgsqlConnection connection = new NpgsqlConnection(connectionStringBuilder.ToString())) { connection.Open(); NpgsqlCommand command = connection.CreateCommand(); command.CommandText = "SELECT rtime FROM aprs_weather_reports WHERE sender = @sender"; command.Parameters.AddWithValue("@sender", NpgsqlDbType.Varchar, sender); NpgsqlDataReader dataReader = command.ExecuteReader(); while (dataReader.Read()) { knownAprsWeatherReports.Add(new DatabaseKeyAprsWeatherReport(sender, dataReader.GetDateTime(0))); } dataReader.Close(); command.Dispose(); connection.Close(); } knownAprsWeatherReporters.Add(sender); } private void WriteAprsWeatherReport(NpgsqlConnection connection, DateTime currentTime, string packetSender, WeatherInfo wi) { NpgsqlCommand command = connection.CreateCommand(); command.CommandText = "insert into aprs_weather_reports (sender, rtime, lat, lon, has_messaging, ts, comment, wind_direction,\r\n wind_speed, wind_gust, temperature, rainfall_hour, rainfall_day,\r\n rainfall_since_midnight, humidity, barometric_pressure, luminosity, rain_raw, snow) " + "values " + "(@sender,@rtime,@lat,@lon,@has_messaging,@ts,@comment,@wind_direction,@wind_speed,@wind_gust,@temperature,@rainfall_hour,@rainfall_day,@rainfall_since_midnight,@humidity,@barometric_pressure,@luminosity,@rain_raw,@snow)"; command.Parameters.AddWithValue("@sender", NpgsqlDbType.Varchar, packetSender); command.Parameters.AddWithValue("@rtime", NpgsqlDbType.Timestamp, currentTime); GeoCoordinate positionCoordinates = new GeoCoordinate(); if (wi.Position != null) { if (!wi.Position.Coordinates.IsNull) positionCoordinates = wi.Position.Coordinates; } command.Parameters.AddWithValue("@lat", NpgsqlDbType.Double, positionCoordinates.IsNull ? DBNull.Value : positionCoordinates.Latitude); command.Parameters.AddWithValue("@lon", NpgsqlDbType.Double, positionCoordinates.IsNull ? DBNull.Value : positionCoordinates.Longitude); command.Parameters.AddWithValue("@has_messaging", NpgsqlDbType.Boolean, wi.HasMessaging); command.Parameters.AddWithValue("@ts", NpgsqlDbType.TimestampTz, wi.Timestamp?.DateTime.ToUniversalTime()); command.Parameters.AddWithValue("@comment", NpgsqlDbType.Text, wi.Comment.Replace("\0", "")); command.Parameters.AddWithValue("@wind_direction", NpgsqlDbType.Integer, wi.WindDirection); command.Parameters.AddWithValue("@wind_speed", NpgsqlDbType.Integer, wi.WindSpeed); command.Parameters.AddWithValue("@wind_gust", NpgsqlDbType.Integer, wi.WindGust); command.Parameters.AddWithValue("@temperature", NpgsqlDbType.Integer, wi.Temperature); command.Parameters.AddWithValue("@rainfall_hour", NpgsqlDbType.Integer, wi.Rainfall1Hour); command.Parameters.AddWithValue("@rainfall_day", NpgsqlDbType.Integer, wi.Rainfall24Hour); command.Parameters.AddWithValue("@rainfall_since_midnight", NpgsqlDbType.Integer, wi.RainfallSinceMidnight); command.Parameters.AddWithValue("@humidity", NpgsqlDbType.Integer, wi.Humidity); command.Parameters.AddWithValue("@barometric_pressure", NpgsqlDbType.Integer, wi.BarometricPressure); command.Parameters.AddWithValue("@luminosity", NpgsqlDbType.Integer, wi.Luminosity); command.Parameters.AddWithValue("@rain_raw", NpgsqlDbType.Integer, wi.RainRaw); command.Parameters.AddWithValue("@snow", NpgsqlDbType.Integer, wi.Snow); CheckNulls(command.Parameters); command.ExecuteNonQuery(); } private HashSet knownAprsStationCapabilities; public bool AprsStationCapabilities(string packetSender, StationCapabilities stationCapabilities) { if (knownAprsStationCapabilities == null) knownAprsStationCapabilities = new HashSet(); if (knownAprsStationCapabilities.Contains(packetSender)) return false; bool inDb = TestForAprsStationCapabilities(packetSender); if (inDb) { knownAprsStationCapabilities.Add(packetSender); return false; } EnqueueTask(x => WriteAprsStationCapabilities(x, packetSender, stationCapabilities)); knownAprsStationCapabilities.Add(packetSender); return true; } private void WriteAprsStationCapabilities(NpgsqlConnection connection, string packetSender, StationCapabilities stationCapabilities) { NpgsqlCommand command = connection.CreateCommand(); command.CommandText = "INSERT INTO aprs_station_capabilities (sender, isl_gate, local_stations, message_count) " + "VALUES (@sender, @isl_gate, @local_stations, @message_count)"; command.Parameters.AddWithValue("@sender", NpgsqlDbType.Varchar, packetSender); command.Parameters.AddWithValue("@isl_gate", NpgsqlDbType.Boolean, stationCapabilities.IsIGate); command.Parameters.AddWithValue("@local_stations", NpgsqlDbType.Integer, stationCapabilities.LocalStations); command.Parameters.AddWithValue("@message_count", NpgsqlDbType.Integer, stationCapabilities.MessageCount); command.ExecuteNonQuery(); } private bool TestForAprsStationCapabilities(string sender) { bool result; using (NpgsqlConnection conn = new NpgsqlConnection(connectionStringBuilder.ToString())) { conn.Open(); NpgsqlCommand command = conn.CreateCommand(); command.CommandText = "SELECT dateadded FROM aprs_station_capabilities WHERE sender = @sender"; command.Parameters.AddWithValue("@sender", NpgsqlDbType.Varchar, sender); NpgsqlDataReader dataReader = command.ExecuteReader(); result = dataReader.Read(); dataReader.Close(); command.Dispose(); conn.Close(); } return result; } class CachedGpsSentence { public CachedGpsSentence(DateTime currentTime, string packetSender, double? course, double? magneticVariation, double? speed) { CurrentTime = currentTime; PacketSender = packetSender; Course = course; MagneticVariation = magneticVariation; Speed = speed; } public DateTime CurrentTime { get; } public string PacketSender { get; } public double? Course { get; } public double? MagneticVariation { get; } public double? Speed { get; } public override bool Equals(object? obj) { return obj is CachedGpsSentence sentence && CurrentTime == sentence.CurrentTime && PacketSender == sentence.PacketSender; } public override int GetHashCode() { return HashCode.Combine(CurrentTime, PacketSender); } } public bool AprsGpsSentence(DateTime currentTime, string packetSender, double? course, double? magneticVariation, double? speed) { CachedGpsSentence cgs = new CachedGpsSentence(currentTime, packetSender, course, magneticVariation, speed); if (TestForAprsGpsSentence(cgs)) { return false; } EnqueueTask(x => WriteAprsGpsSentence(x,cgs)); return true; } private void WriteAprsGpsSentence(NpgsqlConnection conn, CachedGpsSentence cgs) { NpgsqlCommand command = conn.CreateCommand(); command.CommandText = "insert into aprs_gps_sentences (callsign, calltime, course, magnetic_variation, speed) values (@sign,@time,@course,@magnetic_variation, @speed)"; command.Parameters.AddWithValue("@sign", NpgsqlDbType.Varchar, cgs.PacketSender); command.Parameters.AddWithValue("@time", NpgsqlDbType.Timestamp, cgs.CurrentTime); command.Parameters.AddWithValue("@course", NpgsqlDbType.Double, cgs.Course); command.Parameters.AddWithValue("@magnetic_variation", NpgsqlDbType.Double, cgs.MagneticVariation); command.Parameters.AddWithValue("@speed", NpgsqlDbType.Double, cgs.Speed); int v = command.ExecuteNonQuery(); if (v >= 0) { _gpsSentences.Add(cgs); } } private HashSet _gpsSentences; private bool TestForAprsGpsSentence(CachedGpsSentence cgs) { if (_gpsSentences == null) _gpsSentences = new HashSet(); if (_gpsSentences.Contains(cgs)) return true; using (NpgsqlConnection conn = new NpgsqlConnection(connectionStringBuilder.ToString())) { conn.Open(); NpgsqlCommand command = conn.CreateCommand(); command.CommandText = "SELECT dateadded FROM aprs_gps_sentences WHERE callsign = @sign AND calltime = @time"; command.Parameters.AddWithValue("@sign", NpgsqlDbType.Varchar, cgs.PacketSender); command.Parameters.AddWithValue("@time", NpgsqlDbType.Timestamp, cgs.CurrentTime); NpgsqlDataReader dataReader = command.ExecuteReader(); bool result = dataReader.Read(); dataReader.Close(); conn.Close(); if (result) { _gpsSentences.Add(cgs); } return result; } } public void AprsMessage(string sender, DateTime currentTime, string receiver, string message) { EnqueueTask(x => WriteAprsMessage(x, sender, currentTime, receiver, message)); } private void WriteAprsMessage(NpgsqlConnection connection, string sender, DateTime currentTime, string receiver, string message) { int ctime_sort = SelectAprsMessageCtimeSort(connection, currentTime); NpgsqlCommand command = connection.CreateCommand(); command.CommandText = "INSERT INTO aprs_messages (ctime,ctime_sort,sender,receiver,message) " + "VALUES (@ctime,@ctime_sort,@sender,@receiver,@message)"; command.Parameters.AddWithValue("@ctime", NpgsqlDbType.Timestamp, currentTime); command.Parameters.AddWithValue("@ctime_sort", NpgsqlDbType.Integer, ctime_sort); command.Parameters.AddWithValue("@sender", NpgsqlDbType.Varchar, sender); command.Parameters.AddWithValue("@receiver", NpgsqlDbType.Varchar, receiver); command.Parameters.AddWithValue("@message", NpgsqlDbType.Text, message.Replace("\0", "")); CheckNulls(command.Parameters); command.ExecuteNonQuery(); } private int SelectAprsMessageCtimeSort(NpgsqlConnection connection, DateTime currentTime) { int result = 0; NpgsqlCommand command = connection.CreateCommand(); command.CommandText = "SELECT MAX(ctime_sort) FROM aprs_messages WHERE ctime = @ctime"; command.Parameters.AddWithValue("@ctime", NpgsqlDbType.Timestamp, currentTime); NpgsqlDataReader dataReader = command.ExecuteReader(); if (dataReader.Read()) { if (!dataReader.IsDBNull(0)) { result = dataReader.GetInt32(0); result++; } } dataReader.Close(); return result; } } }