using Npgsql; using NpgsqlTypes; using skyscraper5.Data.PostgreSql; using skyscraper5.Dvb.Psi.Model; using skyscraper5.Skyscraper.Scraper.Storage.Utilities; using System.Data.Common; using System.Text; using Newtonsoft.Json; namespace skyscraper8.EPGCollectorPort.SkyscraperSide.Freesat; class FreesatTunnelDataStoragePostgresql : IFreesatTunnelDataStorage { private readonly PostgresqlToken token; public FreesatTunnelDataStoragePostgresql(PostgresqlToken t2) { token = t2; } private HashSet knownEitEvents; public bool StoreEitEvent(EitEvent eitEvent) { if (knownEitEvents == null) knownEitEvents = new HashSet(); DatabaseKeyEitEvent key = new DatabaseKeyEitEvent(eitEvent.ServiceId, eitEvent.TransportStreamId, eitEvent.OriginalNetworkId, eitEvent.EventId, eitEvent.StartTime); if (knownEitEvents.Contains(key)) return false; if (TestForEitEvent(key)) { knownEitEvents.Add(key); return false; } token.EnqueueTask(x => WriteEitEvent(x, eitEvent)); knownEitEvents.Add(key); return true; } private void WriteEitEvent(NpgsqlConnection connection, EitEvent eitEvent) { DatabaseKeyEitEvent key = new DatabaseKeyEitEvent(eitEvent.ServiceId, eitEvent.TransportStreamId, eitEvent.OriginalNetworkId, eitEvent.EventId, eitEvent.StartTime); if (TestForEitEventEx(key, connection)) { return; } NpgsqlCommand command = connection.CreateCommand(); command.CommandText = "INSERT INTO freesat_eit " + "(sid, tsid, onid, event_id, start_time, event) " + "VALUES " + "(@sid, @tsid, @onid, @event_id, @start_time, @event)"; command.Parameters.AddWithValue("@sid", NpgsqlDbType.Integer, (int)eitEvent.ServiceId); command.Parameters.AddWithValue("@tsid", NpgsqlDbType.Integer, (int)eitEvent.TransportStreamId); command.Parameters.AddWithValue("@onid", NpgsqlDbType.Integer, (int)eitEvent.OriginalNetworkId); command.Parameters.AddWithValue("@event_id", NpgsqlDbType.Integer, (int)eitEvent.EventId); command.Parameters.AddWithValue("@start_time", NpgsqlDbType.Timestamp, eitEvent.StartTime); command.Parameters.AddWithValue("@event", NpgsqlDbType.Json, JsonConvert.SerializeObject(eitEvent)); int executeNonQuery = command.ExecuteNonQuery(); if (executeNonQuery != 1) throw new NotImplementedException(string.Format("Wanted to insert 1 line, but it was {0}", executeNonQuery)); } private bool TestForEitEvent(DatabaseKeyEitEvent eitEvent) { using (NpgsqlConnection conn = token.GetConnection()) { conn.Open(); return TestForEitEventEx(eitEvent, conn); } } private static bool TestForEitEventEx(DatabaseKeyEitEvent eitEvent, NpgsqlConnection conn) { NpgsqlCommand command = conn.CreateCommand(); command.CommandText = "SELECT dateadded FROM freesat_eit WHERE sid = @sid AND tsid = @tsid AND onid = @onid AND event_id = @event_id AND start_time = @start_time"; command.Parameters.AddWithValue("@sid", NpgsqlDbType.Integer, (int)eitEvent.ServiceId); command.Parameters.AddWithValue("@tsid", NpgsqlDbType.Integer, (int)eitEvent.TransportStreamId); command.Parameters.AddWithValue("@onid", NpgsqlDbType.Integer, (int)eitEvent.OriginalNetworkId); command.Parameters.AddWithValue("@event_id", NpgsqlDbType.Integer, (int)eitEvent.EventId); command.Parameters.AddWithValue("@start_time", NpgsqlDbType.Timestamp, eitEvent.StartTime); NpgsqlDataReader dataReader = command.ExecuteReader(); bool result = dataReader.Read(); dataReader.Close(); return result; } private HashSet _knownUpdatedNitNetworks; public void StoreNitNetwork(NitNetwork nitNetwork) { if (_knownUpdatedNitNetworks == null) _knownUpdatedNitNetworks = new HashSet(); DatabaseKeyNitNetwork key = new DatabaseKeyNitNetwork(nitNetwork.NetworkId); token.EnqueueTask(x => WriteNit(x, nitNetwork)); knownNitNetworks.Add(key); _knownUpdatedNitNetworks.Add(key); } private void WriteNit(NpgsqlConnection connection, NitNetwork nitNetwork) { NpgsqlCommand command = connection.CreateCommand(); command.CommandText = "INSERT INTO freesat_nit (id, network) " + "VALUES (@id, @network)"; command.Parameters.AddWithValue("@id", NpgsqlDbType.Integer, (int)nitNetwork.NetworkId); command.Parameters.AddWithValue("@network", NpgsqlDbType.Json, JsonConvert.SerializeObject(nitNetwork)); command.ExecuteNonQuery(); } private HashSet knownNitNetworks; public bool TestNitNetwork(NitNetwork nitNetwork) { if (knownNitNetworks == null) knownNitNetworks = new HashSet(); DatabaseKeyNitNetwork key = new DatabaseKeyNitNetwork(nitNetwork.NetworkId); if (knownNitNetworks.Contains(key)) return true; bool result = false; using (NpgsqlConnection conn = token.GetConnection()) { conn.Open(); NpgsqlCommand command = conn.CreateCommand(); command.CommandText = "SELECT dateadded FROM freesat_nit WHERE id = @id"; command.Parameters.AddWithValue("@id", NpgsqlDbType.Integer, (int)nitNetwork.NetworkId); NpgsqlDataReader dataReader = command.ExecuteReader(); result = dataReader.Read(); dataReader.Close(); dataReader.Dispose(); command.Dispose(); conn.Close(); } if (result) knownNitNetworks.Add(key); return result; } private HashSet _knownNitTs; public bool TestForNitTransportStream(ushort networkId, NitTransportStream transportStream) { if (_knownNitTs == null) _knownNitTs = new HashSet(); DatabaseKeyNitTs key = new DatabaseKeyNitTs(networkId, transportStream.TransportStreamId); if (_knownNitTs.Contains(key)) return true; bool result = false; using (NpgsqlConnection conn = token.GetConnection()) { conn.Open(); NpgsqlCommand command = conn.CreateCommand(); command.CommandText = "SELECT dateadded FROM freesat_nit_transport_stream WHERE nid = @nid AND tsid = @tsid"; command.Parameters.AddWithValue("@nid", NpgsqlDbType.Integer, (int)networkId); command.Parameters.AddWithValue("@tsid", NpgsqlDbType.Integer, (int)transportStream.TransportStreamId); NpgsqlDataReader dataReader = command.ExecuteReader(); result = dataReader.Read(); dataReader.Close(); command.Dispose(); } if (result) _knownNitTs.Add(key); return result; } private HashSet _knownUpdatedNitTransportStream; public void StoreNitTransportStream(ushort networkId, NitTransportStream transportStream) { token.EnqueueTask(x => WriteNitTransportStream(x, networkId, transportStream)); DatabaseKeyNitTs ts = new DatabaseKeyNitTs(networkId, transportStream.TransportStreamId); _knownNitTs.Add(ts); if (_knownUpdatedNitTransportStream == null) return; _knownUpdatedNitTransportStream.Add(ts); } private void WriteNitTransportStream(NpgsqlConnection conn, ushort networkId, NitTransportStream transportStream) { /*if (TestForNitTransportStream(networkId, transportStream)) { return; }*/ NpgsqlCommand command = conn.CreateCommand(); command.CommandText = "insert into freesat_nit_transport_stream (nid, tsid, transport_stream) " + "values " + "(@nid,@tsid,@transport_stream);"; command.Parameters.AddWithValue("@nid", NpgsqlDbType.Integer, (int)networkId); command.Parameters.AddWithValue("@tsid", NpgsqlDbType.Integer, (int)transportStream.TransportStreamId); command.Parameters.AddWithValue("@transport_stream", NpgsqlDbType.Json, JsonConvert.SerializeObject(transportStream)); command.ExecuteNonQuery(); } }