using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.Specialized;
using System.ComponentModel;
using System.IO;
using System.Linq;
using System.Net;
using System.Security.Cryptography;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using Newtonsoft.Json;
using NLog;
using SuperSocket.Common;
using SuperSocket.SocketBase;
using SuperSocket.SocketBase.Command;
using SuperSocket.SocketBase.Config;
using SuperSocket.SocketBase.Protocol;
using SuperWebSocket.Command;
using SuperWebSocket.Config;
using SuperWebSocket.Protocol;
using SuperWebSocket.SubProtocol;
namespace SuperWebSocket
{
///
/// WebSocket server interface
///
public interface IWebSocketServer : IAppServer
{
///
/// Gets the web socket protocol processor.
///
IProtocolProcessor WebSocketProtocolProcessor { get; }
///
/// Validates the handshake request.
///
/// The session.
/// The origin.
/// the validation result
bool ValidateHandshake(IWebSocketSession session, string origin);
}
///
/// WebSocket AppServer
///
public class WebSocketServer : WebSocketServer
{
///
/// Initializes a new instance of the class.
///
/// The sub protocols.
public WebSocketServer(IEnumerable> subProtocols)
: base(subProtocols)
{
}
///
/// Initializes a new instance of the class.
///
/// The sub protocol.
public WebSocketServer(ISubProtocol subProtocol)
: base(subProtocol)
{
}
///
/// Initializes a new instance of the class.
///
public WebSocketServer()
: base(new List>())
{
}
}
///
/// WebSocket AppServer
///
/// The type of the web socket session.
public abstract class WebSocketServer : AppServer, IWebSocketServer
where TWebSocketSession : WebSocketSession, new()
{
private IBinaryDataConverter m_BinaryDataConverter;
///
/// Gets or sets the binary data converter.
///
///
/// The binary data converter.
///
protected IBinaryDataConverter BinaryDataConverter
{
get { return m_BinaryDataConverter; }
set { m_BinaryDataConverter = value; }
}
///
/// Initializes a new instance of the class.
///
/// The sub protocols.
public WebSocketServer(IEnumerable> subProtocols)
: this()
{
if (!subProtocols.Any())
return;
foreach (var protocol in subProtocols)
{
if (!RegisterSubProtocol(protocol))
throw new Exception("Failed to register sub protocol!");
}
m_SubProtocolConfigured = true;
}
///
/// Initializes a new instance of the class.
///
/// The sub protocol.
public WebSocketServer(ISubProtocol subProtocol)
: this(new List> { subProtocol })
{
}
///
/// Initializes a new instance of the class.
///
public WebSocketServer()
: base(new WebSocketProtocol())
{
}
static private ILogger logger = NLog.LogManager.GetCurrentClassLogger();
private Dictionary> m_SubProtocols = new Dictionary>(StringComparer.OrdinalIgnoreCase);
internal ISubProtocol DefaultSubProtocol { get; private set; }
private bool m_SubProtocolConfigured = false;
private ConcurrentQueue m_OpenHandshakePendingQueue = new ConcurrentQueue();
private ConcurrentQueue m_CloseHandshakePendingQueue = new ConcurrentQueue();
///
/// The openning handshake timeout, in seconds
///
private int m_OpenHandshakeTimeOut;
///
/// The closing handshake timeout, in seconds
///
private int m_CloseHandshakeTimeOut;
///
/// The interval of checking handshake pending queue, in seconds
///
private int m_HandshakePendingQueueCheckingInterval;
private Timer m_HandshakePendingQueueCheckingTimer;
///
/// Gets the sub protocol by sub protocol name.
///
/// The name.
///
internal ISubProtocol GetSubProtocol(string name)
{
ISubProtocol subProtocol;
if (m_SubProtocols.TryGetValue(name, out subProtocol))
return subProtocol;
else
return null;
}
private IProtocolProcessor m_WebSocketProtocolProcessor;
IProtocolProcessor IWebSocketServer.WebSocketProtocolProcessor
{
get { return m_WebSocketProtocolProcessor; }
}
///
/// Gets the request filter factory.
///
public new WebSocketProtocol ReceiveFilterFactory
{
get
{
return (WebSocketProtocol)base.ReceiveFilterFactory;
}
}
bool IWebSocketServer.ValidateHandshake(IWebSocketSession session, string origin)
{
var s = (TWebSocketSession)session;
s.Origin = origin;
return ValidateHandshake(s, origin);
}
///
/// Validates the handshake request.
///
/// The session.
/// The origin in the handshake request.
///
protected virtual bool ValidateHandshake(TWebSocketSession session, string origin)
{
return true;
}
bool RegisterSubProtocol(ISubProtocol subProtocol)
{
if (m_SubProtocols.ContainsKey(subProtocol.Name))
{
if (Logger.IsErrorEnabled)
Logger.ErrorFormat("Cannot register duplicate name sub protocol! Duplicate name: {0}.", subProtocol.Name);
return false;
}
m_SubProtocols.Add(subProtocol.Name, subProtocol);
return true;
}
private bool SetupSubProtocols(IServerConfig config)
{
//Preparing sub protocols' configuration
var subProtocolConfigSection = config.GetChildConfig("subProtocols");
var subProtocolConfigDict = new Dictionary(subProtocolConfigSection == null ? 0 : subProtocolConfigSection.Count, StringComparer.OrdinalIgnoreCase);
if (subProtocolConfigSection != null)
{
foreach (var protocolConfig in subProtocolConfigSection)
{
string originalProtocolName = protocolConfig.Name;
string protocolName;
ISubProtocol subProtocolInstance;
if (!string.IsNullOrEmpty(originalProtocolName))
{
protocolName = originalProtocolName;
if (!string.IsNullOrEmpty(protocolConfig.Type))
{
try
{
subProtocolInstance = AssemblyUtil.CreateInstance>(protocolConfig.Type, new object[] { originalProtocolName });
}
catch (Exception e)
{
Logger.Error(e);
return false;
}
if (!RegisterSubProtocol(subProtocolInstance))
return false;
}
else
{
if (!m_SubProtocols.ContainsKey(protocolName))
{
subProtocolInstance = new BasicSubProtocol(protocolName);
if (!RegisterSubProtocol(subProtocolInstance))
return false;
}
}
}
else
{
protocolName = BasicSubProtocol.DefaultName;
if (!string.IsNullOrEmpty(protocolConfig.Type))
{
if (Logger.IsErrorEnabled)
Logger.Error("You needn't set Type attribute for SubProtocol, if you don't set Name attribute!");
return false;
}
}
subProtocolConfigDict[protocolName] = protocolConfig;
}
if (subProtocolConfigDict.Values.Any())
m_SubProtocolConfigured = true;
}
if (m_SubProtocols.Count <= 0 || (subProtocolConfigDict.ContainsKey(BasicSubProtocol.DefaultName) && !m_SubProtocols.ContainsKey(BasicSubProtocol.DefaultName)))
{
if (!RegisterSubProtocol(BasicSubProtocol.CreateDefaultSubProtocol()))
return false;
}
//Initialize sub protocols
foreach (var subProtocol in m_SubProtocols.Values)
{
SubProtocolConfig protocolConfig = null;
subProtocolConfigDict.TryGetValue(subProtocol.Name, out protocolConfig);
bool initialized = false;
try
{
initialized = subProtocol.Initialize(this, protocolConfig, Logger);
}
catch (Exception e)
{
initialized = false;
Logger.Error(e);
}
if (!initialized)
{
if (Logger.IsErrorEnabled)
Logger.ErrorFormat("Failed to initialize the sub protocol '{0}'!", subProtocol.Name);
return false;
}
}
return true;
}
///
/// Setups with the specified root config.
///
/// The root config.
/// The config.
///
protected override bool Setup(IRootConfig rootConfig, IServerConfig config)
{
if (m_SubProtocols != null && m_SubProtocols.Count > 0)
DefaultSubProtocol = m_SubProtocols.Values.FirstOrDefault();
m_WebSocketProtocolProcessor = new DraftHybi10Processor
{
NextProcessor = new Rfc6455Processor
{
NextProcessor = new DraftHybi00Processor()
}
};
SetupMultipleProtocolSwitch(m_WebSocketProtocolProcessor);
if (!int.TryParse(config.Options.GetValue("handshakePendingQueueCheckingInterval"), out m_HandshakePendingQueueCheckingInterval))
m_HandshakePendingQueueCheckingInterval = 60;// 1 minute default
if (!int.TryParse(config.Options.GetValue("openHandshakeTimeOut"), out m_OpenHandshakeTimeOut))
m_OpenHandshakeTimeOut = 120;// 2 minute default
if (!int.TryParse(config.Options.GetValue("closeHandshakeTimeOut"), out m_CloseHandshakeTimeOut))
m_CloseHandshakeTimeOut = 120;// 2 minute default
if (m_BinaryDataConverter == null)
{
m_BinaryDataConverter = new TextEncodingBinaryDataConverter(Encoding.UTF8);
}
return true;
}
private void SetupMultipleProtocolSwitch(IProtocolProcessor rootProcessor)
{
var thisProcessor = rootProcessor;
List availableVersions = new List();
while (true)
{
if (thisProcessor.Version > 0)
availableVersions.Add(thisProcessor.Version);
if (thisProcessor.NextProcessor == null)
break;
thisProcessor = thisProcessor.NextProcessor;
}
thisProcessor.NextProcessor = new MultipleProtocolSwitchProcessor(availableVersions.ToArray());
}
///
/// Called when [started].
///
protected override void OnStarted()
{
m_HandshakePendingQueueCheckingTimer = new Timer(HandshakePendingQueueCheckingCallback, null, m_HandshakePendingQueueCheckingInterval * 1000, m_HandshakePendingQueueCheckingInterval * 1000);
base.OnStarted();
}
private void HandshakePendingQueueCheckingCallback(object state)
{
try
{
m_HandshakePendingQueueCheckingTimer.Change(Timeout.Infinite, Timeout.Infinite);
while (true)
{
TWebSocketSession session;
if (!m_OpenHandshakePendingQueue.TryPeek(out session))
break;
if (session.Handshaked || !session.Connected)
{
//Handshaked or not connected
m_OpenHandshakePendingQueue.TryDequeue(out session);
continue;
}
if (DateTime.Now < session.StartTime.AddSeconds(m_OpenHandshakeTimeOut))
break;
//Timeout, dequeue and then close
m_OpenHandshakePendingQueue.TryDequeue(out session);
session.Close(CloseReason.TimeOut);
}
while (true)
{
TWebSocketSession session;
if (!m_CloseHandshakePendingQueue.TryPeek(out session))
break;
if (!session.Connected)
{
//the session has been closed
m_CloseHandshakePendingQueue.TryDequeue(out session);
continue;
}
if (DateTime.Now < session.StartClosingHandshakeTime.AddSeconds(m_CloseHandshakeTimeOut))
break;
//Timeout, dequeue and then close
m_CloseHandshakePendingQueue.TryDequeue(out session);
//Needn't send closing handshake again
session.Close(CloseReason.ServerClosing);
}
}
catch (Exception e)
{
if (Logger.IsErrorEnabled)
Logger.Error(e);
}
finally
{
m_HandshakePendingQueueCheckingTimer.Change(m_HandshakePendingQueueCheckingInterval * 1000, m_HandshakePendingQueueCheckingInterval * 1000);
}
}
internal void PushToCloseHandshakeQueue(IAppSession appSession)
{
m_CloseHandshakePendingQueue.Enqueue((TWebSocketSession)appSession);
}
///
/// Called when [new session connected].
///
/// The session.
protected override void OnNewSessionConnected(TWebSocketSession session)
{
m_OpenHandshakePendingQueue.Enqueue(session);
}
internal void FireOnNewSessionConnected(IAppSession appSession)
{
base.OnNewSessionConnected((TWebSocketSession)appSession);
}
///
/// Occurs when [new request received].
///
///
[Browsable(false)]
[EditorBrowsable(EditorBrowsableState.Never)]
public override event RequestHandler NewRequestReceived
{
add { throw new NotSupportedException("Please use NewMessageReceived instead!"); }
remove { throw new NotSupportedException("Please use NewMessageReceived instead!"); }
}
private SessionHandler m_NewMessageReceived;
///
/// Occurs when [new message received].
///
public event SessionHandler NewMessageReceived
{
add
{
if (m_SubProtocolConfigured)
throw new Exception("If you have defined any sub protocol, you cannot subscribe NewMessageReceived event!");
m_NewMessageReceived += value;
}
remove
{
m_NewMessageReceived -= value;
}
}
void ExecuteMessage(TWebSocketSession session, string message)
{
if (session.SubProtocol == null)
{
if (Logger.IsErrorEnabled)
Logger.Error("No SubProtocol selected! This session cannot process any message!");
session.CloseWithHandshake(session.ProtocolProcessor.CloseStatusClode.ProtocolError, "No SubProtocol selected");
return;
}
ExecuteSubCommand(session, session.SubProtocol.SubRequestParser.ParseRequestInfo(message));
}
internal void OnNewMessageReceived(TWebSocketSession session, string message)
{
try
{
if (m_NewMessageReceived == null)
{
ExecuteMessage(session, message);
}
else
{
m_NewMessageReceived(session, message);
}
}
catch (Exception ex)
{
if (Logger.IsErrorEnabled)
Logger.Error(message + " " + ex.ToString());
}
}
private SessionHandler m_NewDataReceived;
///
/// Occurs when [new data received].
///
public event SessionHandler NewDataReceived
{
add
{
m_NewDataReceived += value;
}
remove
{
m_NewDataReceived -= value;
}
}
internal void OnNewDataReceived(TWebSocketSession session, byte[] data)
{
if (m_NewDataReceived == null)
{
var converter = m_BinaryDataConverter;
if (converter != null)
{
ExecuteMessage(session, converter.ToString(data, 0, data.Length));
}
return;
}
m_NewDataReceived(session, data);
}
private const string m_Tab = "\t";
private const char m_Colon = ':';
private const string m_Space = " ";
private const char m_SpaceChar = ' ';
private const string m_ValueSeparator = ", ";
internal static void ParseHandshake(IWebSocketSession session, TextReader reader)
{
string line;
string firstLine = string.Empty;
string prevKey = string.Empty;
logger.Info("============================================================");
while (!string.IsNullOrEmpty(line = reader.ReadLine()))
{
logger.Info(session.SessionID + " " + line);
if (string.IsNullOrEmpty(firstLine))
{
firstLine = line;
continue;
}
if (line.StartsWith(m_Tab) && !string.IsNullOrEmpty(prevKey))
{
string currentValue = session.Items.GetValue(prevKey, string.Empty);
session.Items[prevKey] = currentValue + line.Trim();
continue;
}
int pos = line.IndexOf(m_Colon);
if (pos <= 0)
continue;
string key = line.Substring(0, pos);
if (!string.IsNullOrEmpty(key))
key = key.Trim();
var valueOffset = pos + 1;
if (line.Length <= valueOffset) //No value in this line
continue;
string value = line.Substring(valueOffset);
if (!string.IsNullOrEmpty(value) && value.StartsWith(m_Space) && value.Length > 1)
value = value.Substring(1);
if (string.IsNullOrEmpty(key))
continue;
object oldValue;
if (!session.Items.TryGetValue(key, out oldValue))
{
session.Items.Add(key, value);
}
else
{
session.Items[key] = oldValue + m_ValueSeparator + value;
}
prevKey = key;
}
logger.Info("============================================================");
var metaInfo = firstLine.Split(m_SpaceChar);
session.Method = metaInfo[0];
session.Path = metaInfo[1];
session.HttpVersion = metaInfo[2];
}
///
/// Setups the commands.
///
/// The discovered commands.
///
protected override bool SetupCommands(Dictionary> discoveredCommands)
{
var commands = new List>
{
new HandShake(),
new Text(),
new Binary(),
new Close(),
new Ping(),
new Pong(),
new Continuation(),
new Plain()
};
commands.ForEach(c => discoveredCommands.Add(c.Name, c));
if (!SetupSubProtocols(Config))
return false;
return true;
}
///
/// Executes the command.
///
/// The session.
/// The request info.
protected override void ExecuteCommand(TWebSocketSession session, IWebSocketFragment requestInfo)
{
if (session.InClosing)
{
//Only handle closing handshake if the session is in closing
if (requestInfo.Key != OpCode.CloseTag)
return;
}
base.ExecuteCommand(session, requestInfo);
}
private void ExecuteSubCommand(TWebSocketSession session, SubRequestInfo requestInfo)
{
ISubCommand subCommand;
if (session.SubProtocol.TryGetCommand(requestInfo.Key, out subCommand))
{
session.CurrentCommand = requestInfo.Key;
subCommand.ExecuteCommand(session, requestInfo);
session.PrevCommand = requestInfo.Key;
if (Config.LogCommand && Logger.IsInfoEnabled)
Logger.Info(session, string.Format("Command - {0} - {1}", session.SessionID, requestInfo.Key));
}
else
{
//session.SubProtocol.TryGetCommand("OCPP", out subCommand);
//session.CurrentCommand = "OCPP";
//subCommand.ExecuteCommand(session, requestInfo);
//session.PrevCommand = "OCPP";
session.HandleUnknownCommand(requestInfo);
}
session.LastActiveTime = DateTime.Now;
}
///
/// Broadcasts data to the specified sessions.
///
/// The sessions.
/// The data.
/// The offset.
/// The length.
/// The send feedback.
public void Broadcast(IEnumerable sessions, byte[] data, int offset, int length, Action sendFeedback)
{
IList> encodedPackage = null;
IProtocolProcessor encodingProcessor = null;
foreach (var s in sessions)
{
if (!s.Connected)
continue;
var currentProtocolProcessor = s.ProtocolProcessor;
if (currentProtocolProcessor == null || !currentProtocolProcessor.CanSendBinaryData)
continue;
if (encodedPackage == null || currentProtocolProcessor != encodingProcessor)
{
encodedPackage = currentProtocolProcessor.GetEncodedPackage(OpCode.Binary, data, offset, length);
encodingProcessor = currentProtocolProcessor;
}
Task.Factory.StartNew(SendRawDataToSession, new BroadcastState(s, encodedPackage, sendFeedback));
}
}
///
/// Broadcasts message to the specified sessions.
///
/// The sessions.
/// The message.
/// The send feedback.
public void Broadcast(IEnumerable sessions, string message, Action sendFeedback)
{
IList> encodedPackage = null;
IProtocolProcessor encodingProcessor = null;
foreach (var s in sessions)
{
if (!s.Connected)
continue;
var currentProtocolProcessor = s.ProtocolProcessor;
if (currentProtocolProcessor == null)
continue;
if (encodedPackage == null || encodingProcessor != currentProtocolProcessor)
{
encodedPackage = currentProtocolProcessor.GetEncodedPackage(OpCode.Text, message);
encodingProcessor = currentProtocolProcessor;
}
Task.Factory.StartNew(SendRawDataToSession, new BroadcastState(s, encodedPackage, sendFeedback));
}
}
private void SendRawDataToSession(object state)
{
var param = state as BroadcastState;
var session = param.Session;
var sendFeedback = param.FeedbackFunc;
var sendOk = false;
try
{
sendOk = session.TrySendRawData(param.Data);
}
catch (Exception e)
{
session.Logger.Error(e);
}
sendFeedback(session, sendOk);
}
#region JSON serialize/deserialize
///
/// Serialize the target object by JSON
///
/// The target.
///
public virtual string JsonSerialize(object target)
{
return JsonConvert.SerializeObject(target);
}
///
/// Deserialize the JSON string to target type object.
///
/// The json.
/// The type.
///
public virtual object JsonDeserialize(string json, Type type)
{
return JsonConvert.DeserializeObject(json, type);
}
#endregion
class BroadcastState
{
public TWebSocketSession Session { get; private set; }
public IList> Data { get; private set; }
public Action FeedbackFunc { get; private set; }
public BroadcastState(TWebSocketSession session, IList> data, Action feedbackFunc)
{
Session = session;
Data = data;
FeedbackFunc = feedbackFunc;
}
}
}
}