The FLUTE is now fast enough to be processed in real-time.

This commit is contained in:
feyris-tan 2025-06-21 12:15:39 +02:00
parent 8cd4e3d99f
commit 40c2ac21b6
5 changed files with 132 additions and 46 deletions

View File

@ -1,2 +1,2 @@
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation"> <wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
<s:String x:Key="/Default/Profiling/Configurations/=1/@EntryIndexedValue">&lt;data&gt;&lt;HostParameters type="LocalHostParameters" /&gt;&lt;Argument type="StandaloneArgument"&gt;&lt;Arguments IsNull="False"&gt;&lt;/Arguments&gt;&lt;FileName IsNull="False"&gt;&lt;/FileName&gt;&lt;WorkingDirectory IsNull="False"&gt;&lt;/WorkingDirectory&gt;&lt;Scope&gt;&lt;ProcessFilters /&gt;&lt;/Scope&gt;&lt;/Argument&gt;&lt;Info type="PerformanceInfo"&gt;&lt;MeasureType&gt;Sampling&lt;/MeasureType&gt;&lt;MeterKind&gt;Rdtsc&lt;/MeterKind&gt;&lt;InjectInfo&gt;&lt;SymbolSearch&gt;&lt;SearchPaths /&gt;&lt;/SymbolSearch&gt;&lt;Scope&gt;&lt;PatternFilters /&gt;&lt;DenyAttributeFilters /&gt;&lt;/Scope&gt;&lt;/InjectInfo&gt;&lt;/Info&gt;&lt;CoreOptions type="CoreOptions"&gt;&lt;CoreTempPath IsNull="False"&gt;&lt;/CoreTempPath&gt;&lt;RemoteEndPoint IsNull="False"&gt;&lt;/RemoteEndPoint&gt;&lt;AdditionalEnvironmentVariables /&gt;&lt;/CoreOptions&gt;&lt;HostOptions type="HostOptions"&gt;&lt;HostTempPath IsNull="False"&gt;&lt;/HostTempPath&gt;&lt;/HostOptions&gt;&lt;/data&gt;</s:String></wpf:ResourceDictionary> <s:String x:Key="/Default/Profiling/Configurations/=1/@EntryIndexedValue">&lt;data&gt;&lt;HostParameters type="LocalHostParameters" /&gt;&lt;Argument type="StandaloneArgument"&gt;&lt;Arguments IsNull="False"&gt;&lt;/Arguments&gt;&lt;FileName IsNull="False"&gt;&lt;/FileName&gt;&lt;WorkingDirectory IsNull="False"&gt;&lt;/WorkingDirectory&gt;&lt;Scope&gt;&lt;ProcessFilters /&gt;&lt;/Scope&gt;&lt;/Argument&gt;&lt;Info type="TimelineInfo" /&gt;&lt;CoreOptions type="CoreOptions"&gt;&lt;CoreTempPath IsNull="False"&gt;&lt;/CoreTempPath&gt;&lt;RemoteEndPoint IsNull="False"&gt;&lt;/RemoteEndPoint&gt;&lt;AdditionalEnvironmentVariables /&gt;&lt;/CoreOptions&gt;&lt;HostOptions type="HostOptions"&gt;&lt;HostTempPath IsNull="False"&gt;&lt;/HostTempPath&gt;&lt;/HostOptions&gt;&lt;/data&gt;</s:String></wpf:ResourceDictionary>

View File

