using skyscraper5.Skyscraper.Scraper.Storage.Split; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using MySqlConnector; using skyscraper5.Dvb.Descriptors; using skyscraper5.Dvb.Psi.Model; namespace skyscraper5.Data.MySql { public partial class MySqlDataStorage : DataStorage { private bool TestForEitEvent(MySqlConnection connector, EitEvent eitEvent, MySqlTransaction transaction) { MySqlCommand command = connector.CreateCommand(); if (transaction != null) command.Transaction = transaction; command.CommandText = "SELECT dateadded FROM dvb_eit WHERE onid = @onid AND tsid = @tsid AND service_id = @service_id AND start_time = @start_time"; command.Parameters.AddWithValue("@onid", eitEvent.OriginalNetworkId); command.Parameters.AddWithValue("@tsid", eitEvent.TransportStreamId); command.Parameters.AddWithValue("@service_id", eitEvent.ServiceId); command.Parameters.AddWithValue("@start_time", eitEvent.StartTime); MySqlDataReader dataReader = command.ExecuteReader(); bool result = dataReader.Read(); dataReader.Close(); return result; } private void InsertEitCaSystem(MySqlConnection connection, MySqlTransaction transaction, EitEvent eitEvent) { if (eitEvent.CaSystems == null) return; if (eitEvent.CaSystems.Length == 0) return; MySqlCommand command = connection.CreateCommand(); command.Transaction = transaction; command.CommandText = "INSERT INTO dvb_eit_ca_systems (onid, tsid, service_id, start_time, ca_system) VALUES (@onid, @tsid, @service_id, @start_time, @ca_system)"; command.Parameters.AddWithValue("@onid", eitEvent.OriginalNetworkId); command.Parameters.AddWithValue("@tsid", eitEvent.TransportStreamId); command.Parameters.AddWithValue("@service_id", eitEvent.ServiceId); command.Parameters.AddWithValue("@start_time", eitEvent.StartTime); command.Parameters.Add("@ca_system", MySqlDbType.Int32); foreach (ushort caSystem in eitEvent.CaSystems) { command.Parameters["@ca_system"].Value = caSystem; command.ExecuteNonQuery(); } } private void InsertEitComponent(MySqlConnection connection, MySqlTransaction transaction, EitEvent eitEvent) { if (eitEvent.Components == null) return; if (eitEvent.Components.Count == 0) return; MySqlCommand command = connection.CreateCommand(); command.Transaction = transaction; command.CommandText = "INSERT INTO dvb_eit_component (onid, tsid, service_id, start_time, text, iso_639_langauge_code, component_tag, component_type, stream_content, stream_content_ext) " + "VALUES (@onid, @tsid, @service_id, @start_time, @text, @iso_639_langauge_code, @component_tag, @component_type, @stream_content, @stream_content_ext)"; command.Parameters.AddWithValue("@onid", eitEvent.OriginalNetworkId); command.Parameters.AddWithValue("@tsid", eitEvent.TransportStreamId); command.Parameters.AddWithValue("@service_id", eitEvent.ServiceId); command.Parameters.AddWithValue("@start_time", eitEvent.StartTime); command.Parameters.Add("@text", MySqlDbType.VarChar); command.Parameters.Add("@iso_639_langauge_code", MySqlDbType.VarChar); command.Parameters.Add("@component_tag", MySqlDbType.Int16); command.Parameters.Add("@component_type", MySqlDbType.Int16); command.Parameters.Add("@stream_content", MySqlDbType.Int32); command.Parameters.Add("@stream_content_ext", MySqlDbType.Int32); bool[] usedComponentTags = new bool[256]; foreach (ComponentDescriptor component in eitEvent.Components) { if (usedComponentTags[component.ComponentTag]) continue; usedComponentTags[component.ComponentTag] = true; command.Parameters["@text"].Value = component.Text; command.Parameters["@iso_639_langauge_code"].Value = component.Iso639LanguageCode; command.Parameters["@component_tag"].Value = component.ComponentTag; command.Parameters["@component_type"].Value = component.ComponentType; command.Parameters["@stream_content"].Value = component.StreamContent; command.Parameters["@stream_content_ext"].Value = component.StreamContentExt; command.ExecuteNonQuery(); } } private void InsertEitContent(MySqlConnection connection, MySqlTransaction transaction, EitEvent eitEvent) { if (eitEvent.Content == null) return; if (eitEvent.Content.Length == 0) return; MySqlCommand command = connection.CreateCommand(); command.Transaction = transaction; command.CommandText = "INSERT INTO dvb_eit_content (onid, tsid, service_id, start_time, l, m, r) VALUES (@onid, @tsid, @service_id, @start_time, @l, @m, @r)"; command.Parameters.AddWithValue("@onid", eitEvent.OriginalNetworkId); command.Parameters.AddWithValue("@tsid", eitEvent.TransportStreamId); command.Parameters.AddWithValue("@service_id", eitEvent.ServiceId); command.Parameters.AddWithValue("@start_time", eitEvent.StartTime); command.Parameters.Add("@l", MySqlDbType.Int32); command.Parameters.Add("@m", MySqlDbType.Int32); command.Parameters.Add("@r", MySqlDbType.Int32); foreach (Tuple tuple in eitEvent.Content) { command.Parameters["@l"].Value = tuple.Item1; command.Parameters["@m"].Value = tuple.Item2; command.Parameters["@r"].Value = tuple.Item3; command.ExecuteNonQuery(); } } private void InsertEitCrids(MySqlConnection connection, MySqlTransaction transaction, EitEvent eitEvent) { if (eitEvent.Crids == null) return; if (eitEvent.Crids.Count == 0) return; MySqlCommand command = connection.CreateCommand(); command.Transaction = transaction; command.CommandText = "INSERT INTO dvb_eit_crids (onid, tsid, service_id, start_time, ordinal, crid_type, crid_location, crid_bytes) " + "VALUES (@onid, @tsid, @service_id, @start_time, @ordinal, @crid_type, @crid_location, @crid_bytes)"; command.Parameters.AddWithValue("@onid", eitEvent.OriginalNetworkId); command.Parameters.AddWithValue("@tsid", eitEvent.TransportStreamId); command.Parameters.AddWithValue("@service_id", eitEvent.ServiceId); command.Parameters.AddWithValue("@start_time", eitEvent.StartTime); command.Parameters.Add("@ordinal", MySqlDbType.Int32); command.Parameters.Add("@crid_type", MySqlDbType.Int32); command.Parameters.Add("@crid_location", MySqlDbType.Int32); command.Parameters.Add("@crid_bytes", MySqlDbType.TinyBlob); for (int i = 0; i < eitEvent.Crids.Count; i++) { command.Parameters["@ordinal"].Value = i; command.Parameters["@crid_type"].Value = (int)eitEvent.Crids[i].CridType; command.Parameters["@crid_location"].Value = (int)eitEvent.Crids[i].CridLocation; command.Parameters["@crid_bytes"].Value = eitEvent.Crids[i].CridBytes; command.ExecuteNonQuery(); } } private void InsertEitEventItems(MySqlConnection connection, MySqlTransaction transaction, EitEvent eitEvent) { if (eitEvent.Items == null) return; if (eitEvent.Items.Count == 0) return; MySqlCommand command = connection.CreateCommand(); command.Transaction = transaction; command.CommandText = "INSERT INTO dvb_eit_event_items (onid, tsid, service_id, start_time, ordinal, description, item) " + "VALUES (@onid, @tsid, @service_id, @start_time, @ordinal, @description, @item)"; command.Parameters.AddWithValue("@onid", eitEvent.OriginalNetworkId); command.Parameters.AddWithValue("@tsid", eitEvent.TransportStreamId); command.Parameters.AddWithValue("@service_id", eitEvent.ServiceId); command.Parameters.AddWithValue("@start_time", eitEvent.StartTime); command.Parameters.Add("@ordinal", MySqlDbType.Int32); command.Parameters.Add("@description", MySqlDbType.VarChar); command.Parameters.Add("@item", MySqlDbType.MediumText); for (int i = 0; i < eitEvent.Items.Count; i++) { command.Parameters["@ordinal"].Value = i; command.Parameters["@description"].Value = eitEvent.Items[i].Description; if (eitEvent.Items[i].Description == null) continue; command.Parameters["@item"].Value = eitEvent.Items[i].Item; if (eitEvent.Items[i].Item == null) continue; SetNulls(command); command.ExecuteNonQuery(); } } private void InsertEitParentalRatings(MySqlConnection connection, MySqlTransaction transaction, EitEvent eitEvent) { if (eitEvent.ParentalRatings == null) return; if (eitEvent.ParentalRatings.Length == 0) return; MySqlCommand command = connection.CreateCommand(); command.Transaction = transaction; command.CommandText = "INSERT INTO dvb_eit_parental_ratings (onid, tsid, service_id, start_time, country, age) " + "VALUES (@onid, @tsid, @service_id, @start_time, @country, @age)"; command.Parameters.AddWithValue("@onid", eitEvent.OriginalNetworkId); command.Parameters.AddWithValue("@tsid", eitEvent.TransportStreamId); command.Parameters.AddWithValue("@service_id", eitEvent.ServiceId); command.Parameters.AddWithValue("@start_time", eitEvent.StartTime); command.Parameters.Add("@country", MySqlDbType.VarChar); command.Parameters.Add("@age", MySqlDbType.Int32); foreach (Tuple parentalRating in eitEvent.ParentalRatings) { if (parentalRating == null) continue; command.Parameters["@country"].Value = parentalRating.Item1; command.Parameters["@age"].Value = parentalRating.Item2; command.ExecuteNonQuery(); } } private Queue queuedEitEvents; private HashSet eitCoordinatesCacheIndex; private HashSet eitCoordinatesCache; private Thread eitWriterThread; private bool IsEitWriterThreadRunning() { if (eitWriterThread == null) return false; if (!eitWriterThread.IsAlive) return false; return true; } private void EitWriterThreadFunction() { using (MySqlConnection connection = new MySqlConnection(_mcsb.ToString())) { connection.Open(); MySqlTransaction transaction = connection.BeginTransaction(); while (true) { EitEvent eitEvent = null; lock (queuedEitEvents) { if (queuedEitEvents.Count == 0) break; eitEvent = queuedEitEvents.Dequeue(); if (queuedEitEvents.Count % 100 == 0) { //Console.WriteLine("{0} EIT events left to write...", queuedEitEvents.Count); } } if (!TestForEitEvent(connection, eitEvent, transaction)) { bool hasLinkages = HasLinkages(eitEvent.Linkages); Guid guid = hasLinkages ? Guid.NewGuid() : Guid.Empty; MySqlCommand command = connection.CreateCommand(); command.Transaction = transaction; command.CommandText = "INSERT INTO dvb_eit " + " (onid, tsid, service_id, event_id, start_time, duration, running_status, free_ca, iso_639_language_code, event_name, text, extended_text, pdc, private_data_specifier, vps, uuid, reference_service_id, reference_event_id, control_remove_access_over_internet, do_not_apply_revocation, do_not_scramble)" + "VALUES" + " (@onid, @tsid, @service_id, @event_id, @start_time, @duration, @running_status, @free_ca, @iso_639_language_code, @event_name, @text, @extended_text, @pdc, @private_data_specifier, " + " @vps, @uuid, @reference_service_id, @reference_event_id, @control_remove_access_over_internet, @do_not_apply_revocation, @do_not_scramble)"; command.Parameters.AddWithValue("@onid", eitEvent.OriginalNetworkId); command.Parameters.AddWithValue("@tsid", eitEvent.TransportStreamId); command.Parameters.AddWithValue("@service_id", eitEvent.ServiceId); command.Parameters.AddWithValue("@event_id", eitEvent.EventId); command.Parameters.AddWithValue("@start_time", eitEvent.StartTime); command.Parameters.AddWithValue("@duration", (int)eitEvent.Duration.TotalSeconds); command.Parameters.AddWithValue("@running_status", (int)eitEvent.RunningStatus); command.Parameters.AddWithValue("@free_ca", eitEvent.FreeCa); command.Parameters.AddWithValue("@iso_639_language_code", eitEvent.Iso639LanguageCode); command.Parameters.AddWithValue("@event_name", eitEvent.EventName); command.Parameters.AddWithValue("@text", eitEvent.Text); if (eitEvent.ExtendedText != null) command.Parameters.AddWithValue("@extended_text", string.Join("", eitEvent.ExtendedText)); else command.Parameters.AddWithValue("@extended_text", DBNull.Value); command.Parameters.AddWithValue("@pdc", eitEvent.Pdc); command.Parameters.AddWithValue("@private_data_specifier", eitEvent.PrivateDataSpecifier); command.Parameters.AddWithValue("@vps", eitEvent.VpsString); if (hasLinkages) command.Parameters.AddWithValue("@uuid", guid); else command.Parameters.AddWithValue("@uuid", DBNull.Value); command.Parameters.AddWithValue("@reference_service_id", eitEvent.ReferenceServiceId); command.Parameters.AddWithValue("@reference_event_id", eitEvent.ReferenceEventId); command.Parameters.AddWithValue("@control_remove_access_over_internet", eitEvent.ControlRemoteAccessOverInternet); command.Parameters.AddWithValue("@do_not_apply_revocation", eitEvent.DoNotApplyRevocation); command.Parameters.AddWithValue("@do_not_scramble", eitEvent.DoNotScramble); SetNulls(command); command.ExecuteNonQuery(); InsertEitCaSystem(connection, transaction, eitEvent); InsertEitComponent(connection, transaction, eitEvent); InsertEitContent(connection, transaction, eitEvent); InsertEitCrids(connection, transaction, eitEvent); InsertEitEventItems(connection, transaction, eitEvent); InsertEitParentalRatings(connection, transaction, eitEvent); if (hasLinkages) InsertSdtLinkages(connection, guid, eitEvent.Linkages, transaction); } } transaction.Commit(); connection.Close(); } } private struct EitCoordinates { public ushort onid, tsid, sid; public DateTime starttime; private EitCoordinates(ushort onid1, ushort tsid1, ushort sid1, int starttimeYear, int starttimeMonth) { onid = onid1; tsid = tsid1; sid = sid1; starttime = new DateTime(starttimeYear, starttimeMonth, 1, 0, 0, 0); } public EitCoordinates(ushort onid, ushort tsid, ushort sid, DateTime starttime) { this.onid = onid; this.tsid = tsid; this.sid = sid; this.starttime = starttime; } public EitCoordinates(EitEvent eitEvent) { onid = eitEvent.OriginalNetworkId; tsid = eitEvent.TransportStreamId; sid = eitEvent.ServiceId; starttime = eitEvent.StartTime; } public EitCoordinates(EitCoordinates coordinatesRounded, DateTime getDateTime) { onid = coordinatesRounded.onid; tsid = coordinatesRounded.tsid; sid = coordinatesRounded.sid; starttime = getDateTime; } public bool Equals(EitCoordinates other) { return onid == other.onid && tsid == other.tsid && sid == other.sid && starttime.Equals(other.starttime); } public override bool Equals(object obj) { return obj is EitCoordinates other && Equals(other); } public override int GetHashCode() { return HashCode.Combine(onid, tsid, sid, starttime); } public EitCoordinates Round() { return new EitCoordinates(onid, tsid, sid, starttime.Year, starttime.Month); } } private void PrecacheEitEntries(EitCoordinates coordinatesRounded) { if (eitCoordinatesCache == null) eitCoordinatesCache = new HashSet(); using (MySqlConnection connection = new MySqlConnection(_mcsb.ToString())) { connection.Open(); MySqlCommand command = connection.CreateCommand(); command.CommandText = "SELECT start_time FROM dvb_eit WHERE onid = @onid AND tsid = @tsid AND service_id = @service_id AND YEAR(start_time) = @year AND MONTH(start_time) = @month"; command.Parameters.AddWithValue("@onid", coordinatesRounded.onid); command.Parameters.AddWithValue("@tsid", coordinatesRounded.tsid); command.Parameters.AddWithValue("@service_id", coordinatesRounded.sid); command.Parameters.AddWithValue("@year", coordinatesRounded.starttime.Year); command.Parameters.AddWithValue("@month", coordinatesRounded.starttime.Month); MySqlDataReader dataReader = command.ExecuteReader(); while (dataReader.Read()) { eitCoordinatesCache.Add(new EitCoordinates(coordinatesRounded, dataReader.GetDateTime(0))); } dataReader.Close(); connection.Close(); } eitCoordinatesCacheIndex.Add(coordinatesRounded); } public bool StoreEitEvent(EitEvent eitEvent) { EitCoordinates coordinates = new EitCoordinates(eitEvent); EitCoordinates coordinatesRounded = coordinates.Round(); if (eitCoordinatesCacheIndex == null) eitCoordinatesCacheIndex = new HashSet(); if (!eitCoordinatesCacheIndex.Contains(coordinatesRounded)) { PrecacheEitEntries(coordinatesRounded); } if (eitCoordinatesCache.Contains(coordinates)) return false; if (queuedEitEvents == null) queuedEitEvents = new Queue(); lock (queuedEitEvents) { queuedEitEvents.Enqueue(eitEvent); } eitCoordinatesCache.Add(coordinates); if (!IsEitWriterThreadRunning()) { eitWriterThread = new Thread(EitWriterThreadFunction); eitWriterThread.Name = "EIT Writer"; eitWriterThread.Priority = ThreadPriority.Lowest; eitWriterThread.Start(); } return true; } } }