1/***************************************************************************
2 * Copyright (C) 2005-2014 by the Quassel Project *
3 * devel@quassel-irc.org *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) version 3. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the *
17 * Free Software Foundation, Inc., *
18 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. *
19 ***************************************************************************/
20
21#include "compressor.h"
22
23#include <QTcpSocket>
24#include <QTimer>
25
26#ifdef HAVE_ZLIB
27# include <zlib.h>
28#else
29# define MINIZ_HEADER_FILE_ONLY
30# include "../../3rdparty/miniz/miniz.c"
31#endif
32
33const int maxBufferSize = 64 * 1024 * 1024; // protect us from zip bombs
34const int ioBufferSize = 64 * 1024; // chunk size for inflate/deflate; should not be too large as we preallocate that space!
35
36Compressor::Compressor(QTcpSocket *socket, Compressor::CompressionLevel level, QObject *parent)
37 : QObject(parent),
38 _socket(socket),
39 _level(level),
40 _inflater(0),
41 _deflater(0)
42{
43 connect(socket, SIGNAL(readyRead()), SLOT(readData()));
44
45 bool ok = true;
46 if (level != NoCompression)
47 ok = initStreams();
48
49 if (!ok) {
50 // something went wrong during initialization... but we can only emit an error after RemotePeer has connected its signal
51 QTimer::singleShot(0, this, SIGNAL(error()));
52 return;
53 }
54
55 // It's possible that more data has already arrived during the handshake, so readyRead() wouldn't be triggered.
56 // However, we want to give RemotePeer a chance to connect to our signals, so trigger this asynchronously.
57 if (socket->bytesAvailable())
58 QTimer::singleShot(0, this, SLOT(readData()));
59}
60
61
62Compressor::~Compressor()
63{
64 // release resources allocated by zlib
65 if (_inflater) {
66 inflateEnd(_inflater);
67 delete _inflater;
68 }
69 if (_deflater) {
70 deflateEnd(_deflater);
71 delete _deflater;
72 }
73}
74
75
76bool Compressor::initStreams()
77{
78 int zlevel;
79 switch(compressionLevel()) {
80 case BestCompression:
81 zlevel = 9;
82 break;
83 case BestSpeed:
84 zlevel = 1;
85 break;
86 default:
87 zlevel = Z_DEFAULT_COMPRESSION;
88 }
89
90 _inflater = new z_stream;
91 memset(_inflater, 0, sizeof(z_stream));
92 if (Z_OK != inflateInit(_inflater)) {
93 qWarning() << "Could not initialize the inflate stream!";
94 return false;
95 }
96
97 _deflater = new z_stream;
98 memset(_deflater, 0, sizeof(z_stream));
99 if (Z_OK != deflateInit(_deflater, zlevel)) {
100 qWarning() << "Could not intialize the deflate stream!";
101 return false;
102 }
103
104 _inputBuffer.reserve(ioBufferSize); // pre-allocate space
105 _outputBuffer.resize(ioBufferSize); // not a typo; we never change the size of this buffer anyway (we *do* for _inputBuffer!)
106
107 qDebug() << "Enabling compression...";
108
109 return true;
110}
111
112
113
114qint64 Compressor::bytesAvailable() const
115{
116 return _readBuffer.size();
117}
118
119
120qint64 Compressor::read(char *data, qint64 maxSize)
121{
122 if (maxSize <= 0)
123 maxSize = _readBuffer.size();
124
125 qint64 n = qMin(maxSize, (qint64)_readBuffer.size());
126 memcpy(data, _readBuffer.constData(), n);
127
128 // TODO: don't copy for every read
129 if (n == _readBuffer.size())
130 _readBuffer.clear();
131 else
132 _readBuffer = _readBuffer.mid(n);
133
134 // If there's still data left in the socket buffer, make sure to schedule a read
135 if (_socket->bytesAvailable())
136 QTimer::singleShot(0, this, SLOT(readData()));
137
138 return n;
139}
140
141
142// The usual usage pattern is to write a blocksize first, followed by the actual data.
143// By setting NoFlush, one can indicate that the write buffer should not immediately be
144// written, which should make things a bit more efficient.
145qint64 Compressor::write(const char *data, qint64 count, WriteBufferHint flush)
146{
147 int pos = _writeBuffer.size();
148 _writeBuffer.resize(pos + count);
149 memcpy(_writeBuffer.data() + pos, data, count);
150
151 if (flush != NoFlush)
152 writeData();
153
154 return count;
155}
156
157
158void Compressor::readData()
159{
160 // don't try to read more data if we're already closing
161 if (_socket->state() != QAbstractSocket::ConnectedState)
162 return;
163
164 if (!_socket->bytesAvailable() || _readBuffer.size() >= maxBufferSize)
165 return;
166
167 if (compressionLevel() == NoCompression) {
168 _readBuffer.append(_socket->read(maxBufferSize - _readBuffer.size()));
169 emit readyRead();
170 return;
171 }
172
173 // We let zlib directly append to the readBuffer, which means we pre-allocate extra space for ioBufferSize.
174 // Afterwards, we'll shrink the buffer appropriately. Since shrinking should not reallocate, the readBuffer's
175 // capacity should over time adapt to the largest message sizes we encounter. However, this is not a bad thing
176 // considering that otherwise (using an intermediate buffer) we'd copy around data for every single message.
177 // TODO: Benchmark if it would still make sense to squeeze the buffer from time to time (e.g. after initial sync)!
178
179 while (_socket->bytesAvailable() && _readBuffer.size() + ioBufferSize < maxBufferSize && _inputBuffer.size() < ioBufferSize) {
180 _readBuffer.resize(_readBuffer.size() + ioBufferSize);
181 _inputBuffer.append(_socket->read(ioBufferSize - _inputBuffer.size()));
182
183 _inflater->next_in = reinterpret_cast<unsigned char *>(_inputBuffer.data());
184 _inflater->avail_in = _inputBuffer.size();
185 _inflater->next_out = reinterpret_cast<unsigned char *>(_readBuffer.data() + _readBuffer.size() - ioBufferSize);
186 _inflater->avail_out = ioBufferSize;
187
188 const unsigned char *orig_out = _inflater->next_out; // so we see if we have actually produced any output
189
190 int status = inflate(_inflater, Z_SYNC_FLUSH); // get as much data as possible
191
192 // adjust input and output buffers
193 _readBuffer.resize(_inflater->next_out - reinterpret_cast<unsigned char *>(_readBuffer.data()));
194 if (_inflater->avail_in > 0)
195 memmove(_inputBuffer.data(), _inflater->next_in, _inflater->avail_in);
196 _inputBuffer.resize(_inflater->avail_in);
197
198 if (_inflater->next_out != orig_out)
199 emit readyRead();
200
201 switch(status) {
202 case Z_NEED_DICT:
203 case Z_DATA_ERROR:
204 case Z_MEM_ERROR:
205 case Z_STREAM_ERROR:
206 qWarning() << "Error while decompressing stream:" << status;
207 emit error(StreamError);
208 return;
209 case Z_BUF_ERROR:
210 // means that we need more input to continue, so this is not an actual error
211 return;
212 case Z_STREAM_END:
213 qWarning() << "Reached end of zlib stream!"; // this should really never happen
214 return;
215 default:
216 // just try to get more out of the stream
217 break;
218 }
219 }
220 //qDebug() << "inflate in:" << _inflater->total_in << "out:" << _inflater->total_out << "ratio:" << (double)_inflater->total_in/_inflater->total_out;
221}
222
223
224void Compressor::writeData()
225{
226 if (compressionLevel() == NoCompression) {
227 _socket->write(_writeBuffer);
228 _writeBuffer.clear();
229 return;
230 }
231
232 _deflater->next_in = reinterpret_cast<unsigned char *>(_writeBuffer.data());
233 _deflater->avail_in = _writeBuffer.size();
234
235 int status;
236 do {
237 _deflater->next_out = reinterpret_cast<unsigned char *>(_outputBuffer.data());
238 _deflater->avail_out = ioBufferSize;
239 status = deflate(_deflater, Z_PARTIAL_FLUSH);
240 if (status != Z_OK && status != Z_BUF_ERROR) {
241 qWarning() << "Error while compressing stream:" << status;
242 emit error(StreamError);
243 return;
244 }
245
246 if (_deflater->avail_out == static_cast<unsigned int>(ioBufferSize))
247 continue; // nothing to write here
248
249 if (!_socket->write(_outputBuffer.constData(), ioBufferSize - _deflater->avail_out)) {
250 qWarning() << "Error while writing to socket:" << _socket->errorString();
251 emit error(DeviceError);
252 return;
253 }
254 } while (_deflater->avail_out == 0); // the output buffer being full is the only reason we should have to loop here!
255
256 if (_deflater->avail_in > 0) {
257 qWarning() << "Oops, something weird happened: data still remaining in write buffer!";
258 emit error(StreamError);
259 }
260
261 _writeBuffer.resize(0);
262
263 //qDebug() << "deflate in:" << _deflater->total_in << "out:" << _deflater->total_out << "ratio:" << (double)_deflater->total_out/_deflater->total_in;
264}
265
266
267void Compressor::flush()
268{
269 if (compressionLevel() == NoCompression && _socket->state() == QAbstractSocket::ConnectedState)
270 _socket->flush();
271
272 // FIXME: missing impl for enabled compression; but then we're not using this method yet
273}
274