158 lines
3.7 KiB
C#
158 lines
3.7 KiB
C#
using FileStorageService.www.Data;
|
|
using Microsoft.EntityFrameworkCore;
|
|
|
|
namespace FileStorageService.www.Streams;
|
|
|
|
public class FileBlockStream(Guid fileId, ApplicationDbContext dbContext)
|
|
: Stream
|
|
{
|
|
|
|
// storing position
|
|
private int _nextBlockIndex;
|
|
private byte[]? _currentBlockData;
|
|
private int _currentOffset;
|
|
private readonly Queue<byte[]> _blockQueue = new();
|
|
|
|
// stream functionality flags
|
|
public override bool CanRead => true;
|
|
public override bool CanSeek => false;
|
|
public override bool CanWrite => false;
|
|
|
|
// not implemented props
|
|
public override long Length =>
|
|
throw new NotSupportedException("Length is unknown.");
|
|
|
|
public override long Position
|
|
{
|
|
get => throw new NotSupportedException("Position is not supported.");
|
|
set =>
|
|
throw new NotSupportedException("Position setting is not supported.");
|
|
}
|
|
|
|
// constructor
|
|
|
|
public override int Read(byte[] buffer, int offset, int count)
|
|
{
|
|
var bytesRead = 0;
|
|
|
|
while (count > 0)
|
|
{
|
|
if (_currentBlockData == null || _currentOffset >= _currentBlockData.Length)
|
|
{
|
|
var nextBlock = GetNextBlock();
|
|
|
|
if (nextBlock == null)
|
|
break;
|
|
|
|
_currentBlockData = nextBlock;
|
|
_currentOffset = 0;
|
|
_nextBlockIndex++;
|
|
}
|
|
|
|
var bytesToCopy = Math.Min(count, _currentBlockData.Length - _currentOffset);
|
|
Array.Copy(_currentBlockData, _currentOffset, buffer, offset, bytesToCopy);
|
|
|
|
offset += bytesToCopy;
|
|
count -= bytesToCopy;
|
|
bytesRead += bytesToCopy;
|
|
_currentOffset += bytesToCopy;
|
|
}
|
|
return bytesRead;
|
|
}
|
|
|
|
public override async Task<int> ReadAsync(byte[] buffer, int offset, int count,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
var bytesRead = 0;
|
|
|
|
while (count > 0)
|
|
{
|
|
if (_currentBlockData == null || _currentOffset >= _currentBlockData.Length)
|
|
{
|
|
var nextBlock = await GetNextBlockAsync();
|
|
|
|
if (nextBlock == null)
|
|
break;
|
|
|
|
_currentBlockData = nextBlock;
|
|
_currentOffset = 0;
|
|
_nextBlockIndex++;
|
|
}
|
|
|
|
var bytesToCopy = Math.Min(count, _currentBlockData.Length - _currentOffset);
|
|
Array.Copy(_currentBlockData, _currentOffset, buffer, offset, bytesToCopy);
|
|
|
|
offset += bytesToCopy;
|
|
count -= bytesToCopy;
|
|
bytesRead += bytesToCopy;
|
|
_currentOffset += bytesToCopy;
|
|
}
|
|
return bytesRead;
|
|
}
|
|
|
|
private byte[]? GetNextBlock()
|
|
{
|
|
|
|
if (_blockQueue.Count == 0)
|
|
{
|
|
var newBlocks = dbContext.FileBlocks
|
|
.Where(b =>
|
|
b.FileHandle.Id == fileId && b.BlockNumber >= _nextBlockIndex)
|
|
.OrderBy(b => b.BlockNumber)
|
|
.Take(10)
|
|
.Select(b => b)
|
|
.ToArray();
|
|
|
|
if (newBlocks.Length == 0) return null;
|
|
|
|
foreach (var newBlock in newBlocks)
|
|
{
|
|
_blockQueue.Enqueue(newBlock.Data);
|
|
}
|
|
}
|
|
|
|
// this is redundant, but chatGPT suggests that if this is ran parallel this could have changed.
|
|
if (_blockQueue.Count == 0) return null;
|
|
|
|
var block = _blockQueue.Dequeue();
|
|
|
|
return block;
|
|
}
|
|
|
|
private async Task<byte[]?> GetNextBlockAsync()
|
|
{
|
|
|
|
if (_blockQueue.Count == 0)
|
|
{
|
|
var newBlocks = await dbContext.FileBlocks
|
|
.Where(b =>
|
|
b.FileHandle.Id == fileId && b.BlockNumber >= _nextBlockIndex)
|
|
.OrderBy(b => b.BlockNumber)
|
|
.Take(10)
|
|
.Select(b => b)
|
|
.ToArrayAsync();
|
|
|
|
if (newBlocks.Length == 0) return null;
|
|
|
|
foreach (var newBlock in newBlocks)
|
|
{
|
|
_blockQueue.Enqueue(newBlock.Data);
|
|
}
|
|
}
|
|
|
|
var block = _blockQueue.Dequeue();
|
|
|
|
return block;
|
|
}
|
|
|
|
public override void Flush() { }
|
|
|
|
public override long Seek(long offset, SeekOrigin origin) =>
|
|
throw new NotSupportedException("Seek not supported.");
|
|
|
|
public override void SetLength(long value) =>
|
|
throw new NotSupportedException("SetLength not supported.");
|
|
|
|
public override void Write(byte[] buffer, int offset, int count) =>
|
|
throw new NotSupportedException("Write not supported.");
|
|
} |