using System; using System.IO; using System.Linq; using System.Collections.Generic; using MLAPI.Serialization.Pooled; using MLAPI.Serialization; using MLAPI.Configuration; using MLAPI.Profiling; using MLAPI.Transports; namespace MLAPI.Messaging { internal class RpcBatcher { public class SendStream { public NetworkChannel NetworkChannel; public PooledNetworkBuffer Buffer; public PooledNetworkWriter Writer; public bool IsEmpty = true; public SendStream() { Buffer = PooledNetworkBuffer.Get(); Writer = PooledNetworkWriter.Get(Buffer); } } // Stores the stream of batched RPC to send to each client, by ClientId private readonly Dictionary k_SendDict = new Dictionary(); // Used to store targets, internally private ulong[] m_TargetList = new ulong[0]; // Used to mark longer lengths. Works because we can't have zero-sized messages private const byte k_LongLenMarker = 0; private void PushLength(int length, ref PooledNetworkWriter writer) { // If length is single byte we write it if (length < 256) { writer.WriteByte((byte)length); // write the amounts of bytes that are coming up } else { // otherwise we write a two-byte length writer.WriteByte(k_LongLenMarker); // mark larger size writer.WriteByte((byte)(length % 256)); // write the length modulo 256 writer.WriteByte((byte)(length / 256)); // write the length divided by 256 } } private int PopLength(in NetworkBuffer messageBuffer) { int read = messageBuffer.ReadByte(); // if we read a non-zero value, we have a single byte length // or a -1 error we can return if (read != k_LongLenMarker) { return read; } // otherwise, a two-byte length follows. We'll read in len1, len2 int len1 = messageBuffer.ReadByte(); if (len1 < 0) { // pass errors back to caller return len1; } int len2 = messageBuffer.ReadByte(); if (len2 < 0) { // pass errors back to caller return len2; } return len1 + len2 * 256; } /// /// FillTargetList /// Fills a list with the ClientId's an item is targeted to /// /// the FrameQueueItem we want targets for /// the list to fill private static void FillTargetList(in RpcFrameQueueItem queueItem, ref ulong[] networkIdList) { switch (queueItem.QueueItemType) { // todo: revisit .resize() and .ToArry() usage, for performance case RpcQueueContainer.QueueItemType.ServerRpc: Array.Resize(ref networkIdList, 1); networkIdList[0] = queueItem.NetworkId; break; default: // todo: consider the implications of default usage of queueItem.clientIds case RpcQueueContainer.QueueItemType.ClientRpc: // copy the list networkIdList = queueItem.ClientNetworkIds.ToArray(); break; } } /// /// QueueItem /// Add a FrameQueueItem to be sent /// queueItem /// the threshold in bytes public void QueueItem(in RpcFrameQueueItem queueItem) { FillTargetList(queueItem, ref m_TargetList); foreach (ulong clientId in m_TargetList) { if (!k_SendDict.ContainsKey(clientId)) { // todo: consider what happens if many clients join and leave the game consecutively // we probably need a cleanup mechanism at some point k_SendDict[clientId] = new SendStream(); } if (k_SendDict[clientId].IsEmpty) { k_SendDict[clientId].IsEmpty = false; k_SendDict[clientId].NetworkChannel = queueItem.NetworkChannel; switch (queueItem.QueueItemType) { // 8 bits are used for the message type, which is an NetworkConstants case RpcQueueContainer.QueueItemType.ServerRpc: k_SendDict[clientId].Writer.WriteByte(NetworkConstants.SERVER_RPC); // MessageType break; case RpcQueueContainer.QueueItemType.ClientRpc: k_SendDict[clientId].Writer.WriteByte(NetworkConstants.CLIENT_RPC); // MessageType break; } } // write the amounts of bytes that are coming up PushLength(queueItem.MessageData.Count, ref k_SendDict[clientId].Writer); // write the message to send k_SendDict[clientId].Writer.WriteBytes(queueItem.MessageData.Array, queueItem.MessageData.Count, queueItem.MessageData.Offset); ProfilerStatManager.BytesSent.Record(queueItem.MessageData.Count); ProfilerStatManager.RpcsSent.Record(); PerformanceDataManager.Increment(ProfilerConstants.ByteSent, queueItem.MessageData.Count); PerformanceDataManager.Increment(ProfilerConstants.RpcSent); } } public delegate void SendCallbackType(ulong clientId, SendStream messageStream); public delegate void ReceiveCallbackType(NetworkBuffer messageStream, RpcQueueContainer.QueueItemType messageType, ulong clientId, float receiveTime); /// /// SendItems /// Send any batch of RPC that are of length above threshold /// /// the threshold in bytes /// the function to call for sending the batch public void SendItems(int thresholdBytes, SendCallbackType sendCallback) { foreach (KeyValuePair entry in k_SendDict) { if (!entry.Value.IsEmpty) { // read the queued message int length = (int)k_SendDict[entry.Key].Buffer.Length; if (length >= thresholdBytes) { sendCallback(entry.Key, entry.Value); // clear the batch that was sent from the SendDict entry.Value.Buffer.SetLength(0); entry.Value.Buffer.Position = 0; entry.Value.IsEmpty = true; ProfilerStatManager.RpcBatchesSent.Record(); PerformanceDataManager.Increment(ProfilerConstants.RpcBatchesSent); } } } } /// /// ReceiveItems /// Process the messageStream and call the callback with individual RPC messages /// /// the messageStream containing the batched RPC /// the callback to call has type int f(message, type, clientId, time) /// the message type to pass back to callback /// the clientId to pass back to callback /// the packet receive time to pass back to callback public void ReceiveItems(in NetworkBuffer messageBuffer, ReceiveCallbackType receiveCallback, RpcQueueContainer.QueueItemType messageType, ulong clientId, float receiveTime) { using (var copy = PooledNetworkBuffer.Get()) { do { // read the length of the next RPC int rpcSize = PopLength(messageBuffer); if (rpcSize < 0) { // abort if there's an error reading lengths return; } // copy what comes after current stream position long position = messageBuffer.Position; copy.SetLength(rpcSize); copy.Position = 0; Buffer.BlockCopy(messageBuffer.GetBuffer(), (int)position, copy.GetBuffer(), 0, rpcSize); receiveCallback(copy, messageType, clientId, receiveTime); // seek over the RPC // RPCReceiveQueueItem peeks at content, it doesn't advance messageBuffer.Seek(rpcSize, SeekOrigin.Current); } while (messageBuffer.Position < messageBuffer.Length); } } } }