using System; using System.Collections.Generic; using UnityEngine; using Unity.Profiling; using MLAPI.Configuration; using MLAPI.Profiling; namespace MLAPI.Messaging { /// /// RpcQueueProcessing /// Handles processing of RpcQueues /// Inbound to invocation /// Outbound to send /// internal class RpcQueueProcessor { #if DEVELOPMENT_BUILD || UNITY_EDITOR private static ProfilerMarker s_ProcessReceiveQueue = new ProfilerMarker($"{nameof(RpcQueueProcessor)}.{nameof(ProcessReceiveQueue)}"); private static ProfilerMarker s_ProcessSendQueue = new ProfilerMarker($"{nameof(RpcQueueProcessor)}.{nameof(ProcessSendQueue)}"); #endif // Batcher object used to manage the RPC batching on the send side private readonly RpcBatcher m_RpcBatcher = new RpcBatcher(); private const int k_BatchThreshold = 512; //NSS-TODO: Need to determine how we want to handle all other MLAPI send types //Temporary place to keep internal MLAPI messages private readonly List m_InternalMLAPISendQueue = new List(); /// /// ProcessReceiveQueue /// Public facing interface method to start processing all RPCs in the current inbound frame /// public void ProcessReceiveQueue(NetworkUpdateStage currentStage) { bool advanceFrameHistory = false; var rpcQueueContainer = NetworkManager.Singleton.RpcQueueContainer; if (rpcQueueContainer != null) { #if DEVELOPMENT_BUILD || UNITY_EDITOR s_ProcessReceiveQueue.Begin(); #endif var currentFrame = rpcQueueContainer.GetQueueHistoryFrame(RpcQueueHistoryFrame.QueueFrameType.Inbound, currentStage); var nextFrame = rpcQueueContainer.GetQueueHistoryFrame(RpcQueueHistoryFrame.QueueFrameType.Inbound, currentStage, true); if (nextFrame.IsDirty && nextFrame.HasLoopbackData) { advanceFrameHistory = true; } if (currentFrame != null && currentFrame.IsDirty) { var currentQueueItem = currentFrame.GetFirstQueueItem(); while (currentQueueItem.QueueItemType != RpcQueueContainer.QueueItemType.None) { advanceFrameHistory = true; if (rpcQueueContainer.IsTesting()) { Debug.Log($"RPC invoked during the {currentStage} update stage."); } NetworkManager.InvokeRpc(currentQueueItem); ProfilerStatManager.RpcsQueueProc.Record(); PerformanceDataManager.Increment(ProfilerConstants.RpcQueueProcessed); currentQueueItem = currentFrame.GetNextQueueItem(); } //We call this to dispose of the shared stream writer and stream currentFrame.CloseQueue(); } if (advanceFrameHistory) { rpcQueueContainer.AdvanceFrameHistory(RpcQueueHistoryFrame.QueueFrameType.Inbound); } #if DEVELOPMENT_BUILD || UNITY_EDITOR s_ProcessReceiveQueue.End(); #endif } } /// /// ProcessSendQueue /// Called to send both performance RPC and internal messages and then flush the outbound queue /// public void ProcessSendQueue() { #if DEVELOPMENT_BUILD || UNITY_EDITOR s_ProcessSendQueue.Begin(); #endif RpcQueueSendAndFlush(); #if DEVELOPMENT_BUILD || UNITY_EDITOR s_ProcessSendQueue.End(); #endif InternalMessagesSendAndFlush(); } /// /// QueueInternalMLAPICommand /// Added this as an example of how to add internal messages to the outbound send queue /// /// message queue item to add< public void QueueInternalMLAPICommand(RpcFrameQueueItem queueItem) { m_InternalMLAPISendQueue.Add(queueItem); } /// /// Generic Sending Method for Internal Messages /// TODO: Will need to open this up for discussion, but we will want to determine if this is how we want internal MLAPI command /// messages to be sent. We might want specific commands to occur during specific network update regions (see NetworkUpdate /// public void InternalMessagesSendAndFlush() { foreach (RpcFrameQueueItem queueItem in m_InternalMLAPISendQueue) { var PoolStream = queueItem.NetworkBuffer; if (NetworkManager.Singleton.IsListening) { switch (queueItem.QueueItemType) { case RpcQueueContainer.QueueItemType.CreateObject: { foreach (ulong clientId in queueItem.ClientNetworkIds) { InternalMessageSender.Send(clientId, NetworkConstants.ADD_OBJECT, queueItem.NetworkChannel, PoolStream); } PerformanceDataManager.Increment(ProfilerConstants.RpcSent, queueItem.ClientNetworkIds.Length); ProfilerStatManager.RpcsSent.Record(queueItem.ClientNetworkIds.Length); break; } case RpcQueueContainer.QueueItemType.DestroyObject: { foreach (ulong clientId in queueItem.ClientNetworkIds) { InternalMessageSender.Send(clientId, NetworkConstants.DESTROY_OBJECT, queueItem.NetworkChannel, PoolStream); } PerformanceDataManager.Increment(ProfilerConstants.RpcSent, queueItem.ClientNetworkIds.Length); ProfilerStatManager.RpcsSent.Record(queueItem.ClientNetworkIds.Length); break; } } } PoolStream.Dispose(); } m_InternalMLAPISendQueue.Clear(); } /// /// RPCQueueSendAndFlush /// Sends all RPC queue items in the current outbound frame /// private void RpcQueueSendAndFlush() { var advanceFrameHistory = false; var rpcQueueContainer = NetworkManager.Singleton.RpcQueueContainer; if (rpcQueueContainer != null) { var currentFrame = rpcQueueContainer.GetCurrentFrame(RpcQueueHistoryFrame.QueueFrameType.Outbound, NetworkUpdateStage.PostLateUpdate); if (currentFrame != null) { var currentQueueItem = currentFrame.GetFirstQueueItem(); while (currentQueueItem.QueueItemType != RpcQueueContainer.QueueItemType.None) { advanceFrameHistory = true; if (rpcQueueContainer.IsUsingBatching()) { m_RpcBatcher.QueueItem(currentQueueItem); m_RpcBatcher.SendItems(k_BatchThreshold, SendCallback); } else { SendFrameQueueItem(currentQueueItem); } currentQueueItem = currentFrame.GetNextQueueItem(); } //If the size is < m_BatchThreshold then just send the messages if (advanceFrameHistory && rpcQueueContainer.IsUsingBatching()) { m_RpcBatcher.SendItems(0, SendCallback); } } //If we processed any RPCs, then advance to the next frame if (advanceFrameHistory) { rpcQueueContainer.AdvanceFrameHistory(RpcQueueHistoryFrame.QueueFrameType.Outbound); } } } /// /// SendCallback /// This is the callback from the batcher when it need to send a batch /// /// /// clientId to send to /// the stream to send private static void SendCallback(ulong clientId, RpcBatcher.SendStream sendStream) { var length = (int)sendStream.Buffer.Length; var bytes = sendStream.Buffer.GetBuffer(); var sendBuffer = new ArraySegment(bytes, 0, length); NetworkManager.Singleton.NetworkConfig.NetworkTransport.Send(clientId, sendBuffer, sendStream.NetworkChannel); } /// /// SendFrameQueueItem /// Sends the RPC Queue Item to the specified destination /// /// Information on what to send private void SendFrameQueueItem(RpcFrameQueueItem queueItem) { switch (queueItem.QueueItemType) { case RpcQueueContainer.QueueItemType.ServerRpc: { NetworkManager.Singleton.NetworkConfig.NetworkTransport.Send(queueItem.NetworkId, queueItem.MessageData, queueItem.NetworkChannel); //For each packet sent, we want to record how much data we have sent PerformanceDataManager.Increment(ProfilerConstants.ByteSent, (int)queueItem.StreamSize); PerformanceDataManager.Increment(ProfilerConstants.RpcSent); ProfilerStatManager.BytesSent.Record((int)queueItem.StreamSize); ProfilerStatManager.RpcsSent.Record(); break; } case RpcQueueContainer.QueueItemType.ClientRpc: { foreach (ulong clientid in queueItem.ClientNetworkIds) { NetworkManager.Singleton.NetworkConfig.NetworkTransport.Send(clientid, queueItem.MessageData, queueItem.NetworkChannel); //For each packet sent, we want to record how much data we have sent PerformanceDataManager.Increment(ProfilerConstants.ByteSent, (int)queueItem.StreamSize); ProfilerStatManager.BytesSent.Record((int)queueItem.StreamSize); } //For each client we send to, we want to record how many RPCs we have sent PerformanceDataManager.Increment(ProfilerConstants.RpcSent, queueItem.ClientNetworkIds.Length); ProfilerStatManager.RpcsSent.Record(queueItem.ClientNetworkIds.Length); break; } } } } }