polygone/Library/PackageCache/com.unity.multiplayer.mlapi@3e3aef6aa0/Runtime/Messaging/RpcBatcher.cs
2021-08-02 05:44:37 -04:00

225 lines
No EOL
9.4 KiB
C#

using System;
using System.IO;
using System.Linq;
using System.Collections.Generic;
using MLAPI.Serialization.Pooled;
using MLAPI.Serialization;
using MLAPI.Configuration;
using MLAPI.Profiling;
using MLAPI.Transports;
namespace MLAPI.Messaging
{
internal class RpcBatcher
{
public class SendStream
{
public NetworkChannel NetworkChannel;
public PooledNetworkBuffer Buffer;
public PooledNetworkWriter Writer;
public bool IsEmpty = true;
public SendStream()
{
Buffer = PooledNetworkBuffer.Get();
Writer = PooledNetworkWriter.Get(Buffer);
}
}
// Stores the stream of batched RPC to send to each client, by ClientId
private readonly Dictionary<ulong, SendStream> k_SendDict = new Dictionary<ulong, SendStream>();
// Used to store targets, internally
private ulong[] m_TargetList = new ulong[0];
// Used to mark longer lengths. Works because we can't have zero-sized messages
private const byte k_LongLenMarker = 0;
private void PushLength(int length, ref PooledNetworkWriter writer)
{
// If length is single byte we write it
if (length < 256)
{
writer.WriteByte((byte)length); // write the amounts of bytes that are coming up
}
else
{
// otherwise we write a two-byte length
writer.WriteByte(k_LongLenMarker); // mark larger size
writer.WriteByte((byte)(length % 256)); // write the length modulo 256
writer.WriteByte((byte)(length / 256)); // write the length divided by 256
}
}
private int PopLength(in NetworkBuffer messageBuffer)
{
int read = messageBuffer.ReadByte();
// if we read a non-zero value, we have a single byte length
// or a -1 error we can return
if (read != k_LongLenMarker)
{
return read;
}
// otherwise, a two-byte length follows. We'll read in len1, len2
int len1 = messageBuffer.ReadByte();
if (len1 < 0)
{
// pass errors back to caller
return len1;
}
int len2 = messageBuffer.ReadByte();
if (len2 < 0)
{
// pass errors back to caller
return len2;
}
return len1 + len2 * 256;
}
/// <summary>
/// FillTargetList
/// Fills a list with the ClientId's an item is targeted to
/// </summary>
/// <param name="queueItem">the FrameQueueItem we want targets for</param>
/// <param name="networkIdList">the list to fill</param>
private static void FillTargetList(in RpcFrameQueueItem queueItem, ref ulong[] networkIdList)
{
switch (queueItem.QueueItemType)
{
// todo: revisit .resize() and .ToArry() usage, for performance
case RpcQueueContainer.QueueItemType.ServerRpc:
Array.Resize(ref networkIdList, 1);
networkIdList[0] = queueItem.NetworkId;
break;
default:
// todo: consider the implications of default usage of queueItem.clientIds
case RpcQueueContainer.QueueItemType.ClientRpc:
// copy the list
networkIdList = queueItem.ClientNetworkIds.ToArray();
break;
}
}
/// <summary>
/// QueueItem
/// Add a FrameQueueItem to be sent
/// </summary>queueItem
/// <param name="queueItem">the threshold in bytes</param>
public void QueueItem(in RpcFrameQueueItem queueItem)
{
FillTargetList(queueItem, ref m_TargetList);
foreach (ulong clientId in m_TargetList)
{
if (!k_SendDict.ContainsKey(clientId))
{
// todo: consider what happens if many clients join and leave the game consecutively
// we probably need a cleanup mechanism at some point
k_SendDict[clientId] = new SendStream();
}
if (k_SendDict[clientId].IsEmpty)
{
k_SendDict[clientId].IsEmpty = false;
k_SendDict[clientId].NetworkChannel = queueItem.NetworkChannel;
switch (queueItem.QueueItemType)
{
// 8 bits are used for the message type, which is an NetworkConstants
case RpcQueueContainer.QueueItemType.ServerRpc:
k_SendDict[clientId].Writer.WriteByte(NetworkConstants.SERVER_RPC); // MessageType
break;
case RpcQueueContainer.QueueItemType.ClientRpc:
k_SendDict[clientId].Writer.WriteByte(NetworkConstants.CLIENT_RPC); // MessageType
break;
}
}
// write the amounts of bytes that are coming up
PushLength(queueItem.MessageData.Count, ref k_SendDict[clientId].Writer);
// write the message to send
k_SendDict[clientId].Writer.WriteBytes(queueItem.MessageData.Array, queueItem.MessageData.Count, queueItem.MessageData.Offset);
ProfilerStatManager.BytesSent.Record(queueItem.MessageData.Count);
ProfilerStatManager.RpcsSent.Record();
PerformanceDataManager.Increment(ProfilerConstants.ByteSent, queueItem.MessageData.Count);
PerformanceDataManager.Increment(ProfilerConstants.RpcSent);
}
}
public delegate void SendCallbackType(ulong clientId, SendStream messageStream);
public delegate void ReceiveCallbackType(NetworkBuffer messageStream, RpcQueueContainer.QueueItemType messageType, ulong clientId, float receiveTime);
/// <summary>
/// SendItems
/// Send any batch of RPC that are of length above threshold
/// </summary>
/// <param name="thresholdBytes"> the threshold in bytes</param>
/// <param name="sendCallback"> the function to call for sending the batch</param>
public void SendItems(int thresholdBytes, SendCallbackType sendCallback)
{
foreach (KeyValuePair<ulong, SendStream> entry in k_SendDict)
{
if (!entry.Value.IsEmpty)
{
// read the queued message
int length = (int)k_SendDict[entry.Key].Buffer.Length;
if (length >= thresholdBytes)
{
sendCallback(entry.Key, entry.Value);
// clear the batch that was sent from the SendDict
entry.Value.Buffer.SetLength(0);
entry.Value.Buffer.Position = 0;
entry.Value.IsEmpty = true;
ProfilerStatManager.RpcBatchesSent.Record();
PerformanceDataManager.Increment(ProfilerConstants.RpcBatchesSent);
}
}
}
}
/// <summary>
/// ReceiveItems
/// Process the messageStream and call the callback with individual RPC messages
/// </summary>
/// <param name="messageBuffer"> the messageStream containing the batched RPC</param>
/// <param name="receiveCallback"> the callback to call has type int f(message, type, clientId, time) </param>
/// <param name="messageType"> the message type to pass back to callback</param>
/// <param name="clientId"> the clientId to pass back to callback</param>
/// <param name="receiveTime"> the packet receive time to pass back to callback</param>
public void ReceiveItems(in NetworkBuffer messageBuffer, ReceiveCallbackType receiveCallback, RpcQueueContainer.QueueItemType messageType, ulong clientId, float receiveTime)
{
using (var copy = PooledNetworkBuffer.Get())
{
do
{
// read the length of the next RPC
int rpcSize = PopLength(messageBuffer);
if (rpcSize < 0)
{
// abort if there's an error reading lengths
return;
}
// copy what comes after current stream position
long position = messageBuffer.Position;
copy.SetLength(rpcSize);
copy.Position = 0;
Buffer.BlockCopy(messageBuffer.GetBuffer(), (int)position, copy.GetBuffer(), 0, rpcSize);
receiveCallback(copy, messageType, clientId, receiveTime);
// seek over the RPC
// RPCReceiveQueueItem peeks at content, it doesn't advance
messageBuffer.Seek(rpcSize, SeekOrigin.Current);
} while (messageBuffer.Position < messageBuffer.Length);
}
}
}
}