201 lines
7.4 KiB
C#

using Npgsql;
using NpgsqlTypes;
using skyscraper5.Data.PostgreSql;
using skyscraper5.Dvb.Psi.Model;
using skyscraper5.Skyscraper.Scraper.Storage.Utilities;
using System.Data.Common;
using System.Text;
using Newtonsoft.Json;
namespace skyscraper8.EPGCollectorPort.SkyscraperSide.Freesat;
class FreesatTunnelDataStoragePostgresql : IFreesatTunnelDataStorage
{
private readonly PostgresqlToken token;
public FreesatTunnelDataStoragePostgresql(PostgresqlToken t2)
{
token = t2;
}
private HashSet<DatabaseKeyEitEvent> knownEitEvents;
public bool StoreEitEvent(EitEvent eitEvent)
{
if (knownEitEvents == null)
knownEitEvents = new HashSet<DatabaseKeyEitEvent>();
DatabaseKeyEitEvent key = new DatabaseKeyEitEvent(eitEvent.ServiceId, eitEvent.TransportStreamId, eitEvent.OriginalNetworkId, eitEvent.EventId, eitEvent.StartTime);
if (knownEitEvents.Contains(key))
return false;
if (TestForEitEvent(key))
{
knownEitEvents.Add(key);
return false;
}
token.EnqueueTask(x => WriteEitEvent(x, eitEvent));
knownEitEvents.Add(key);
return true;
}
private void WriteEitEvent(NpgsqlConnection connection, EitEvent eitEvent)
{
DatabaseKeyEitEvent key = new DatabaseKeyEitEvent(eitEvent.ServiceId, eitEvent.TransportStreamId, eitEvent.OriginalNetworkId, eitEvent.EventId, eitEvent.StartTime);
if (TestForEitEventEx(key, connection))
{
return;
}
NpgsqlCommand command = connection.CreateCommand();
command.CommandText =
"INSERT INTO freesat_eit " +
"(sid, tsid, onid, event_id, start_time, event) " +
"VALUES " +
"(@sid, @tsid, @onid, @event_id, @start_time, @event)";
command.Parameters.AddWithValue("@sid", NpgsqlDbType.Integer, (int)eitEvent.ServiceId);
command.Parameters.AddWithValue("@tsid", NpgsqlDbType.Integer, (int)eitEvent.TransportStreamId);
command.Parameters.AddWithValue("@onid", NpgsqlDbType.Integer, (int)eitEvent.OriginalNetworkId);
command.Parameters.AddWithValue("@event_id", NpgsqlDbType.Integer, (int)eitEvent.EventId);
command.Parameters.AddWithValue("@start_time", NpgsqlDbType.Timestamp, eitEvent.StartTime);
command.Parameters.AddWithValue("@event", NpgsqlDbType.Json, JsonConvert.SerializeObject(eitEvent));
int executeNonQuery = command.ExecuteNonQuery();
if (executeNonQuery != 1)
throw new NotImplementedException(string.Format("Wanted to insert 1 line, but it was {0}", executeNonQuery));
}
private bool TestForEitEvent(DatabaseKeyEitEvent eitEvent)
{
using (NpgsqlConnection conn = token.GetConnection())
{
conn.Open();
return TestForEitEventEx(eitEvent, conn);
}
}
private static bool TestForEitEventEx(DatabaseKeyEitEvent eitEvent, NpgsqlConnection conn)
{
NpgsqlCommand command = conn.CreateCommand();
command.CommandText =
"SELECT dateadded FROM freesat_eit WHERE sid = @sid AND tsid = @tsid AND onid = @onid AND event_id = @event_id AND start_time = @start_time";
command.Parameters.AddWithValue("@sid", NpgsqlDbType.Integer, (int)eitEvent.ServiceId);
command.Parameters.AddWithValue("@tsid", NpgsqlDbType.Integer, (int)eitEvent.TransportStreamId);
command.Parameters.AddWithValue("@onid", NpgsqlDbType.Integer, (int)eitEvent.OriginalNetworkId);
command.Parameters.AddWithValue("@event_id", NpgsqlDbType.Integer, (int)eitEvent.EventId);
command.Parameters.AddWithValue("@start_time", NpgsqlDbType.Timestamp, eitEvent.StartTime);
NpgsqlDataReader dataReader = command.ExecuteReader();
bool result = dataReader.Read();
dataReader.Close();
return result;
}
private HashSet<DatabaseKeyNitNetwork> _knownUpdatedNitNetworks;
public void StoreNitNetwork(NitNetwork nitNetwork)
{
if (_knownUpdatedNitNetworks == null)
_knownUpdatedNitNetworks = new HashSet<DatabaseKeyNitNetwork>();
DatabaseKeyNitNetwork key = new DatabaseKeyNitNetwork(nitNetwork.NetworkId);
token.EnqueueTask(x => WriteNit(x, nitNetwork));
knownNitNetworks.Add(key);
_knownUpdatedNitNetworks.Add(key);
}
private void WriteNit(NpgsqlConnection connection, NitNetwork nitNetwork)
{
NpgsqlCommand command = connection.CreateCommand();
command.CommandText =
"INSERT INTO freesat_nit (id, network) " +
"VALUES (@id, @network)";
command.Parameters.AddWithValue("@id", NpgsqlDbType.Integer, (int)nitNetwork.NetworkId);
command.Parameters.AddWithValue("@network", NpgsqlDbType.Json, JsonConvert.SerializeObject(nitNetwork));
command.ExecuteNonQuery();
}
private HashSet<DatabaseKeyNitNetwork> knownNitNetworks;
public bool TestNitNetwork(NitNetwork nitNetwork)
{
if (knownNitNetworks == null)
knownNitNetworks = new HashSet<DatabaseKeyNitNetwork>();
DatabaseKeyNitNetwork key = new DatabaseKeyNitNetwork(nitNetwork.NetworkId);
if (knownNitNetworks.Contains(key))
return true;
bool result = false;
using (NpgsqlConnection conn = token.GetConnection())
{
conn.Open();
NpgsqlCommand command = conn.CreateCommand();
command.CommandText = "SELECT dateadded FROM freesat_nit WHERE id = @id";
command.Parameters.AddWithValue("@id", NpgsqlDbType.Integer, (int)nitNetwork.NetworkId);
NpgsqlDataReader dataReader = command.ExecuteReader();
result = dataReader.Read();
dataReader.Close();
dataReader.Dispose();
command.Dispose();
conn.Close();
}
if (result)
knownNitNetworks.Add(key);
return result;
}
private HashSet<DatabaseKeyNitTs> _knownNitTs;
public bool TestForNitTransportStream(ushort networkId, NitTransportStream transportStream)
{
if (_knownNitTs == null)
_knownNitTs = new HashSet<DatabaseKeyNitTs>();
DatabaseKeyNitTs key = new DatabaseKeyNitTs(networkId, transportStream.TransportStreamId);
if (_knownNitTs.Contains(key))
return true;
bool result = false;
using (NpgsqlConnection conn = token.GetConnection())
{
conn.Open();
NpgsqlCommand command = conn.CreateCommand();
command.CommandText =
"SELECT dateadded FROM freesat_nit_transport_stream WHERE nid = @nid AND tsid = @tsid";
command.Parameters.AddWithValue("@nid", NpgsqlDbType.Integer, (int)networkId);
command.Parameters.AddWithValue("@tsid", NpgsqlDbType.Integer, (int)transportStream.TransportStreamId);
NpgsqlDataReader dataReader = command.ExecuteReader();
result = dataReader.Read();
dataReader.Close();
command.Dispose();
}
if (result)
_knownNitTs.Add(key);
return result;
}
private HashSet<DatabaseKeyNitTs> _knownUpdatedNitTransportStream;
public void StoreNitTransportStream(ushort networkId, NitTransportStream transportStream)
{
token.EnqueueTask(x => WriteNitTransportStream(x, networkId, transportStream));
DatabaseKeyNitTs ts = new DatabaseKeyNitTs(networkId, transportStream.TransportStreamId);
_knownNitTs.Add(ts);
if (_knownUpdatedNitTransportStream == null)
return;
_knownUpdatedNitTransportStream.Add(ts);
}
private void WriteNitTransportStream(NpgsqlConnection conn, ushort networkId, NitTransportStream transportStream)
{
/*if (TestForNitTransportStream(networkId, transportStream))
{
return;
}*/
NpgsqlCommand command = conn.CreateCommand();
command.CommandText =
"insert into freesat_nit_transport_stream (nid, tsid, transport_stream) " +
"values " +
"(@nid,@tsid,@transport_stream);";
command.Parameters.AddWithValue("@nid", NpgsqlDbType.Integer, (int)networkId);
command.Parameters.AddWithValue("@tsid", NpgsqlDbType.Integer, (int)transportStream.TransportStreamId);
command.Parameters.AddWithValue("@transport_stream", NpgsqlDbType.Json, JsonConvert.SerializeObject(transportStream));
command.ExecuteNonQuery();
}
}