using System; using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.Threading.Tasks; using Minio; using Minio.DataModel; using Minio.DataModel.Args; using Minio.Exceptions; using skyscraper5.Dvb.DataBroadcasting.SkyscraperVfs; using skyscraper5.Skyscraper.Scraper.Storage.Split; using skyscraper8.DvbNip; using skyscraper8.Ietf.FLUTE; namespace skyscraper5.Data { public class MinioObjectStorage : ObjectStorage { private readonly IMinioClient _minioClient; private readonly string _minioBucket; private List _tasks; private List droppedFiles; public MinioObjectStorage(IMinioClient minioClient, string minioBucket) { _minioClient = minioClient; _minioBucket = minioBucket; _tasks = new List(); droppedFiles = new List(); } private void WriteObject(string fullName, Stream buffer, string mime = "application/octet-stream", Dictionary optionalData = null) { PutObjectArgs putObjectArgs = new PutObjectArgs(); putObjectArgs = putObjectArgs.WithBucket(_minioBucket); putObjectArgs = putObjectArgs.WithObject(fullName); putObjectArgs = putObjectArgs.WithStreamData(buffer); //putObjectArgs = putObjectArgs.WithObjectSize(buffer.Length); putObjectArgs = putObjectArgs.WithContentType(mime); putObjectArgs = putObjectArgs.WithHeaders(optionalData); lock (_tasks) { _tasks.Add(_minioClient.PutObjectAsync(putObjectArgs).ContinueWith(task => { droppedFiles.Add(fullName); definetlyKnownFiles.Add(fullName); definetlyMissingFiles.Remove(fullName); buffer.Close(); buffer.Dispose(); })); } CleanTaskList(); } private void CleanTaskList() { while (true) { bool done = true; lock (_tasks) { foreach (Task task in _tasks) { if (task.IsCompleted) { _tasks.Remove(task); Debug.WriteLine(String.Format("Removed completed Task: {0}", task.ToString())); done = false; break; } } } if (done) break; } } public bool ObjectCarouselFileArrival(VfsFile vfsFile, int transportStreamId, int networkId) { string combine = string.Join('/', "DSM-CC_Objects", networkId.ToString(), transportStreamId.ToString(), vfsFile.SourcePid.ToString(), vfsFile.ToString().Substring(1)); combine = combine.Replace('\\', '/'); if (FileExists(combine)) return false; Dictionary extendedInfo = new Dictionary(); extendedInfo.Add(nameof(vfsFile.ObjectLocation.CarouselId), vfsFile.ObjectLocation.CarouselId.ToString()); extendedInfo.Add(nameof(vfsFile.ObjectLocation.ModuleId), vfsFile.ObjectLocation.ModuleId.ToString()); extendedInfo.Add(nameof(vfsFile.ObjectLocation.Version), vfsFile.ObjectLocation.Version.ToString()); extendedInfo.Add(nameof(vfsFile.ObjectLocation.ObjectKey), BitConverter.ToString(vfsFile.ObjectLocation.ObjectKey)); WriteObject(combine,new MemoryStream(vfsFile.FileContent),"application/octet-stream",extendedInfo); return true; } public void DataCarouselModuleArrival(int currentNetworkId, int currentTransportStreamId, int elementaryPid, ushort moduleModuleId, byte moduleModuleVersion, Stream result) { string combine = string.Join('/', "DSM-CC_Data", currentNetworkId.ToString(), currentTransportStreamId.ToString(), elementaryPid.ToString(), String.Format("{0}_V{1}.bin", moduleModuleId, moduleModuleVersion)); if (FileExists(combine)) return; WriteObject(combine, result); WantedModuleCoordinate coordinate = new WantedModuleCoordinate(currentNetworkId, currentTransportStreamId, elementaryPid, moduleModuleId, moduleModuleVersion); knownModuleCoordinates.Add(coordinate); wantedModuleCoordinates.Remove(coordinate); } public bool FileExists(string combine) { if (definetlyMissingFiles == null) definetlyMissingFiles = new HashSet(); if (definetlyKnownFiles == null) definetlyKnownFiles = new HashSet(); if (definetlyKnownFiles.Contains(combine)) return true; if (definetlyMissingFiles.Contains(combine)) return false; if (droppedFiles != null) { if (droppedFiles.Contains(combine)) return true; } StatObjectArgs soa = new StatObjectArgs().WithBucket(_minioBucket).WithObject(combine); try { ObjectStat objectStat = _minioClient.StatObjectAsync(soa).Result; definetlyKnownFiles.Add(combine); return true; } catch (AggregateException e) { MinioException minioException = e.InnerExceptions[0] as MinioException; if (minioException.Message.Contains("Minio.Exceptions.ObjectNotFoundException")) { definetlyMissingFiles.Add(combine); return false; } else { throw minioException; } } } public bool IsDsmCcModuleWanted(int currentNetworkId, int currentTransportStreamId, int elementaryPid, ushort moduleId, byte moduleVersion) { if (wantedModuleCoordinates == null) wantedModuleCoordinates = new HashSet(); if (knownModuleCoordinates == null) knownModuleCoordinates = new HashSet(); WantedModuleCoordinate coordinate = new WantedModuleCoordinate(currentNetworkId, currentTransportStreamId, elementaryPid, moduleId, moduleVersion); if (wantedModuleCoordinates.Contains(coordinate)) return true; if (knownModuleCoordinates.Contains(coordinate)) return false; string combine = String.Join('/', "DSM-CC_Data", currentNetworkId.ToString(), currentTransportStreamId.ToString(), elementaryPid.ToString(), String.Format("{0}_V{1}.bin", moduleId, moduleVersion)); bool isWanted = !FileExists(combine); if (isWanted) { wantedModuleCoordinates.Add(coordinate); } else { knownModuleCoordinates.Add(coordinate); } return isWanted; } public bool TestForFramegrab(int currentNetworkId, int transportStreamId, ushort mappingProgramNumber, int mappingStreamElementaryPid) { string combine = string.Join('/', "Framegrabs", currentNetworkId.ToString(), transportStreamId.ToString(), String.Format("{1}_{0}.jpg", mappingStreamElementaryPid, mappingProgramNumber)); return FileExists(combine); } public void StoreFramegrab(int currentNetworkId, int transportStreamId, ushort mappingProgramNumber, ushort pid, byte[] imageData) { string combine = string.Join('/', "Framegrabs", currentNetworkId.ToString(), transportStreamId.ToString(), String.Format("{1}_{0}.jpg", pid, mappingProgramNumber)); WriteObject(combine, new MemoryStream(imageData), "image/jpeg"); } public void WaitForCompletion() { while (_tasks.Count > 0) { _tasks[0].Wait(); _tasks.RemoveAt(0); } wantedModuleCoordinates?.Clear(); knownModuleCoordinates?.Clear(); definetlyKnownFiles?.Clear(); } private HashSet wantedModuleCoordinates; private HashSet knownModuleCoordinates; private HashSet definetlyKnownFiles; struct WantedModuleCoordinate { private int currentNetworkId, currentTransportStreamId, elementaryPid; private ushort moduleId; private byte moduleVersion; public WantedModuleCoordinate(int currentNetworkId, int currentTransportStreamId, int elementaryPid, ushort moduleId, byte moduleVersion) { this.currentNetworkId = currentNetworkId; this.currentTransportStreamId = currentTransportStreamId; this.elementaryPid = elementaryPid; this.moduleId = moduleId; this.moduleVersion = moduleVersion; } public bool Equals(WantedModuleCoordinate other) { return currentNetworkId == other.currentNetworkId && currentTransportStreamId == other.currentTransportStreamId && elementaryPid == other.elementaryPid && moduleId == other.moduleId && moduleVersion == other.moduleVersion; } public override bool Equals(object obj) { return obj is WantedModuleCoordinate other && Equals(other); } public override int GetHashCode() { return HashCode.Combine(currentNetworkId, currentTransportStreamId, elementaryPid, moduleId, moduleVersion); } } private int uiVersion; private HashSet definetlyMissingFiles; public void UiSetVersion(int version) { this.uiVersion = version; } public object[] GetPluginConnector() { return new object[] { _minioClient, _minioBucket }; } public void Ping() { GetVersioningArgs args = new GetVersioningArgs().WithBucket(_minioBucket); VersioningConfiguration vc = _minioClient.GetVersioningAsync(args).Result; } public bool DvbNipTestForFile(string announcedFileContentLocation) { string path = "/nip/" + DvbNipUtilities.MakeFilename(announcedFileContentLocation); bool result = FileExists(path); return result; } public void DvbNipFileArrival(NipActualCarrierInformation carrier, FluteListener listener) { string path = "/nip/" + DvbNipUtilities.MakeFilename(listener.FileAssociation.ContentLocation); Stream stream = listener.ToStream(); WriteObject(path, stream); } } }