Files
NebulaLauncher/Nebula.Shared/Services/ContentService.Download.cs

275 lines
9.7 KiB
C#
Raw Permalink Normal View History

2025-01-05 17:05:23 +03:00
using System.Buffers.Binary;
2024-12-27 19:15:33 +03:00
using System.Globalization;
using System.Net.Http.Headers;
using System.Numerics;
2025-03-13 14:58:47 +03:00
using Nebula.Shared.FileApis;
2025-01-05 17:05:23 +03:00
using Nebula.Shared.FileApis.Interfaces;
using Nebula.Shared.Models;
using Nebula.Shared.Utils;
2024-12-27 19:15:33 +03:00
2025-01-05 17:05:23 +03:00
namespace Nebula.Shared.Services;
2024-12-27 19:15:33 +03:00
public partial class ContentService
{
public readonly IReadWriteFileApi ContentFileApi = fileService.CreateFileApi("content");
public readonly IReadWriteFileApi ManifestFileApi = fileService.CreateFileApi("manifest");
2025-07-02 21:32:51 +03:00
public void SetServerHash(string address, string hash)
{
var dict = varService.GetConfigValue(CurrentConVar.ServerManifestHash)!;
if (dict.TryGetValue(address, out var oldHash))
{
if(oldHash == hash) return;
ManifestFileApi.Remove(oldHash);
}
dict[address] = hash;
varService.SetConfigValue(CurrentConVar.ServerManifestHash, dict);
}
2025-07-02 21:32:51 +03:00
public HashApi CreateHashApi(List<RobustManifestItem> manifestItems)
2024-12-27 19:15:33 +03:00
{
2025-07-02 21:32:51 +03:00
return new HashApi(manifestItems, ContentFileApi);
2024-12-27 19:15:33 +03:00
}
2025-03-13 14:58:47 +03:00
public async Task<HashApi> EnsureItems(ManifestReader manifestReader, Uri downloadUri,
2025-01-14 22:10:16 +03:00
ILoadingHandler loadingHandler,
2024-12-27 19:15:33 +03:00
CancellationToken cancellationToken)
{
List<RobustManifestItem> allItems = [];
List<RobustManifestItem> items = [];
while (manifestReader.TryReadItem(out var item))
{
if (cancellationToken.IsCancellationRequested)
2025-03-13 14:58:47 +03:00
throw new TaskCanceledException();
2024-12-27 19:15:33 +03:00
allItems.Add(item.Value);
}
2025-07-02 21:32:51 +03:00
var hashApi = CreateHashApi(allItems);
2024-12-27 19:15:33 +03:00
2025-03-13 14:58:47 +03:00
items = allItems.Where(a=> !hashApi.Has(a)).ToList();
2024-12-27 19:15:33 +03:00
2025-07-10 15:22:15 +03:00
loadingHandler.SetLoadingMessage("Download Count:" + items.Count);
_logger.Log("Download Count:" + items.Count);
2025-03-13 14:58:47 +03:00
await Download(downloadUri, items, hashApi, loadingHandler, cancellationToken);
2024-12-27 19:15:33 +03:00
2025-03-13 14:58:47 +03:00
return hashApi;
2024-12-27 19:15:33 +03:00
}
2025-03-13 14:58:47 +03:00
public async Task<HashApi> EnsureItems(RobustManifestInfo info, ILoadingHandler loadingHandler,
2024-12-27 19:15:33 +03:00
CancellationToken cancellationToken)
{
_logger.Log("Getting manifest: " + info.Hash);
2025-07-10 15:22:15 +03:00
loadingHandler.SetLoadingMessage("Getting manifest: " + info.Hash);
2024-12-27 19:15:33 +03:00
if (ManifestFileApi.TryOpen(info.Hash, out var stream))
2024-12-27 19:15:33 +03:00
{
_logger.Log("Loading manifest from: " + info.Hash);
2025-01-08 18:00:06 +03:00
return await EnsureItems(new ManifestReader(stream), info.DownloadUri, loadingHandler, cancellationToken);
2024-12-27 19:15:33 +03:00
}
2025-07-02 21:32:51 +03:00
SetServerHash(info.ManifestUri.ToString(), info.Hash);
2024-12-27 19:15:33 +03:00
_logger.Log("Fetching manifest from: " + info.ManifestUri);
2025-07-10 15:22:15 +03:00
loadingHandler.SetLoadingMessage("Fetching manifest from: " + info.ManifestUri);
2024-12-27 19:15:33 +03:00
var response = await _http.GetAsync(info.ManifestUri, cancellationToken);
if (!response.IsSuccessStatusCode) throw new Exception();
await using var streamContent = await response.Content.ReadAsStreamAsync(cancellationToken);
ManifestFileApi.Save(info.Hash, streamContent);
2024-12-27 19:15:33 +03:00
streamContent.Seek(0, SeekOrigin.Begin);
using var manifestReader = new ManifestReader(streamContent);
2025-01-08 18:00:06 +03:00
return await EnsureItems(manifestReader, info.DownloadUri, loadingHandler, cancellationToken);
2024-12-27 19:15:33 +03:00
}
2025-04-20 15:43:57 +03:00
public void Unpack(HashApi hashApi, IWriteFileApi otherApi, ILoadingHandler loadingHandler)
2024-12-27 19:15:33 +03:00
{
_logger.Log("Unpack manifest files");
2025-03-13 14:58:47 +03:00
var items = hashApi.Manifest.Values.ToList();
2025-01-08 18:00:06 +03:00
loadingHandler.AppendJob(items.Count);
2025-04-29 21:56:21 +03:00
var options = new ParallelOptions
{
MaxDegreeOfParallelism = 10
};
Parallel.ForEach(items, options, item =>
2025-01-08 18:00:06 +03:00
{
2025-03-13 14:58:47 +03:00
if (hashApi.TryOpen(item, out var stream))
2024-12-27 19:15:33 +03:00
{
_logger.Log($"Unpack {item.Hash} to: {item.Path}");
2025-05-11 22:41:15 +03:00
otherApi.Save(item.Path, stream);
2024-12-27 19:15:33 +03:00
stream.Close();
}
else
{
_logger.Error("OH FUCK!! " + item.Path);
2024-12-27 19:15:33 +03:00
}
2025-01-14 22:10:16 +03:00
2025-01-08 18:00:06 +03:00
loadingHandler.AppendResolvedJob();
2025-04-29 21:56:21 +03:00
});
2025-04-20 15:43:57 +03:00
if (loadingHandler is IDisposable disposable)
{
disposable.Dispose();
}
2024-12-27 19:15:33 +03:00
}
2025-03-13 14:58:47 +03:00
public async Task Download(Uri contentCdn, List<RobustManifestItem> toDownload, HashApi hashApi, ILoadingHandler loadingHandler,
2025-01-14 22:10:16 +03:00
CancellationToken cancellationToken)
2024-12-27 19:15:33 +03:00
{
if (toDownload.Count == 0 || cancellationToken.IsCancellationRequested)
{
_logger.Log("Nothing to download! Fuck this!");
2024-12-27 19:15:33 +03:00
return;
}
2025-01-08 18:00:06 +03:00
var downloadJobWatch = loadingHandler.GetQueryJob();
2025-07-10 15:22:15 +03:00
loadingHandler.SetLoadingMessage("Downloading from: " + contentCdn);
_logger.Log("Downloading from: " + contentCdn);
2024-12-27 19:15:33 +03:00
var requestBody = new byte[toDownload.Count * 4];
var reqI = 0;
foreach (var item in toDownload)
{
BinaryPrimitives.WriteInt32LittleEndian(requestBody.AsSpan(reqI, 4), item.Id);
reqI += 4;
}
var request = new HttpRequestMessage(HttpMethod.Post, contentCdn);
request.Headers.Add(
"X-Robust-Download-Protocol",
2025-01-14 22:10:16 +03:00
varService.GetConfigValue(CurrentConVar.ManifestDownloadProtocolVersion)
.ToString(CultureInfo.InvariantCulture));
2024-12-27 19:15:33 +03:00
request.Content = new ByteArrayContent(requestBody);
request.Content.Headers.ContentType = new MediaTypeHeaderValue("application/octet-stream");
request.Headers.AcceptEncoding.Add(new StringWithQualityHeaderValue("zstd"));
var response = await _http.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken);
if (cancellationToken.IsCancellationRequested)
{
2025-07-10 15:22:15 +03:00
_logger.Log("Downloading cancelled!");
2024-12-27 19:15:33 +03:00
return;
}
2025-01-14 22:10:16 +03:00
2025-01-08 18:00:06 +03:00
downloadJobWatch.Dispose();
2024-12-27 19:15:33 +03:00
response.EnsureSuccessStatusCode();
var stream = await response.Content.ReadAsStreamAsync();
var bandwidthStream = new BandwidthStream(stream);
stream = bandwidthStream;
if (response.Content.Headers.ContentEncoding.Contains("zstd"))
stream = new ZStdDecompressStream(stream);
await using var streamDispose = stream;
// Read flags header
var streamHeader = await stream.ReadExactAsync(4, null);
var streamFlags = (DownloadStreamHeaderFlags)BinaryPrimitives.ReadInt32LittleEndian(streamHeader);
var preCompressed = (streamFlags & DownloadStreamHeaderFlags.PreCompressed) != 0;
// compressContext.SetParameter(ZSTD_cParameter.ZSTD_c_nbWorkers, 4);
// If the stream is pre-compressed we need to decompress the blobs to verify BLAKE2B hash.
// If it isn't, we need to manually try re-compressing individual files to store them.
var compressContext = preCompressed ? null : new ZStdCCtx();
var decompressContext = preCompressed ? new ZStdDCtx() : null;
// Normal file header:
// <int32> uncompressed length
// When preCompressed is set, we add:
// <int32> compressed length
var fileHeader = new byte[preCompressed ? 8 : 4];
try
{
// Buffer for storing compressed ZStd data.
var compressBuffer = new byte[1024];
// Buffer for storing uncompressed data.
var readBuffer = new byte[1024];
var i = 0;
2025-01-14 22:10:16 +03:00
2025-01-08 18:00:06 +03:00
loadingHandler.AppendJob(toDownload.Count);
2025-01-14 22:10:16 +03:00
2024-12-27 19:15:33 +03:00
foreach (var item in toDownload)
{
if (cancellationToken.IsCancellationRequested)
{
2025-07-10 15:22:15 +03:00
_logger.Log("Downloading cancelled!");
2024-12-27 19:15:33 +03:00
decompressContext?.Dispose();
compressContext?.Dispose();
return;
}
// Read file header.
await stream.ReadExactAsync(fileHeader, null);
var length = BinaryPrimitives.ReadInt32LittleEndian(fileHeader.AsSpan(0, 4));
EnsureBuffer(ref readBuffer, length);
var data = readBuffer.AsMemory(0, length);
if (preCompressed)
{
// Compressed length from extended header.
var compressedLength = BinaryPrimitives.ReadInt32LittleEndian(fileHeader.AsSpan(4, 4));
if (compressedLength > 0)
{
EnsureBuffer(ref compressBuffer, compressedLength);
var compressedData = compressBuffer.AsMemory(0, compressedLength);
await stream.ReadExactAsync(compressedData, null);
// Decompress so that we can verify hash down below.
var decompressedLength = decompressContext!.Decompress(data.Span, compressedData.Span);
if (decompressedLength != data.Length)
throw new Exception($"Compressed blob {i} had incorrect decompressed size!");
}
else
{
await stream.ReadExactAsync(data, null);
}
}
else
{
await stream.ReadExactAsync(data, null);
}
using var fileStream = new MemoryStream(data.ToArray());
2025-03-13 14:58:47 +03:00
hashApi.Save(item, fileStream);
2024-12-27 19:15:33 +03:00
_logger.Log("file saved:" + item.Path);
2025-01-08 18:00:06 +03:00
loadingHandler.AppendResolvedJob();
2024-12-27 19:15:33 +03:00
i += 1;
}
}
finally
{
decompressContext?.Dispose();
compressContext?.Dispose();
}
}
private static void EnsureBuffer(ref byte[] buf, int needsFit)
{
if (buf.Length >= needsFit)
return;
var newLen = 2 << BitOperations.Log2((uint)needsFit - 1);
buf = new byte[newLen];
}
}