Skip to content

Commit 66b7adf

Browse files
committed
Fix mod communication (probably)
1 parent 76637b1 commit 66b7adf

3 files changed

Lines changed: 93 additions & 92 deletions

File tree

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Text;
4+
5+
namespace Torch.Mod.Messages
6+
{
7+
/// <summary>
8+
/// shim to store incoming message data
9+
/// </summary>
10+
internal class IncomingMessage : MessageBase
11+
{
12+
public IncomingMessage()
13+
{
14+
}
15+
16+
public override void ProcessClient()
17+
{
18+
throw new Exception();
19+
}
20+
21+
public override void ProcessServer()
22+
{
23+
throw new Exception();
24+
}
25+
}
26+
}

‎Torch.Mod/ModCommunication.cs‎

Lines changed: 66 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
using Sandbox.ModAPI;
99
using Torch.Mod.Messages;
1010
using VRage;
11+
using VRage.Collections;
1112
using VRage.Game.ModAPI;
1213
using VRage.Utils;
1314
using Task = ParallelTasks.Task;
@@ -18,40 +19,37 @@ public static class ModCommunication
1819
{
1920
public const ushort NET_ID = 4352;
2021
private static bool _closing;
21-
private static ConcurrentQueue<MessageBase> _outgoing;
22-
private static ConcurrentQueue<byte[]> _incoming;
22+
private static BlockingCollection<MessageBase> _processing;
23+
private static MyConcurrentPool<IncomingMessage> _messagePool;
2324
private static List<IMyPlayer> _playerCache;
24-
private static FastResourceLock _lock;
25-
private static Task _task;
2625

2726
public static void Register()
2827
{
2928
MyLog.Default.WriteLineAndConsole("TORCH MOD: Registering mod communication.");
30-
_outgoing = new ConcurrentQueue<MessageBase>();
31-
_incoming = new ConcurrentQueue<byte[]>();
29+
_processing = new BlockingCollection<MessageBase>(new ConcurrentQueue<MessageBase>());
3230
_playerCache = new List<IMyPlayer>();
33-
_lock = new FastResourceLock();
34-
31+
_messagePool = new MyConcurrentPool<IncomingMessage>(8);
3532

3633
MyAPIGateway.Multiplayer.RegisterMessageHandler(NET_ID, MessageHandler);
3734
//background thread to handle de/compression and processing
38-
_task = MyAPIGateway.Parallel.StartBackground(DoProcessing);
35+
MyAPIGateway.Parallel.StartBackground(DoProcessing);
3936
MyLog.Default.WriteLineAndConsole("TORCH MOD: Mod communication registered successfully.");
4037
}
4138

4239
public static void Unregister()
4340
{
4441
MyLog.Default.WriteLineAndConsole("TORCH MOD: Unregistering mod communication.");
4542
MyAPIGateway.Multiplayer?.UnregisterMessageHandler(NET_ID, MessageHandler);
46-
ReleaseLock();
43+
_processing.CompleteAdding();
4744
_closing = true;
4845
//_task.Wait();
4946
}
5047

5148
private static void MessageHandler(byte[] bytes)
5249
{
53-
_incoming.Enqueue(bytes);
54-
ReleaseLock();
50+
var m = _messagePool.Get();
51+
m.CompressedData = bytes;
52+
_processing.Add(m);
5553
}
5654

5755
public static void DoProcessing()
@@ -60,90 +58,84 @@ public static void DoProcessing()
6058
{
6159
try
6260
{
63-
byte[] incoming;
64-
while (_incoming.TryDequeue(out incoming))
61+
var m = _processing.Take();
62+
if (m is IncomingMessage)
6563
{
66-
MessageBase m;
64+
MessageBase i;
6765
try
6866
{
69-
var o = MyCompression.Decompress(incoming);
70-
m = MyAPIGateway.Utilities.SerializeFromBinary<MessageBase>(o);
67+
var o = MyCompression.Decompress(m.CompressedData);
68+
m.CompressedData = null;
69+
_messagePool.Return((IncomingMessage)m);
70+
i = MyAPIGateway.Utilities.SerializeFromBinary<MessageBase>(o);
7171
}
7272
catch (Exception ex)
7373
{
7474
MyLog.Default.WriteLineAndConsole($"TORCH MOD: Failed to deserialize message! {ex}");
7575
continue;
7676
}
77+
7778
if (MyAPIGateway.Multiplayer.IsServer)
78-
m.ProcessServer();
79+
i.ProcessServer();
7980
else
80-
m.ProcessClient();
81+
i.ProcessClient();
8182
}
82-
83-
if (!_outgoing.IsEmpty)
83+
else
8484
{
85-
List<MessageBase> tosend = new List<MessageBase>(_outgoing.Count);
86-
MessageBase outMessage;
87-
while (_outgoing.TryDequeue(out outMessage))
88-
{
89-
var b = MyAPIGateway.Utilities.SerializeToBinary(outMessage);
90-
outMessage.CompressedData = MyCompression.Compress(b);
91-
tosend.Add(outMessage);
92-
}
85+
var b = MyAPIGateway.Utilities.SerializeToBinary(m);
86+
m.CompressedData = MyCompression.Compress(b);
9387

9488
MyAPIGateway.Utilities.InvokeOnGameThread(() =>
95-
{
96-
MyAPIGateway.Players.GetPlayers(_playerCache);
97-
foreach (var outgoing in tosend)
98-
{
99-
switch (outgoing.TargetType)
100-
{
101-
case MessageTarget.Single:
102-
MyAPIGateway.Multiplayer.SendMessageTo(NET_ID, outgoing.CompressedData, outgoing.Target);
103-
break;
104-
case MessageTarget.Server:
105-
MyAPIGateway.Multiplayer.SendMessageToServer(NET_ID, outgoing.CompressedData);
106-
break;
107-
case MessageTarget.AllClients:
108-
foreach (var p in _playerCache)
109-
{
110-
if (p.SteamUserId == MyAPIGateway.Multiplayer.MyId)
111-
continue;
112-
MyAPIGateway.Multiplayer.SendMessageTo(NET_ID, outgoing.CompressedData, p.SteamUserId);
113-
}
114-
break;
115-
case MessageTarget.AllExcept:
116-
foreach (var p in _playerCache)
117-
{
118-
if (p.SteamUserId == MyAPIGateway.Multiplayer.MyId || outgoing.Ignore.Contains(p.SteamUserId))
119-
continue;
120-
MyAPIGateway.Multiplayer.SendMessageTo(NET_ID, outgoing.CompressedData, p.SteamUserId);
121-
}
122-
break;
123-
default:
124-
throw new Exception();
125-
}
126-
}
127-
_playerCache.Clear();
128-
});
129-
}
89+
{
13090

131-
AcquireLock();
91+
switch (m.TargetType)
92+
{
93+
case MessageTarget.Single:
94+
MyAPIGateway.Multiplayer.SendMessageTo(NET_ID, m.CompressedData, m.Target);
95+
break;
96+
case MessageTarget.Server:
97+
MyAPIGateway.Multiplayer.SendMessageToServer(NET_ID, m.CompressedData);
98+
break;
99+
case MessageTarget.AllClients:
100+
MyAPIGateway.Players.GetPlayers(_playerCache);
101+
foreach (var p in _playerCache)
102+
{
103+
if (p.SteamUserId == MyAPIGateway.Multiplayer.MyId)
104+
continue;
105+
MyAPIGateway.Multiplayer.SendMessageTo(NET_ID, m.CompressedData, p.SteamUserId);
106+
}
107+
break;
108+
case MessageTarget.AllExcept:
109+
MyAPIGateway.Players.GetPlayers(_playerCache);
110+
foreach (var p in _playerCache)
111+
{
112+
if (p.SteamUserId == MyAPIGateway.Multiplayer.MyId || m.Ignore.Contains(p.SteamUserId))
113+
continue;
114+
MyAPIGateway.Multiplayer.SendMessageTo(NET_ID, m.CompressedData, p.SteamUserId);
115+
}
116+
break;
117+
default:
118+
throw new Exception();
119+
}
120+
_playerCache.Clear();
121+
});
122+
}
132123
}
133124
catch (Exception ex)
134125
{
135126
MyLog.Default.WriteLineAndConsole($"TORCH MOD: Exception occurred in communication thread! {ex}");
136127
}
137128
}
138129

