Increase WebRTC file transfer speed by implementing the bufferamountlow event properly

This commit is contained in:
schlagmichdoch 2023-10-13 18:26:58 +02:00
parent 49b749f32b
commit 06d00668b8
2 changed files with 111 additions and 107 deletions

View File

@ -270,6 +270,9 @@ class Peer {
this._roomIds = {};
this._updateRoomIds(roomType, roomId);
this._chunkSize = 262144;
this._lowWaterMark = 2* this._chunkSize;
this._filesQueue = [];
this._busy = false;
@ -448,22 +451,9 @@ class Peer {
mime: file.type
});
this._chunker = new FileChunker(file,
chunk => this._send(chunk),
offset => this._onPartitionEnd(offset));
this._chunker.nextPartition();
}
_onPartitionEnd(offset) {
this.sendJSON({ type: 'partition', offset: offset });
}
_onReceivedPartitionEnd(offset) {
this.sendJSON({ type: 'partition-received', offset: offset });
}
_sendNextPartition() {
if (!this._chunker || this._chunker.isFileEnd()) return;
this._chunker.nextPartition();
this._chunkSize,
chunk => this._send(chunk));
this._chunker._readChunksIntoBuffer(this._channel ? this._channel.bufferedAmount : 0);
}
_sendProgress(progress) {
@ -483,12 +473,6 @@ class Peer {
case 'header':
this._onFileHeader(messageJSON);
break;
case 'partition':
this._onReceivedPartitionEnd(messageJSON);
break;
case 'partition-received':
this._sendNextPartition();
break;
case 'progress':
this._onDownloadProgress(messageJSON.progress);
break;
@ -572,7 +556,6 @@ class Peer {
_onChunkReceived(chunk) {
if(!this._digester || !(chunk.byteLength || chunk.size)) return;
this._digester.unchunk(chunk);
const progress = this._digester.progress;
@ -634,6 +617,8 @@ class Peer {
} else {
this._dequeueFile();
}
this._chunker._removeEventListener();
this._chunker = null;
}
_onFileTransferRequestResponded(message) {
@ -711,8 +696,7 @@ class RTCPeer extends Peer {
if (!this._conn) return;
const channel = this._conn.createDataChannel('data-channel', {
ordered: true,
reliable: true // Obsolete. See https://developer.mozilla.org/en-US/docs/Web/API/RTCDataChannel/reliable
ordered: false,
});
channel.onopen = e => this._onChannelOpened(e);
channel.onerror = e => this._onError(e);
@ -758,7 +742,15 @@ class RTCPeer extends Peer {
channel.binaryType = 'arraybuffer';
channel.onmessage = e => this._onMessage(e.data);
channel.onclose = _ => this._onChannelClosed();
this._chunkSize = Math.min(this._conn.sctp.maxMessageSize, 262144); // max chunk size: 256 KB
this._lowWaterMark = 2 * this._chunkSize;
channel.bufferedAmountLowThreshold = this._lowWaterMark;
channel.onbufferedamountlow = () => Events.fire("bufferedamountlow", channel.bufferedAmount);
this._channel = channel;
Events.on('beforeunload', e => this._onBeforeUnload(e));
Events.on('pagehide', _ => this._onPageHide());
Events.fire('peer-connected', {peerId: this._peerId, connectionHash: this.getConnectionHash()});
@ -1104,51 +1096,56 @@ class PeersManager {
class FileChunker {
constructor(file, onChunk, onPartitionEnd) {
this._chunkSize = 64000; // 64 KB
this._maxPartitionSize = 1e6; // 1 MB
this._offset = 0;
this._partitionSize = 0;
constructor(file, chunkSize, onChunk) {
this._file = file;
this._chunkSize = chunkSize;
this._highWaterMark = 8 * chunkSize;
this._bytesToSend = file.size;
this._sendProgress = 0;
this._onChunk = onChunk;
this._onPartitionEnd = onPartitionEnd;
this._reader = new FileReader();
this._reader.addEventListener('error', err => console.error('Error reading file:', err));
this._reader.addEventListener('abort', e => console.log('File reading aborted:', e));
this._reader.addEventListener('load', e => this._onChunkRead(e.target.result));
this.bufferedAmountCallback = e => this._readChunksIntoBuffer(e.detail);
Events.on('bufferedamountlow', this.bufferedAmountCallback);
}
nextPartition() {
this._partitionSize = 0;
_removeEventListener() {
Events.off('bufferedamountlow', this.bufferedAmountCallback);
}
_readChunksIntoBuffer(bufferedAmount) {
this._bufferedAmount = bufferedAmount;
this._readChunk();
}
_readChunk() {
const chunk = this._file.slice(this._offset, this._offset + this._chunkSize);
const chunk = this._file.slice(this._sendProgress, this._sendProgress + this._chunkSize);
this._reader.readAsArrayBuffer(chunk);
}
_onChunkRead(chunk) {
this._offset += chunk.byteLength;
this._partitionSize += chunk.byteLength;
this._onChunk(chunk);
if (this.isFileEnd()) return;
if (this._isPartitionEnd()) {
this._onPartitionEnd(this._offset);
return;
}
this._bufferedAmount += this._chunkSize;
this._sendProgress += this._chunkSize;
if (this._isBufferFull() || this._isFileEnd()) return;
this._readChunk();
}
repeatPartition() {
this._offset -= this._partitionSize;
this.nextPartition();
_isBufferFull() {
return this._bufferedAmount >= this._highWaterMark;
}
_isPartitionEnd() {
return this._partitionSize >= this._maxPartitionSize;
}
isFileEnd() {
return this._offset >= this._file.size;
_isFileEnd() {
return this._sendProgress >= this._bytesToSend;
}
}
@ -1172,6 +1169,7 @@ class FileDigester {
if (isNaN(this.progress)) this.progress = 1
if (this._bytesReceived < this._size) return;
// we are done
const blob = new Blob(this._buffer)
this._buffer = null;

View File

@ -158,8 +158,6 @@ class ServerConnection {
break;
case 'request':
case 'header':
case 'partition':
case 'partition-received':
case 'progress':
case 'files-transfer-response':
case 'file-transfer-complete':
@ -281,6 +279,9 @@ class Peer {
this._roomIds = {};
this._updateRoomIds(roomType, roomId);
this._chunkSize = 262144;
this._lowWaterMark = 2* this._chunkSize;
this._filesQueue = [];
this._busy = false;
@ -459,22 +460,9 @@ class Peer {
mime: file.type
});
this._chunker = new FileChunker(file,
chunk => this._send(chunk),
offset => this._onPartitionEnd(offset));
this._chunker.nextPartition();
}
_onPartitionEnd(offset) {
this.sendJSON({ type: 'partition', offset: offset });
}
_onReceivedPartitionEnd(offset) {
this.sendJSON({ type: 'partition-received', offset: offset });
}
_sendNextPartition() {
if (!this._chunker || this._chunker.isFileEnd()) return;
this._chunker.nextPartition();
this._chunkSize,
chunk => this._send(chunk));
this._chunker._readChunksIntoBuffer(this._channel ? this._channel.bufferedAmount : 0);
}
_sendProgress(progress) {
@ -494,12 +482,6 @@ class Peer {
case 'header':
this._onFileHeader(messageJSON);
break;
case 'partition':
this._onReceivedPartitionEnd(messageJSON);
break;
case 'partition-received':
this._sendNextPartition();
break;
case 'progress':
this._onDownloadProgress(messageJSON.progress);
break;
@ -507,7 +489,7 @@ class Peer {
this._onFileTransferRequestResponded(messageJSON);
break;
case 'file-transfer-complete':
this._onFileTransferCompleted();
this._onFileTransferCompleted(messageJSON);
break;
case 'message-transfer-complete':
this._onMessageTransferCompleted();
@ -563,6 +545,7 @@ class Peer {
_onFileHeader(header) {
if (this._requestAccepted && this._requestAccepted.header.length) {
this._lastProgress = 0;
this._timeStart = Date.now();
this._digester = new FileDigester({size: header.size, name: header.name, mime: header.mime},
this._requestAccepted.totalSize,
this._totalBytesReceived,
@ -582,7 +565,6 @@ class Peer {
_onChunkReceived(chunk) {
if(!this._digester || !(chunk.byteLength || chunk.size)) return;
this._digester.unchunk(chunk);
const progress = this._digester.progress;
@ -606,7 +588,13 @@ class Peer {
const acceptedHeader = this._requestAccepted.header.shift();
this._totalBytesReceived += fileBlob.size;
this.sendJSON({type: 'file-transfer-complete'});
let duration = (Date.now() - this._timeStart) / 1000;
let size = Math.round(10 * fileBlob.size / 1000000) / 10;
let speed = Math.round(100 * fileBlob.size / 1000000 / duration) / 100;
console.log(`File received.\n\nSize: ${size} MB\tDuration: ${duration} s\tSpeed: ${speed} MB/s`);
this.sendJSON({type: 'file-transfer-complete', size: size, duration: duration, speed: speed});
const sameSize = fileBlob.size === acceptedHeader.size;
const sameName = fileBlob.name === acceptedHeader.name
@ -627,8 +615,10 @@ class Peer {
}
}
_onFileTransferCompleted() {
this._chunker = null;
_onFileTransferCompleted(message) {
console.log(`File sent.\n\nSize: ${message.size} MB\tDuration: ${message.duration} s\tSpeed: ${message.speed} MB/s`);
if (!this._filesQueue.length) {
this._busy = false;
Events.fire('notify-user', Localization.getTranslation("notifications.file-transfer-completed"));
@ -636,6 +626,8 @@ class Peer {
} else {
this._dequeueFile();
}
this._chunker._removeEventListener();
this._chunker = null;
}
_onFileTransferRequestResponded(message) {
@ -713,8 +705,7 @@ class RTCPeer extends Peer {
if (!this._conn) return;
const channel = this._conn.createDataChannel('data-channel', {
ordered: true,
reliable: true // Obsolete. See https://developer.mozilla.org/en-US/docs/Web/API/RTCDataChannel/reliable
ordered: false,
});
channel.onopen = e => this._onChannelOpened(e);
channel.onerror = e => this._onError(e);
@ -760,7 +751,15 @@ class RTCPeer extends Peer {
channel.binaryType = 'arraybuffer';
channel.onmessage = e => this._onMessage(e.data);
channel.onclose = _ => this._onChannelClosed();
this._chunkSize = Math.min(this._conn.sctp.maxMessageSize, 262144); // max chunk size: 256 KB
this._lowWaterMark = 2 * this._chunkSize;
channel.bufferedAmountLowThreshold = this._lowWaterMark;
channel.onbufferedamountlow = () => Events.fire("bufferedamountlow", channel.bufferedAmount);
this._channel = channel;
Events.on('beforeunload', e => this._onBeforeUnload(e));
Events.on('pagehide', _ => this._onPageHide());
Events.fire('peer-connected', {peerId: this._peerId, connectionHash: this.getConnectionHash()});
@ -904,6 +903,7 @@ class WSPeer extends Peer {
type: 'ws-chunk',
chunk: arrayBufferToBase64(chunk)
});
Events.fire("bufferedamountlow", 0);
}
sendJSON(message) {
@ -992,7 +992,7 @@ class PeersManager {
}
if (window.isRtcSupported && rtcSupported) {
this.peers[peerId] = new RTCPeer(this._server,isCaller, peerId, roomType, roomId);
this.peers[peerId] = new RTCPeer(this._server, isCaller, peerId, roomType, roomId);
} else {
this.peers[peerId] = new WSPeer(this._server, isCaller, peerId, roomType, roomId);
}
@ -1167,51 +1167,56 @@ class PeersManager {
class FileChunker {
constructor(file, onChunk, onPartitionEnd) {
this._chunkSize = 64000; // 64 KB
this._maxPartitionSize = 1e6; // 1 MB
this._offset = 0;
this._partitionSize = 0;
constructor(file, chunkSize, onChunk) {
this._file = file;
this._chunkSize = chunkSize;
this._highWaterMark = 8 * chunkSize;
this._bytesToSend = file.size;
this._sendProgress = 0;
this._onChunk = onChunk;
this._onPartitionEnd = onPartitionEnd;
this._reader = new FileReader();
this._reader.addEventListener('error', err => console.error('Error reading file:', err));
this._reader.addEventListener('abort', e => console.log('File reading aborted:', e));
this._reader.addEventListener('load', e => this._onChunkRead(e.target.result));
this.bufferedAmountCallback = e => this._readChunksIntoBuffer(e.detail);
Events.on('bufferedamountlow', this.bufferedAmountCallback);
}
nextPartition() {
this._partitionSize = 0;
_removeEventListener() {
Events.off('bufferedamountlow', this.bufferedAmountCallback);
}
_readChunksIntoBuffer(bufferedAmount) {
this._bufferedAmount = bufferedAmount;
this._readChunk();
}
_readChunk() {
const chunk = this._file.slice(this._offset, this._offset + this._chunkSize);
const chunk = this._file.slice(this._sendProgress, this._sendProgress + this._chunkSize);
this._reader.readAsArrayBuffer(chunk);
}
_onChunkRead(chunk) {
this._offset += chunk.byteLength;
this._partitionSize += chunk.byteLength;
this._onChunk(chunk);
if (this.isFileEnd()) return;
if (this._isPartitionEnd()) {
this._onPartitionEnd(this._offset);
return;
}
this._bufferedAmount += this._chunkSize;
this._sendProgress += this._chunkSize;
if (this._isBufferFull() || this._isFileEnd()) return;
this._readChunk();
}
repeatPartition() {
this._offset -= this._partitionSize;
this.nextPartition();
_isBufferFull() {
return this._bufferedAmount >= this._highWaterMark;
}
_isPartitionEnd() {
return this._partitionSize >= this._maxPartitionSize;
}
isFileEnd() {
return this._offset >= this._file.size;
_isFileEnd() {
return this._sendProgress >= this._bytesToSend;
}
}
@ -1235,6 +1240,7 @@ class FileDigester {
if (isNaN(this.progress)) this.progress = 1
if (this._bytesReceived < this._size) return;
// we are done
const blob = new Blob(this._buffer)
this._buffer = null;