diff --git a/skyscraper8.sln.DotSettings.user b/skyscraper8.sln.DotSettings.user index d3202ca..f031d72 100644 --- a/skyscraper8.sln.DotSettings.user +++ b/skyscraper8.sln.DotSettings.user @@ -1,2 +1,2 @@  - <data><HostParameters type="LocalHostParameters" /><Argument type="StandaloneArgument"><Arguments IsNull="False"></Arguments><FileName IsNull="False"></FileName><WorkingDirectory IsNull="False"></WorkingDirectory><Scope><ProcessFilters /></Scope></Argument><Info type="PerformanceInfo"><MeasureType>Sampling</MeasureType><MeterKind>Rdtsc</MeterKind><InjectInfo><SymbolSearch><SearchPaths /></SymbolSearch><Scope><PatternFilters /><DenyAttributeFilters /></Scope></InjectInfo></Info><CoreOptions type="CoreOptions"><CoreTempPath IsNull="False"></CoreTempPath><RemoteEndPoint IsNull="False"></RemoteEndPoint><AdditionalEnvironmentVariables /></CoreOptions><HostOptions type="HostOptions"><HostTempPath IsNull="False"></HostTempPath></HostOptions></data> \ No newline at end of file + <data><HostParameters type="LocalHostParameters" /><Argument type="StandaloneArgument"><Arguments IsNull="False"></Arguments><FileName IsNull="False"></FileName><WorkingDirectory IsNull="False"></WorkingDirectory><Scope><ProcessFilters /></Scope></Argument><Info type="TimelineInfo" /><CoreOptions type="CoreOptions"><CoreTempPath IsNull="False"></CoreTempPath><RemoteEndPoint IsNull="False"></RemoteEndPoint><AdditionalEnvironmentVariables /></CoreOptions><HostOptions type="HostOptions"><HostTempPath IsNull="False"></HostTempPath></HostOptions></data> \ No newline at end of file diff --git a/skyscraper8/DvbNip/DvbNipReceiver.cs b/skyscraper8/DvbNip/DvbNipReceiver.cs index 1cfe628..42b1592 100644 --- a/skyscraper8/DvbNip/DvbNipReceiver.cs +++ b/skyscraper8/DvbNip/DvbNipReceiver.cs @@ -1,4 +1,5 @@ -using skyscraper5.Ietf.Rfc768; +using log4net; +using skyscraper5.Ietf.Rfc768; using skyscraper5.Ietf.Rfc971; using skyscraper5.Skyscraper.IO; using skyscraper5.Skyscraper.Plugins; @@ -15,6 +16,8 @@ namespace skyscraper8.DvbNip [SkyscraperPlugin] internal class DvbNipReceiver : ISkyscraperMpePlugin { + private static readonly ILog logger = LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType.Name); + public bool CanHandlePacket(InternetHeader internetHeader, byte[] ipv4Packet) { if (!internetHeader.IsDestinationMulticast) @@ -32,7 +35,8 @@ namespace skyscraper8.DvbNip private bool bootstrapped; private uint fluteHits, fluteMisses; - private List flutes = new List(); + + private Dictionary, FluteListener> flutes; private static IPAddress dvbServiceDiscovery = IPAddress.Parse("224.0.23.14"); public void HandlePacket(InternetHeader internetHeader, byte[] ipv4Packet) @@ -60,32 +64,21 @@ namespace skyscraper8.DvbNip LctFrame lctFrame = new LctFrame(udpPacket.Payload); if (flutes == null) - flutes = new List(); + flutes = new Dictionary, FluteListener>(); - FluteListener fluteListener = null; - foreach(FluteListener listenerCandidate in flutes) + Tuple fluteCoordinate = new Tuple(internetHeader.DestinationAddress, udpPacket.DestinationPort, lctFrame.LctHeader.TransportSessionIdentifier, lctFrame.LctHeader.TransportObjectIdentifier); + FluteListener fluteListener; + if (flutes.ContainsKey(fluteCoordinate)) { - if (!listenerCandidate.DestinationAddress.Equals(internetHeader.DestinationAddress)) - continue; - if (listenerCandidate.DestinationPort != udpPacket.DestinationPort) - continue; - if (listenerCandidate.DestinationTsi != lctFrame.LctHeader.TransportSessionIdentifier) - continue; - if (listenerCandidate.DestinationToi != lctFrame.LctHeader.TransportObjectIdentifier) - continue; - fluteListener = listenerCandidate; - break; - } - - if (fluteListener == null) - { - fluteListener = new FluteListener(internetHeader.DestinationAddress, udpPacket.DestinationPort, lctFrame.LctHeader.TransportSessionIdentifier, lctFrame.LctHeader.TransportObjectIdentifier); - flutes.Add(fluteListener); - fluteMisses++; + fluteListener = flutes[fluteCoordinate]; + fluteHits++; } else { - fluteHits++; + fluteListener = new FluteListener(internetHeader.DestinationAddress, udpPacket.DestinationPort, lctFrame.LctHeader.TransportSessionIdentifier, lctFrame.LctHeader.TransportObjectIdentifier); + flutes.Add(fluteCoordinate, fluteListener); + fluteMisses++; + TryPruneCache(); } fluteListener.PushPacket(lctFrame); if (fluteListener.IsComplete()) @@ -108,7 +101,7 @@ namespace skyscraper8.DvbNip } fluteStream.Close(); fluteStream.Dispose(); - flutes.Remove(fluteListener); + flutes.Remove(fluteCoordinate); return; } if (fluteListener.FileAssociation == null) @@ -147,30 +140,80 @@ namespace skyscraper8.DvbNip } } - private void SetFileAssociations(FluteListener sourceListener, FDTInstanceType fdtAnnouncement) + private void TryPruneCache() { + + /* + DateTime now = DateTime.Now; + List staleListeners = flutes.Where(x => x.LastTouched != DateTime.MinValue) + .Where(x => x.FileAssociation != null) + .Where(x => (now - x.LastTouched).TotalMinutes >= 1.0) + .Where(x => DvbNipUtilities.IsContinuousFileType(x.FileAssociation)) + .ToList(); + if (staleListeners.Count > 0) + { + int prevItems = flutes.Count; + foreach (FluteListener staleListener in staleListeners) + { + staleListener.Disabled = true; + flutes.Remove(staleListener); + } + + int nowItems = flutes.Count; + + logger.DebugFormat(String.Format("Removed {0} stale segments from FLUTE cache. Cache slimmed from {1} to {2} items.", staleListeners.Count,prevItems,nowItems)); + }*/ + } + + private bool SetFileAssociations(FluteListener sourceListener, FDTInstanceType fdtAnnouncement) + { + bool result = false; foreach(FileType announcedFile in fdtAnnouncement.File) { if (string.IsNullOrEmpty(announcedFile.TOI)) continue; ulong targetToi = ulong.Parse(announcedFile.TOI); - FluteListener? targetListener = flutes.Find(x => + Tuple fluteCoordinate = new Tuple(sourceListener.DestinationAddress, sourceListener.DestinationPort, sourceListener.DestinationTsi, targetToi); + if (flutes.ContainsKey(fluteCoordinate)) + { + FluteListener listener = flutes[fluteCoordinate]; + if (listener.FileAssociation == null) + { + listener.FileAssociation = announcedFile; + result = true; + } + } + else + { + FluteListener listener = new FluteListener(sourceListener.DestinationAddress, sourceListener.DestinationPort, sourceListener.DestinationTsi, targetToi); + listener.FileAssociation = announcedFile; + flutes.Add(fluteCoordinate, listener); + result = true; + } + /*FluteListener? targetListener = flutes.Find(x => x.DestinationAddress.Equals(sourceListener.DestinationAddress) && x.DestinationPort == sourceListener.DestinationPort && x.DestinationTsi == sourceListener.DestinationTsi && x.DestinationToi == targetToi); if (targetListener != null) { - targetListener.FileAssociation = announcedFile; + if (targetListener.FileAssociation != null) + { + targetListener.FileAssociation = announcedFile; + result = true; + } } else { targetListener = new FluteListener(sourceListener.DestinationAddress, sourceListener.DestinationPort, sourceListener.DestinationTsi, targetToi); targetListener.FileAssociation = announcedFile; flutes.Add(targetListener); - } + result = true; + }*/ } + + return result; } public void SetContext(DateTime? currentTime, object skyscraperContext) @@ -185,7 +228,7 @@ namespace skyscraper8.DvbNip public bool StopProcessingAfterThis() { - return false; + return bootstrapped; } public NipActualCarrierInformation CurrentCarrierInformation { get; private set; } diff --git a/skyscraper8/DvbNip/DvbNipUtilities.cs b/skyscraper8/DvbNip/DvbNipUtilities.cs index 345ec27..636e28c 100644 --- a/skyscraper8/DvbNip/DvbNipUtilities.cs +++ b/skyscraper8/DvbNip/DvbNipUtilities.cs @@ -4,6 +4,7 @@ using System.Linq; using System.Text; using System.Threading.Tasks; using System.Xml.Serialization; +using skyscraper8.Ietf.FLUTE; namespace skyscraper8.DvbNip { @@ -51,5 +52,10 @@ namespace skyscraper8.DvbNip return false; } } + + public static bool IsContinuousFileType(FileType filteType) + { + return IsContinuousFileType(Path.GetExtension(filteType.ContentLocation)); + } } } diff --git a/skyscraper8/Ietf/FLUTE/FluteListener.cs b/skyscraper8/Ietf/FLUTE/FluteListener.cs index 7fb79a5..38f1dce 100644 --- a/skyscraper8/Ietf/FLUTE/FluteListener.cs +++ b/skyscraper8/Ietf/FLUTE/FluteListener.cs @@ -9,6 +9,7 @@ using System.Runtime.Serialization; using System.Security.Cryptography.X509Certificates; using System.Text; using System.Threading.Tasks; +using skyscraper5.Teletext.Wss; namespace skyscraper8.Ietf.FLUTE { @@ -27,6 +28,8 @@ namespace skyscraper8.Ietf.FLUTE public ulong DestinationTsi { get; } public ulong DestinationToi { get; } + public DateTime LastTouched { get; private set; } + public override string ToString() { if (_disabled) @@ -42,6 +45,8 @@ namespace skyscraper8.Ietf.FLUTE internal void PushPacket(LctFrame lctFrame) { + LastTouched = DateTime.Now; + if (_disabled) return; @@ -61,10 +66,11 @@ namespace skyscraper8.Ietf.FLUTE { return; } - transferLength = lctFrame.LctHeader.FecObjectTransmissionInformation.TransferLength; + transferLength = (long)lctFrame.LctHeader.FecObjectTransmissionInformation.TransferLength; } + if (blocks == null) - blocks = new List(); + blocks = new Dictionary, FluteBlock>(); ushort sbn = lctFrame.FecHeader.SourceBlockNumber; ushort esi = lctFrame.FecHeader.EncodingSymbolId; @@ -73,7 +79,7 @@ namespace skyscraper8.Ietf.FLUTE //x.SourceBlockNumer == sbn && //x.EncodingSymbolId == esi); - FluteBlock fluteBlock = null; + /*FluteBlock fluteBlock = null foreach (FluteBlock candidateBlock in blocks) { if (candidateBlock.SourceBlockNumer == sbn) @@ -94,7 +100,19 @@ namespace skyscraper8.Ietf.FLUTE else { blocksDeduplicated++; + }*/ + + Tuple coordinate = new Tuple(sbn, esi); + if (blocks.ContainsKey(coordinate)) + { + blocksDeduplicated++; + return; } + + FluteBlock fluteBlock = new FluteBlock(sbn, esi, lctFrame.Payload); + blocks.Add(coordinate, fluteBlock); + _dataWritten += (uint)fluteBlock.Payload.Length; + } internal bool IsComplete() @@ -112,8 +130,8 @@ namespace skyscraper8.Ietf.FLUTE return DataWritten >= transferLength; } - private ulong _dataWritten; - public ulong DataWritten + private long _dataWritten; + public long DataWritten { get { @@ -135,12 +153,13 @@ namespace skyscraper8.Ietf.FLUTE Stream level1; if (blocks.Count == 1) { - level1 = new MemoryStream(blocks[0].Payload); + level1 = new MemoryStream(blocks.Values.First().Payload); } else { - blocks.Sort(new FluteBlockComparer()); - level1 = new FluteListenerStream(blocks); + List blockPayloads = blocks.Values.ToList(); + blockPayloads.Sort(new FluteBlockComparer()); + level1 = new FluteListenerStream(blockPayloads); } if (FileAssociation != null) @@ -149,6 +168,7 @@ namespace skyscraper8.Ietf.FLUTE { case null: case "null": + case "ll": break; case "gzip": GZipStream level2 = new GZipStream(level1, CompressionMode.Decompress, false); @@ -174,8 +194,8 @@ namespace skyscraper8.Ietf.FLUTE fileStream.Dispose(); } - private ulong transferLength; - private List blocks; + private long transferLength; + private Dictionary,FluteBlock> blocks; private uint blocksDeduplicated; public class FluteBlock @@ -211,7 +231,7 @@ namespace skyscraper8.Ietf.FLUTE { if (transferLength == 0) { - transferLength = _fileAssocitation.ContentLength; + transferLength = (long)_fileAssocitation.ContentLength; } } } @@ -231,8 +251,11 @@ namespace skyscraper8.Ietf.FLUTE if (value) { transferLength = 0; - blocks.Clear(); - blocks = null; + if (blocks != null) + { + blocks.Clear(); + blocks = null; + } } _disabled = value; } diff --git a/skyscraper8/Skyscraper/IO/M3U8Stream.cs b/skyscraper8/Skyscraper/IO/M3U8Stream.cs index df939de..f02f9f9 100644 --- a/skyscraper8/Skyscraper/IO/M3U8Stream.cs +++ b/skyscraper8/Skyscraper/IO/M3U8Stream.cs @@ -1,4 +1,5 @@ -using System; +using log4net; +using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; @@ -10,6 +11,8 @@ namespace skyscraper8.Skyscraper.IO { public class M3U8Stream : Stream { + private static readonly ILog logger = LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType.Name); + public M3U8Stream(string source) { this.segmentIndex = 0; @@ -66,10 +69,21 @@ namespace skyscraper8.Skyscraper.IO } } + private DateTime lastSegmentSwitch; private Stream GetSegmentStream(string segmentName) { - Console.WriteLine("open segment {0}", segmentName); - Debug.WriteLine(String.Format("open segment {0}", segmentName)); + if (lastSegmentSwitch == DateTime.MinValue) + { + lastSegmentSwitch = DateTime.Now; + logger.DebugFormat("open segment {0}", segmentName); + } + else + { + TimeSpan timeTaken = DateTime.Now - lastSegmentSwitch; + lastSegmentSwitch = DateTime.Now; + logger.DebugFormat("open segment {0}, processing took {1}", segmentName, timeTaken.ToString()); + } + //Debug.WriteLine(String.Format("open segment {0}", segmentName)); if (source.StartsWith("http://") || source.StartsWith("https://")) { return webClient.GetStreamAsync(segmentName).Result;