139-
MyLog.Default.WriteLineAndConsole("TORCH MOD: COMMUNICATION THREAD: EXIT SIGNAL RECIEVED!");
130+
MyLog.Default.WriteLineAndConsole("TORCH MOD: COMMUNICATION THREAD: EXIT SIGNAL RECEIVED!");
140131
//exit signal received. Clean everything and GTFO
141-
_outgoing = null;
142-
_incoming = null;
132+
_processing.Dispose();
133+
_processing = null;
134+
_messagePool.Clean();
135+
_messagePool = null;
143136
_playerCache = null;
144-
_lock = null;
145137
}
146-
138+
147139
public static void SendMessageTo(MessageBase message, ulong target)
148140
{
149141
if (!MyAPIGateway.Multiplayer.IsServer)
@@ -155,8 +147,7 @@ public static void SendMessageTo(MessageBase message, ulong target)
155147
message.Target = target;
156148
message.TargetType = MessageTarget.Single;
157149
MyLog.Default.WriteLineAndConsole($"Sending message of type {message.GetType().FullName}");
158-
_outgoing.Enqueue(message);
159-
ReleaseLock();
150+
_processing.Add(message);
160151
}
161152

162153
public static void SendMessageToClients(MessageBase message)
@@ -168,8 +159,7 @@ public static void SendMessageToClients(MessageBase message)
168159
return;
169160

