using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Collections.Specialized; using System.ComponentModel; using System.IO; using System.Linq; using System.Net; using System.Security.Cryptography; using System.Text; using System.Text.RegularExpressions; using System.Threading; using System.Threading.Tasks; using Newtonsoft.Json; using NLog; using SuperSocket.Common; using SuperSocket.SocketBase; using SuperSocket.SocketBase.Command; using SuperSocket.SocketBase.Config; using SuperSocket.SocketBase.Protocol; using SuperWebSocket.Command; using SuperWebSocket.Config; using SuperWebSocket.Protocol; using SuperWebSocket.SubProtocol; namespace SuperWebSocket { /// /// WebSocket server interface /// public interface IWebSocketServer : IAppServer { /// /// Gets the web socket protocol processor. /// IProtocolProcessor WebSocketProtocolProcessor { get; } /// /// Validates the handshake request. /// /// The session. /// The origin. /// the validation result bool ValidateHandshake(IWebSocketSession session, string origin); } /// /// WebSocket AppServer /// public class WebSocketServer : WebSocketServer { /// /// Initializes a new instance of the class. /// /// The sub protocols. public WebSocketServer(IEnumerable> subProtocols) : base(subProtocols) { } /// /// Initializes a new instance of the class. /// /// The sub protocol. public WebSocketServer(ISubProtocol subProtocol) : base(subProtocol) { } /// /// Initializes a new instance of the class. /// public WebSocketServer() : base(new List>()) { } } /// /// WebSocket AppServer /// /// The type of the web socket session. public abstract class WebSocketServer : AppServer, IWebSocketServer where TWebSocketSession : WebSocketSession, new() { private IBinaryDataConverter m_BinaryDataConverter; /// /// Gets or sets the binary data converter. /// /// /// The binary data converter. /// protected IBinaryDataConverter BinaryDataConverter { get { return m_BinaryDataConverter; } set { m_BinaryDataConverter = value; } } /// /// Initializes a new instance of the class. /// /// The sub protocols. public WebSocketServer(IEnumerable> subProtocols) : this() { if (!subProtocols.Any()) return; foreach (var protocol in subProtocols) { if (!RegisterSubProtocol(protocol)) throw new Exception("Failed to register sub protocol!"); } m_SubProtocolConfigured = true; } /// /// Initializes a new instance of the class. /// /// The sub protocol. public WebSocketServer(ISubProtocol subProtocol) : this(new List> { subProtocol }) { } /// /// Initializes a new instance of the class. /// public WebSocketServer() : base(new WebSocketProtocol()) { } static private ILogger logger = NLog.LogManager.GetCurrentClassLogger(); private Dictionary> m_SubProtocols = new Dictionary>(StringComparer.OrdinalIgnoreCase); internal ISubProtocol DefaultSubProtocol { get; private set; } private bool m_SubProtocolConfigured = false; private ConcurrentQueue m_OpenHandshakePendingQueue = new ConcurrentQueue(); private ConcurrentQueue m_CloseHandshakePendingQueue = new ConcurrentQueue(); /// /// The openning handshake timeout, in seconds /// private int m_OpenHandshakeTimeOut; /// /// The closing handshake timeout, in seconds /// private int m_CloseHandshakeTimeOut; /// /// The interval of checking handshake pending queue, in seconds /// private int m_HandshakePendingQueueCheckingInterval; private Timer m_HandshakePendingQueueCheckingTimer; /// /// Gets the sub protocol by sub protocol name. /// /// The name. /// internal ISubProtocol GetSubProtocol(string name) { ISubProtocol subProtocol; if (m_SubProtocols.TryGetValue(name, out subProtocol)) return subProtocol; else return null; } private IProtocolProcessor m_WebSocketProtocolProcessor; IProtocolProcessor IWebSocketServer.WebSocketProtocolProcessor { get { return m_WebSocketProtocolProcessor; } } /// /// Gets the request filter factory. /// public new WebSocketProtocol ReceiveFilterFactory { get { return (WebSocketProtocol)base.ReceiveFilterFactory; } } bool IWebSocketServer.ValidateHandshake(IWebSocketSession session, string origin) { var s = (TWebSocketSession)session; s.Origin = origin; return ValidateHandshake(s, origin); } /// /// Validates the handshake request. /// /// The session. /// The origin in the handshake request. /// protected virtual bool ValidateHandshake(TWebSocketSession session, string origin) { return true; } bool RegisterSubProtocol(ISubProtocol subProtocol) { if (m_SubProtocols.ContainsKey(subProtocol.Name)) { if (Logger.IsErrorEnabled) Logger.ErrorFormat("Cannot register duplicate name sub protocol! Duplicate name: {0}.", subProtocol.Name); return false; } m_SubProtocols.Add(subProtocol.Name, subProtocol); return true; } private bool SetupSubProtocols(IServerConfig config) { //Preparing sub protocols' configuration var subProtocolConfigSection = config.GetChildConfig("subProtocols"); var subProtocolConfigDict = new Dictionary(subProtocolConfigSection == null ? 0 : subProtocolConfigSection.Count, StringComparer.OrdinalIgnoreCase); if (subProtocolConfigSection != null) { foreach (var protocolConfig in subProtocolConfigSection) { string originalProtocolName = protocolConfig.Name; string protocolName; ISubProtocol subProtocolInstance; if (!string.IsNullOrEmpty(originalProtocolName)) { protocolName = originalProtocolName; if (!string.IsNullOrEmpty(protocolConfig.Type)) { try { subProtocolInstance = AssemblyUtil.CreateInstance>(protocolConfig.Type, new object[] { originalProtocolName }); } catch (Exception e) { Logger.Error(e); return false; } if (!RegisterSubProtocol(subProtocolInstance)) return false; } else { if (!m_SubProtocols.ContainsKey(protocolName)) { subProtocolInstance = new BasicSubProtocol(protocolName); if (!RegisterSubProtocol(subProtocolInstance)) return false; } } } else { protocolName = BasicSubProtocol.DefaultName; if (!string.IsNullOrEmpty(protocolConfig.Type)) { if (Logger.IsErrorEnabled) Logger.Error("You needn't set Type attribute for SubProtocol, if you don't set Name attribute!"); return false; } } subProtocolConfigDict[protocolName] = protocolConfig; } if (subProtocolConfigDict.Values.Any()) m_SubProtocolConfigured = true; } if (m_SubProtocols.Count <= 0 || (subProtocolConfigDict.ContainsKey(BasicSubProtocol.DefaultName) && !m_SubProtocols.ContainsKey(BasicSubProtocol.DefaultName))) { if (!RegisterSubProtocol(BasicSubProtocol.CreateDefaultSubProtocol())) return false; } //Initialize sub protocols foreach (var subProtocol in m_SubProtocols.Values) { SubProtocolConfig protocolConfig = null; subProtocolConfigDict.TryGetValue(subProtocol.Name, out protocolConfig); bool initialized = false; try { initialized = subProtocol.Initialize(this, protocolConfig, Logger); } catch (Exception e) { initialized = false; Logger.Error(e); } if (!initialized) { if (Logger.IsErrorEnabled) Logger.ErrorFormat("Failed to initialize the sub protocol '{0}'!", subProtocol.Name); return false; } } return true; } /// /// Setups with the specified root config. /// /// The root config. /// The config. /// protected override bool Setup(IRootConfig rootConfig, IServerConfig config) { if (m_SubProtocols != null && m_SubProtocols.Count > 0) DefaultSubProtocol = m_SubProtocols.Values.FirstOrDefault(); m_WebSocketProtocolProcessor = new DraftHybi10Processor { NextProcessor = new Rfc6455Processor { NextProcessor = new DraftHybi00Processor() } }; SetupMultipleProtocolSwitch(m_WebSocketProtocolProcessor); if (!int.TryParse(config.Options.GetValue("handshakePendingQueueCheckingInterval"), out m_HandshakePendingQueueCheckingInterval)) m_HandshakePendingQueueCheckingInterval = 60;// 1 minute default if (!int.TryParse(config.Options.GetValue("openHandshakeTimeOut"), out m_OpenHandshakeTimeOut)) m_OpenHandshakeTimeOut = 120;// 2 minute default if (!int.TryParse(config.Options.GetValue("closeHandshakeTimeOut"), out m_CloseHandshakeTimeOut)) m_CloseHandshakeTimeOut = 120;// 2 minute default if (m_BinaryDataConverter == null) { m_BinaryDataConverter = new TextEncodingBinaryDataConverter(Encoding.UTF8); } return true; } private void SetupMultipleProtocolSwitch(IProtocolProcessor rootProcessor) { var thisProcessor = rootProcessor; List availableVersions = new List(); while (true) { if (thisProcessor.Version > 0) availableVersions.Add(thisProcessor.Version); if (thisProcessor.NextProcessor == null) break; thisProcessor = thisProcessor.NextProcessor; } thisProcessor.NextProcessor = new MultipleProtocolSwitchProcessor(availableVersions.ToArray()); } /// /// Called when [started]. /// protected override void OnStarted() { m_HandshakePendingQueueCheckingTimer = new Timer(HandshakePendingQueueCheckingCallback, null, m_HandshakePendingQueueCheckingInterval * 1000, m_HandshakePendingQueueCheckingInterval * 1000); base.OnStarted(); } private void HandshakePendingQueueCheckingCallback(object state) { try { m_HandshakePendingQueueCheckingTimer.Change(Timeout.Infinite, Timeout.Infinite); while (true) { TWebSocketSession session; if (!m_OpenHandshakePendingQueue.TryPeek(out session)) break; if (session.Handshaked || !session.Connected) { //Handshaked or not connected m_OpenHandshakePendingQueue.TryDequeue(out session); continue; } if (DateTime.Now < session.StartTime.AddSeconds(m_OpenHandshakeTimeOut)) break; //Timeout, dequeue and then close m_OpenHandshakePendingQueue.TryDequeue(out session); session.Close(CloseReason.TimeOut); } while (true) { TWebSocketSession session; if (!m_CloseHandshakePendingQueue.TryPeek(out session)) break; if (!session.Connected) { //the session has been closed m_CloseHandshakePendingQueue.TryDequeue(out session); continue; } if (DateTime.Now < session.StartClosingHandshakeTime.AddSeconds(m_CloseHandshakeTimeOut)) break; //Timeout, dequeue and then close m_CloseHandshakePendingQueue.TryDequeue(out session); //Needn't send closing handshake again session.Close(CloseReason.ServerClosing); } } catch (Exception e) { if (Logger.IsErrorEnabled) Logger.Error(e); } finally { m_HandshakePendingQueueCheckingTimer.Change(m_HandshakePendingQueueCheckingInterval * 1000, m_HandshakePendingQueueCheckingInterval * 1000); } } internal void PushToCloseHandshakeQueue(IAppSession appSession) { m_CloseHandshakePendingQueue.Enqueue((TWebSocketSession)appSession); } /// /// Called when [new session connected]. /// /// The session. protected override void OnNewSessionConnected(TWebSocketSession session) { m_OpenHandshakePendingQueue.Enqueue(session); } internal void FireOnNewSessionConnected(IAppSession appSession) { base.OnNewSessionConnected((TWebSocketSession)appSession); } /// /// Occurs when [new request received]. /// /// [Browsable(false)] [EditorBrowsable(EditorBrowsableState.Never)] public override event RequestHandler NewRequestReceived { add { throw new NotSupportedException("Please use NewMessageReceived instead!"); } remove { throw new NotSupportedException("Please use NewMessageReceived instead!"); } } private SessionHandler m_NewMessageReceived; /// /// Occurs when [new message received]. /// public event SessionHandler NewMessageReceived { add { if (m_SubProtocolConfigured) throw new Exception("If you have defined any sub protocol, you cannot subscribe NewMessageReceived event!"); m_NewMessageReceived += value; } remove { m_NewMessageReceived -= value; } } void ExecuteMessage(TWebSocketSession session, string message) { if (session.SubProtocol == null) { if (Logger.IsErrorEnabled) Logger.Error("No SubProtocol selected! This session cannot process any message!"); session.CloseWithHandshake(session.ProtocolProcessor.CloseStatusClode.ProtocolError, "No SubProtocol selected"); return; } ExecuteSubCommand(session, session.SubProtocol.SubRequestParser.ParseRequestInfo(message)); } internal void OnNewMessageReceived(TWebSocketSession session, string message) { try { if (m_NewMessageReceived == null) { ExecuteMessage(session, message); } else { m_NewMessageReceived(session, message); } } catch (Exception ex) { if (Logger.IsErrorEnabled) Logger.Error(message + " " + ex.ToString()); } } private SessionHandler m_NewDataReceived; /// /// Occurs when [new data received]. /// public event SessionHandler NewDataReceived { add { m_NewDataReceived += value; } remove { m_NewDataReceived -= value; } } internal void OnNewDataReceived(TWebSocketSession session, byte[] data) { if (m_NewDataReceived == null) { var converter = m_BinaryDataConverter; if (converter != null) { ExecuteMessage(session, converter.ToString(data, 0, data.Length)); } return; } m_NewDataReceived(session, data); } private const string m_Tab = "\t"; private const char m_Colon = ':'; private const string m_Space = " "; private const char m_SpaceChar = ' '; private const string m_ValueSeparator = ", "; internal static void ParseHandshake(IWebSocketSession session, TextReader reader) { string line; string firstLine = string.Empty; string prevKey = string.Empty; logger.Info("============================================================"); while (!string.IsNullOrEmpty(line = reader.ReadLine())) { logger.Info(session.SessionID + " " + line); if (string.IsNullOrEmpty(firstLine)) { firstLine = line; continue; } if (line.StartsWith(m_Tab) && !string.IsNullOrEmpty(prevKey)) { string currentValue = session.Items.GetValue(prevKey, string.Empty); session.Items[prevKey] = currentValue + line.Trim(); continue; } int pos = line.IndexOf(m_Colon); if (pos <= 0) continue; string key = line.Substring(0, pos); if (!string.IsNullOrEmpty(key)) key = key.Trim(); var valueOffset = pos + 1; if (line.Length <= valueOffset) //No value in this line continue; string value = line.Substring(valueOffset); if (!string.IsNullOrEmpty(value) && value.StartsWith(m_Space) && value.Length > 1) value = value.Substring(1); if (string.IsNullOrEmpty(key)) continue; object oldValue; if (!session.Items.TryGetValue(key, out oldValue)) { session.Items.Add(key, value); } else { session.Items[key] = oldValue + m_ValueSeparator + value; } prevKey = key; } logger.Info("============================================================"); var metaInfo = firstLine.Split(m_SpaceChar); session.Method = metaInfo[0]; session.Path = metaInfo[1]; session.HttpVersion = metaInfo[2]; } /// /// Setups the commands. /// /// The discovered commands. /// protected override bool SetupCommands(Dictionary> discoveredCommands) { var commands = new List> { new HandShake(), new Text(), new Binary(), new Close(), new Ping(), new Pong(), new Continuation(), new Plain() }; commands.ForEach(c => discoveredCommands.Add(c.Name, c)); if (!SetupSubProtocols(Config)) return false; return true; } /// /// Executes the command. /// /// The session. /// The request info. protected override void ExecuteCommand(TWebSocketSession session, IWebSocketFragment requestInfo) { if (session.InClosing) { //Only handle closing handshake if the session is in closing if (requestInfo.Key != OpCode.CloseTag) return; } base.ExecuteCommand(session, requestInfo); } private void ExecuteSubCommand(TWebSocketSession session, SubRequestInfo requestInfo) { ISubCommand subCommand; if (session.SubProtocol.TryGetCommand(requestInfo.Key, out subCommand)) { session.CurrentCommand = requestInfo.Key; subCommand.ExecuteCommand(session, requestInfo); session.PrevCommand = requestInfo.Key; if (Config.LogCommand && Logger.IsInfoEnabled) Logger.Info(session, string.Format("Command - {0} - {1}", session.SessionID, requestInfo.Key)); } else { //session.SubProtocol.TryGetCommand("OCPP", out subCommand); //session.CurrentCommand = "OCPP"; //subCommand.ExecuteCommand(session, requestInfo); //session.PrevCommand = "OCPP"; session.HandleUnknownCommand(requestInfo); } session.LastActiveTime = DateTime.Now; } /// /// Broadcasts data to the specified sessions. /// /// The sessions. /// The data. /// The offset. /// The length. /// The send feedback. public void Broadcast(IEnumerable sessions, byte[] data, int offset, int length, Action sendFeedback) { IList> encodedPackage = null; IProtocolProcessor encodingProcessor = null; foreach (var s in sessions) { if (!s.Connected) continue; var currentProtocolProcessor = s.ProtocolProcessor; if (currentProtocolProcessor == null || !currentProtocolProcessor.CanSendBinaryData) continue; if (encodedPackage == null || currentProtocolProcessor != encodingProcessor) { encodedPackage = currentProtocolProcessor.GetEncodedPackage(OpCode.Binary, data, offset, length); encodingProcessor = currentProtocolProcessor; } Task.Factory.StartNew(SendRawDataToSession, new BroadcastState(s, encodedPackage, sendFeedback)); } } /// /// Broadcasts message to the specified sessions. /// /// The sessions. /// The message. /// The send feedback. public void Broadcast(IEnumerable sessions, string message, Action sendFeedback) { IList> encodedPackage = null; IProtocolProcessor encodingProcessor = null; foreach (var s in sessions) { if (!s.Connected) continue; var currentProtocolProcessor = s.ProtocolProcessor; if (currentProtocolProcessor == null) continue; if (encodedPackage == null || encodingProcessor != currentProtocolProcessor) { encodedPackage = currentProtocolProcessor.GetEncodedPackage(OpCode.Text, message); encodingProcessor = currentProtocolProcessor; } Task.Factory.StartNew(SendRawDataToSession, new BroadcastState(s, encodedPackage, sendFeedback)); } } private void SendRawDataToSession(object state) { var param = state as BroadcastState; var session = param.Session; var sendFeedback = param.FeedbackFunc; var sendOk = false; try { sendOk = session.TrySendRawData(param.Data); } catch (Exception e) { session.Logger.Error(e); } sendFeedback(session, sendOk); } #region JSON serialize/deserialize /// /// Serialize the target object by JSON /// /// The target. /// public virtual string JsonSerialize(object target) { return JsonConvert.SerializeObject(target); } /// /// Deserialize the JSON string to target type object. /// /// The json. /// The type. /// public virtual object JsonDeserialize(string json, Type type) { return JsonConvert.DeserializeObject(json, type); } #endregion class BroadcastState { public TWebSocketSession Session { get; private set; } public IList> Data { get; private set; } public Action FeedbackFunc { get; private set; } public BroadcastState(TWebSocketSession session, IList> data, Action feedbackFunc) { Session = session; Data = data; FeedbackFunc = feedbackFunc; } } } }