From 5715e047a3ffd9f8dc1d2f02cec320aacb781a06 Mon Sep 17 00:00:00 2001 From: feyris-tan <4116042+feyris-tan@users.noreply.github.com> Date: Mon, 4 May 2026 22:12:17 +0200 Subject: [PATCH] Modified the ReutersWneExtractor to use an external event handler. --- skyscraper8/ReutersWne/IWneHandler.cs | 16 ++++ skyscraper8/ReutersWne/ReutersWneExtractor.cs | 25 +++++-- skyscraper8/ReutersWne/WneStory.cs | 75 +++++++++++++++++-- skyscraper8/Skyscraper/IO/M3U8Stream.cs | 8 ++ 4 files changed, 112 insertions(+), 12 deletions(-) create mode 100644 skyscraper8/ReutersWne/IWneHandler.cs diff --git a/skyscraper8/ReutersWne/IWneHandler.cs b/skyscraper8/ReutersWne/IWneHandler.cs new file mode 100644 index 0000000..006a9cc --- /dev/null +++ b/skyscraper8/ReutersWne/IWneHandler.cs @@ -0,0 +1,16 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace skyscraper8.ReutersWne +{ + internal interface IWneHandler + { + void OnWneStoryComplete(WneStoryProgress wneStoryProgress, Stream value); + void OnWneStoryDetect(uint embeddedSessionId); + void OnWneStoryFail(uint sessionId); + void OnWneStoryProgress(WneStoryProgress wneStoryProgress) + } +} diff --git a/skyscraper8/ReutersWne/ReutersWneExtractor.cs b/skyscraper8/ReutersWne/ReutersWneExtractor.cs index 2fab337..6c9a61c 100644 --- a/skyscraper8/ReutersWne/ReutersWneExtractor.cs +++ b/skyscraper8/ReutersWne/ReutersWneExtractor.cs @@ -6,7 +6,7 @@ using skyscraper5.Skyscraper.Plugins; namespace skyscraper8.ReutersWne; [SkyscraperPlugin] -public class ReutersWneExtractor : ISkyscraperMpePlugin +internal class ReutersWneExtractor : ISkyscraperMpePlugin { private ILog _logger; public void ConnectToStorage(object[] connector) @@ -169,6 +169,9 @@ public class ReutersWneExtractor : ISkyscraperMpePlugin private void DeliverFile(WneStory story) { + Handler.OnWneStoryComplete(new WneStoryProgress(story), story.GetStream()); + + /* //_logger.InfoFormat("Attempting to deliver story #{0}", story.SessionId); //_logger.InfoFormat("Expected number of blocks: {0}", story.ExpectedPayloadBlock); @@ -193,7 +196,7 @@ public class ReutersWneExtractor : ISkyscraperMpePlugin FileStream fileStream = outFileInfo.OpenWrite(); listByteArrayStream.CopyTo(fileStream); fileStream.Flush(); - fileStream.Close(); + fileStream.Close();*/ } private bool ParsePacketType3(Span udpPayload) @@ -226,6 +229,7 @@ public class ReutersWneExtractor : ISkyscraperMpePlugin currentStory = new WneStory(sessionId); wneStories.Add(sessionId, currentStory); _logger.InfoFormat("Found new WNE story #{0}", sessionId); + Handler?.OnWneStoryDetect(sessionId); } else { @@ -293,6 +297,7 @@ public class ReutersWneExtractor : ISkyscraperMpePlugin if (!wneStories.ContainsKey(sessionId)) { _logger.InfoFormat("Found new WNE story #{0}", sessionId); + Handler?.OnWneStoryDetect(sessionId); WneStory newStory = new WneStory(sessionId); newStory.EccGroup = udpPayload[8]; if (udpPayload[11] == 0x07) @@ -330,6 +335,7 @@ public class ReutersWneExtractor : ISkyscraperMpePlugin if (fourthUint != 0) { OnError("Unexpected payload packet in story #{0}. Expected {1}, got {2}.", sessionId, 0, fourthUint); + Handler?.OnWneStoryFail(sessionId); return false; } uint thirdUint = udpPayload.ReadUInt32LittleEndian(8); @@ -339,11 +345,13 @@ public class ReutersWneExtractor : ISkyscraperMpePlugin { OnError("Expected payload block {0} but got {1}. This story ({2}) is incomplete and can not be recovered.", wneStories[sessionId].ExpectedPayloadBlock, thirdUint, sessionId); wneStories[sessionId].Corrupted = true; + Handler?.OnWneStoryFail(sessionId); } return false; } wneStories[sessionId].AppendPayloadBlock(udpPayload.Slice(16)); + Handler.OnWneStoryProgress(new WneStoryProgress(wneStories[sessionId])); return true; } } @@ -353,6 +361,7 @@ public class ReutersWneExtractor : ISkyscraperMpePlugin if (!wneStories.ContainsKey(sessionId)) { OnError("Missed announcement of story #{0}", sessionId); + Handler.OnWneStoryFail(sessionId); return false; } outerStory = wneStories[sessionId]; @@ -394,10 +403,12 @@ public class ReutersWneExtractor : ISkyscraperMpePlugin { wneStories.Add(embeddedSessionId, new WneStory(embeddedSessionId)); _logger.InfoFormat("Found new embedded WNE story #{0}", embeddedSessionId); + Handler?.OnWneStoryDetect(embeddedSessionId); } else { OnError("Missed announcement of embedded WNE story #{0}", embeddedSessionId); + Handler?.OnWneStoryFail(outerStory.SessionId); return false; } } @@ -413,6 +424,7 @@ public class ReutersWneExtractor : ISkyscraperMpePlugin if (outerStory.ExpectedEccGroup != udpPayload[8]) { OnError("Expected ECC group {0} but got {1}", outerStory.ExpectedEccGroup, udpPayload[8]); + Handler?.OnWneStoryFail(outerStory.SessionId); return false; } @@ -431,8 +443,8 @@ public class ReutersWneExtractor : ISkyscraperMpePlugin { if (udpPayload[12] != 0x01) { - OnError("Expected Continuity Counter {0} but got {1}", - outerStory.ExpectedContinuityCounter, udpPayload[11]); + OnError("Expected Continuity Counter {0} but got {1}", outerStory.ExpectedContinuityCounter, udpPayload[11]); + Handler?.OnWneStoryFail(outerStory.SessionId); return false; } @@ -478,10 +490,12 @@ public class ReutersWneExtractor : ISkyscraperMpePlugin { OnError("Expected payload block {0} but got {1}. This story ({2}) is incomplete and can not be recovered.", wneStories[sessionId].ExpectedPayloadBlock, thirdUint, sessionId); wneStories[sessionId].Corrupted = true; + Handler?.OnWneStoryFail(sessionId); return false; } wneStories[sessionId].AppendPayloadBlock(udpPayload.Slice(16)); + Handler?.OnWneStoryProgress(new WneStoryProgress(wneStories[sessionId])); return true; } @@ -498,8 +512,9 @@ public class ReutersWneExtractor : ISkyscraperMpePlugin private HashSet loggedErrors; private Dictionary wneStories; + public IWneHandler Handler { get; set; } - private void OnError(string message, params object[] args) + private void OnError(string message, params object[] args) { if (loggedErrors == null) loggedErrors = new HashSet(); diff --git a/skyscraper8/ReutersWne/WneStory.cs b/skyscraper8/ReutersWne/WneStory.cs index e063242..59c2d9c 100644 --- a/skyscraper8/ReutersWne/WneStory.cs +++ b/skyscraper8/ReutersWne/WneStory.cs @@ -21,16 +21,46 @@ internal class WneStory : IDisposable private List payloadBlocks; public bool Corrupted; public bool Delivered; - public uint FileSize; + + private MemoryStream preallocated; + private uint _fileSize; + public uint FileSize + { + get + { + return _fileSize; + } + set + { + if (payloadBlocks != null) + { + throw new NotImplementedException("Can't set the file size when a block was already delivered"); + } + preallocated = new MemoryStream(new byte[_fileSize]); + _fileSize = value; + } + } + + public long CaughtPayloadBytes; public void AppendPayloadBlock(Span slice) { - if (payloadBlocks == null) + if (preallocated != null) { - payloadBlocks = new List(); + preallocated.Write(slice); + ExpectedPayloadBlock++; + CaughtPayloadBytes += slice.Length; + } + else + { + if (payloadBlocks == null) + { + payloadBlocks = new List(); + } + payloadBlocks.Add(slice.ToArray()); + ExpectedPayloadBlock++; + CaughtPayloadBytes += slice.Length; } - payloadBlocks.Add(slice.ToArray()); - ExpectedPayloadBlock++; } public bool IsEmpty() @@ -56,13 +86,22 @@ internal class WneStory : IDisposable return true; } - public ListByteArrayStream ToStream() + public Stream ToStream() { if (Delivered) { throw new InvalidOperationException("Cannot convert a delivered story to a stream."); } - return new ListByteArrayStream(payloadBlocks); + + if (preallocated != null) + { + preallocated.Position = 0; + return preallocated; + } + else + { + return new ListByteArrayStream(payloadBlocks); + } } public void Dispose() @@ -71,4 +110,26 @@ internal class WneStory : IDisposable payloadBlocks = null; Delivered = true; } + + public WneStoryProgress GetProgress() + { + return new WneStoryProgress(this); + } + } + +struct WneStoryProgress +{ + internal WneStoryProgress(WneStory source) + { + this.Filename = source.SourceFileName; + this.SessionId = source.SessionId; + this.FileSize = source.FileSize; + this.FileCaught = source.CaughtPayloadBytes; + } + + public string Filename { get; } + public uint SessionId { get; } + public uint FileSize { get; } + public long FileCaught { get; } +} \ No newline at end of file diff --git a/skyscraper8/Skyscraper/IO/M3U8Stream.cs b/skyscraper8/Skyscraper/IO/M3U8Stream.cs index 59ac4b7..a2f5be7 100644 --- a/skyscraper8/Skyscraper/IO/M3U8Stream.cs +++ b/skyscraper8/Skyscraper/IO/M3U8Stream.cs @@ -151,6 +151,14 @@ namespace skyscraper8.Skyscraper.IO continue; if (finishedSegments.Contains(segmentNames[i])) continue; + if (segmentNames[i].EndsWith(".m3u8")) + { + this.source = String.Format("{0}{1}",GetRoot(source), segmentNames[i]); + ReloadSegmentList(); + return GetNextSegment(); + } + if (string.IsNullOrEmpty(segmentNames[i])) + continue; this.segmentIndex = i; this.finishedSegments.Add(segmentNames[i]);