@ -1,4 +1,5 @@
using skyscraper5.Ietf.Rfc768; using log4net;
using skyscraper5.Ietf.Rfc768;
using skyscraper5.Ietf.Rfc971; using skyscraper5.Ietf.Rfc971;
using skyscraper5.Skyscraper.IO; using skyscraper5.Skyscraper.IO;
using skyscraper5.Skyscraper.Plugins; using skyscraper5.Skyscraper.Plugins;
@ -15,6 +16,8 @@ namespace skyscraper8.DvbNip
[SkyscraperPlugin] [SkyscraperPlugin]
internal class DvbNipReceiver : ISkyscraperMpePlugin internal class DvbNipReceiver : ISkyscraperMpePlugin
{ {
private static readonly ILog logger = LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType.Name);
public bool CanHandlePacket(InternetHeader internetHeader, byte[] ipv4Packet) public bool CanHandlePacket(InternetHeader internetHeader, byte[] ipv4Packet)
{ {
if (!internetHeader.IsDestinationMulticast) if (!internetHeader.IsDestinationMulticast)
@ -32,7 +35,8 @@ namespace skyscraper8.DvbNip
private bool bootstrapped; private bool bootstrapped;
private uint fluteHits, fluteMisses; private uint fluteHits, fluteMisses;
private List<FluteListener> flutes = new List<FluteListener>();
private Dictionary<Tuple<IPAddress, ushort, ulong, ulong>, FluteListener> flutes;
private static IPAddress dvbServiceDiscovery = IPAddress.Parse("224.0.23.14"); private static IPAddress dvbServiceDiscovery = IPAddress.Parse("224.0.23.14");
public void HandlePacket(InternetHeader internetHeader, byte[] ipv4Packet) public void HandlePacket(InternetHeader internetHeader, byte[] ipv4Packet)
@ -60,32 +64,21 @@ namespace skyscraper8.DvbNip
LctFrame lctFrame = new LctFrame(udpPacket.Payload); LctFrame lctFrame = new LctFrame(udpPacket.Payload);
if (flutes == null) if (flutes == null)
flutes = new List<FluteListener>(); flutes = new Dictionary<Tuple<IPAddress, ushort, ulong, ulong>, FluteListener>();
FluteListener fluteListener = null; Tuple<IPAddress, ushort, ulong, ulong> fluteCoordinate = new Tuple<IPAddress, ushort, ulong, ulong>(internetHeader.DestinationAddress, udpPacket.DestinationPort, lctFrame.LctHeader.TransportSessionIdentifier, lctFrame.LctHeader.TransportObjectIdentifier);
foreach(FluteListener listenerCandidate in flutes) FluteListener fluteListener;
if (flutes.ContainsKey(fluteCoordinate))
{ {
if (!listenerCandidate.DestinationAddress.Equals(internetHeader.DestinationAddress)) fluteListener = flutes[fluteCoordinate];
continue; fluteHits++;
if (listenerCandidate.DestinationPort != udpPacket.DestinationPort)
continue;
if (listenerCandidate.DestinationTsi != lctFrame.LctHeader.TransportSessionIdentifier)
continue;
if (listenerCandidate.DestinationToi != lctFrame.LctHeader.TransportObjectIdentifier)
continue;
fluteListener = listenerCandidate;
break;
}
if (fluteListener == null)
{
fluteListener = new FluteListener(internetHeader.DestinationAddress, udpPacket.DestinationPort, lctFrame.LctHeader.TransportSessionIdentifier, lctFrame.LctHeader.TransportObjectIdentifier);
flutes.Add(fluteListener);
fluteMisses++;
} }
else else
{ {
fluteHits++; fluteListener = new FluteListener(internetHeader.DestinationAddress, udpPacket.DestinationPort, lctFrame.LctHeader.TransportSessionIdentifier, lctFrame.LctHeader.TransportObjectIdentifier);
flutes.Add(fluteCoordinate, fluteListener);
fluteMisses++;
TryPruneCache();
} }
fluteListener.PushPacket(lctFrame); fluteListener.PushPacket(lctFrame);
if (fluteListener.IsComplete()) if (fluteListener.IsComplete())
@ -108,7 +101,7 @@ namespace skyscraper8.DvbNip
} }
fluteStream.Close(); fluteStream.Close();
fluteStream.Dispose(); fluteStream.Dispose();
flutes.Remove(fluteListener); flutes.Remove(fluteCoordinate);
return; return;
} }
if (fluteListener.FileAssociation == null) if (fluteListener.FileAssociation == null)
@ -147,30 +140,80 @@ namespace skyscraper8.DvbNip
} }
} }
private void SetFileAssociations(FluteListener sourceListener, FDTInstanceType fdtAnnouncement) private void TryPruneCache()
{ {
/*
DateTime now = DateTime.Now;
List<FluteListener> staleListeners = flutes.Where(x => x.LastTouched != DateTime.MinValue)
.Where(x => x.FileAssociation != null)
.Where(x => (now - x.LastTouched).TotalMinutes >= 1.0)
.Where(x => DvbNipUtilities.IsContinuousFileType(x.FileAssociation))
.ToList();
if (staleListeners.Count > 0)
{
int prevItems = flutes.Count;
foreach (FluteListener staleListener in staleListeners)
{
staleListener.Disabled = true;
flutes.Remove(staleListener);
}
int nowItems = flutes.Count;
logger.DebugFormat(String.Format("Removed {0} stale segments from FLUTE cache. Cache slimmed from {1} to {2} items.", staleListeners.Count,prevItems,nowItems));
}*/
}
private bool SetFileAssociations(FluteListener sourceListener, FDTInstanceType fdtAnnouncement)
{
bool result = false;
foreach(FileType announcedFile in fdtAnnouncement.File) foreach(FileType announcedFile in fdtAnnouncement.File)
{ {
if (string.IsNullOrEmpty(announcedFile.TOI)) if (string.IsNullOrEmpty(announcedFile.TOI))
continue; continue;
ulong targetToi = ulong.Parse(announcedFile.TOI); ulong targetToi = ulong.Parse(announcedFile.TOI);
FluteListener? targetListener = flutes.Find(x => Tuple<IPAddress, ushort, ulong, ulong> fluteCoordinate = new Tuple<IPAddress, ushort, ulong, ulong>(sourceListener.DestinationAddress, sourceListener.DestinationPort, sourceListener.DestinationTsi, targetToi);
if (flutes.ContainsKey(fluteCoordinate))
{
FluteListener listener = flutes[fluteCoordinate];
if (listener.FileAssociation == null)
{
listener.FileAssociation = announcedFile;
result = true;
}
}
else
{
FluteListener listener = new FluteListener(sourceListener.DestinationAddress, sourceListener.DestinationPort, sourceListener.DestinationTsi, targetToi);
listener.FileAssociation = announcedFile;
flutes.Add(fluteCoordinate, listener);
result = true;
}
/*FluteListener? targetListener = flutes.Find(x =>
x.DestinationAddress.Equals(sourceListener.DestinationAddress) && x.DestinationAddress.Equals(sourceListener.DestinationAddress) &&
x.DestinationPort == sourceListener.DestinationPort && x.DestinationPort == sourceListener.DestinationPort &&
x.DestinationTsi == sourceListener.DestinationTsi && x.DestinationTsi == sourceListener.DestinationTsi &&
x.DestinationToi == targetToi); x.DestinationToi == targetToi);
if (targetListener != null) if (targetListener != null)
{ {
targetListener.FileAssociation = announcedFile; if (targetListener.FileAssociation != null)
{
targetListener.FileAssociation = announcedFile;
result = true;
}
} }
else else
{ {
targetListener = new FluteListener(sourceListener.DestinationAddress, sourceListener.DestinationPort, sourceListener.DestinationTsi, targetToi); targetListener = new FluteListener(sourceListener.DestinationAddress, sourceListener.DestinationPort, sourceListener.DestinationTsi, targetToi);
targetListener.FileAssociation = announcedFile; targetListener.FileAssociation = announcedFile;
flutes.Add(targetListener); flutes.Add(targetListener);
} result = true;
}*/
} }
return result;
} }
public void SetContext(DateTime? currentTime, object skyscraperContext) public void SetContext(DateTime? currentTime, object skyscraperContext)
@ -185,7 +228,7 @@ namespace skyscraper8.DvbNip
public bool StopProcessingAfterThis() public bool StopProcessingAfterThis()
{ {
return false; return bootstrapped;
} }
public NipActualCarrierInformation CurrentCarrierInformation { get; private set; } public NipActualCarrierInformation CurrentCarrierInformation { get; private set; }

View File

@ -4,6 +4,7 @@ using System.Linq;
using System.Text; using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
using System.Xml.Serialization; using System.Xml.Serialization;
using skyscraper8.Ietf.FLUTE;
namespace skyscraper8.DvbNip namespace skyscraper8.DvbNip
{ {
@ -51,5 +52,10 @@ namespace skyscraper8.DvbNip
return false; return false;
} }
} }
public static bool IsContinuousFileType(FileType filteType)
{
return IsContinuousFileType(Path.GetExtension(filteType.ContentLocation));
}
} }
} }

