427 lines
16 KiB
C#

using AprsSharp.Parsers.Aprs;
using GeoCoordinatePortable;
using Npgsql;
using NpgsqlTypes;
using skyscraper5.Aprs.AprsSharp;
using skyscraper5.Skyscraper.Scraper.Storage.Utilities;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using skyscraper5.Data.PostgreSql;
namespace skyscraper5.Aprs.AprsStorage
{
internal class AprsPostgresqlStorage : LX9SESStorage
{
private readonly PostgresqlToken _postgresql;
public AprsPostgresqlStorage(PostgresqlToken postgresql)
{
_postgresql = postgresql;
}
#region Background Worker
private Thread _writerThread;
private delegate void WriterTask(NpgsqlConnection connection);
private Queue<WriterTask> _writerTasks;
private void EnqueueTask(WriterTask task)
{
if (_writerTasks == null)
{
_writerTasks = new Queue<WriterTask>();
}
lock (_writerTasks)
{
_writerTasks.Enqueue(task);
}
if (_writerThread == null)
{
_writerThread = new Thread(WriterThreadFunction);
_writerThread.Priority = ThreadPriority.Highest;
_writerThread.Name = "PostgreSQL APRS Writer Thread";
_writerThread.Start();
}
}
private NpgsqlTransaction transaction;
private void WriterThreadFunction()
{
using (NpgsqlConnection connection = _postgresql.GetConnection())
{
connection.Open();
transaction = connection.BeginTransaction();
while (_writerTasks.Count > 0)
{
WriterTask task;
lock (_writerTasks)
{
task = _writerTasks.Dequeue();
}
task(connection);
}
transaction.Commit();
transaction.Dispose();
connection.Close();
}
_writerThread = null;
}
private void CommitTransaction(NpgsqlConnection conn)
{
transaction.Commit();
transaction.Dispose();
transaction = conn.BeginTransaction();
}
#endregion
private HashSet<string> loadedAprsPositionHistories;
private HashSet<DatabaseKeyAprsPosition> knownAprsPositions;
public bool AprsPosition(DateTime currentTime, string packetSender, GeoCoordinate positionCoordinates, string piComment)
{
if (positionCoordinates.IsNull)
return false;
if (loadedAprsPositionHistories == null)
loadedAprsPositionHistories = new HashSet<string>();
if (knownAprsPositions == null)
knownAprsPositions = new HashSet<DatabaseKeyAprsPosition>();
if (!loadedAprsPositionHistories.Contains(packetSender))
{
FetchAprsPositionHistory(packetSender);
}
DatabaseKeyAprsPosition key = new DatabaseKeyAprsPosition(currentTime, packetSender, positionCoordinates.Longitude, positionCoordinates.Latitude);
if (knownAprsPositions.Contains(key))
return false;
EnqueueTask(x => WriteAprsPosition(x, currentTime, packetSender, positionCoordinates, piComment));
knownAprsPositions.Add(key);
return true;
}
private void WriteAprsPosition(NpgsqlConnection connection, DateTime currentTime, string packetSender, GeoCoordinate positionCoordinates, string piComment)
{
NpgsqlCommand command = connection.CreateCommand();
command.CommandText =
"INSERT INTO aprs_positions (ctime, sender, lon, lat, comment) " +
"VALUES (@ctime, @sender, @lon, @lat, @comment)";
command.Parameters.AddWithValue("@ctime", NpgsqlDbType.Timestamp, currentTime);
command.Parameters.AddWithValue("@sender", NpgsqlDbType.Varchar, packetSender);
command.Parameters.AddWithValue("@lon", NpgsqlDbType.Double, positionCoordinates.Longitude);
command.Parameters.AddWithValue("@lat", NpgsqlDbType.Double, positionCoordinates.Latitude);
if (piComment != null)
piComment = piComment.Replace("\0", "");
command.Parameters.AddWithValue("@comment", NpgsqlDbType.Text, piComment);
CheckNulls(command.Parameters);
command.ExecuteNonQuery();
}
public static void CheckNulls(NpgsqlParameterCollection collection)
{
foreach (NpgsqlParameter o in collection)
{
if (o.Value == null)
o.Value = DBNull.Value;
else if (o.Value == DBNull.Value)
continue;
else
{
if (o.NpgsqlDbType == NpgsqlDbType.Text || o.NpgsqlDbType == NpgsqlDbType.Varchar)
{
string oValue = (string)o.Value;
oValue = oValue.Trim('\0');
o.Value = oValue;
}
}
}
}
private void FetchAprsPositionHistory(string packetSender)
{
using (NpgsqlConnection connection = _postgresql.GetConnection())
{
connection.Open();
NpgsqlCommand command = connection.CreateCommand();
command.CommandText = "SELECT ctime, lon, lat FROM aprs_positions WHERE sender = @sender";
command.Parameters.AddWithValue("@sender", NpgsqlDbType.Varchar, packetSender);
NpgsqlDataReader dataReader = command.ExecuteReader();
while (dataReader.Read())
{
DateTime ctime = dataReader.GetDateTime(0);
double lon = dataReader.GetDouble(1);
double lat = dataReader.GetDouble(2);
knownAprsPositions.Add(new DatabaseKeyAprsPosition(ctime, packetSender, lon, lat));
}
dataReader.Close();
command.Dispose();
connection.Close();
}
loadedAprsPositionHistories.Add(packetSender);
}
private HashSet<string> knownAprsWeatherReporters;
private HashSet<DatabaseKeyAprsWeatherReport> knownAprsWeatherReports;
public bool AprsWeatherReport(DateTime currentTime, string packetSender, WeatherInfo wi)
{
if (knownAprsWeatherReporters == null)
knownAprsWeatherReporters = new HashSet<string>();
if (knownAprsWeatherReports == null)
knownAprsWeatherReports = new HashSet<DatabaseKeyAprsWeatherReport>();
if (!knownAprsWeatherReporters.Contains(packetSender))
SelectAprsWeatherReports(packetSender);
DatabaseKeyAprsWeatherReport key = new DatabaseKeyAprsWeatherReport(packetSender, currentTime);
if (knownAprsWeatherReports.Contains(key))
return false;
EnqueueTask(x => WriteAprsWeatherReport(x, currentTime, packetSender, wi));
knownAprsWeatherReports.Add(key);
return true;
}
private void SelectAprsWeatherReports(string sender)
{
using (NpgsqlConnection connection = _postgresql.GetConnection())
{
connection.Open();
NpgsqlCommand command = connection.CreateCommand();
command.CommandText = "SELECT rtime FROM aprs_weather_reports WHERE sender = @sender";
command.Parameters.AddWithValue("@sender", NpgsqlDbType.Varchar, sender);
NpgsqlDataReader dataReader = command.ExecuteReader();
while (dataReader.Read())
{
knownAprsWeatherReports.Add(new DatabaseKeyAprsWeatherReport(sender, dataReader.GetDateTime(0)));
}
dataReader.Close();
command.Dispose();
connection.Close();
}
knownAprsWeatherReporters.Add(sender);
}
private void WriteAprsWeatherReport(NpgsqlConnection connection, DateTime currentTime, string packetSender, WeatherInfo wi)
{
NpgsqlCommand command = connection.CreateCommand();
command.CommandText =
"insert into aprs_weather_reports (sender, rtime, lat, lon, has_messaging, ts, comment, wind_direction,\r\n wind_speed, wind_gust, temperature, rainfall_hour, rainfall_day,\r\n rainfall_since_midnight, humidity, barometric_pressure, luminosity, rain_raw, snow) " +
"values " +
"(@sender,@rtime,@lat,@lon,@has_messaging,@ts,@comment,@wind_direction,@wind_speed,@wind_gust,@temperature,@rainfall_hour,@rainfall_day,@rainfall_since_midnight,@humidity,@barometric_pressure,@luminosity,@rain_raw,@snow)";
command.Parameters.AddWithValue("@sender", NpgsqlDbType.Varchar, packetSender);
command.Parameters.AddWithValue("@rtime", NpgsqlDbType.Timestamp, currentTime);
GeoCoordinate positionCoordinates = new GeoCoordinate();
if (wi.Position != null)
{
if (!wi.Position.Coordinates.IsNull)
positionCoordinates = wi.Position.Coordinates;
}
command.Parameters.AddWithValue("@lat", NpgsqlDbType.Double, positionCoordinates.IsNull ? DBNull.Value : positionCoordinates.Latitude);
command.Parameters.AddWithValue("@lon", NpgsqlDbType.Double, positionCoordinates.IsNull ? DBNull.Value : positionCoordinates.Longitude);
command.Parameters.AddWithValue("@has_messaging", NpgsqlDbType.Boolean, wi.HasMessaging);
command.Parameters.AddWithValue("@ts", NpgsqlDbType.TimestampTz, wi.Timestamp?.DateTime.ToUniversalTime());
command.Parameters.AddWithValue("@comment", NpgsqlDbType.Text, wi.Comment.Replace("\0", ""));
command.Parameters.AddWithValue("@wind_direction", NpgsqlDbType.Integer, wi.WindDirection);
command.Parameters.AddWithValue("@wind_speed", NpgsqlDbType.Integer, wi.WindSpeed);
command.Parameters.AddWithValue("@wind_gust", NpgsqlDbType.Integer, wi.WindGust);
command.Parameters.AddWithValue("@temperature", NpgsqlDbType.Integer, wi.Temperature);
command.Parameters.AddWithValue("@rainfall_hour", NpgsqlDbType.Integer, wi.Rainfall1Hour);
command.Parameters.AddWithValue("@rainfall_day", NpgsqlDbType.Integer, wi.Rainfall24Hour);
command.Parameters.AddWithValue("@rainfall_since_midnight", NpgsqlDbType.Integer, wi.RainfallSinceMidnight);
command.Parameters.AddWithValue("@humidity", NpgsqlDbType.Integer, wi.Humidity);
command.Parameters.AddWithValue("@barometric_pressure", NpgsqlDbType.Integer, wi.BarometricPressure);
command.Parameters.AddWithValue("@luminosity", NpgsqlDbType.Integer, wi.Luminosity);
command.Parameters.AddWithValue("@rain_raw", NpgsqlDbType.Integer, wi.RainRaw);
command.Parameters.AddWithValue("@snow", NpgsqlDbType.Integer, wi.Snow);
CheckNulls(command.Parameters);
command.ExecuteNonQuery();
}
private HashSet<string> knownAprsStationCapabilities;
public bool AprsStationCapabilities(string packetSender, StationCapabilities stationCapabilities)
{
if (knownAprsStationCapabilities == null)
knownAprsStationCapabilities = new HashSet<string>();
if (knownAprsStationCapabilities.Contains(packetSender))
return false;
bool inDb = TestForAprsStationCapabilities(packetSender);
if (inDb)
{
knownAprsStationCapabilities.Add(packetSender);
return false;
}
EnqueueTask(x => WriteAprsStationCapabilities(x, packetSender, stationCapabilities));
knownAprsStationCapabilities.Add(packetSender);
return true;
}
private void WriteAprsStationCapabilities(NpgsqlConnection connection, string packetSender, StationCapabilities stationCapabilities)
{
NpgsqlCommand command = connection.CreateCommand();
command.CommandText =
"INSERT INTO aprs_station_capabilities (sender, isl_gate, local_stations, message_count) " +
"VALUES (@sender, @isl_gate, @local_stations, @message_count)";
command.Parameters.AddWithValue("@sender", NpgsqlDbType.Varchar, packetSender);
command.Parameters.AddWithValue("@isl_gate", NpgsqlDbType.Boolean, stationCapabilities.IsIGate);
command.Parameters.AddWithValue("@local_stations", NpgsqlDbType.Integer, stationCapabilities.LocalStations);
command.Parameters.AddWithValue("@message_count", NpgsqlDbType.Integer, stationCapabilities.MessageCount);
command.ExecuteNonQuery();
}
private bool TestForAprsStationCapabilities(string sender)
{
bool result;
using (NpgsqlConnection conn = _postgresql.GetConnection())
{
conn.Open();
NpgsqlCommand command = conn.CreateCommand();
command.CommandText = "SELECT dateadded FROM aprs_station_capabilities WHERE sender = @sender";
command.Parameters.AddWithValue("@sender", NpgsqlDbType.Varchar, sender);
NpgsqlDataReader dataReader = command.ExecuteReader();
result = dataReader.Read();
dataReader.Close();
command.Dispose();
conn.Close();
}
return result;
}
class CachedGpsSentence
{
public CachedGpsSentence(DateTime currentTime, string packetSender, double? course, double? magneticVariation, double? speed)
{
CurrentTime = currentTime;
PacketSender = packetSender;
Course = course;
MagneticVariation = magneticVariation;
Speed = speed;
}
public DateTime CurrentTime { get; }
public string PacketSender { get; }
public double? Course { get; }
public double? MagneticVariation { get; }
public double? Speed { get; }
public override bool Equals(object? obj)
{
return obj is CachedGpsSentence sentence &&
CurrentTime == sentence.CurrentTime &&
PacketSender == sentence.PacketSender;
}
public override int GetHashCode()
{
return HashCode.Combine(CurrentTime, PacketSender);
}
}
public bool AprsGpsSentence(DateTime currentTime, string packetSender, double? course, double? magneticVariation,
double? speed)
{
CachedGpsSentence cgs = new CachedGpsSentence(currentTime, packetSender, course, magneticVariation, speed);
if (TestForAprsGpsSentence(cgs))
{
return false;
}
EnqueueTask(x => WriteAprsGpsSentence(x,cgs));
return true;
}
private void WriteAprsGpsSentence(NpgsqlConnection conn, CachedGpsSentence cgs)
{
NpgsqlCommand command = conn.CreateCommand();
command.CommandText = "insert into aprs_gps_sentences (callsign, calltime, course, magnetic_variation, speed) values (@sign,@time,@course,@magnetic_variation, @speed)";
command.Parameters.AddWithValue("@sign", NpgsqlDbType.Varchar, cgs.PacketSender);
command.Parameters.AddWithValue("@time", NpgsqlDbType.Timestamp, cgs.CurrentTime);
command.Parameters.AddWithValue("@course", NpgsqlDbType.Double, cgs.Course);
command.Parameters.AddWithValue("@magnetic_variation", NpgsqlDbType.Double, cgs.MagneticVariation);
command.Parameters.AddWithValue("@speed", NpgsqlDbType.Double, cgs.Speed);
int v = command.ExecuteNonQuery();
if (v >= 0)
{
_gpsSentences.Add(cgs);
}
}
private HashSet<CachedGpsSentence> _gpsSentences;
private bool TestForAprsGpsSentence(CachedGpsSentence cgs)
{
if (_gpsSentences == null)
_gpsSentences = new HashSet<CachedGpsSentence>();
if (_gpsSentences.Contains(cgs))
return true;
using (NpgsqlConnection conn = _postgresql.GetConnection())
{
conn.Open();
NpgsqlCommand command = conn.CreateCommand();
command.CommandText = "SELECT dateadded FROM aprs_gps_sentences WHERE callsign = @sign AND calltime = @time";
command.Parameters.AddWithValue("@sign", NpgsqlDbType.Varchar, cgs.PacketSender);
command.Parameters.AddWithValue("@time", NpgsqlDbType.Timestamp, cgs.CurrentTime);
NpgsqlDataReader dataReader = command.ExecuteReader();
bool result = dataReader.Read();
dataReader.Close();
conn.Close();
if (result)
{
_gpsSentences.Add(cgs);
}
return result;
}
}
public void AprsMessage(string sender, DateTime currentTime, string receiver, string message)
{
EnqueueTask(x => WriteAprsMessage(x, sender, currentTime, receiver, message));
}
private void WriteAprsMessage(NpgsqlConnection connection, string sender, DateTime currentTime, string receiver, string message)
{
int ctime_sort = SelectAprsMessageCtimeSort(connection, currentTime);
NpgsqlCommand command = connection.CreateCommand();
command.CommandText =
"INSERT INTO aprs_messages (ctime,ctime_sort,sender,receiver,message) " +
"VALUES (@ctime,@ctime_sort,@sender,@receiver,@message)";
command.Parameters.AddWithValue("@ctime", NpgsqlDbType.Timestamp, currentTime);
command.Parameters.AddWithValue("@ctime_sort", NpgsqlDbType.Integer, ctime_sort);
command.Parameters.AddWithValue("@sender", NpgsqlDbType.Varchar, sender);
command.Parameters.AddWithValue("@receiver", NpgsqlDbType.Varchar, receiver);
command.Parameters.AddWithValue("@message", NpgsqlDbType.Text, message.Replace("\0", ""));
CheckNulls(command.Parameters);
command.ExecuteNonQuery();
}
private int SelectAprsMessageCtimeSort(NpgsqlConnection connection, DateTime currentTime)
{
int result = 0;
NpgsqlCommand command = connection.CreateCommand();
command.CommandText = "SELECT MAX(ctime_sort) FROM aprs_messages WHERE ctime = @ctime";
command.Parameters.AddWithValue("@ctime", NpgsqlDbType.Timestamp, currentTime);
NpgsqlDataReader dataReader = command.ExecuteReader();
if (dataReader.Read())
{
if (!dataReader.IsDBNull(0))
{
result = dataReader.GetInt32(0);
result++;
}
}
dataReader.Close();
return result;
}
}
}