170161
message.TargetType = MessageTarget.AllClients;
171-
_outgoing.Enqueue(message);
172-
ReleaseLock();
162+
_processing.Add(message);
173163
}
174164

175165
public static void SendMessageExcept(MessageBase message, params ulong[] ignoredUsers)
@@ -182,8 +172,7 @@ public static void SendMessageExcept(MessageBase message, params ulong[] ignored
182172

183173
message.TargetType = MessageTarget.AllExcept;
184174
message.Ignore = ignoredUsers;
185-
_outgoing.Enqueue(message);
186-
ReleaseLock();
175+
_processing.Add(message);
187176
}
188177

189178
public static void SendMessageToServer(MessageBase message)
@@ -192,22 +181,7 @@ public static void SendMessageToServer(MessageBase message)
192181
return;
193182

194183
message.TargetType = MessageTarget.Server;
195-
_outgoing.Enqueue(message);
196-
ReleaseLock();
197-
}
198-
199-
private static void ReleaseLock()
200-
{
201-
while(_lock?.TryAcquireExclusive() == false)
202-
_lock?.ReleaseExclusive();
203-
_lock?.ReleaseExclusive();
204-
}
205-
206-
private static void AcquireLock()
207-
{
208-
ReleaseLock();
209-
_lock?.AcquireExclusive();
210-
_lock?.AcquireExclusive();
184+
_processing.Add(message);
211185
}
212186
}
213187
}

‎Torch.Mod/Torch.Mod.projitems‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
<Import_RootNamespace>Torch.Mod</Import_RootNamespace>
1010
</PropertyGroup>
1111
<ItemGroup>
12+
<Compile Include="$(MSBuildThisFileDirectory)Messages\IncomingMessage.cs" />
1213
<Compile Include="$(MSBuildThisFileDirectory)Messages\NotificationMessage.cs" />
1314
<Compile Include="$(MSBuildThisFileDirectory)Messages\DialogMessage.cs" />
1415
<Compile Include="$(MSBuildThisFileDirectory)Messages\MessageBase.cs" />

0 commit comments

Comments
 (0)