View File

@ -9,6 +9,7 @@ using System.Runtime.Serialization;
using System.Security.Cryptography.X509Certificates; using System.Security.Cryptography.X509Certificates;
using System.Text; using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
using skyscraper5.Teletext.Wss;
namespace skyscraper8.Ietf.FLUTE namespace skyscraper8.Ietf.FLUTE
{ {
@ -27,6 +28,8 @@ namespace skyscraper8.Ietf.FLUTE
public ulong DestinationTsi { get; } public ulong DestinationTsi { get; }
public ulong DestinationToi { get; } public ulong DestinationToi { get; }
public DateTime LastTouched { get; private set; }
public override string ToString() public override string ToString()
{ {
if (_disabled) if (_disabled)
@ -42,6 +45,8 @@ namespace skyscraper8.Ietf.FLUTE
internal void PushPacket(LctFrame lctFrame) internal void PushPacket(LctFrame lctFrame)
{ {
LastTouched = DateTime.Now;
if (_disabled) if (_disabled)
return; return;
@ -61,10 +66,11 @@ namespace skyscraper8.Ietf.FLUTE
{ {
return; return;
} }
transferLength = lctFrame.LctHeader.FecObjectTransmissionInformation.TransferLength; transferLength = (long)lctFrame.LctHeader.FecObjectTransmissionInformation.TransferLength;
} }
if (blocks == null) if (blocks == null)
blocks = new List<FluteBlock>(); blocks = new Dictionary<Tuple<ushort, ushort>, FluteBlock>();
ushort sbn = lctFrame.FecHeader.SourceBlockNumber; ushort sbn = lctFrame.FecHeader.SourceBlockNumber;
ushort esi = lctFrame.FecHeader.EncodingSymbolId; ushort esi = lctFrame.FecHeader.EncodingSymbolId;
@ -73,7 +79,7 @@ namespace skyscraper8.Ietf.FLUTE
//x.SourceBlockNumer == sbn && //x.SourceBlockNumer == sbn &&
//x.EncodingSymbolId == esi); //x.EncodingSymbolId == esi);
FluteBlock fluteBlock = null; /*FluteBlock fluteBlock = null
foreach (FluteBlock candidateBlock in blocks) foreach (FluteBlock candidateBlock in blocks)
{ {
if (candidateBlock.SourceBlockNumer == sbn) if (candidateBlock.SourceBlockNumer == sbn)
@ -94,7 +100,19 @@ namespace skyscraper8.Ietf.FLUTE
else else
{ {
blocksDeduplicated++; blocksDeduplicated++;
}*/
Tuple<ushort, ushort> coordinate = new Tuple<ushort, ushort>(sbn, esi);
if (blocks.ContainsKey(coordinate))
{
blocksDeduplicated++;
return;
} }
FluteBlock fluteBlock = new FluteBlock(sbn, esi, lctFrame.Payload);
blocks.Add(coordinate, fluteBlock);
_dataWritten += (uint)fluteBlock.Payload.Length;
} }
internal bool IsComplete() internal bool IsComplete()
@ -112,8 +130,8 @@ namespace skyscraper8.Ietf.FLUTE
return DataWritten >= transferLength; return DataWritten >= transferLength;
} }
private ulong _dataWritten; private long _dataWritten;
public ulong DataWritten public long DataWritten
{ {
get get
{ {
@ -135,12 +153,13 @@ namespace skyscraper8.Ietf.FLUTE
Stream level1; Stream level1;
if (blocks.Count == 1) if (blocks.Count == 1)
{ {
level1 = new MemoryStream(blocks[0].Payload); level1 = new MemoryStream(blocks.Values.First().Payload);
} }
else else
{ {
blocks.Sort(new FluteBlockComparer()); List<FluteBlock> blockPayloads = blocks.Values.ToList();
level1 = new FluteListenerStream(blocks); blockPayloads.Sort(new FluteBlockComparer());
level1 = new FluteListenerStream(blockPayloads);
} }
if (FileAssociation != null) if (FileAssociation != null)
@ -149,6 +168,7 @@ namespace skyscraper8.Ietf.FLUTE
{ {
case null: case null:
case "null": case "null":
case "ll":
break; break;
case "gzip": case "gzip":
GZipStream level2 = new GZipStream(level1, CompressionMode.Decompress, false); GZipStream level2 = new GZipStream(level1, CompressionMode.Decompress, false);
@ -174,8 +194,8 @@ namespace skyscraper8.Ietf.FLUTE
fileStream.Dispose(); fileStream.Dispose();
} }
private ulong transferLength; private long transferLength;
private List<FluteBlock> blocks; private Dictionary<Tuple<ushort,ushort>,FluteBlock> blocks;
private uint blocksDeduplicated; private uint blocksDeduplicated;
public class FluteBlock public class FluteBlock
@ -211,7 +231,7 @@ namespace skyscraper8.Ietf.FLUTE
{ {
if (transferLength == 0) if (transferLength == 0)
{ {
transferLength = _fileAssocitation.ContentLength; transferLength = (long)_fileAssocitation.ContentLength;
} }
} }
} }
@ -231,8 +251,11 @@ namespace skyscraper8.Ietf.FLUTE
if (value) if (value)
{ {
transferLength = 0; transferLength = 0;
blocks.Clear(); if (blocks != null)
blocks = null; {
blocks.Clear();
blocks = null;
}
} }
_disabled = value; _disabled = value;
} }

