file-storage/FileStorageService.www/Streams/FileBlockStream.cs

158 lines
3.7 KiB
C#

using FileStorageService.www.Data;
using Microsoft.EntityFrameworkCore;
namespace FileStorageService.www.Streams;
public class FileBlockStream(Guid fileId, ApplicationDbContext dbContext)
: Stream
{
// storing position
private int _nextBlockIndex;
private byte[]? _currentBlockData;
private int _currentOffset;
private readonly Queue<byte[]> _blockQueue = new();
// stream functionality flags
public override bool CanRead => true;
public override bool CanSeek => false;
public override bool CanWrite => false;
// not implemented props
public override long Length =>
throw new NotSupportedException("Length is unknown.");
public override long Position
{
get => throw new NotSupportedException("Position is not supported.");
set =>
throw new NotSupportedException("Position setting is not supported.");
}
// constructor
public override int Read(byte[] buffer, int offset, int count)
{
var bytesRead = 0;
while (count > 0)
{
if (_currentBlockData == null || _currentOffset >= _currentBlockData.Length)
{
var nextBlock = GetNextBlock();
if (nextBlock == null)
break;
_currentBlockData = nextBlock;
_currentOffset = 0;
_nextBlockIndex++;
}
var bytesToCopy = Math.Min(count, _currentBlockData.Length - _currentOffset);
Array.Copy(_currentBlockData, _currentOffset, buffer, offset, bytesToCopy);
offset += bytesToCopy;
count -= bytesToCopy;
bytesRead += bytesToCopy;
_currentOffset += bytesToCopy;
}
return bytesRead;
}
public override async Task<int> ReadAsync(byte[] buffer, int offset, int count,
CancellationToken cancellationToken)
{
var bytesRead = 0;
while (count > 0)
{
if (_currentBlockData == null || _currentOffset >= _currentBlockData.Length)
{
var nextBlock = await GetNextBlockAsync();
if (nextBlock == null)
break;
_currentBlockData = nextBlock;
_currentOffset = 0;
_nextBlockIndex++;
}
var bytesToCopy = Math.Min(count, _currentBlockData.Length - _currentOffset);
Array.Copy(_currentBlockData, _currentOffset, buffer, offset, bytesToCopy);
offset += bytesToCopy;
count -= bytesToCopy;
bytesRead += bytesToCopy;
_currentOffset += bytesToCopy;
}
return bytesRead;
}
private byte[]? GetNextBlock()
{
if (_blockQueue.Count == 0)
{
var newBlocks = dbContext.FileBlocks
.Where(b =>
b.FileHandle.Id == fileId && b.BlockNumber >= _nextBlockIndex)
.OrderBy(b => b.BlockNumber)
.Take(10)
.Select(b => b)
.ToArray();
if (newBlocks.Length == 0) return null;
foreach (var newBlock in newBlocks)
{
_blockQueue.Enqueue(newBlock.Data);
}
}
// this is redundant, but chatGPT suggests that if this is ran parallel this could have changed.
if (_blockQueue.Count == 0) return null;
var block = _blockQueue.Dequeue();
return block;
}
private async Task<byte[]?> GetNextBlockAsync()
{
if (_blockQueue.Count == 0)
{
var newBlocks = await dbContext.FileBlocks
.Where(b =>
b.FileHandle.Id == fileId && b.BlockNumber >= _nextBlockIndex)
.OrderBy(b => b.BlockNumber)
.Take(10)
.Select(b => b)
.ToArrayAsync();
if (newBlocks.Length == 0) return null;
foreach (var newBlock in newBlocks)
{
_blockQueue.Enqueue(newBlock.Data);
}
}
var block = _blockQueue.Dequeue();
return block;
}
public override void Flush() { }
public override long Seek(long offset, SeekOrigin origin) =>
throw new NotSupportedException("Seek not supported.");
public override void SetLength(long value) =>
throw new NotSupportedException("SetLength not supported.");
public override void Write(byte[] buffer, int offset, int count) =>
throw new NotSupportedException("Write not supported.");
}