123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855 |
- using System;
- using System.Collections.Concurrent;
- using System.Collections.Generic;
- using System.Collections.Specialized;
- using System.ComponentModel;
- using System.IO;
- using System.Linq;
- using System.Net;
- using System.Security.Cryptography;
- using System.Text;
- using System.Text.RegularExpressions;
- using System.Threading;
- using System.Threading.Tasks;
- using Newtonsoft.Json;
- using NLog;
- using SuperSocket.Common;
- using SuperSocket.SocketBase;
- using SuperSocket.SocketBase.Command;
- using SuperSocket.SocketBase.Config;
- using SuperSocket.SocketBase.Protocol;
- using SuperWebSocket.Command;
- using SuperWebSocket.Config;
- using SuperWebSocket.Protocol;
- using SuperWebSocket.SubProtocol;
- namespace SuperWebSocket
- {
- /// <summary>
- /// WebSocket server interface
- /// </summary>
- public interface IWebSocketServer : IAppServer
- {
- /// <summary>
- /// Gets the web socket protocol processor.
- /// </summary>
- IProtocolProcessor WebSocketProtocolProcessor { get; }
- /// <summary>
- /// Validates the handshake request.
- /// </summary>
- /// <param name="session">The session.</param>
- /// <param name="origin">The origin.</param>
- /// <returns>the validation result</returns>
- bool ValidateHandshake(IWebSocketSession session, string origin);
- }
- /// <summary>
- /// WebSocket AppServer
- /// </summary>
- public class WebSocketServer : WebSocketServer<WebSocketSession>
- {
- /// <summary>
- /// Initializes a new instance of the <see cref="WebSocketServer"/> class.
- /// </summary>
- /// <param name="subProtocols">The sub protocols.</param>
- public WebSocketServer(IEnumerable<ISubProtocol<WebSocketSession>> subProtocols)
- : base(subProtocols)
- {
- }
- /// <summary>
- /// Initializes a new instance of the <see cref="WebSocketServer"/> class.
- /// </summary>
- /// <param name="subProtocol">The sub protocol.</param>
- public WebSocketServer(ISubProtocol<WebSocketSession> subProtocol)
- : base(subProtocol)
- {
- }
- /// <summary>
- /// Initializes a new instance of the <see cref="WebSocketServer"/> class.
- /// </summary>
- public WebSocketServer()
- : base(new List<ISubProtocol<WebSocketSession>>())
- {
- }
- }
- /// <summary>
- /// WebSocket AppServer
- /// </summary>
- /// <typeparam name="TWebSocketSession">The type of the web socket session.</typeparam>
- public abstract class WebSocketServer<TWebSocketSession> : AppServer<TWebSocketSession, IWebSocketFragment>, IWebSocketServer
- where TWebSocketSession : WebSocketSession<TWebSocketSession>, new()
- {
- private IBinaryDataConverter m_BinaryDataConverter;
- /// <summary>
- /// Gets or sets the binary data converter.
- /// </summary>
- /// <value>
- /// The binary data converter.
- /// </value>
- protected IBinaryDataConverter BinaryDataConverter
- {
- get { return m_BinaryDataConverter; }
- set { m_BinaryDataConverter = value; }
- }
- /// <summary>
- /// Initializes a new instance of the <see cref="WebSocketServer<TWebSocketSession>"/> class.
- /// </summary>
- /// <param name="subProtocols">The sub protocols.</param>
- public WebSocketServer(IEnumerable<ISubProtocol<TWebSocketSession>> subProtocols)
- : this()
- {
- if (!subProtocols.Any())
- return;
- foreach (var protocol in subProtocols)
- {
- if (!RegisterSubProtocol(protocol))
- throw new Exception("Failed to register sub protocol!");
- }
- m_SubProtocolConfigured = true;
- }
- /// <summary>
- /// Initializes a new instance of the <see cref="WebSocketServer<TWebSocketSession>"/> class.
- /// </summary>
- /// <param name="subProtocol">The sub protocol.</param>
- public WebSocketServer(ISubProtocol<TWebSocketSession> subProtocol)
- : this(new List<ISubProtocol<TWebSocketSession>> { subProtocol })
- {
- }
- /// <summary>
- /// Initializes a new instance of the <see cref="WebSocketServer<TWebSocketSession>"/> class.
- /// </summary>
- public WebSocketServer()
- : base(new WebSocketProtocol())
- {
- }
- static private ILogger logger = NLog.LogManager.GetCurrentClassLogger();
- private Dictionary<string, ISubProtocol<TWebSocketSession>> m_SubProtocols = new Dictionary<string, ISubProtocol<TWebSocketSession>>(StringComparer.OrdinalIgnoreCase);
- internal ISubProtocol<TWebSocketSession> DefaultSubProtocol { get; private set; }
- private bool m_SubProtocolConfigured = false;
- private ConcurrentQueue<TWebSocketSession> m_OpenHandshakePendingQueue = new ConcurrentQueue<TWebSocketSession>();
- private ConcurrentQueue<TWebSocketSession> m_CloseHandshakePendingQueue = new ConcurrentQueue<TWebSocketSession>();
- /// <summary>
- /// The openning handshake timeout, in seconds
- /// </summary>
- private int m_OpenHandshakeTimeOut;
- /// <summary>
- /// The closing handshake timeout, in seconds
- /// </summary>
- private int m_CloseHandshakeTimeOut;
- /// <summary>
- /// The interval of checking handshake pending queue, in seconds
- /// </summary>
- private int m_HandshakePendingQueueCheckingInterval;
- private Timer m_HandshakePendingQueueCheckingTimer;
- /// <summary>
- /// Gets the sub protocol by sub protocol name.
- /// </summary>
- /// <param name="name">The name.</param>
- /// <returns></returns>
- internal ISubProtocol<TWebSocketSession> GetSubProtocol(string name)
- {
- ISubProtocol<TWebSocketSession> subProtocol;
- if (m_SubProtocols.TryGetValue(name, out subProtocol))
- return subProtocol;
- else
- return null;
- }
- private IProtocolProcessor m_WebSocketProtocolProcessor;
- IProtocolProcessor IWebSocketServer.WebSocketProtocolProcessor
- {
- get { return m_WebSocketProtocolProcessor; }
- }
- /// <summary>
- /// Gets the request filter factory.
- /// </summary>
- public new WebSocketProtocol ReceiveFilterFactory
- {
- get
- {
- return (WebSocketProtocol)base.ReceiveFilterFactory;
- }
- }
- bool IWebSocketServer.ValidateHandshake(IWebSocketSession session, string origin)
- {
- var s = (TWebSocketSession)session;
- s.Origin = origin;
- return ValidateHandshake(s, origin);
- }
- /// <summary>
- /// Validates the handshake request.
- /// </summary>
- /// <param name="session">The session.</param>
- /// <param name="origin">The origin in the handshake request.</param>
- /// <returns></returns>
- protected virtual bool ValidateHandshake(TWebSocketSession session, string origin)
- {
- return true;
- }
- bool RegisterSubProtocol(ISubProtocol<TWebSocketSession> subProtocol)
- {
- if (m_SubProtocols.ContainsKey(subProtocol.Name))
- {
- if (Logger.IsErrorEnabled)
- Logger.ErrorFormat("Cannot register duplicate name sub protocol! Duplicate name: {0}.", subProtocol.Name);
- return false;
- }
- m_SubProtocols.Add(subProtocol.Name, subProtocol);
- return true;
- }
- private bool SetupSubProtocols(IServerConfig config)
- {
- //Preparing sub protocols' configuration
- var subProtocolConfigSection = config.GetChildConfig<SubProtocolConfigCollection>("subProtocols");
- var subProtocolConfigDict = new Dictionary<string, SubProtocolConfig>(subProtocolConfigSection == null ? 0 : subProtocolConfigSection.Count, StringComparer.OrdinalIgnoreCase);
- if (subProtocolConfigSection != null)
- {
- foreach (var protocolConfig in subProtocolConfigSection)
- {
- string originalProtocolName = protocolConfig.Name;
- string protocolName;
- ISubProtocol<TWebSocketSession> subProtocolInstance;
- if (!string.IsNullOrEmpty(originalProtocolName))
- {
- protocolName = originalProtocolName;
- if (!string.IsNullOrEmpty(protocolConfig.Type))
- {
- try
- {
- subProtocolInstance = AssemblyUtil.CreateInstance<ISubProtocol<TWebSocketSession>>(protocolConfig.Type, new object[] { originalProtocolName });
- }
- catch (Exception e)
- {
- Logger.Error(e);
- return false;
- }
- if (!RegisterSubProtocol(subProtocolInstance))
- return false;
- }
- else
- {
- if (!m_SubProtocols.ContainsKey(protocolName))
- {
- subProtocolInstance = new BasicSubProtocol<TWebSocketSession>(protocolName);
- if (!RegisterSubProtocol(subProtocolInstance))
- return false;
- }
- }
- }
- else
- {
- protocolName = BasicSubProtocol<TWebSocketSession>.DefaultName;
- if (!string.IsNullOrEmpty(protocolConfig.Type))
- {
- if (Logger.IsErrorEnabled)
- Logger.Error("You needn't set Type attribute for SubProtocol, if you don't set Name attribute!");
- return false;
- }
- }
- subProtocolConfigDict[protocolName] = protocolConfig;
- }
- if (subProtocolConfigDict.Values.Any())
- m_SubProtocolConfigured = true;
- }
- if (m_SubProtocols.Count <= 0 || (subProtocolConfigDict.ContainsKey(BasicSubProtocol<TWebSocketSession>.DefaultName) && !m_SubProtocols.ContainsKey(BasicSubProtocol<TWebSocketSession>.DefaultName)))
- {
- if (!RegisterSubProtocol(BasicSubProtocol<TWebSocketSession>.CreateDefaultSubProtocol()))
- return false;
- }
- //Initialize sub protocols
- foreach (var subProtocol in m_SubProtocols.Values)
- {
- SubProtocolConfig protocolConfig = null;
- subProtocolConfigDict.TryGetValue(subProtocol.Name, out protocolConfig);
- bool initialized = false;
- try
- {
- initialized = subProtocol.Initialize(this, protocolConfig, Logger);
- }
- catch (Exception e)
- {
- initialized = false;
- Logger.Error(e);
- }
- if (!initialized)
- {
- if (Logger.IsErrorEnabled)
- Logger.ErrorFormat("Failed to initialize the sub protocol '{0}'!", subProtocol.Name);
- return false;
- }
- }
- return true;
- }
- /// <summary>
- /// Setups with the specified root config.
- /// </summary>
- /// <param name="rootConfig">The root config.</param>
- /// <param name="config">The config.</param>
- /// <returns></returns>
- protected override bool Setup(IRootConfig rootConfig, IServerConfig config)
- {
- if (m_SubProtocols != null && m_SubProtocols.Count > 0)
- DefaultSubProtocol = m_SubProtocols.Values.FirstOrDefault();
- m_WebSocketProtocolProcessor = new DraftHybi10Processor
- {
- NextProcessor = new Rfc6455Processor
- {
- NextProcessor = new DraftHybi00Processor()
- }
- };
- SetupMultipleProtocolSwitch(m_WebSocketProtocolProcessor);
- if (!int.TryParse(config.Options.GetValue("handshakePendingQueueCheckingInterval"), out m_HandshakePendingQueueCheckingInterval))
- m_HandshakePendingQueueCheckingInterval = 60;// 1 minute default
- if (!int.TryParse(config.Options.GetValue("openHandshakeTimeOut"), out m_OpenHandshakeTimeOut))
- m_OpenHandshakeTimeOut = 120;// 2 minute default
- if (!int.TryParse(config.Options.GetValue("closeHandshakeTimeOut"), out m_CloseHandshakeTimeOut))
- m_CloseHandshakeTimeOut = 120;// 2 minute default
- if (m_BinaryDataConverter == null)
- {
- m_BinaryDataConverter = new TextEncodingBinaryDataConverter(Encoding.UTF8);
- }
- return true;
- }
- private void SetupMultipleProtocolSwitch(IProtocolProcessor rootProcessor)
- {
- var thisProcessor = rootProcessor;
- List<int> availableVersions = new List<int>();
- while (true)
- {
- if (thisProcessor.Version > 0)
- availableVersions.Add(thisProcessor.Version);
- if (thisProcessor.NextProcessor == null)
- break;
- thisProcessor = thisProcessor.NextProcessor;
- }
- thisProcessor.NextProcessor = new MultipleProtocolSwitchProcessor(availableVersions.ToArray());
- }
- /// <summary>
- /// Called when [started].
- /// </summary>
- protected override void OnStarted()
- {
- m_HandshakePendingQueueCheckingTimer = new Timer(HandshakePendingQueueCheckingCallback, null, m_HandshakePendingQueueCheckingInterval * 1000, m_HandshakePendingQueueCheckingInterval * 1000);
- base.OnStarted();
- }
- private void HandshakePendingQueueCheckingCallback(object state)
- {
- try
- {
- m_HandshakePendingQueueCheckingTimer.Change(Timeout.Infinite, Timeout.Infinite);
- while (true)
- {
- TWebSocketSession session;
- if (!m_OpenHandshakePendingQueue.TryPeek(out session))
- break;
- if (session.Handshaked || !session.Connected)
- {
- //Handshaked or not connected
- m_OpenHandshakePendingQueue.TryDequeue(out session);
- continue;
- }
- if (DateTime.Now < session.StartTime.AddSeconds(m_OpenHandshakeTimeOut))
- break;
- //Timeout, dequeue and then close
- m_OpenHandshakePendingQueue.TryDequeue(out session);
- session.Close(CloseReason.TimeOut);
- }
- while (true)
- {
- TWebSocketSession session;
- if (!m_CloseHandshakePendingQueue.TryPeek(out session))
- break;
- if (!session.Connected)
- {
- //the session has been closed
- m_CloseHandshakePendingQueue.TryDequeue(out session);
- continue;
- }
- if (DateTime.Now < session.StartClosingHandshakeTime.AddSeconds(m_CloseHandshakeTimeOut))
- break;
- //Timeout, dequeue and then close
- m_CloseHandshakePendingQueue.TryDequeue(out session);
- //Needn't send closing handshake again
- session.Close(CloseReason.ServerClosing);
- }
- }
- catch (Exception e)
- {
- if (Logger.IsErrorEnabled)
- Logger.Error(e);
- }
- finally
- {
- m_HandshakePendingQueueCheckingTimer.Change(m_HandshakePendingQueueCheckingInterval * 1000, m_HandshakePendingQueueCheckingInterval * 1000);
- }
- }
- internal void PushToCloseHandshakeQueue(IAppSession appSession)
- {
- m_CloseHandshakePendingQueue.Enqueue((TWebSocketSession)appSession);
- }
- /// <summary>
- /// Called when [new session connected].
- /// </summary>
- /// <param name="session">The session.</param>
- protected override void OnNewSessionConnected(TWebSocketSession session)
- {
- m_OpenHandshakePendingQueue.Enqueue(session);
- }
- internal void FireOnNewSessionConnected(IAppSession appSession)
- {
- base.OnNewSessionConnected((TWebSocketSession)appSession);
- }
- /// <summary>
- /// Occurs when [new request received].
- /// </summary>
- /// <exception cref="System.NotSupportedException"></exception>
- [Browsable(false)]
- [EditorBrowsable(EditorBrowsableState.Never)]
- public override event RequestHandler<TWebSocketSession, IWebSocketFragment> NewRequestReceived
- {
- add { throw new NotSupportedException("Please use NewMessageReceived instead!"); }
- remove { throw new NotSupportedException("Please use NewMessageReceived instead!"); }
- }
- private SessionHandler<TWebSocketSession, string> m_NewMessageReceived;
- /// <summary>
- /// Occurs when [new message received].
- /// </summary>
- public event SessionHandler<TWebSocketSession, string> NewMessageReceived
- {
- add
- {
- if (m_SubProtocolConfigured)
- throw new Exception("If you have defined any sub protocol, you cannot subscribe NewMessageReceived event!");
- m_NewMessageReceived += value;
- }
- remove
- {
- m_NewMessageReceived -= value;
- }
- }
- void ExecuteMessage(TWebSocketSession session, string message)
- {
- if (session.SubProtocol == null)
- {
- if (Logger.IsErrorEnabled)
- Logger.Error("No SubProtocol selected! This session cannot process any message!");
- session.CloseWithHandshake(session.ProtocolProcessor.CloseStatusClode.ProtocolError, "No SubProtocol selected");
- return;
- }
- ExecuteSubCommand(session, session.SubProtocol.SubRequestParser.ParseRequestInfo(message));
- }
- internal void OnNewMessageReceived(TWebSocketSession session, string message)
- {
- try
- {
- if (m_NewMessageReceived == null)
- {
- ExecuteMessage(session, message);
- }
- else
- {
- m_NewMessageReceived(session, message);
- }
- }
- catch (Exception ex)
- {
- if (Logger.IsErrorEnabled)
- Logger.Error(message + " " + ex.ToString());
- }
- }
- private SessionHandler<TWebSocketSession, byte[]> m_NewDataReceived;
- /// <summary>
- /// Occurs when [new data received].
- /// </summary>
- public event SessionHandler<TWebSocketSession, byte[]> NewDataReceived
- {
- add
- {
- m_NewDataReceived += value;
- }
- remove
- {
- m_NewDataReceived -= value;
- }
- }
- internal void OnNewDataReceived(TWebSocketSession session, byte[] data)
- {
- if (m_NewDataReceived == null)
- {
- var converter = m_BinaryDataConverter;
- if (converter != null)
- {
- ExecuteMessage(session, converter.ToString(data, 0, data.Length));
- }
- return;
- }
- m_NewDataReceived(session, data);
- }
- private const string m_Tab = "\t";
- private const char m_Colon = ':';
- private const string m_Space = " ";
- private const char m_SpaceChar = ' ';
- private const string m_ValueSeparator = ", ";
- internal static void ParseHandshake(IWebSocketSession session, TextReader reader)
- {
- string line;
- string firstLine = string.Empty;
- string prevKey = string.Empty;
- logger.Info("============================================================");
- while (!string.IsNullOrEmpty(line = reader.ReadLine()))
- {
- logger.Info(session.SessionID + " " + line);
- if (string.IsNullOrEmpty(firstLine))
- {
- firstLine = line;
- continue;
- }
- if (line.StartsWith(m_Tab) && !string.IsNullOrEmpty(prevKey))
- {
- string currentValue = session.Items.GetValue<string>(prevKey, string.Empty);
- session.Items[prevKey] = currentValue + line.Trim();
- continue;
- }
- int pos = line.IndexOf(m_Colon);
- if (pos <= 0)
- continue;
- string key = line.Substring(0, pos);
- if (!string.IsNullOrEmpty(key))
- key = key.Trim();
- var valueOffset = pos + 1;
- if (line.Length <= valueOffset) //No value in this line
- continue;
- string value = line.Substring(valueOffset);
- if (!string.IsNullOrEmpty(value) && value.StartsWith(m_Space) && value.Length > 1)
- value = value.Substring(1);
- if (string.IsNullOrEmpty(key))
- continue;
- object oldValue;
- if (!session.Items.TryGetValue(key, out oldValue))
- {
- session.Items.Add(key, value);
- }
- else
- {
- session.Items[key] = oldValue + m_ValueSeparator + value;
- }
- prevKey = key;
- }
- logger.Info("============================================================");
- var metaInfo = firstLine.Split(m_SpaceChar);
- session.Method = metaInfo[0];
- session.Path = metaInfo[1];
- session.HttpVersion = metaInfo[2];
- }
- /// <summary>
- /// Setups the commands.
- /// </summary>
- /// <param name="discoveredCommands">The discovered commands.</param>
- /// <returns></returns>
- protected override bool SetupCommands(Dictionary<string, ICommand<TWebSocketSession, IWebSocketFragment>> discoveredCommands)
- {
- var commands = new List<ICommand<TWebSocketSession, IWebSocketFragment>>
- {
- new HandShake<TWebSocketSession>(),
- new Text<TWebSocketSession>(),
- new Binary<TWebSocketSession>(),
- new Close<TWebSocketSession>(),
- new Ping<TWebSocketSession>(),
- new Pong<TWebSocketSession>(),
- new Continuation<TWebSocketSession>(),
- new Plain<TWebSocketSession>()
- };
- commands.ForEach(c => discoveredCommands.Add(c.Name, c));
- if (!SetupSubProtocols(Config))
- return false;
- return true;
- }
- /// <summary>
- /// Executes the command.
- /// </summary>
- /// <param name="session">The session.</param>
- /// <param name="requestInfo">The request info.</param>
- protected override void ExecuteCommand(TWebSocketSession session, IWebSocketFragment requestInfo)
- {
- if (session.InClosing)
- {
- //Only handle closing handshake if the session is in closing
- if (requestInfo.Key != OpCode.CloseTag)
- return;
- }
- base.ExecuteCommand(session, requestInfo);
- }
- private void ExecuteSubCommand(TWebSocketSession session, SubRequestInfo requestInfo)
- {
- ISubCommand<TWebSocketSession> subCommand;
- if (session.SubProtocol.TryGetCommand(requestInfo.Key, out subCommand))
- {
- session.CurrentCommand = requestInfo.Key;
- subCommand.ExecuteCommand(session, requestInfo);
- session.PrevCommand = requestInfo.Key;
- if (Config.LogCommand && Logger.IsInfoEnabled)
- Logger.Info(session, string.Format("Command - {0} - {1}", session.SessionID, requestInfo.Key));
- }
- else
- {
- //session.SubProtocol.TryGetCommand("OCPP", out subCommand);
- //session.CurrentCommand = "OCPP";
- //subCommand.ExecuteCommand(session, requestInfo);
- //session.PrevCommand = "OCPP";
- session.HandleUnknownCommand(requestInfo);
- }
- session.LastActiveTime = DateTime.Now;
- }
- /// <summary>
- /// Broadcasts data to the specified sessions.
- /// </summary>
- /// <param name="sessions">The sessions.</param>
- /// <param name="data">The data.</param>
- /// <param name="offset">The offset.</param>
- /// <param name="length">The length.</param>
- /// <param name="sendFeedback">The send feedback.</param>
- public void Broadcast(IEnumerable<TWebSocketSession> sessions, byte[] data, int offset, int length, Action<TWebSocketSession, bool> sendFeedback)
- {
- IList<ArraySegment<byte>> encodedPackage = null;
- IProtocolProcessor encodingProcessor = null;
- foreach (var s in sessions)
- {
- if (!s.Connected)
- continue;
- var currentProtocolProcessor = s.ProtocolProcessor;
- if (currentProtocolProcessor == null || !currentProtocolProcessor.CanSendBinaryData)
- continue;
- if (encodedPackage == null || currentProtocolProcessor != encodingProcessor)
- {
- encodedPackage = currentProtocolProcessor.GetEncodedPackage(OpCode.Binary, data, offset, length);
- encodingProcessor = currentProtocolProcessor;
- }
- Task.Factory.StartNew(SendRawDataToSession, new BroadcastState(s, encodedPackage, sendFeedback));
- }
- }
- /// <summary>
- /// Broadcasts message to the specified sessions.
- /// </summary>
- /// <param name="sessions">The sessions.</param>
- /// <param name="message">The message.</param>
- /// <param name="sendFeedback">The send feedback.</param>
- public void Broadcast(IEnumerable<TWebSocketSession> sessions, string message, Action<TWebSocketSession, bool> sendFeedback)
- {
- IList<ArraySegment<byte>> encodedPackage = null;
- IProtocolProcessor encodingProcessor = null;
- foreach (var s in sessions)
- {
- if (!s.Connected)
- continue;
- var currentProtocolProcessor = s.ProtocolProcessor;
- if (currentProtocolProcessor == null)
- continue;
- if (encodedPackage == null || encodingProcessor != currentProtocolProcessor)
- {
- encodedPackage = currentProtocolProcessor.GetEncodedPackage(OpCode.Text, message);
- encodingProcessor = currentProtocolProcessor;
- }
- Task.Factory.StartNew(SendRawDataToSession, new BroadcastState(s, encodedPackage, sendFeedback));
- }
- }
- private void SendRawDataToSession(object state)
- {
- var param = state as BroadcastState;
- var session = param.Session;
- var sendFeedback = param.FeedbackFunc;
- var sendOk = false;
- try
- {
- sendOk = session.TrySendRawData(param.Data);
- }
- catch (Exception e)
- {
- session.Logger.Error(e);
- }
- sendFeedback(session, sendOk);
- }
- #region JSON serialize/deserialize
- /// <summary>
- /// Serialize the target object by JSON
- /// </summary>
- /// <param name="target">The target.</param>
- /// <returns></returns>
- public virtual string JsonSerialize(object target)
- {
- return JsonConvert.SerializeObject(target);
- }
- /// <summary>
- /// Deserialize the JSON string to target type object.
- /// </summary>
- /// <param name="json">The json.</param>
- /// <param name="type">The type.</param>
- /// <returns></returns>
- public virtual object JsonDeserialize(string json, Type type)
- {
- return JsonConvert.DeserializeObject(json, type);
- }
- #endregion
- class BroadcastState
- {
- public TWebSocketSession Session { get; private set; }
- public IList<ArraySegment<byte>> Data { get; private set; }
- public Action<TWebSocketSession, bool> FeedbackFunc { get; private set; }
- public BroadcastState(TWebSocketSession session, IList<ArraySegment<byte>> data, Action<TWebSocketSession, bool> feedbackFunc)
- {
- Session = session;
- Data = data;
- FeedbackFunc = feedbackFunc;
- }
- }
- }
- }
|