View File

@ -1,4 +1,5 @@
using System; using log4net;
using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Diagnostics; using System.Diagnostics;
using System.Linq; using System.Linq;
@ -10,6 +11,8 @@ namespace skyscraper8.Skyscraper.IO
{ {
public class M3U8Stream : Stream public class M3U8Stream : Stream
{ {
private static readonly ILog logger = LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType.Name);
public M3U8Stream(string source) public M3U8Stream(string source)
{ {
this.segmentIndex = 0; this.segmentIndex = 0;
@ -66,10 +69,21 @@ namespace skyscraper8.Skyscraper.IO
} }
} }
private DateTime lastSegmentSwitch;
private Stream GetSegmentStream(string segmentName) private Stream GetSegmentStream(string segmentName)
{ {
Console.WriteLine("open segment {0}", segmentName); if (lastSegmentSwitch == DateTime.MinValue)
Debug.WriteLine(String.Format("open segment {0}", segmentName)); {
lastSegmentSwitch = DateTime.Now;
logger.DebugFormat("open segment {0}", segmentName);
}
else
{
TimeSpan timeTaken = DateTime.Now - lastSegmentSwitch;
lastSegmentSwitch = DateTime.Now;
logger.DebugFormat("open segment {0}, processing took {1}", segmentName, timeTaken.ToString());
}
//Debug.WriteLine(String.Format("open segment {0}", segmentName));
if (source.StartsWith("http://") || source.StartsWith("https://")) if (source.StartsWith("http://") || source.StartsWith("https://"))
{ {
return webClient.GetStreamAsync(segmentName).Result; return webClient.GetStreamAsync(segmentName).Result;