以下为内部源码,可以看出,消费一条消息后,则放到TreadPool中去执行回调函数
using System; using System.Collections.Generic; using System.Diagnostics; using System.Net; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; using Pipelines.Sockets.Unofficial; namespace StackExchange.Redis { public partial class ConnectionMultiplexer { private readonly Dictionary<RedisChannel, Subscription> subscriptions = new Dictionary<RedisChannel, Subscription>(); internal static void CompleteAsWorker(ICompletable completable) { if (completable != null) ThreadPool.QueueUserWorkItem(s_CompleteAsWorker, completable); } internal void OnMessage(in RedisChannel subscription, in RedisChannel channel, in RedisValue payload) { ICompletable completable = null; ChannelMessageQueue queues = null; Subscription sub; lock (subscriptions) { if (subscriptions.TryGetValue(subscription, out sub)) { completable = sub.ForInvoke(channel, payload, out queues); } } if (queues != null) ChannelMessageQueue.WriteAll(ref queues, channel, payload); if (completable != null && !completable.TryComplete(false)) ConnectionMultiplexer.CompleteAsWorker(completable); } } }