1 | /*************************************************************************** |
2 | * Copyright (C) 2005-2014 by the Quassel Project * |
3 | * devel@quassel-irc.org * |
4 | * * |
5 | * This program is free software; you can redistribute it and/or modify * |
6 | * it under the terms of the GNU General Public License as published by * |
7 | * the Free Software Foundation; either version 2 of the License, or * |
8 | * (at your option) version 3. * |
9 | * * |
10 | * This program is distributed in the hope that it will be useful, * |
11 | * but WITHOUT ANY WARRANTY; without even the implied warranty of * |
12 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * |
13 | * GNU General Public License for more details. * |
14 | * * |
15 | * You should have received a copy of the GNU General Public License * |
16 | * along with this program; if not, write to the * |
17 | * Free Software Foundation, Inc., * |
18 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * |
19 | ***************************************************************************/ |
20 | |
21 | #include <QtEndian> |
22 | |
23 | #include <QHostAddress> |
24 | #include <QTimer> |
25 | |
26 | #ifdef HAVE_SSL |
27 | # include <QSslSocket> |
28 | #else |
29 | # include <QTcpSocket> |
30 | #endif |
31 | |
32 | #include "remotepeer.h" |
33 | |
34 | using namespace Protocol; |
35 | |
36 | const quint32 maxMessageSize = 64 * 1024 * 1024; // This is uncompressed size. 64 MB should be enough for any sort of initData or backlog chunk |
37 | |
38 | RemotePeer::RemotePeer(::AuthHandler *authHandler, QTcpSocket *socket, Compressor::CompressionLevel level, QObject *parent) |
39 | : Peer(authHandler, parent), |
40 | _socket(socket), |
41 | _compressor(new Compressor(socket, level, this)), |
42 | _signalProxy(0), |
43 | _heartBeatTimer(new QTimer(this)), |
44 | _heartBeatCount(0), |
45 | _lag(0), |
46 | _msgSize(0) |
47 | { |
48 | socket->setParent(this); |
49 | connect(socket, SIGNAL(stateChanged(QAbstractSocket::SocketState)), SLOT(onSocketStateChanged(QAbstractSocket::SocketState))); |
50 | connect(socket, SIGNAL(error(QAbstractSocket::SocketError)), SLOT(onSocketError(QAbstractSocket::SocketError))); |
51 | connect(socket, SIGNAL(disconnected()), SIGNAL(disconnected())); |
52 | |
53 | #ifdef HAVE_SSL |
54 | QSslSocket *sslSocket = qobject_cast<QSslSocket *>(socket); |
55 | if (sslSocket) |
56 | connect(sslSocket, SIGNAL(encrypted()), SIGNAL(secureStateChanged())); |
57 | #endif |
58 | |
59 | connect(_compressor, SIGNAL(readyRead()), SLOT(onReadyRead())); |
60 | connect(_compressor, SIGNAL(error(Compressor::Error)), SLOT(onCompressionError(Compressor::Error))); |
61 | |
62 | connect(_heartBeatTimer, SIGNAL(timeout()), SLOT(sendHeartBeat())); |
63 | } |
64 | |
65 | |
66 | void RemotePeer::onSocketStateChanged(QAbstractSocket::SocketState state) |
67 | { |
68 | if (state == QAbstractSocket::ClosingState) { |
69 | emit statusMessage(tr("Disconnecting..." )); |
70 | } |
71 | } |
72 | |
73 | |
74 | void RemotePeer::onSocketError(QAbstractSocket::SocketError error) |
75 | { |
76 | emit socketError(error, socket()->errorString()); |
77 | } |
78 | |
79 | |
80 | void RemotePeer::onCompressionError(Compressor::Error error) |
81 | { |
82 | close(QString("Compression error %1" ).arg(error)); |
83 | } |
84 | |
85 | |
86 | QString RemotePeer::description() const |
87 | { |
88 | if (socket()) |
89 | return socket()->peerAddress().toString(); |
90 | |
91 | return QString(); |
92 | } |
93 | |
94 | |
95 | ::SignalProxy *RemotePeer::signalProxy() const |
96 | { |
97 | return _signalProxy; |
98 | } |
99 | |
100 | |
101 | void RemotePeer::setSignalProxy(::SignalProxy *proxy) |
102 | { |
103 | if (proxy == _signalProxy) |
104 | return; |
105 | |
106 | if (!proxy) { |
107 | _heartBeatTimer->stop(); |
108 | disconnect(signalProxy(), 0, this, 0); |
109 | _signalProxy = 0; |
110 | if (isOpen()) |
111 | close(); |
112 | } |
113 | else { |
114 | if (signalProxy()) { |
115 | qWarning() << Q_FUNC_INFO << "Setting another SignalProxy not supported, ignoring!" ; |
116 | return; |
117 | } |
118 | _signalProxy = proxy; |
119 | connect(proxy, SIGNAL(heartBeatIntervalChanged(int)), SLOT(changeHeartBeatInterval(int))); |
120 | _heartBeatTimer->setInterval(proxy->heartBeatInterval() * 1000); |
121 | _heartBeatTimer->start(); |
122 | } |
123 | } |
124 | |
125 | |
126 | void RemotePeer::changeHeartBeatInterval(int secs) |
127 | { |
128 | if(secs <= 0) |
129 | _heartBeatTimer->stop(); |
130 | else { |
131 | _heartBeatTimer->setInterval(secs * 1000); |
132 | _heartBeatTimer->start(); |
133 | } |
134 | } |
135 | |
136 | |
137 | int RemotePeer::lag() const |
138 | { |
139 | return _lag; |
140 | } |
141 | |
142 | |
143 | QTcpSocket *RemotePeer::socket() const |
144 | { |
145 | return _socket; |
146 | } |
147 | |
148 | |
149 | bool RemotePeer::isSecure() const |
150 | { |
151 | if (socket()) { |
152 | if (isLocal()) |
153 | return true; |
154 | #ifdef HAVE_SSL |
155 | QSslSocket *sslSocket = qobject_cast<QSslSocket *>(socket()); |
156 | if (sslSocket && sslSocket->isEncrypted()) |
157 | return true; |
158 | #endif |
159 | } |
160 | return false; |
161 | } |
162 | |
163 | |
164 | bool RemotePeer::isLocal() const |
165 | { |
166 | if (socket()) { |
167 | if (socket()->peerAddress() == QHostAddress::LocalHost || socket()->peerAddress() == QHostAddress::LocalHostIPv6) |
168 | return true; |
169 | } |
170 | return false; |
171 | } |
172 | |
173 | |
174 | bool RemotePeer::isOpen() const |
175 | { |
176 | return socket() && socket()->state() == QTcpSocket::ConnectedState; |
177 | } |
178 | |
179 | |
180 | void RemotePeer::close(const QString &reason) |
181 | { |
182 | if (!reason.isEmpty()) { |
183 | qWarning() << "Disconnecting:" << reason; |
184 | } |
185 | |
186 | if (socket() && socket()->state() != QTcpSocket::UnconnectedState) { |
187 | socket()->disconnectFromHost(); |
188 | } |
189 | } |
190 | |
191 | |
192 | void RemotePeer::onReadyRead() |
193 | { |
194 | QByteArray msg; |
195 | while (readMessage(msg)) |
196 | processMessage(msg); |
197 | } |
198 | |
199 | |
200 | bool RemotePeer::readMessage(QByteArray &msg) |
201 | { |
202 | if (_msgSize == 0) { |
203 | if (_compressor->bytesAvailable() < 4) |
204 | return false; |
205 | _compressor->read((char*)&_msgSize, 4); |
206 | _msgSize = qFromBigEndian<quint32>(_msgSize); |
207 | |
208 | if (_msgSize > maxMessageSize) { |
209 | close("Peer tried to send package larger than max package size!" ); |
210 | return false; |
211 | } |
212 | |
213 | if (_msgSize == 0) { |
214 | close("Peer tried to send an empty message!" ); |
215 | return false; |
216 | } |
217 | } |
218 | |
219 | if (_compressor->bytesAvailable() < _msgSize) { |
220 | emit transferProgress(socket()->bytesAvailable(), _msgSize); |
221 | return false; |
222 | } |
223 | |
224 | emit transferProgress(_msgSize, _msgSize); |
225 | |
226 | msg.resize(_msgSize); |
227 | qint64 bytesRead = _compressor->read(msg.data(), _msgSize); |
228 | if (bytesRead != _msgSize) { |
229 | close("Premature end of data stream!" ); |
230 | return false; |
231 | } |
232 | |
233 | _msgSize = 0; |
234 | return true; |
235 | } |
236 | |
237 | |
238 | void RemotePeer::writeMessage(const QByteArray &msg) |
239 | { |
240 | quint32 size = qToBigEndian<quint32>(msg.size()); |
241 | _compressor->write((const char*)&size, 4, Compressor::NoFlush); |
242 | _compressor->write(msg.constData(), msg.size()); |
243 | } |
244 | |
245 | |
246 | void RemotePeer::handle(const HeartBeat &heartBeat) |
247 | { |
248 | dispatch(HeartBeatReply(heartBeat.timestamp)); |
249 | } |
250 | |
251 | |
252 | void RemotePeer::handle(const HeartBeatReply &heartBeatReply) |
253 | { |
254 | _heartBeatCount = 0; |
255 | #if QT_VERSION >= 0x040700 |
256 | emit lagUpdated(heartBeatReply.timestamp.msecsTo(QDateTime::currentDateTime().toUTC()) / 2); |
257 | #else |
258 | emit lagUpdated(heartBeatReply.timestamp.time().msecsTo(QDateTime::currentDateTime().toUTC().time()) / 2); |
259 | #endif |
260 | } |
261 | |
262 | |
263 | void RemotePeer::sendHeartBeat() |
264 | { |
265 | if (signalProxy()->maxHeartBeatCount() > 0 && _heartBeatCount >= signalProxy()->maxHeartBeatCount()) { |
266 | qWarning() << "Disconnecting peer:" << description() |
267 | << "(didn't receive a heartbeat for over" << _heartBeatCount *_heartBeatTimer->interval() / 1000 << "seconds)" ; |
268 | socket()->close(); |
269 | _heartBeatTimer->stop(); |
270 | return; |
271 | } |
272 | |
273 | if (_heartBeatCount > 0) { |
274 | _lag = _heartBeatCount * _heartBeatTimer->interval(); |
275 | emit lagUpdated(_lag); |
276 | } |
277 | |
278 | dispatch(HeartBeat(QDateTime::currentDateTime().toUTC())); |
279 | ++_heartBeatCount; |
280 | } |
281 | |