Modified the ReutersWneExtractor to use an external event handler.
Some checks failed
🚀 Pack skyscraper8 / make-zip (push) Failing after 13s

This commit is contained in:
feyris-tan 2026-05-04 22:12:17 +02:00
parent 0c0932e193
commit 5715e047a3
4 changed files with 112 additions and 12 deletions

View File

@ -0,0 +1,16 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace skyscraper8.ReutersWne
{
internal interface IWneHandler
{
void OnWneStoryComplete(WneStoryProgress wneStoryProgress, Stream value);
void OnWneStoryDetect(uint embeddedSessionId);
void OnWneStoryFail(uint sessionId);
void OnWneStoryProgress(WneStoryProgress wneStoryProgress)
}
}

View File

@ -6,7 +6,7 @@ using skyscraper5.Skyscraper.Plugins;
namespace skyscraper8.ReutersWne;
[SkyscraperPlugin]
public class ReutersWneExtractor : ISkyscraperMpePlugin
internal class ReutersWneExtractor : ISkyscraperMpePlugin
{
private ILog _logger;
public void ConnectToStorage(object[] connector)
@ -169,6 +169,9 @@ public class ReutersWneExtractor : ISkyscraperMpePlugin
private void DeliverFile(WneStory story)
{
Handler.OnWneStoryComplete(new WneStoryProgress(story), story.GetStream());
/*
//_logger.InfoFormat("Attempting to deliver story #{0}", story.SessionId);
//_logger.InfoFormat("Expected number of blocks: {0}", story.ExpectedPayloadBlock);
@ -193,7 +196,7 @@ public class ReutersWneExtractor : ISkyscraperMpePlugin
FileStream fileStream = outFileInfo.OpenWrite();
listByteArrayStream.CopyTo(fileStream);
fileStream.Flush();
fileStream.Close();
fileStream.Close();*/
}
private bool ParsePacketType3(Span<byte> udpPayload)
@ -226,6 +229,7 @@ public class ReutersWneExtractor : ISkyscraperMpePlugin
currentStory = new WneStory(sessionId);
wneStories.Add(sessionId, currentStory);
_logger.InfoFormat("Found new WNE story #{0}", sessionId);
Handler?.OnWneStoryDetect(sessionId);
}
else
{
@ -293,6 +297,7 @@ public class ReutersWneExtractor : ISkyscraperMpePlugin
if (!wneStories.ContainsKey(sessionId))
{
_logger.InfoFormat("Found new WNE story #{0}", sessionId);
Handler?.OnWneStoryDetect(sessionId);
WneStory newStory = new WneStory(sessionId);
newStory.EccGroup = udpPayload[8];
if (udpPayload[11] == 0x07)
@ -330,6 +335,7 @@ public class ReutersWneExtractor : ISkyscraperMpePlugin
if (fourthUint != 0)
{
OnError("Unexpected payload packet in story #{0}. Expected {1}, got {2}.", sessionId, 0, fourthUint);
Handler?.OnWneStoryFail(sessionId);
return false;
}
uint thirdUint = udpPayload.ReadUInt32LittleEndian(8);
@ -339,11 +345,13 @@ public class ReutersWneExtractor : ISkyscraperMpePlugin
{
OnError("Expected payload block {0} but got {1}. This story ({2}) is incomplete and can not be recovered.", wneStories[sessionId].ExpectedPayloadBlock, thirdUint, sessionId);
wneStories[sessionId].Corrupted = true;
Handler?.OnWneStoryFail(sessionId);
}
return false;
}
wneStories[sessionId].AppendPayloadBlock(udpPayload.Slice(16));
Handler.OnWneStoryProgress(new WneStoryProgress(wneStories[sessionId]));
return true;
}
}
@ -353,6 +361,7 @@ public class ReutersWneExtractor : ISkyscraperMpePlugin
if (!wneStories.ContainsKey(sessionId))
{
OnError("Missed announcement of story #{0}", sessionId);
Handler.OnWneStoryFail(sessionId);
return false;
}
outerStory = wneStories[sessionId];
@ -394,10 +403,12 @@ public class ReutersWneExtractor : ISkyscraperMpePlugin
{
wneStories.Add(embeddedSessionId, new WneStory(embeddedSessionId));
_logger.InfoFormat("Found new embedded WNE story #{0}", embeddedSessionId);
Handler?.OnWneStoryDetect(embeddedSessionId);
}
else
{
OnError("Missed announcement of embedded WNE story #{0}", embeddedSessionId);
Handler?.OnWneStoryFail(outerStory.SessionId);
return false;
}
}
@ -413,6 +424,7 @@ public class ReutersWneExtractor : ISkyscraperMpePlugin
if (outerStory.ExpectedEccGroup != udpPayload[8])
{
OnError("Expected ECC group {0} but got {1}", outerStory.ExpectedEccGroup, udpPayload[8]);
Handler?.OnWneStoryFail(outerStory.SessionId);
return false;
}
@ -431,8 +443,8 @@ public class ReutersWneExtractor : ISkyscraperMpePlugin
{
if (udpPayload[12] != 0x01)
{
OnError("Expected Continuity Counter {0} but got {1}",
outerStory.ExpectedContinuityCounter, udpPayload[11]);
OnError("Expected Continuity Counter {0} but got {1}", outerStory.ExpectedContinuityCounter, udpPayload[11]);
Handler?.OnWneStoryFail(outerStory.SessionId);
return false;
}
@ -478,10 +490,12 @@ public class ReutersWneExtractor : ISkyscraperMpePlugin
{
OnError("Expected payload block {0} but got {1}. This story ({2}) is incomplete and can not be recovered.", wneStories[sessionId].ExpectedPayloadBlock, thirdUint, sessionId);
wneStories[sessionId].Corrupted = true;
Handler?.OnWneStoryFail(sessionId);
return false;
}
wneStories[sessionId].AppendPayloadBlock(udpPayload.Slice(16));
Handler?.OnWneStoryProgress(new WneStoryProgress(wneStories[sessionId]));
return true;
}
@ -498,8 +512,9 @@ public class ReutersWneExtractor : ISkyscraperMpePlugin
private HashSet<string> loggedErrors;
private Dictionary<uint, WneStory> wneStories;
public IWneHandler Handler { get; set; }
private void OnError(string message, params object[] args)
private void OnError(string message, params object[] args)
{
if (loggedErrors == null)
loggedErrors = new HashSet<string>();

View File

@ -21,16 +21,46 @@ internal class WneStory : IDisposable
private List<byte[]> payloadBlocks;
public bool Corrupted;
public bool Delivered;
public uint FileSize;
private MemoryStream preallocated;
private uint _fileSize;
public uint FileSize
{
get
{
return _fileSize;
}
set
{
if (payloadBlocks != null)
{
throw new NotImplementedException("Can't set the file size when a block was already delivered");
}
preallocated = new MemoryStream(new byte[_fileSize]);
_fileSize = value;
}
}
public long CaughtPayloadBytes;
public void AppendPayloadBlock(Span<byte> slice)
{
if (payloadBlocks == null)
if (preallocated != null)
{
payloadBlocks = new List<byte[]>();
preallocated.Write(slice);
ExpectedPayloadBlock++;
CaughtPayloadBytes += slice.Length;
}
else
{
if (payloadBlocks == null)
{
payloadBlocks = new List<byte[]>();
}
payloadBlocks.Add(slice.ToArray());
ExpectedPayloadBlock++;
CaughtPayloadBytes += slice.Length;
}
payloadBlocks.Add(slice.ToArray());
ExpectedPayloadBlock++;
}
public bool IsEmpty()
@ -56,13 +86,22 @@ internal class WneStory : IDisposable
return true;
}
public ListByteArrayStream ToStream()
public Stream ToStream()
{
if (Delivered)
{
throw new InvalidOperationException("Cannot convert a delivered story to a stream.");
}
return new ListByteArrayStream(payloadBlocks);
if (preallocated != null)
{
preallocated.Position = 0;
return preallocated;
}
else
{
return new ListByteArrayStream(payloadBlocks);
}
}
public void Dispose()
@ -71,4 +110,26 @@ internal class WneStory : IDisposable
payloadBlocks = null;
Delivered = true;
}
public WneStoryProgress GetProgress()
{
return new WneStoryProgress(this);
}
}
struct WneStoryProgress
{
internal WneStoryProgress(WneStory source)
{
this.Filename = source.SourceFileName;
this.SessionId = source.SessionId;
this.FileSize = source.FileSize;
this.FileCaught = source.CaughtPayloadBytes;
}
public string Filename { get; }
public uint SessionId { get; }
public uint FileSize { get; }
public long FileCaught { get; }
}

View File

@ -151,6 +151,14 @@ namespace skyscraper8.Skyscraper.IO
continue;
if (finishedSegments.Contains(segmentNames[i]))
continue;
if (segmentNames[i].EndsWith(".m3u8"))
{
this.source = String.Format("{0}{1}",GetRoot(source), segmentNames[i]);
ReloadSegmentList();
return GetNextSegment();
}
if (string.IsNullOrEmpty(segmentNames[i]))
continue;
this.segmentIndex = i;
this.finishedSegments.Add(segmentNames[i]);