using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Collections.Specialized; using System.ComponentModel; using System.IO; using System.Linq; using System.Net; using System.Security.Cryptography; using System.Text; using System.Text.RegularExpressions; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Newtonsoft.Json; using SuperSocket.Common; using SuperSocket.SocketBase; using SuperSocket.SocketBase.Command; using SuperSocket.SocketBase.Config; using SuperSocket.SocketBase.Protocol; using SuperWebSocket.Command; using SuperWebSocket.Config; using SuperWebSocket.Protocol; using SuperWebSocket.SubProtocol; namespace SuperWebSocket { /// <summary> /// WebSocket server interface /// </summary> public interface IWebSocketServer : IAppServer { /// <summary> /// Gets the web socket protocol processor. /// </summary> IProtocolProcessor WebSocketProtocolProcessor { get; } /// <summary> /// Validates the handshake request. /// </summary> /// <param name="session">The session.</param> /// <param name="origin">The origin.</param> /// <returns>the validation result</returns> bool ValidateHandshake(IWebSocketSession session, string origin); } /// <summary> /// WebSocket AppServer /// </summary> public class WebSocketServer : WebSocketServer<WebSocketSession> { /// <summary> /// Initializes a new instance of the <see cref="WebSocketServer"/> class. /// </summary> /// <param name="subProtocols">The sub protocols.</param> public WebSocketServer(IEnumerable<ISubProtocol<WebSocketSession>> subProtocols) : base(subProtocols) { } /// <summary> /// Initializes a new instance of the <see cref="WebSocketServer"/> class. /// </summary> /// <param name="subProtocol">The sub protocol.</param> public WebSocketServer(ISubProtocol<WebSocketSession> subProtocol) : base(subProtocol) { } /// <summary> /// Initializes a new instance of the <see cref="WebSocketServer"/> class. /// </summary> public WebSocketServer() : base(new List<ISubProtocol<WebSocketSession>>()) { } } /// <summary> /// WebSocket AppServer /// </summary> /// <typeparam name="TWebSocketSession">The type of the web socket session.</typeparam> public abstract class WebSocketServer<TWebSocketSession> : AppServer<TWebSocketSession, IWebSocketFragment>, IWebSocketServer where TWebSocketSession : WebSocketSession<TWebSocketSession>, new() { private IBinaryDataConverter m_BinaryDataConverter; /// <summary> /// Gets or sets the binary data converter. /// </summary> /// <value> /// The binary data converter. /// </value> protected IBinaryDataConverter BinaryDataConverter { get { return m_BinaryDataConverter; } set { m_BinaryDataConverter = value; } } /// <summary> /// Initializes a new instance of the <see cref="WebSocketServer<TWebSocketSession>"/> class. /// </summary> /// <param name="subProtocols">The sub protocols.</param> public WebSocketServer(IEnumerable<ISubProtocol<TWebSocketSession>> subProtocols) : this() { if (!subProtocols.Any()) return; foreach (var protocol in subProtocols) { if (!RegisterSubProtocolInternal(protocol)) throw new Exception("Failed to register sub protocol!"); } m_SubProtocolConfigured = true; } /// <summary> /// Initializes a new instance of the <see cref="WebSocketServer<TWebSocketSession>"/> class. /// </summary> /// <param name="subProtocol">The sub protocol.</param> public WebSocketServer(ISubProtocol<TWebSocketSession> subProtocol) : this(new List<ISubProtocol<TWebSocketSession>> { subProtocol }) { } /// <summary> /// Initializes a new instance of the <see cref="WebSocketServer<TWebSocketSession>"/> class. /// </summary> public WebSocketServer() : base(new WebSocketProtocol()) { } //static private ILogger logger; private Dictionary<string, ISubProtocol<TWebSocketSession>> m_SubProtocols = new Dictionary<string, ISubProtocol<TWebSocketSession>>(StringComparer.OrdinalIgnoreCase); internal ISubProtocol<TWebSocketSession> DefaultSubProtocol { get; private set; } private bool m_SubProtocolConfigured = false; private ConcurrentQueue<TWebSocketSession> m_OpenHandshakePendingQueue = new ConcurrentQueue<TWebSocketSession>(); private ConcurrentQueue<TWebSocketSession> m_CloseHandshakePendingQueue = new ConcurrentQueue<TWebSocketSession>(); /// <summary> /// The openning handshake timeout, in seconds /// </summary> private int m_OpenHandshakeTimeOut; /// <summary> /// The closing handshake timeout, in seconds /// </summary> private int m_CloseHandshakeTimeOut; /// <summary> /// The interval of checking handshake pending queue, in seconds /// </summary> private int m_HandshakePendingQueueCheckingInterval; private Timer m_HandshakePendingQueueCheckingTimer; /// <summary> /// Gets the sub protocol by sub protocol name. /// </summary> /// <param name="name">The name.</param> /// <returns></returns> internal ISubProtocol<TWebSocketSession> GetSubProtocol(string name) { ISubProtocol<TWebSocketSession> subProtocol; if (m_SubProtocols.TryGetValue(name, out subProtocol)) return subProtocol; else return null; } private IProtocolProcessor m_WebSocketProtocolProcessor; IProtocolProcessor IWebSocketServer.WebSocketProtocolProcessor { get { return m_WebSocketProtocolProcessor; } } /// <summary> /// Gets the request filter factory. /// </summary> public new WebSocketProtocol ReceiveFilterFactory { get { return (WebSocketProtocol)base.ReceiveFilterFactory; } } /// <summary> /// Register sub protocols, Only use after empty create /// </summary> /// <param name="subProtocols"></param> /// <exception cref="Exception"></exception> public void RegisterSubProtocol(IEnumerable<ISubProtocol<TWebSocketSession>> subProtocols) { if (!subProtocols.Any()) return; foreach (var protocol in subProtocols) { if (!RegisterSubProtocolInternal(protocol)) throw new Exception("Failed to register sub protocol!"); } m_SubProtocolConfigured = true; } bool IWebSocketServer.ValidateHandshake(IWebSocketSession session, string origin) { var s = (TWebSocketSession)session; s.Origin = origin; return ValidateHandshake(s, origin); } /// <summary> /// Validates the handshake request. /// </summary> /// <param name="session">The session.</param> /// <param name="origin">The origin in the handshake request.</param> /// <returns></returns> protected virtual bool ValidateHandshake(TWebSocketSession session, string origin) { return true; } bool RegisterSubProtocolInternal(ISubProtocol<TWebSocketSession> subProtocol) { if (m_SubProtocols.ContainsKey(subProtocol.Name)) { Logger.LogError("Cannot register duplicate name sub protocol! Duplicate name: {0}.", subProtocol.Name); return false; } m_SubProtocols.Add(subProtocol.Name, subProtocol); return true; } private bool SetupSubProtocols(IServerConfig config) { //Preparing sub protocols' configuration var subProtocolConfigSection = config.GetChildConfig<SubProtocolConfigCollection>("subProtocols"); var subProtocolConfigDict = new Dictionary<string, SubProtocolConfig>(subProtocolConfigSection == null ? 0 : subProtocolConfigSection.Count, StringComparer.OrdinalIgnoreCase); if (subProtocolConfigSection != null) { foreach (var protocolConfig in subProtocolConfigSection) { string originalProtocolName = protocolConfig.Name; string protocolName; ISubProtocol<TWebSocketSession> subProtocolInstance; if (!string.IsNullOrEmpty(originalProtocolName)) { protocolName = originalProtocolName; if (!string.IsNullOrEmpty(protocolConfig.Type)) { try { subProtocolInstance = AssemblyUtil.CreateInstance<ISubProtocol<TWebSocketSession>>(protocolConfig.Type, new object[] { originalProtocolName }); } catch (Exception e) { Logger.LogError(e.Message); return false; } if (!RegisterSubProtocolInternal(subProtocolInstance)) return false; } else { if (!m_SubProtocols.ContainsKey(protocolName)) { subProtocolInstance = new BasicSubProtocol<TWebSocketSession>(protocolName); if (!RegisterSubProtocolInternal(subProtocolInstance)) return false; } } } else { protocolName = BasicSubProtocol<TWebSocketSession>.DefaultName; if (!string.IsNullOrEmpty(protocolConfig.Type)) { Logger.LogError("You needn't set Type attribute for SubProtocol, if you don't set Name attribute!"); return false; } } subProtocolConfigDict[protocolName] = protocolConfig; } if (subProtocolConfigDict.Values.Any()) m_SubProtocolConfigured = true; } if (m_SubProtocols.Count <= 0 || (subProtocolConfigDict.ContainsKey(BasicSubProtocol<TWebSocketSession>.DefaultName) && !m_SubProtocols.ContainsKey(BasicSubProtocol<TWebSocketSession>.DefaultName))) { if (!RegisterSubProtocolInternal(BasicSubProtocol<TWebSocketSession>.CreateDefaultSubProtocol())) return false; } //Initialize sub protocols foreach (var subProtocol in m_SubProtocols.Values) { SubProtocolConfig protocolConfig = null; subProtocolConfigDict.TryGetValue(subProtocol.Name, out protocolConfig); bool initialized = false; try { initialized = subProtocol.Initialize(this, protocolConfig, Logger); } catch (Exception e) { initialized = false; Logger.LogError(e,e.Message); } if (!initialized) { Logger.LogError("Failed to initialize the sub protocol '{0}'!", subProtocol.Name); return false; } } return true; } /// <summary> /// Setups with the specified root config. /// </summary> /// <param name="rootConfig">The root config.</param> /// <param name="config">The config.</param> /// <returns></returns> protected override bool Setup(IRootConfig rootConfig, IServerConfig config) { if (m_SubProtocols != null && m_SubProtocols.Count > 0) DefaultSubProtocol = m_SubProtocols.Values.FirstOrDefault(); m_WebSocketProtocolProcessor = new DraftHybi10Processor { NextProcessor = new Rfc6455Processor { NextProcessor = new DraftHybi00Processor() } }; SetupMultipleProtocolSwitch(m_WebSocketProtocolProcessor); if (!int.TryParse(config.Options.GetValue("handshakePendingQueueCheckingInterval"), out m_HandshakePendingQueueCheckingInterval)) m_HandshakePendingQueueCheckingInterval = 60;// 1 minute default if (!int.TryParse(config.Options.GetValue("openHandshakeTimeOut"), out m_OpenHandshakeTimeOut)) m_OpenHandshakeTimeOut = 120;// 2 minute default if (!int.TryParse(config.Options.GetValue("closeHandshakeTimeOut"), out m_CloseHandshakeTimeOut)) m_CloseHandshakeTimeOut = 120;// 2 minute default if (m_BinaryDataConverter == null) { m_BinaryDataConverter = new TextEncodingBinaryDataConverter(Encoding.UTF8); } return true; } private void SetupMultipleProtocolSwitch(IProtocolProcessor rootProcessor) { var thisProcessor = rootProcessor; List<int> availableVersions = new List<int>(); while (true) { if (thisProcessor.Version > 0) availableVersions.Add(thisProcessor.Version); if (thisProcessor.NextProcessor == null) break; thisProcessor = thisProcessor.NextProcessor; } thisProcessor.NextProcessor = new MultipleProtocolSwitchProcessor(availableVersions.ToArray()); } /// <summary> /// Called when [started]. /// </summary> protected override void OnStarted() { m_HandshakePendingQueueCheckingTimer = new Timer(HandshakePendingQueueCheckingCallback, null, m_HandshakePendingQueueCheckingInterval * 1000, m_HandshakePendingQueueCheckingInterval * 1000); base.OnStarted(); } private void HandshakePendingQueueCheckingCallback(object state) { try { m_HandshakePendingQueueCheckingTimer.Change(Timeout.Infinite, Timeout.Infinite); while (true) { TWebSocketSession session; if (!m_OpenHandshakePendingQueue.TryPeek(out session)) break; if (session.Handshaked || !session.Connected) { //Handshaked or not connected m_OpenHandshakePendingQueue.TryDequeue(out session); continue; } if (DateTime.UtcNow < session.StartTime.AddSeconds(m_OpenHandshakeTimeOut)) break; //Timeout, dequeue and then close m_OpenHandshakePendingQueue.TryDequeue(out session); session.Close(CloseReason.TimeOut); } while (true) { TWebSocketSession session; if (!m_CloseHandshakePendingQueue.TryPeek(out session)) break; if (!session.Connected) { //the session has been closed m_CloseHandshakePendingQueue.TryDequeue(out session); continue; } if (DateTime.UtcNow < session.StartClosingHandshakeTime.AddSeconds(m_CloseHandshakeTimeOut)) break; //Timeout, dequeue and then close m_CloseHandshakePendingQueue.TryDequeue(out session); //Needn't send closing handshake again session.Close(CloseReason.ServerClosing); } } catch (Exception e) { Logger.LogError(e,e.Message); } finally { m_HandshakePendingQueueCheckingTimer.Change(m_HandshakePendingQueueCheckingInterval * 1000, m_HandshakePendingQueueCheckingInterval * 1000); } } internal void PushToCloseHandshakeQueue(IAppSession appSession) { m_CloseHandshakePendingQueue.Enqueue((TWebSocketSession)appSession); } /// <summary> /// Called when [new session connected]. /// </summary> /// <param name="session">The session.</param> protected override void OnNewSessionConnected(TWebSocketSession session) { m_OpenHandshakePendingQueue.Enqueue(session); } internal void FireOnNewSessionConnected(IAppSession appSession) { base.OnNewSessionConnected((TWebSocketSession)appSession); } /// <summary> /// Occurs when [new request received]. /// </summary> /// <exception cref="System.NotSupportedException"></exception> [Browsable(false)] [EditorBrowsable(EditorBrowsableState.Never)] public override event RequestHandler<TWebSocketSession, IWebSocketFragment> NewRequestReceived { add { throw new NotSupportedException("Please use NewMessageReceived instead!"); } remove { throw new NotSupportedException("Please use NewMessageReceived instead!"); } } private SessionHandler<TWebSocketSession, string> m_NewMessageReceived; /// <summary> /// Occurs when [new message received]. /// </summary> public event SessionHandler<TWebSocketSession, string> NewMessageReceived { add { if (m_SubProtocolConfigured) throw new Exception("If you have defined any sub protocol, you cannot subscribe NewMessageReceived event!"); m_NewMessageReceived += value; } remove { m_NewMessageReceived -= value; } } void ExecuteMessage(TWebSocketSession session, string message) { if (session.SubProtocol == null) { Logger.LogError("No SubProtocol selected! This session cannot process any message!"); session.CloseWithHandshake(session.ProtocolProcessor.CloseStatusClode.ProtocolError, "No SubProtocol selected"); return; } ExecuteSubCommand(session, session.SubProtocol.SubRequestParser.ParseRequestInfo(message)); } internal void OnNewMessageReceived(TWebSocketSession session, string message) { try { if (m_NewMessageReceived == null) { ExecuteMessage(session, message); } else { m_NewMessageReceived(session, message); } } catch (Exception ex) { Logger.LogError(message + " " + ex.ToString()); } } private SessionHandler<TWebSocketSession, byte[]> m_NewDataReceived; /// <summary> /// Occurs when [new data received]. /// </summary> public event SessionHandler<TWebSocketSession, byte[]> NewDataReceived { add { m_NewDataReceived += value; } remove { m_NewDataReceived -= value; } } internal void OnNewDataReceived(TWebSocketSession session, byte[] data) { if (m_NewDataReceived == null) { var converter = m_BinaryDataConverter; if (converter != null) { ExecuteMessage(session, converter.ToString(data, 0, data.Length)); } return; } m_NewDataReceived(session, data); } private const string m_Tab = "\t"; private const char m_Colon = ':'; private const string m_Space = " "; private const char m_SpaceChar = ' '; private const string m_ValueSeparator = ", "; internal static void ParseHandshake(IWebSocketSession session, TextReader reader) { string line; string firstLine = string.Empty; string prevKey = string.Empty; //logger.LogInformation("============================================================"); while (!string.IsNullOrEmpty(line = reader.ReadLine())) { //logger.LogInformation(session.SessionID + " " + line); if (string.IsNullOrEmpty(firstLine)) { firstLine = line; continue; } if (line.StartsWith(m_Tab) && !string.IsNullOrEmpty(prevKey)) { string currentValue = session.Items.GetValue<string>(prevKey, string.Empty); session.Items[prevKey] = currentValue + line.Trim(); continue; } int pos = line.IndexOf(m_Colon); if (pos <= 0) continue; string key = line.Substring(0, pos); if (!string.IsNullOrEmpty(key)) key = key.Trim(); var valueOffset = pos + 1; if (line.Length <= valueOffset) //No value in this line continue; string value = line.Substring(valueOffset); if (!string.IsNullOrEmpty(value) && value.StartsWith(m_Space) && value.Length > 1) value = value.Substring(1); if (string.IsNullOrEmpty(key)) continue; object oldValue; if (!session.Items.TryGetValue(key, out oldValue)) { session.Items.Add(key, value); } else { session.Items[key] = oldValue + m_ValueSeparator + value; } prevKey = key; } //logger.LogInformation("============================================================"); var metaInfo = firstLine.Split(m_SpaceChar); session.Method = metaInfo[0]; session.Path = metaInfo[1]; session.HttpVersion = metaInfo[2]; } /// <summary> /// Setups the commands. /// </summary> /// <param name="discoveredCommands">The discovered commands.</param> /// <returns></returns> protected override bool SetupCommands(Dictionary<string, ICommand<TWebSocketSession, IWebSocketFragment>> discoveredCommands) { var commands = new List<ICommand<TWebSocketSession, IWebSocketFragment>> { new HandShake<TWebSocketSession>(), new Text<TWebSocketSession>(), new Binary<TWebSocketSession>(), new Close<TWebSocketSession>(), new Ping<TWebSocketSession>(), new Pong<TWebSocketSession>(), new Continuation<TWebSocketSession>(), new Plain<TWebSocketSession>() }; commands.ForEach(c => discoveredCommands.Add(c.Name, c)); if (!SetupSubProtocols(Config)) return false; return true; } /// <summary> /// Executes the command. /// </summary> /// <param name="session">The session.</param> /// <param name="requestInfo">The request info.</param> protected override void ExecuteCommand(TWebSocketSession session, IWebSocketFragment requestInfo) { if (session.InClosing) { //Only handle closing handshake if the session is in closing if (requestInfo.Key != OpCode.CloseTag) return; } base.ExecuteCommand(session, requestInfo); } private void ExecuteSubCommand(TWebSocketSession session, SubRequestInfo requestInfo) { ISubCommand<TWebSocketSession> subCommand; if (session.SubProtocol.TryGetCommand(requestInfo.Key, out subCommand)) { session.CurrentCommand = requestInfo.Key; subCommand.ExecuteCommand(session, requestInfo); session.PrevCommand = requestInfo.Key; Logger.LogInfo(session, string.Format("Command - {0} - {1}", session.SessionID, requestInfo.Key)); } else { //session.SubProtocol.TryGetCommand("OCPP", out subCommand); //session.CurrentCommand = "OCPP"; //subCommand.ExecuteCommand(session, requestInfo); //session.PrevCommand = "OCPP"; session.HandleUnknownCommand(requestInfo); } session.LastActiveTime = DateTime.UtcNow; } /// <summary> /// Broadcasts data to the specified sessions. /// </summary> /// <param name="sessions">The sessions.</param> /// <param name="data">The data.</param> /// <param name="offset">The offset.</param> /// <param name="length">The length.</param> /// <param name="sendFeedback">The send feedback.</param> public void Broadcast(IEnumerable<TWebSocketSession> sessions, byte[] data, int offset, int length, Action<TWebSocketSession, bool> sendFeedback) { IList<ArraySegment<byte>> encodedPackage = null; IProtocolProcessor encodingProcessor = null; foreach (var s in sessions) { if (!s.Connected) continue; var currentProtocolProcessor = s.ProtocolProcessor; if (currentProtocolProcessor == null || !currentProtocolProcessor.CanSendBinaryData) continue; if (encodedPackage == null || currentProtocolProcessor != encodingProcessor) { encodedPackage = currentProtocolProcessor.GetEncodedPackage(OpCode.Binary, data, offset, length); encodingProcessor = currentProtocolProcessor; } Task.Factory.StartNew(SendRawDataToSession, new BroadcastState(s, encodedPackage, sendFeedback)); } } /// <summary> /// Broadcasts message to the specified sessions. /// </summary> /// <param name="sessions">The sessions.</param> /// <param name="message">The message.</param> /// <param name="sendFeedback">The send feedback.</param> public void Broadcast(IEnumerable<TWebSocketSession> sessions, string message, Action<TWebSocketSession, bool> sendFeedback) { IList<ArraySegment<byte>> encodedPackage = null; IProtocolProcessor encodingProcessor = null; foreach (var s in sessions) { if (!s.Connected) continue; var currentProtocolProcessor = s.ProtocolProcessor; if (currentProtocolProcessor == null) continue; if (encodedPackage == null || encodingProcessor != currentProtocolProcessor) { encodedPackage = currentProtocolProcessor.GetEncodedPackage(OpCode.Text, message); encodingProcessor = currentProtocolProcessor; } Task.Factory.StartNew(SendRawDataToSession, new BroadcastState(s, encodedPackage, sendFeedback)); } } private void SendRawDataToSession(object state) { var param = state as BroadcastState; var session = param.Session; var sendFeedback = param.FeedbackFunc; var sendOk = false; try { sendOk = session.TrySendRawData(param.Data); } catch (Exception e) { session.Logger.LogError(e,e.Message); } sendFeedback(session, sendOk); } #region JSON serialize/deserialize /// <summary> /// Serialize the target object by JSON /// </summary> /// <param name="target">The target.</param> /// <returns></returns> public virtual string JsonSerialize(object target) { return JsonConvert.SerializeObject(target); } /// <summary> /// Deserialize the JSON string to target type object. /// </summary> /// <param name="json">The json.</param> /// <param name="type">The type.</param> /// <returns></returns> public virtual object JsonDeserialize(string json, Type type) { return JsonConvert.DeserializeObject(json, type); } #endregion class BroadcastState { public TWebSocketSession Session { get; private set; } public IList<ArraySegment<byte>> Data { get; private set; } public Action<TWebSocketSession, bool> FeedbackFunc { get; private set; } public BroadcastState(TWebSocketSession session, IList<ArraySegment<byte>> data, Action<TWebSocketSession, bool> feedbackFunc) { Session = session; Data = data; FeedbackFunc = feedbackFunc; } } } }