Qore Programming Language  0.9.3.2
qore_socket_private.h
1 /* -*- mode: c++; indent-tabs-mode: nil -*- */
2 /*
3  qore_socket_private.h
4 
5  Qore Programming Language
6 
7  Copyright (C) 2003 - 2019 Qore Technologies, s.r.o.
8 
9  Permission is hereby granted, free of charge, to any person obtaining a
10  copy of this software and associated documentation files (the "Software"),
11  to deal in the Software without restriction, including without limitation
12  the rights to use, copy, modify, merge, publish, distribute, sublicense,
13  and/or sell copies of the Software, and to permit persons to whom the
14  Software is furnished to do so, subject to the following conditions:
15 
16  The above copyright notice and this permission notice shall be included in
17  all copies or substantial portions of the Software.
18 
19  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22  AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23  LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
24  FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
25  DEALINGS IN THE SOFTWARE.
26 
27  Note that the Qore library is released under a choice of three open-source
28  licenses: MIT (as above), LGPL 2+, or GPL 2+; see README-LICENSE for more
29  information.
30 */
31 
32 #ifndef _QORE_QORE_SOCKET_PRIVATE_H
33 #define _QORE_QORE_SOCKET_PRIVATE_H
34 
35 #include "qore/intern/SSLSocketHelper.h"
36 
37 #include "qore/intern/QC_Queue.h"
38 
39 #include <cctype>
40 #include <cctype>
41 #include <cerrno>
42 #include <cstdlib>
43 #include <cstring>
44 #include <strings.h>
45 
46 #include <openssl/ssl.h>
47 #include <openssl/err.h>
48 
49 #if defined HAVE_POLL
50 #include <poll.h>
51 #elif defined HAVE_SYS_SELECT_H
52 #include <sys/select.h>
53 #elif (defined _WIN32 || defined __WIN32__) && ! defined __CYGWIN__
54 #define HAVE_SELECT 1
55 #else
56 #error no async socket I/O APIs available
57 #endif
58 
59 #ifndef DEFAULT_SOCKET_BUFSIZE
60 #define DEFAULT_SOCKET_BUFSIZE 4096
61 #endif
62 
63 #ifndef QORE_MAX_HEADER_SIZE
64 #define QORE_MAX_HEADER_SIZE 16384
65 #endif
66 
67 #define CHF_HTTP11 (1 << 0)
68 #define CHF_PROCESS (1 << 1)
69 #define CHF_REQUEST (1 << 2)
70 
71 #ifndef DEFAULT_SOCKET_MIN_THRESHOLD_BYTES
72 #define DEFAULT_SOCKET_MIN_THRESHOLD_BYTES 1024
73 #endif
74 
75 DLLLOCAL void concat_target(QoreString& str, const struct sockaddr *addr, const char* type = "target");
76 DLLLOCAL int do_read_error(qore_offset_t rc, const char* method_name, int timeout_ms, ExceptionSink* xsink);
77 DLLLOCAL int sock_get_raw_error();
78 DLLLOCAL int sock_get_error();
79 DLLLOCAL void qore_socket_error(ExceptionSink* xsink, const char* err, const char* cdesc, const char* mname = 0, const char* host = 0, const char* svc = 0, const struct sockaddr *addr = 0);
80 DLLLOCAL void qore_socket_error_intern(int rc, ExceptionSink* xsink, const char* err, const char* cdesc, const char* mname = 0, const char* host = 0, const char* svc = 0, const struct sockaddr *addr = 0);
81 DLLLOCAL void se_in_op(const char* cname, const char* meth, ExceptionSink* xsink);
82 DLLLOCAL void se_in_op_thread(const char* cname, const char* meth, ExceptionSink* xsink);
83 DLLLOCAL void se_not_open(const char* cname, const char* meth, ExceptionSink* xsink);
84 DLLLOCAL void se_timeout(const char* cname, const char* meth, int timeout_ms, ExceptionSink* xsink);
85 DLLLOCAL void se_closed(const char* cname, const char* mname, ExceptionSink* xsink);
86 
87 #ifdef _Q_WINDOWS
88 #define GETSOCKOPT_ARG_4 char*
89 #define SETSOCKOPT_ARG_4 const char*
90 #define SHUTDOWN_ARG SD_BOTH
91 #define QORE_INVALID_SOCKET ((int)INVALID_SOCKET)
92 #define QORE_SOCKET_ERROR SOCKET_ERROR
93 DLLLOCAL int check_windows_rc(int rc);
94 
95 #ifndef ECONNRESET
96 #define ECONNRESET WSAECONNRESET
97 #endif
98 
99 #else
100 // UNIX/Cygwin
101 #define GETSOCKOPT_ARG_4 void*
102 #define SETSOCKOPT_ARG_4 void*
103 #define SHUTDOWN_ARG SHUT_RDWR
104 #define QORE_INVALID_SOCKET -1
105 #define QORE_SOCKET_ERROR -1
106 #endif
107 
108 hashdecl qore_socketsource_private {
109  QoreStringNode* address;
110  QoreStringNode* hostname;
111 
112  DLLLOCAL qore_socketsource_private() : address(0), hostname(0) {
113  }
114 
115  DLLLOCAL ~qore_socketsource_private() {
116  if (address) address->deref();
117  if (hostname) hostname->deref();
118  }
119 
120  DLLLOCAL void setAddress(QoreStringNode* addr) {
121  assert(!address);
122  address = addr;
123  }
124 
125  DLLLOCAL void setAddress(const char* addr) {
126  assert(!address);
127  address = new QoreStringNode(addr);
128  }
129 
130  DLLLOCAL void setHostName(const char* host) {
131  assert(!hostname);
132  hostname = new QoreStringNode(host);
133  }
134 
135  DLLLOCAL void setAll(QoreObject* o, ExceptionSink* xsink) {
136  if (address) {
137  o->setValue("source", address, xsink);
138  address = 0;
139  }
140 
141  if (hostname) {
142  o->setValue("source_host", hostname, xsink);
143  hostname = 0;
144  }
145  }
146 };
147 
148 class OptionalNonBlockingHelper {
149 public:
150  qore_socket_private& sock;
151  ExceptionSink* xsink;
152  bool set;
153 
154  DLLLOCAL OptionalNonBlockingHelper(qore_socket_private& s, bool n_set, ExceptionSink* xs);
155  DLLLOCAL ~OptionalNonBlockingHelper();
156 };
157 
158 class PrivateQoreSocketTimeoutBase {
159 protected:
160  hashdecl qore_socket_private* sock;
161  int64 start;
162 
163 public:
164  DLLLOCAL PrivateQoreSocketTimeoutBase(qore_socket_private* s) : sock(s), start(sock ? q_clock_getmicros() : 0) {
165  }
166 };
167 
168 class PrivateQoreSocketTimeoutHelper : public PrivateQoreSocketTimeoutBase {
169 protected:
170  const char* op;
171 public:
172  DLLLOCAL PrivateQoreSocketTimeoutHelper(qore_socket_private* s, const char* op);
173  DLLLOCAL ~PrivateQoreSocketTimeoutHelper();
174 };
175 
176 class PrivateQoreSocketThroughputHelper : public PrivateQoreSocketTimeoutBase {
177 protected:
178  bool send;
179 public:
180  DLLLOCAL PrivateQoreSocketThroughputHelper(qore_socket_private* s, bool snd);
181  DLLLOCAL ~PrivateQoreSocketThroughputHelper();
182 
183  DLLLOCAL void finalize(int64 bytes);
184 };
185 
186 hashdecl qore_socket_private;
187 
188 hashdecl qore_socket_op_helper {
189 protected:
190  qore_socket_private* s;
191 
192 public:
193  DLLLOCAL qore_socket_op_helper(qore_socket_private* sock);
194  DLLLOCAL ~qore_socket_op_helper();
195 };
196 
197 class SSLSocketHelperHelper {
198 protected:
199  qore_socket_private* s;
200  SSLSocketHelper* ssl;
201  bool context_saved = false;
202 
203 public:
204  DLLLOCAL SSLSocketHelperHelper(qore_socket_private* sock, bool set_thread_context = false);
205 
206  DLLLOCAL ~SSLSocketHelperHelper();
207 
208  DLLLOCAL void error();
209 };
210 
211 hashdecl qore_socket_private {
212  friend class PrivateQoreSocketTimeoutHelper;
213  friend class PrivateQoreSocketThroughputHelper;
214 
215  // for client certificate capture
216  static thread_local qore_socket_private* current_socket;
217 
218  int sock, sfamily, port, stype, sprot;
219 
220  // issue #3558: connection sequence to show when a connection has been reestablished
221  int64 connection_id = 0;
222 
223  const QoreEncoding* enc;
224 
225  std::string socketname;
226  // issue #3053: client target for SNI
227  std::string client_target;
228  SSLSocketHelper* ssl = nullptr;
229  Queue* cb_queue = nullptr,
230  * warn_queue = nullptr;
231 
232  // socket buffer for buffered reads
233  char rbuf[DEFAULT_SOCKET_BUFSIZE];
234 
235  // current buffer size
236  size_t buflen = 0,
237  bufoffset = 0;
238 
239  int64 tl_warning_us = 0; // timeout threshold for network action warning in microseconds
240  double tp_warning_bs = 0; // throughput warning threshold in B/s
241  int64 tp_bytes_sent = 0, // throughput: bytes sent
242  tp_bytes_recv = 0, // throughput: bytes received
243  tp_us_sent = 0, // throughput: time sending
244  tp_us_recv = 0, // throughput: time receiving
245  tp_us_min = 0 // throughput: minimum time for transfer to be considered
246  ;
247 
248  QoreValue callback_arg;
249  bool del = false,
250  http_exp_chunked_body = false,
251  ssl_accept_all_certs = false,
252  ssl_capture_remote_cert = false;
253  int in_op = -1,
254  ssl_verify_mode = SSL_VERIFY_NONE;
255 
256  // issue #3512: the remote certificate captured
257  QoreObject* remote_cert = nullptr;
258 
259  DLLLOCAL qore_socket_private(int n_sock = QORE_INVALID_SOCKET, int n_sfamily = AF_UNSPEC, int n_stype = SOCK_STREAM, int n_prot = 0, const QoreEncoding* n_enc = QCS_DEFAULT) :
260  sock(n_sock), sfamily(n_sfamily), port(-1), stype(n_stype), sprot(n_prot), enc(n_enc) {
261  }
262 
263  DLLLOCAL ~qore_socket_private() {
264  close_internal();
265 
266  // must be dereferenced and removed before deleting
267  assert(!cb_queue);
268  assert(!warn_queue);
269  }
270 
271  DLLLOCAL bool isOpen() {
272  return sock != QORE_INVALID_SOCKET;
273  }
274 
275  DLLLOCAL int close() {
276  int rc = close_internal();
277  if (in_op >= 0)
278  in_op = -1;
279  if (http_exp_chunked_body)
280  http_exp_chunked_body = false;
281  sfamily = AF_UNSPEC;
282  stype = SOCK_STREAM;
283  sprot = 0;
284 
285  return rc;
286  }
287 
288  DLLLOCAL int close_and_reset() {
289  assert(sock != QORE_INVALID_SOCKET);
290  int rc;
291  while (true) {
292 #ifdef _Q_WINDOWS
293  rc = ::closesocket(sock);
294 #else
295  rc = ::close(sock);
296 #endif
297  // try again if close was interrupted by a signal
298  if (!rc || sock_get_error() != EINTR)
299  break;
300  }
301  //printd(5, "qore_socket_private::close_and_reset(this: %p) close(%d) returned %d\n", this, sock, rc);
302  sock = QORE_INVALID_SOCKET;
303  if (buflen)
304  buflen = 0;
305  if (bufoffset)
306  bufoffset = 0;
307  if (del)
308  del = false;
309  if (port != -1)
310  port = -1;
311  // issue #3053: clear hostname for SNI
312  client_target.clear();
313  return rc;
314  }
315 
316  DLLLOCAL int close_internal() {
317  //printd(5, "qore_socket_private::close_internal(this: %p) sock: %d\n", this, sock);
318  if (remote_cert) {
319  remote_cert->deref(nullptr);
320  remote_cert = nullptr;
321  }
322  if (sock >= 0) {
323  // if an SSL connection has been established, shut it down first
324  if (ssl) {
325  ssl->shutdown();
326  ssl->deref();
327  ssl = nullptr;
328  }
329 
330  if (!socketname.empty()) {
331  if (del)
332  unlink(socketname.c_str());
333  socketname.clear();
334  }
335  do_close_event();
336  // issue #3558: increment the connection sequence here. so the connection sequence is different as soon as
337  // it's closed
338  ++connection_id;
339 
340  return close_and_reset();
341  } else {
342  return 0;
343  }
344  }
345 
346  DLLLOCAL int getSendTimeout() const {
347  hashdecl timeval tv;
348 
349 #if defined(HPUX) && defined(__ia64) && defined(__LP64__)
350  // on HPUX 64-bit the OS defines socklen_t to be 8 bytes
351  // but the library expects a 32-bit value
352  int size = sizeof(hashdecl timeval);
353 #else
354  socklen_t size = sizeof(hashdecl timeval);
355 #endif
356 
357  if (getsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, (GETSOCKOPT_ARG_4)&tv, (socklen_t *)&size))
358  return -1;
359 
360  return tv.tv_sec * 1000 + tv.tv_usec / 1000;
361  }
362 
363  DLLLOCAL int getRecvTimeout() const {
364  hashdecl timeval tv;
365 
366 #if defined(HPUX) && defined(__ia64) && defined(__LP64__)
367  // on HPUX 64-bit the OS defines socklen_t to be 8 bytes
368  // but the library expects a 32-bit value
369  int size = sizeof(hashdecl timeval);
370 #else
371  socklen_t size = sizeof(hashdecl timeval);
372 #endif
373 
374  if (getsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (GETSOCKOPT_ARG_4)&tv, (socklen_t *)&size))
375  return -1;
376 
377  return tv.tv_sec * 1000 + tv.tv_usec / 1000;
378  }
379 
380  DLLLOCAL int getPort() {
381  // if we don't need to find out what port we are, then return current value
382  if (sock == QORE_INVALID_SOCKET || (sfamily != AF_INET && sfamily != AF_INET6) || port > 0)
383  return port;
384 
385  // otherwise find out what port we're connected to
386  hashdecl sockaddr_storage addr;
387 #if defined(HPUX) && defined(__ia64) && defined(__LP64__)
388  // on HPUX 64-bit the OS defines socklen_t to be 8 bytes, but the library expects a 32-bit value
389  int size = sizeof addr;
390 #else
391  socklen_t size = sizeof addr;
392 #endif
393 
394  if (getsockname(sock, (struct sockaddr *)&addr, (socklen_t *)&size) < 0)
395  return -1;
396 
397  port = q_get_port_from_addr((const struct sockaddr *)&addr);
398  return port;
399  }
400 
401  DLLLOCAL static void do_header(const char* key, QoreString& hdr, const QoreValue& v) {
402  switch (v.getType()) {
403  case NT_STRING:
404  hdr.sprintf("%s: %s\r\n", key, v.get<const QoreStringNode>()->c_str());
405  break;
406  case NT_INT:
407  hdr.sprintf("%s: " QLLD "\r\n", key, v.getAsBigInt());
408  break;
409  case NT_FLOAT: {
410  hdr.sprintf("%s: ", key);
411  size_t offset = hdr.size();
412  hdr.sprintf("%f\r\n", v.getAsFloat());
413  // issue 1556: external modules that call setlocale() can change
414  // the decimal point character used here from '.' to ','
415  // only search the double added, QoreString::sprintf() concatenates
416  q_fix_decimal(&hdr, offset);
417  break;
418  }
419  case NT_NUMBER:
420  hdr.sprintf("%s: ", key);
421  v.get<const QoreNumberNode>()->toString(hdr);
422  hdr.concat("\r\n");
423  break;
424  case NT_BOOLEAN:
425  hdr.sprintf("%s: %d\r\n", key, (int)v.getAsBool());
426  break;
427  }
428  }
429 
430  DLLLOCAL static void do_headers(QoreString& hdr, const QoreHashNode* headers, qore_size_t size, bool addsize = false) {
431  // RFC-2616 4.4 (http://tools.ietf.org/html/rfc2616#section-4.4)
432  // add Content-Length: 0 to headers for responses without a body where there is no transfer-encoding
433  if (headers) {
434  ConstHashIterator hi(headers);
435 
436  while (hi.next()) {
437  const QoreValue v = hi.get();
438  const char* key = hi.getKey();
439  if (addsize && !strcasecmp(key, "transfer-encoding"))
440  addsize = false;
441  if (addsize && !strcasecmp(key, "content-length"))
442  addsize = false;
443  if (v.getType() == NT_LIST) {
444  ConstListIterator li(v.get<const QoreListNode>());
445  while (li.next())
446  do_header(key, hdr, li.getValue());
447  }
448  else
449  do_header(key, hdr, v);
450  }
451  }
452  // add data and content-length header if necessary
453  if (size || addsize)
454  hdr.sprintf("Content-Length: " QSD "\r\n", size);
455 
456  hdr.concat("\r\n");
457  }
458 
459  DLLLOCAL int listen(int backlog = 20) {
460  if (sock == QORE_INVALID_SOCKET)
461  return QSE_NOT_OPEN;
462  if (in_op >= 0)
463  return QSE_IN_OP;
464 #ifdef _Q_WINDOWS
465  if (::listen(sock, backlog)) {
466  // set errno
467  sock_get_error();
468  return -1;
469  }
470  return 0;
471 #else
472  return ::listen(sock, backlog);
473 #endif
474  }
475 
476  DLLLOCAL int accept_intern(ExceptionSink* xsink, struct sockaddr *addr, socklen_t *size, int timeout_ms = -1) {
477  //printd(5, "qore_socket_private::accept_intern() to: %d\n", timeout_ms);
478  assert(xsink);
479  while (true) {
480  if (timeout_ms >= 0 && !isDataAvailable(timeout_ms, "accept", xsink)) {
481  if (*xsink)
482  return -1;
483  // do not throw exception here, NOTHING will be returned in Qore on timeout
484  return QSE_TIMEOUT; // -3
485  }
486 
487  int rc = ::accept(sock, addr, size);
488  if (rc != QORE_INVALID_SOCKET)
489  return rc;
490 
491  // retry if interrupted by a signal
492  if (sock_get_error() == EINTR)
493  continue;
494 
495  qore_socket_error(xsink, "SOCKET-ACCEPT-ERROR", "error in accept()", 0, 0, 0, addr);
496  return -1;
497  }
498  }
499 
500  // returns a new socket
501  DLLLOCAL int accept_internal(ExceptionSink* xsink, SocketSource *source, int timeout_ms = -1) {
502  assert(xsink);
503  if (sock == QORE_INVALID_SOCKET) {
504  // FIXME: remove check
505  if (xsink)
506  xsink->raiseException("SOCKET-NOT-OPEN", "socket must be opened, bound, and in a listening state before new connections can be accepted");
507  return QSE_NOT_OPEN;
508  }
509  if (in_op >= 0) {
510  if (in_op == gettid()) {
511  se_in_op("Socket", "accept", xsink);
512  return QSE_IN_OP;
513  }
514  se_in_op_thread("Socket", "accept", xsink);
515  return QSE_IN_OP_THREAD;
516  }
517 
518  int rc;
519  if (sfamily == AF_UNIX) {
520 #ifdef _Q_WINDOWS
521  // FIXME: remove check
522  if (xsink)
523  xsink->raiseException("SOCKET-ACCEPT-ERROR", "UNIX sockets are not available under Windows");
524  return -1;
525 #else
526  hashdecl sockaddr_un addr_un;
527 
528 #if defined(HPUX) && defined(__ia64) && defined(__LP64__)
529  // on HPUX 64-bit the OS defines socklen_t to be 8 bytes
530  // but the library expects a 32-bit value
531  int size = sizeof(hashdecl sockaddr_un);
532 #else
533  socklen_t size = sizeof(hashdecl sockaddr_un);
534 #endif
535  rc = accept_intern(xsink, (struct sockaddr *)&addr_un, (socklen_t *)&size, timeout_ms);
536  //printd(1, "qore_socket_private::accept_internal() " QSD " bytes returned\n", size);
537 
538  if (rc >= 0 && source) {
539  QoreStringNode* addr = new QoreStringNode(enc);
540  addr->sprintf("UNIX socket: %s", socketname.c_str());
541  source->priv->setAddress(addr);
542  source->priv->setHostName("localhost");
543  }
544 #endif // windows
545  } else if (sfamily == AF_INET || sfamily == AF_INET6) {
546  hashdecl sockaddr_storage addr_in;
547 #if defined(HPUX) && defined(__ia64) && defined(__LP64__)
548  // on HPUX 64-bit the OS defines socklen_t to be 8 bytes
549  // but the library expects a 32-bit value
550  int size = sizeof(addr_in);
551 #else
552  socklen_t size = sizeof(addr_in);
553 #endif
554 
555  rc = accept_intern(xsink, (struct sockaddr *)&addr_in, (socklen_t *)&size, timeout_ms);
556  //printd(1, "qore_socket_private::accept_internal() rc: %d, %d bytes returned\n", rc, size);
557 
558  if (rc >= 0 && source) {
559  char host[NI_MAXHOST + 1];
560  char service[NI_MAXSERV + 1];
561 
562  if (!getnameinfo((struct sockaddr *)&addr_in, qore_get_in_len((struct sockaddr *)&addr_in), host, sizeof(host), service, sizeof(service), NI_NUMERICSERV)) {
563  source->priv->setHostName(host);
564  }
565 
566  // get ipv4 or ipv6 address
567  char ifname[INET6_ADDRSTRLEN];
568  if (inet_ntop(addr_in.ss_family, qore_get_in_addr((struct sockaddr *)&addr_in), ifname, sizeof(ifname))) {
569  //printd(5, "inet_ntop() '%s' host: '%s'\n", ifname, host);
570  source->priv->setAddress(ifname);
571  }
572  }
573  } else {
574  // should not happen
575  // FIXME: remove check
576  if (xsink)
577  xsink->raiseException("SOCKET-ACCEPT-ERROR", "do not know how to accept connections with address family %d", sfamily);
578  rc = -1;
579  }
580  return rc;
581  }
582 
583  DLLLOCAL void cleanup(ExceptionSink* xsink) {
584  if (cb_queue) {
585  // close the socket before the delete message is put on the queue
586  // the socket would be closed anyway in the destructor
587  close_internal();
588 
589  QoreHashNode* h = new QoreHashNode(autoTypeInfo);
590  h->setKeyValue("event", QORE_EVENT_DELETED, 0);
591  h->setKeyValue("source", QORE_SOURCE_SOCKET, 0);
592  h->setKeyValue("id", (int64)this, 0);
593  cb_queue->pushAndTakeRef(h);
594 
595  // deref and remove event queue
596  cb_queue->deref(xsink);
597  cb_queue = 0;
598  }
599  if (warn_queue) {
600  warn_queue->deref(xsink);
601  warn_queue = 0;
602  if (callback_arg) {
603  callback_arg.discard(xsink);
604  callback_arg = QoreValue();
605  }
606  }
607  }
608 
609  DLLLOCAL void setEventQueue(Queue* cbq, ExceptionSink* xsink) {
610  if (cb_queue)
611  cb_queue->deref(xsink);
612  cb_queue = cbq;
613  }
614 
615  DLLLOCAL void do_start_ssl_event() {
616  if (cb_queue) {
617  QoreHashNode* h = new QoreHashNode(autoTypeInfo);
618  h->setKeyValue("event", QORE_EVENT_START_SSL, 0);
619  h->setKeyValue("source", QORE_SOURCE_SOCKET, 0);
620  h->setKeyValue("id", (int64)this, 0);
621  cb_queue->pushAndTakeRef(h);
622  }
623  }
624 
625  DLLLOCAL void do_ssl_established_event() {
626  if (cb_queue) {
627  QoreHashNode* h = new QoreHashNode(autoTypeInfo);
628  h->setKeyValue("event", QORE_EVENT_SSL_ESTABLISHED, 0);
629  h->setKeyValue("source", QORE_SOURCE_SOCKET, 0);
630  h->setKeyValue("id", (int64)this, 0);
631  h->setKeyValue("cipher", new QoreStringNode(ssl->getCipherName()), 0);
632  h->setKeyValue("cipher_version", new QoreStringNode(ssl->getCipherVersion()), 0);
633  cb_queue->pushAndTakeRef(h);
634  }
635  }
636 
637  DLLLOCAL void do_connect_event(int af, const struct sockaddr* addr, const char* target, const char* service = 0, int prt = -1) {
638  if (cb_queue) {
639  QoreHashNode* h = new QoreHashNode(autoTypeInfo);
640  h->setKeyValue("event", QORE_EVENT_CONNECTING, 0);
641  h->setKeyValue("source", QORE_SOURCE_SOCKET, 0);
642  h->setKeyValue("id", (int64)this, 0);
643  QoreStringNode* str = q_addr_to_string2(addr);
644  if (str)
645  h->setKeyValue("address", str, 0);
646  else
647  h->setKeyValue("error", q_strerror(sock_get_error()), 0);
648  q_af_to_hash(af, *h, 0);
649  h->setKeyValue("target", new QoreStringNode(target), 0);
650  if (service)
651  h->setKeyValue("service", new QoreStringNode(service), 0);
652  if (prt != -1)
653  h->setKeyValue("port", prt, 0);
654  cb_queue->pushAndTakeRef(h);
655  }
656  }
657 
658  DLLLOCAL void do_connected_event() {
659  if (cb_queue) {
660  QoreHashNode* h = new QoreHashNode(autoTypeInfo);
661  h->setKeyValue("event", QORE_EVENT_CONNECTED, 0);
662  h->setKeyValue("source", QORE_SOURCE_SOCKET, 0);
663  h->setKeyValue("id", (int64)this, 0);
664  cb_queue->pushAndTakeRef(h);
665  }
666  }
667 
668  DLLLOCAL void do_chunked_read(int event, qore_size_t bytes, qore_size_t total_read, int source) {
669  if (cb_queue) {
670  QoreHashNode* h = new QoreHashNode(autoTypeInfo);
671  h->setKeyValue("event", event, 0);
672  h->setKeyValue("source", source, 0);
673  h->setKeyValue("id", (int64)this, 0);
674  if (event == QORE_EVENT_HTTP_CHUNKED_DATA_RECEIVED)
675  h->setKeyValue("read", bytes, 0);
676  else
677  h->setKeyValue("size", bytes, 0);
678  h->setKeyValue("total_read", total_read, 0);
679  cb_queue->pushAndTakeRef(h);
680  }
681  }
682 
683  DLLLOCAL void do_read_http_header(int event, const QoreHashNode* headers, int source) {
684  if (cb_queue) {
685  QoreHashNode* h = new QoreHashNode(autoTypeInfo);
686  h->setKeyValue("event", event, 0);
687  h->setKeyValue("source", source, 0);
688  h->setKeyValue("id", (int64)this, 0);
689  h->setKeyValue("headers", headers->hashRefSelf(), 0);
690  cb_queue->pushAndTakeRef(h);
691  }
692  }
693 
694  DLLLOCAL void do_send_http_message(const QoreString& str, const QoreHashNode* headers, int source) {
695  if (cb_queue) {
696  QoreHashNode* h = new QoreHashNode(autoTypeInfo);
697  h->setKeyValue("event", QORE_EVENT_HTTP_SEND_MESSAGE, 0);
698  h->setKeyValue("source", source, 0);
699  h->setKeyValue("id", (int64)this, 0);
700  h->setKeyValue("message", new QoreStringNode(str), 0);
701  //printd(5, "do_send_http_message() str='%s' headers: %p (%d %s)\n", str.getBuffer(), headers, headers->getType(), headers->getTypeName());
702  h->setKeyValue("headers", headers->hashRefSelf(), 0);
703  cb_queue->pushAndTakeRef(h);
704  }
705  }
706 
707  DLLLOCAL void do_close_event() {
708  if (cb_queue) {
709  QoreHashNode* h = new QoreHashNode(autoTypeInfo);
710  h->setKeyValue("event", QORE_EVENT_CHANNEL_CLOSED, 0);
711  h->setKeyValue("source", QORE_SOURCE_SOCKET, 0);
712  h->setKeyValue("id", (int64)this, 0);
713  cb_queue->pushAndTakeRef(h);
714  }
715  }
716 
717  DLLLOCAL void do_read_event(qore_size_t bytes_read, qore_size_t total_read, qore_size_t bufsize = 0) {
718  // post bytes read on event queue, if any
719  if (cb_queue) {
720  QoreHashNode* h = new QoreHashNode(autoTypeInfo);
721  h->setKeyValue("event", QORE_EVENT_PACKET_READ, 0);
722  h->setKeyValue("source", QORE_SOURCE_SOCKET, 0);
723  h->setKeyValue("id", (int64)this, 0);
724  h->setKeyValue("read", bytes_read, 0);
725  h->setKeyValue("total_read", total_read, 0);
726  // set total bytes to read and remaining bytes if bufsize > 0
727  if (bufsize > 0)
728  h->setKeyValue("total_to_read", bufsize, 0);
729  cb_queue->pushAndTakeRef(h);
730  }
731  }
732 
733  DLLLOCAL void do_send_event(int bytes_sent, int total_sent, int bufsize) {
734  // post bytes sent on event queue, if any
735  if (cb_queue) {
736  QoreHashNode* h = new QoreHashNode(autoTypeInfo);
737  h->setKeyValue("event", QORE_EVENT_PACKET_SENT, 0);
738  h->setKeyValue("source", QORE_SOURCE_SOCKET, 0);
739  h->setKeyValue("id", (int64)this, 0);
740  h->setKeyValue("sent", bytes_sent, 0);
741  h->setKeyValue("total_sent", total_sent, 0);
742  h->setKeyValue("total_to_send", bufsize, 0);
743  cb_queue->pushAndTakeRef(h);
744  }
745  }
746 
747  DLLLOCAL void do_resolve_event(const char* host, const char* service = 0) {
748  // post bytes sent on event queue, if any
749  if (cb_queue) {
750  QoreHashNode* h = new QoreHashNode(autoTypeInfo);
751  h->setKeyValue("event", QORE_EVENT_HOSTNAME_LOOKUP, 0);
752  h->setKeyValue("source", QORE_SOURCE_SOCKET, 0);
753  h->setKeyValue("id", (int64)this, 0);
754  if (host)
755  h->setKeyValue("name", new QoreStringNode(host), 0);
756  if (service)
757  h->setKeyValue("service", new QoreStringNode(service), 0);
758  cb_queue->pushAndTakeRef(h);
759  }
760  }
761 
762  DLLLOCAL void do_resolved_event(const struct sockaddr* addr) {
763  // post bytes sent on event queue, if any
764  if (cb_queue) {
765  QoreHashNode* h = new QoreHashNode(autoTypeInfo);
766  h->setKeyValue("event", QORE_EVENT_HOSTNAME_RESOLVED, 0);
767  h->setKeyValue("source", QORE_SOURCE_SOCKET, 0);
768  h->setKeyValue("id", (int64)this, 0);
769  QoreStringNode* str = q_addr_to_string2(addr);
770  if (str)
771  h->setKeyValue("address", str, 0);
772  else
773  h->setKeyValue("error", q_strerror(sock_get_error()), 0);
774  int prt = q_get_port_from_addr(addr);
775  if (prt > 0)
776  h->setKeyValue("port", prt, 0);
777  q_af_to_hash(addr->sa_family, *h, 0);
778  cb_queue->pushAndTakeRef(h);
779  }
780  }
781 
782  DLLLOCAL int64 getObjectIDForEvents() const {
783  return (int64)this;
784  }
785 
786  DLLLOCAL int connectUNIX(const char* p, int sock_type, int protocol, ExceptionSink* xsink) {
787  assert(xsink);
788  assert(p);
789  QORE_TRACE("connectUNIX()");
790 
791 #ifdef _Q_WINDOWS
792  // FIXME: remove check
793  if (xsink)
794  xsink->raiseException("SOCKET-CONNECTUNIX-ERROR", "UNIX sockets are not available under Windows");
795  return -1;
796 #else
797  // close socket if already open
798  close();
799 
800  printd(5, "qore_socket_private::connectUNIX(%s)\n", p);
801 
802  hashdecl sockaddr_un addr;
803 
804  addr.sun_family = AF_UNIX;
805  // copy path and terminate if necessary
806  strncpy(addr.sun_path, p, sizeof(addr.sun_path) - 1);
807  addr.sun_path[sizeof(addr.sun_path) - 1] = '\0';
808  if ((sock = socket(AF_UNIX, sock_type, protocol)) == QORE_SOCKET_ERROR) {
809  // FIXME: remove check
810  if (xsink)
811  xsink->raiseErrnoException("SOCKET-CONNECT-ERROR", errno, "error connecting to UNIX socket: '%s'", p);
812  return -1;
813  }
814 
815  do_connect_event(AF_UNIX, (sockaddr*)&addr, p);
816  while (true) {
817  if (!::connect(sock, (const sockaddr *)&addr, sizeof(struct sockaddr_un)))
818  break;
819 
820  // try again if we were interrupted by a signal
821  if (sock_get_error() == EINTR)
822  continue;
823 
824  // otherwise close the socket and return an exception with the error code
825  // do not have to worry about windows API calls here; this is a UNIX-only function
826  close_and_reset();
827  qore_socket_error(xsink, "SOCKET-CONNECT-ERROR", "error in connect()", 0, p);
828 
829  return -1;
830  }
831 
832  // save file name for deleting when socket is closed
833  socketname = addr.sun_path;
834  sfamily = AF_UNIX;
835 
836  do_connected_event();
837 
838  return 0;
839 #endif // windows
840  }
841 
842  // socket must be open or -1 is returned and a Qore-language exception is raised
843  /* return values:
844  -1: error
845  0: timeout
846  > 0: I/O can continue
847  */
848  DLLLOCAL int asyncIoWait(int timeout_ms, bool read, bool write, const char* cname, const char* mname, ExceptionSink* xsink) const {
849  assert(xsink);
850  assert(read || write);
851  if (sock == QORE_INVALID_SOCKET) {
852  se_not_open(cname, mname, xsink);
853  return -1;
854  }
855 
856  return asyncIoWait(timeout_ms, read, write, xsink);
857  }
858 
859  DLLLOCAL int asyncIoWait(int timeout_ms, bool read, bool write, ExceptionSink* xsink) const {
860  assert(xsink);
861 #if defined HAVE_POLL
862  return poll_intern(xsink, timeout_ms, read, write);
863 #elif defined HAVE_SELECT
864  return select_intern(xsink, timeout_ms, read, write);
865 #else
866 #error no async socket operations supported
867 #endif
868  }
869 
870 #if defined HAVE_POLL
871  DLLLOCAL int poll_intern(ExceptionSink* xsink, int timeout_ms, bool read, bool write) const {
872  int rc;
873  short arg = 0;
874  if (read)
875  arg |= POLLIN;
876  if (write)
877  arg |= POLLOUT;
878  pollfd fds = {sock, arg, 0};
879  while (true) {
880  rc = poll(&fds, 1, timeout_ms);
881  if (rc == -1 && errno == EINTR)
882  continue;
883  break;
884  }
885  if (rc < 0)
886  qore_socket_error(xsink, "SOCKET-SELECT-ERROR", "poll(2) returned an error");
887  else if (!rc && ((fds.revents & POLLHUP) || (fds.revents & (POLLERR|POLLNVAL))))
888  rc = -1;
889 
890  return rc;
891  }
892 #elif defined HAVE_SELECT
893  DLLLOCAL int select_intern(ExceptionSink* xsink, int timeout_ms, bool read, bool write) const {
894  bool aborted = false;
895  int rc = select_intern(xsink, timeout_ms, read, write, aborted);
896  if (rc != QORE_SOCKET_ERROR && aborted)
897  rc = -1;
898  return rc;
899  }
900 
901  DLLLOCAL int select_intern(ExceptionSink* xsink, int timeout_ms, bool read, bool write, bool& aborted) const {
902  assert(xsink);
903  assert(!aborted);
904  // windows does not use FD_SETSIZE to limit the value of the highest socket descriptor in the set
905  // instead it has a maximum of 64 sockets in the set; we only need one anyway
906 #ifndef _Q_WINDOWS
907  // select is inherently broken since it can only handle descriptors < FD_SETSIZE, which is 1024 on Linux for example
908  if (sock >= FD_SETSIZE) {
909  // FIXME: remove check
910  if (xsink)
911  xsink->raiseException("SOCKET-SELECT-ERROR", "fd is %d which is >= %d; contact the Qore developers to implement an alternative to select() on this platform", sock, FD_SETSIZE);
912  return -1;
913  }
914 #endif
915  hashdecl timeval tv;
916  int rc;
917  while (true) {
918  // to be safe, we set the file descriptor arg after each EINTR (required on Linux for example)
919  fd_set sfs, err;
920 
921  FD_ZERO(&sfs);
922  FD_ZERO(&err);
923  FD_SET(sock, &sfs);
924  FD_SET(sock, &err);
925 
926  tv.tv_sec = timeout_ms / 1000;
927  tv.tv_usec = (timeout_ms % 1000) * 1000;
928 
929  fd_set* readfd = read ? &sfs : 0;
930  fd_set* writefd = write ? &sfs : 0;
931 
932  rc = select(sock + 1, readfd, writefd, &err, &tv);
933  //printd(5, "select_intern() rc: %d err: %d\n", rc, FD_ISSET(sock, &err));
934  if (rc != QORE_SOCKET_ERROR) {
935  if (FD_ISSET(sock, &err))
936  aborted = true;
937  break;
938  }
939  if (sock_get_error() != EINTR)
940  break;
941  }
942  if (rc == QORE_SOCKET_ERROR) {
943  // do not close the socket here, even in case of EBADF, just return an error
944  rc = 0;
945  qore_socket_error(xsink, "SOCKET-SELECT-ERROR", "select(2) returned an error");
946  }
947 
948  return rc;
949  }
950 #endif
951 
952  DLLLOCAL bool tryReadSocketData(const char* mname, ExceptionSink* xsink) {
953  assert(xsink);
954  assert(!buflen);
955  if (!ssl) {
956  // issue #3564 see if any data is available on the socket
957  return asyncIoWait(0, true, false, "Socket", mname, xsink);
958  }
959  // select can return true if there is protocol negotiation data available,
960  // so we try to peek 1 byte of applicatoin data with a timeout of 0 with the SSL connection
961  int rc = ssl->doSSLRW(xsink, mname, rbuf, 1, 0, PEEK, false);
962  if (*xsink || (rc == QSE_TIMEOUT)) {
963  return false;
964  }
965  return rc > 0 ? true : false;
966  }
967 
968  DLLLOCAL bool isSocketDataAvailable(int timeout_ms, const char* mname, ExceptionSink* xsink) {
969  return asyncIoWait(timeout_ms, true, false, "Socket", mname, xsink);
970  }
971 
972  DLLLOCAL bool isDataAvailable(int timeout_ms, const char* mname, ExceptionSink* xsink) {
973  if (buflen)
974  return true;
975  return isSocketDataAvailable(timeout_ms, mname, xsink);
976  }
977 
978  DLLLOCAL bool isWriteFinished(int timeout_ms, const char* mname, ExceptionSink* xsink) {
979  return asyncIoWait(timeout_ms, false, true, "Socket", mname, xsink);
980  }
981 
982  DLLLOCAL int close_and_exit() {
983  if (sock != QORE_INVALID_SOCKET)
984  close_and_reset();
985  return -1;
986  }
987 
988  DLLLOCAL int connectINETTimeout(int timeout_ms, const struct sockaddr* ai_addr, qore_size_t ai_addrlen, ExceptionSink* xsink, bool only_timeout) {
989  assert(xsink);
990  PrivateQoreSocketTimeoutHelper toh(this, "connect");
991 
992  while (true) {
993  if (!::connect(sock, ai_addr, ai_addrlen))
994  return 0;
995 
996 #ifdef _Q_WINDOWS
997  if (sock_get_error() != EAGAIN) {
998  qore_socket_error(xsink, "SOCKET-CONNECT-ERROR", "error in connect()", 0, 0, 0, ai_addr);
999  break;
1000  }
1001 #else
1002  // try again if we were interrupted by a signal
1003  if (errno == EINTR)
1004  continue;
1005 
1006  if (errno != EINPROGRESS)
1007  break;
1008 #endif
1009 
1010  //printd(5, "qore_socket_private::connectINETTimeout() timeout_ms: %d errno: %d\n", timeout_ms, errno);
1011 
1012  // check for timeout or connection with EINPROGRESS
1013  while (true) {
1014 #ifdef _Q_WINDOWS
1015  bool aborted = false;
1016  int rc = select_intern(xsink, timeout_ms, false, true, aborted);
1017 
1018  //printd(5, "qore_socket_private::connectINETTimeout() timeout_ms: %d rc: %d aborted: %d\n", timeout_ms, rc, aborted);
1019 
1020  // windows select() returns an error in the error socket set instead of an WSAECONNREFUSED error like UNIX,
1021  // so we simulate it here
1022  if (rc != QORE_SOCKET_ERROR && aborted) {
1023  qore_socket_error(xsink, "SOCKET-CONNECT-ERROR", "error in connect()", 0, 0, 0, ai_addr);
1024  return -1;
1025  }
1026 #else
1027  int rc = asyncIoWait(timeout_ms, false, true, "Socket", "connectINETTimeout", xsink);
1028 #endif
1029  if (*xsink)
1030  return -1;
1031 
1032  //printd(5, "asyncIoWait(%d) returned %d\n", timeout_ms, rc);
1033  if (rc == QORE_SOCKET_ERROR && sock_get_error() != EINTR) {
1034  if (!only_timeout)
1035  qore_socket_error(xsink, "SOCKET-CONNECT-ERROR", "error in asyncIoWait() with Socket::connect() with timeout", 0, 0, 0, ai_addr);
1036  return -1;
1037  } else if (rc > 0) {
1038  // socket selected for write
1039  socklen_t lon = sizeof(int);
1040  int val;
1041 
1042  if (getsockopt(sock, SOL_SOCKET, SO_ERROR, (GETSOCKOPT_ARG_4)(&val), &lon) == QORE_SOCKET_ERROR) {
1043  if (!only_timeout)
1044  qore_socket_error(xsink, "SOCKET-CONNECT-ERROR", "error in getsockopt()", 0, 0, 0, ai_addr);
1045  return -1;
1046  }
1047 
1048  if (val) {
1049  if (only_timeout) {
1050  errno = val;
1051  return -1;
1052  }
1053  qore_socket_error_intern(val, xsink, "SOCKET-CONNECT-ERROR", "error in getsockopt()", 0, 0, 0, ai_addr);
1054  return -1;
1055  }
1056 
1057  // connected successfully within the timeout period
1058  return 0;
1059  } else {
1060  SimpleRefHolder<QoreStringNode> desc(new QoreStringNodeMaker("timeout in connection after %dms", timeout_ms));
1061  concat_target(*(*desc), ai_addr);
1062  // FIXME: remove check
1063  if (xsink)
1064  xsink->raiseException("SOCKET-CONNECT-ERROR", desc.release());
1065  return -1;
1066  }
1067  }
1068  }
1069 
1070  return -1;
1071  }
1072 
1073  DLLLOCAL int sock_errno_err(const char* err, const char* desc, ExceptionSink* xsink) {
1074  sock = QORE_INVALID_SOCKET;
1075  qore_socket_error(xsink, err, desc);
1076  return -1;
1077  }
1078 
1079  DLLLOCAL int set_non_blocking(bool non_blocking, ExceptionSink* xsink) {
1080  assert(xsink);
1081 
1082  // ignore call when socket already closed
1083  if (sock == QORE_INVALID_SOCKET) {
1084  assert(!xsink || *xsink);
1085  return -1;
1086  }
1087 
1088 #ifdef _Q_WINDOWS
1089  u_long mode = non_blocking ? 1 : 0;
1090  int rc = ioctlsocket(sock, FIONBIO, &mode);
1091  if (check_windows_rc(rc))
1092  return sock_errno_err("SOCKET-CONNECT-ERROR", "error in ioctlsocket(FIONBIO)", xsink);
1093 #else
1094  int arg;
1095 
1096  // get socket descriptor status flags
1097  if ((arg = fcntl(sock, F_GETFL, 0)) < 0)
1098  return sock_errno_err("SOCKET-CONNECT-ERROR", "error in fcntl() getting socket descriptor status flag", xsink);
1099 
1100  if (non_blocking) // set non-blocking
1101  arg |= O_NONBLOCK;
1102  else // set blocking
1103  arg &= ~O_NONBLOCK;
1104 
1105  if (fcntl(sock, F_SETFL, arg) < 0)
1106  return sock_errno_err("SOCKET-CONNECT-ERROR", "error in fcntl() setting socket descriptor status flag", xsink);
1107 #endif
1108 
1109  return 0;
1110  }
1111 
1112  DLLLOCAL int connectINET(const char* host, const char* service, int timeout_ms, ExceptionSink* xsink, int family = AF_UNSPEC, int type = SOCK_STREAM, int protocol = 0) {
1113  assert(xsink);
1114  family = q_get_af(family);
1115  type = q_get_sock_type(type);
1116 
1117  QORE_TRACE("qore_socket_private::connectINET()");
1118 
1119  // close socket if already open
1120  close();
1121 
1122  printd(5, "qore_socket_private::connectINET(%s:%s, %dms)\n", host, service, timeout_ms);
1123 
1124  do_resolve_event(host, service);
1125 
1126  QoreAddrInfo ai;
1127  if (ai.getInfo(xsink, host, service, family, 0, type, protocol))
1128  return -1;
1129 
1130  hashdecl addrinfo *aip = ai.getAddrInfo();
1131 
1132  // emit all "resolved" events
1133  if (cb_queue)
1134  for (struct addrinfo *p = aip; p; p = p->ai_next)
1135  do_resolved_event(p->ai_addr);
1136 
1137  int prt = q_get_port_from_addr(aip->ai_addr);
1138 
1139  for (struct addrinfo *p = aip; p; p = p->ai_next) {
1140  if (!connectINETIntern(host, service, p->ai_family, p->ai_addr, p->ai_addrlen, p->ai_socktype, p->ai_protocol, prt, timeout_ms, xsink, true))
1141  return 0;
1142  if (*xsink)
1143  break;
1144  }
1145 
1146  if (!*xsink)
1147  qore_socket_error(xsink, "SOCKET-CONNECT-ERROR", "error in connect()", 0, host, service);
1148  return -1;
1149  }
1150 
1151  DLLLOCAL int connectINETIntern(const char* host, const char* service, int ai_family, struct sockaddr* ai_addr, size_t ai_addrlen, int ai_socktype, int ai_protocol, int prt, int timeout_ms, ExceptionSink* xsink, bool only_timeout = false) {
1152  assert(xsink);
1153  printd(5, "qore_socket_private::connectINETIntern() host: %s service: %s family: %d timeout_ms: %d\n", host, service, ai_family, timeout_ms);
1154  if ((sock = socket(ai_family, ai_socktype, ai_protocol)) == QORE_INVALID_SOCKET) {
1155  // FIXME: remove check
1156  if (xsink)
1157  xsink->raiseErrnoException("SOCKET-CONNECT-ERROR", errno, "cannot establish a connection to %s:%s", host, service);
1158  return -1;
1159  }
1160 
1161  //printd(5, "qore_socket_private::connectINETIntern(this: %p, host='%s', port: %d, timeout_ms: %d) sock: %d\n", this, host, port, timeout_ms, sock);
1162 
1163  int rc;
1164 
1165  // perform connect with timeout if a non-negative timeout was passed
1166  if (timeout_ms >= 0) {
1167  // set non-blocking
1168  if (set_non_blocking(true, xsink))
1169  return close_and_exit();
1170 
1171  do_connect_event(ai_family, ai_addr, host, service, prt);
1172 
1173  rc = connectINETTimeout(timeout_ms, ai_addr, ai_addrlen, xsink, only_timeout);
1174  //printd(5, "qore_socket_private::connectINETIntern() errno: %d rc: %d, xsink: %d\n", errno, rc, xsink && *xsink);
1175 
1176  // set blocking
1177  if (set_non_blocking(false, xsink))
1178  return close_and_exit();
1179  } else {
1180  do_connect_event(ai_family, ai_addr, host, service, prt);
1181 
1182  while (true) {
1183  rc = ::connect(sock, ai_addr, ai_addrlen);
1184 
1185  // try again if rc == -1 and errno == EINTR
1186  if (!rc || sock_get_error() != EINTR)
1187  break;
1188  }
1189  }
1190 
1191  if (rc < 0) {
1192  if (!only_timeout || errno == ETIMEDOUT)
1193  qore_socket_error(xsink, "SOCKET-CONNECT-ERROR", "error in connect()", 0, host, service);
1194 
1195  return close_and_exit();
1196  }
1197 
1198  sfamily = ai_family;
1199  stype = ai_socktype;
1200  sprot = ai_protocol;
1201  port = prt;
1202  //printd(5, "qore_socket_private::connectINETIntern(this: %p, host='%s', port: %d, timeout_ms: %d) success, rc: %d, sock: %d\n", this, host, port, timeout_ms, rc, sock);
1203 
1204  do_connected_event();
1205 
1206  // issue #3053: save hostname for SNI
1207  client_target = host;
1208  return 0;
1209  }
1210 
1211  DLLLOCAL int upgradeClientToSSLIntern(const char* mname, const char* sni_target_host, X509* cert, EVP_PKEY* pkey, int timeout_ms, ExceptionSink* xsink) {
1212  assert(!ssl);
1213  SSLSocketHelperHelper sshh(this, true);
1214 
1215  int rc;
1216  do_start_ssl_event();
1217  // issue #3053: send target hostname to support SNI
1218  if (!sni_target_host && !client_target.empty()) {
1219  sni_target_host = client_target.c_str();
1220  }
1221  if ((rc = ssl->setClient(mname, sni_target_host, sock, cert, pkey, xsink)) || ssl->connect(mname, timeout_ms, xsink)) {
1222  sshh.error();
1223  return rc ? rc : -1;
1224  }
1225  do_ssl_established_event();
1226 
1227  return 0;
1228  }
1229 
1230  DLLLOCAL int upgradeServerToSSLIntern(const char* mname, X509* cert, EVP_PKEY* pkey, int timeout_ms, ExceptionSink* xsink) {
1231  assert(!ssl);
1232  //printd(5, "qore_socket_private::upgradeServerToSSLIntern() this: %p mode: %d\n", this, ssl_verify_mode);
1233  SSLSocketHelperHelper sshh(this, true);
1234 
1235  do_start_ssl_event();
1236  if (ssl->setServer(mname, sock, cert, pkey, xsink) || ssl->accept(mname, timeout_ms, xsink)) {
1237  sshh.error();
1238  return -1;
1239  }
1240  do_ssl_established_event();
1241 
1242  return 0;
1243  }
1244 
1245  // returns 0 = success, -1 = error
1246  DLLLOCAL int openUNIX(int sock_type = SOCK_STREAM, int protocol = 0) {
1247  if (sock != QORE_INVALID_SOCKET)
1248  close();
1249 
1250  if ((sock = socket(AF_UNIX, sock_type, protocol)) == QORE_INVALID_SOCKET) {
1251  return -1;
1252  }
1253 
1254  sfamily = AF_UNIX;
1255  stype = sock_type;
1256  sprot = protocol;
1257  port = -1;
1258  return 0;
1259  }
1260 
1261  // returns 0 = success, -1 = error
1262  DLLLOCAL int openINET(int family = AF_INET, int sock_type = SOCK_STREAM, int protocol = 0) {
1263  if (sock != QORE_INVALID_SOCKET)
1264  close();
1265 
1266  if ((sock = socket(family, sock_type, protocol)) == QORE_INVALID_SOCKET)
1267  return -1;
1268 
1269  sfamily = family;
1270  stype = sock_type;
1271  sprot = protocol;
1272  port = -1;
1273  return 0;
1274  }
1275 
1276  DLLLOCAL int reuse(int opt) {
1277  //printf("qore_socket_private::reuse(%s)\n", opt ? "true" : "false");
1278  return setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (SETSOCKOPT_ARG_4)&opt, sizeof(int));
1279  }
1280 
1281  // the only place where xsink is optional
1282  DLLLOCAL int bindIntern(struct sockaddr* ai_addr, size_t ai_addrlen, int prt, bool reuseaddr, ExceptionSink* xsink = 0) {
1283  reuse(reuseaddr);
1284 
1285  if ((::bind(sock, ai_addr, ai_addrlen)) == QORE_SOCKET_ERROR) {
1286  if (xsink)
1287  qore_socket_error(xsink, "SOCKET-BIND-ERROR", "error in bind()", 0, 0, 0, ai_addr);
1288  close();
1289  return -1;
1290  }
1291 
1292  // set port number
1293  if (prt)
1294  port = prt;
1295  else {
1296  // get port number
1297 #if defined(HPUX) && defined(__ia64) && defined(__LP64__)
1298  // on HPUX 64-bit the OS defines socklen_t to be 8 bytes, but the library expects a 32-bit value
1299  int len = ai_addrlen;
1300 #else
1301  socklen_t len = ai_addrlen;
1302 #endif
1303 
1304  if (getsockname(sock, ai_addr, &len))
1305  port = -1;
1306  else
1307  port = q_get_port_from_addr(ai_addr);
1308  }
1309  return 0;
1310  }
1311 
1312  // bind to UNIX domain socket file
1313  DLLLOCAL int bindUNIX(ExceptionSink* xsink, const char* name, int socktype = SOCK_STREAM, int protocol = 0) {
1314  assert(xsink);
1315 #ifdef _Q_WINDOWS
1316  // FIXME: remove check
1317  if (xsink)
1318  xsink->raiseException("SOCKET-BINDUNIX-ERROR", "UNIX sockets are not available under Windows");
1319  return -1;
1320 #else
1321  close();
1322 
1323  // try to open socket if necessary
1324  if (openUNIX(socktype, protocol)) {
1325  // FIXME: remove check
1326  if (xsink)
1327  xsink->raiseErrnoException("SOCKET-BIND-ERROR", errno, "error opening UNIX socket ('%s') for bind", name);
1328  return -1;
1329  }
1330 
1331  hashdecl sockaddr_un addr;
1332  addr.sun_family = AF_UNIX;
1333  // copy path and terminate if necessary
1334  strncpy(addr.sun_path, name, sizeof(addr.sun_path) - 1);
1335  addr.sun_path[sizeof(addr.sun_path) - 1] = '\0';
1336 
1337  if (bindIntern((sockaddr*)&addr, sizeof(struct sockaddr_un), -1, false, xsink))
1338  return -1;
1339 
1340  // save socket file name for deleting on close
1341  socketname = addr.sun_path;
1342  // delete UNIX domain socket on close
1343  del = true;
1344  return 0;
1345 #endif // windows
1346  }
1347 
1348  DLLLOCAL int bindINET(ExceptionSink* xsink, const char* name, const char* service, bool reuseaddr = true, int family = AF_UNSPEC, int socktype = SOCK_STREAM, int protocol = 0) {
1349  assert(xsink);
1350  family = q_get_af(family);
1351  socktype = q_get_sock_type(socktype);
1352 
1353  close();
1354 
1355  QoreAddrInfo ai;
1356  do_resolve_event(name, service);
1357  if (ai.getInfo(xsink, name, service, family, AI_PASSIVE, socktype, protocol))
1358  return -1;
1359 
1360  hashdecl addrinfo *aip = ai.getAddrInfo();
1361  // first emit all "resolved" events
1362  if (cb_queue)
1363  for (struct addrinfo *p = aip; p; p = p->ai_next)
1364  do_resolved_event(p->ai_addr);
1365 
1366  // try to open socket if necessary
1367  if (openINET(aip->ai_family, aip->ai_socktype, protocol)) {
1368  qore_socket_error(xsink, "SOCKET-BINDINET-ERROR", "error opening socket for bind", 0, name, service);
1369  return -1;
1370  }
1371 
1372  int prt = q_get_port_from_addr(aip->ai_addr);
1373 
1374  int en = 0;
1375  // iterate through addresses and bind to the first interface possible
1376  for (struct addrinfo *p = aip; p; p = p->ai_next) {
1377  if (!bindIntern(p->ai_addr, p->ai_addrlen, prt, reuseaddr)) {
1378  //printd(5, "qore_socket_private::bindINET(family: %d) bound: name: %s service: %s f: %d st: %d p: %d\n", family, name ? name : "(null)", service ? service : "(null)", p->ai_family, p->ai_socktype, p->ai_protocol);
1379  return 0;
1380  }
1381 
1382  en = sock_get_raw_error();
1383  //printd(5, "qore_socket_private::bindINET() failed to bind: name: %s service: %s f: %d st: %d p: %d, errno: %d (%s)\n", name ? name : "(null)", service ? service : "(null)", p->ai_family, p->ai_socktype, p->ai_protocol, en, strerror(en));
1384  }
1385 
1386  // if no bind was possible, then raise an exception
1387  qore_socket_error_intern(en, xsink, "SOCKET-BIND-ERROR", "error binding on socket", 0, name, service);
1388  return -1;
1389  }
1390 
1391  // only called from qore-bound code - always with xsink
1392  DLLLOCAL QoreHashNode* getPeerInfo(ExceptionSink* xsink, bool host_lookup = true) const {
1393  assert(xsink);
1394  if (sock == QORE_INVALID_SOCKET) {
1395  se_not_open("Socket", "getPeerInfo", xsink);
1396  return 0;
1397  }
1398 
1399  hashdecl sockaddr_storage addr;
1400  socklen_t len = sizeof addr;
1401  if (getpeername(sock, (struct sockaddr*)&addr, &len)) {
1402  qore_socket_error(xsink, "SOCKET-GETPEERINFO-ERROR", "error in getpeername()");
1403  return 0;
1404  }
1405 
1406  return getAddrInfo(addr, len, host_lookup);
1407  }
1408 
1409  // only called from qore-bound code - always with xsink
1410  DLLLOCAL QoreHashNode* getSocketInfo(ExceptionSink* xsink, bool host_lookup = true) const {
1411  assert(xsink);
1412  if (sock == QORE_INVALID_SOCKET) {
1413  se_not_open("Socket", "getSocketInfo", xsink);
1414  return 0;
1415  }
1416 
1417  hashdecl sockaddr_storage addr;
1418 #if defined(HPUX) && defined(__ia64) && defined(__LP64__)
1419  // on HPUX 64-bit the OS defines socklen_t to be 8 bytes, but the library expects a 32-bit value
1420  int len = sizeof addr;
1421 #else
1422  socklen_t len = sizeof addr;
1423 #endif
1424 
1425  if (getsockname(sock, (struct sockaddr*)&addr, &len)) {
1426  qore_socket_error(xsink, "SOCKET-GETSOCKETINFO-ERROR", "error in getsockname()");
1427  return 0;
1428  }
1429 
1430  return getAddrInfo(addr, len, host_lookup);
1431  }
1432 
1433  DLLLOCAL QoreHashNode* getAddrInfo(const struct sockaddr_storage& addr, socklen_t len, bool host_lookup = true) const {
1434  QoreHashNode* h = new QoreHashNode(autoTypeInfo);
1435 
1436  if (addr.ss_family == AF_INET || addr.ss_family == AF_INET6) {
1437  if (host_lookup) {
1438  char host[NI_MAXHOST + 1];
1439 
1440  if (!getnameinfo((struct sockaddr*)&addr, qore_get_in_len((struct sockaddr*)&addr), host, sizeof(host), 0, 0, 0)) {
1441  QoreStringNode* hoststr = new QoreStringNode(host);
1442  h->setKeyValue("hostname", hoststr, 0);
1443  h->setKeyValue("hostname_desc", QoreAddrInfo::getAddressDesc(addr.ss_family, hoststr->getBuffer()), 0);
1444  }
1445  }
1446 
1447  // get ipv4 or ipv6 address
1448  char ifname[INET6_ADDRSTRLEN];
1449  if (inet_ntop(addr.ss_family, qore_get_in_addr((struct sockaddr*)&addr), ifname, sizeof(ifname))) {
1450  QoreStringNode* addrstr = new QoreStringNode(ifname);
1451  h->setKeyValue("address", addrstr, 0);
1452  h->setKeyValue("address_desc", QoreAddrInfo::getAddressDesc(addr.ss_family, addrstr->getBuffer()), 0);
1453  }
1454 
1455  int tport;
1456  if (addr.ss_family == AF_INET) {
1457  hashdecl sockaddr_in* s = (hashdecl sockaddr_in*)&addr;
1458  tport = ntohs(s->sin_port);
1459  } else {
1460  hashdecl sockaddr_in6* s = (hashdecl sockaddr_in6*)&addr;
1461  tport = ntohs(s->sin6_port);
1462  }
1463 
1464  h->setKeyValue("port", tport, 0);
1465  }
1466 #ifndef _Q_WINDOWS
1467  else if (addr.ss_family == AF_UNIX) {
1468  assert(!socketname.empty());
1469  QoreStringNode* addrstr = new QoreStringNode(socketname);
1470  h->setKeyValue("address", addrstr, 0);
1471  h->setKeyValue("address_desc", QoreAddrInfo::getAddressDesc(addr.ss_family, addrstr->getBuffer()), 0);
1472  }
1473 #endif
1474 
1475  h->setKeyValue("family", addr.ss_family, 0);
1476  h->setKeyValue("familystr", new QoreStringNode(QoreAddrInfo::getFamilyName(addr.ss_family)), 0);
1477 
1478  return h;
1479  }
1480 
1481  // set backwards-compatible object members on accept
1482  // to be (hopefully) deleted in a future version of qore
1483  DLLLOCAL void setAccept(QoreObject* o) {
1484  hashdecl sockaddr_storage addr;
1485 
1486  socklen_t len = sizeof addr;
1487  if (getpeername(sock, (struct sockaddr*)&addr, &len))
1488  return;
1489 
1490  if (addr.ss_family == AF_INET || addr.ss_family == AF_INET6) {
1491  // get ipv4 or ipv6 address
1492  char ifname[INET6_ADDRSTRLEN];
1493  if (inet_ntop(addr.ss_family, qore_get_in_addr((struct sockaddr *)&addr), ifname, sizeof(ifname))) {
1494  //printd(5, "inet_ntop() '%s' host: '%s'\n", ifname, host);
1495  o->setValue("source", new QoreStringNode(ifname), 0);
1496  }
1497 
1498  char host[NI_MAXHOST + 1];
1499  if (!getnameinfo((struct sockaddr *)&addr, qore_get_in_len((struct sockaddr *)&addr), host, sizeof(host), 0, 0, 0))
1500  o->setValue("source_host", new QoreStringNode(host), 0);
1501  }
1502 #ifndef _Q_WINDOWS
1503  else if (addr.ss_family == AF_UNIX) {
1504  QoreStringNode* astr = new QoreStringNode(enc);
1505  hashdecl sockaddr_un *addr_un = (hashdecl sockaddr_un *)&addr;
1506  astr->sprintf("UNIX socket: %s", addr_un->sun_path);
1507  o->setValue("source", astr, 0);
1508  o->setValue("source_host", new QoreStringNode("localhost"), 0);
1509  }
1510 #endif
1511  }
1512 
1513  // buffered reads for high performance
1514  DLLLOCAL qore_offset_t brecv(ExceptionSink* xsink, const char* meth, char*& buf, qore_size_t bs, int flags, int timeout, bool do_event = true) {
1515  assert(xsink);
1516  // must be checked if open/connected before this function is called
1517  assert(sock != QORE_INVALID_SOCKET);
1518  assert(meth);
1519 
1520  // always returned buffered data first
1521  if (buflen) {
1522  buf = rbuf + bufoffset;
1523  if (buflen <= bs) {
1524  bs = buflen;
1525  buflen = 0;
1526  bufoffset = 0;
1527  } else {
1528  buflen -= bs;
1529  bufoffset += bs;
1530  }
1531  return (qore_offset_t)bs;
1532  }
1533 
1534  // real socket reads are only done when the buffer is empty
1535 
1536  //printd(5, "qore_socket_private::brecv(buf: %p, bs: %d, flags: %d, timeout: %d, do_event: %d) this: %p ssl: %d\n", buf, (int)bs, flags, timeout, (int)do_event, this, ssl);
1537 
1538  qore_offset_t rc;
1539  if (!ssl) {
1540  if (timeout != -1 && !isDataAvailable(timeout, meth, xsink)) {
1541  if (*xsink)
1542  return -1;
1543  se_timeout("Socket", meth, timeout, xsink);
1544 
1545  return QSE_TIMEOUT;
1546  }
1547 
1548  while (true) {
1549 #ifdef DEBUG
1550  errno = 0;
1551 #endif
1552  rc = ::recv(sock, rbuf, DEFAULT_SOCKET_BUFSIZE, flags);
1553  if (rc == QORE_SOCKET_ERROR) {
1554  sock_get_error();
1555  if (errno == EINTR)
1556  continue;
1557 #ifdef ECONNRESET
1558  if (errno == ECONNRESET) {
1559  se_closed("Socket", meth, xsink);
1560  close();
1561  } else
1562 #endif
1563  qore_socket_error(xsink, "SOCKET-RECV-ERROR", "error in recv()", meth);
1564  break;
1565  }
1566  //printd(5, "qore_socket_private::brecv(%d, %p, %ld, %d) rc: %ld errno: %d\n", sock, buf, bs, flags, rc, errno);
1567  // try again if we were interrupted by a signal
1568  if (rc >= 0)
1569  break;
1570  }
1571  } else
1572  rc = ssl->read(meth, rbuf, DEFAULT_SOCKET_BUFSIZE, timeout, xsink);
1573 
1574  //printd(5, "qore_socket_private::brecv(%d, %p, %ld, %d) rc: %ld errno: %d\n", sock, buf, bs, flags, rc, errno);
1575  if (rc > 0) {
1576  buf = rbuf;
1577  assert(!buflen);
1578  assert(!bufoffset);
1579  if (rc > (qore_offset_t)bs) {
1580  buflen = rc - bs;
1581  bufoffset = bs;
1582  rc = bs;
1583  }
1584 
1585  // register event
1586  if (do_event)
1587  do_read_event(rc, rc);
1588  } else {
1589 #ifdef DEBUG
1590  buf = 0;
1591 #endif
1592  if (!rc)
1593  close();
1594  }
1595 
1596  return rc;
1597  }
1598 
1600  DLLLOCAL QoreStringNode* readHTTPData(ExceptionSink* xsink, const char* meth, int timeout, qore_offset_t& rc, bool exit_early = false) {
1601  assert(xsink);
1602  assert(meth);
1603  if (sock == QORE_INVALID_SOCKET) {
1604  se_not_open("Socket", meth, xsink);
1605  rc = QSE_NOT_OPEN;
1606  return 0;
1607  }
1608 
1609  PrivateQoreSocketThroughputHelper th(this, false);
1610 
1611  // state:
1612  // 0 = '\r' received
1613  // 1 = '\r\n' received
1614  // 2 = '\r\n\r' received
1615  // 3 = '\n' received
1616  // read in HHTP header until \r\n\r\n or \n\n from socket
1617  int state = -1;
1618  QoreStringNodeHolder hdr(new QoreStringNode(enc));
1619 
1620  qore_size_t count = 0;
1621 
1622  while (true) {
1623  char* buf;
1624  rc = brecv(xsink, meth, buf, 1, 0, timeout, false);
1625  //printd(5, "qore_socket_private::readHTTPData() this: %p Socket::%s(): rc: " QLLD " read char: %c (%03d) (old state: %d)\n", this, meth, rc, rc > 0 && buf[0] > 31 ? buf[0] : '?', rc > 0 ? buf[0] : 0, state);
1626  if (rc <= 0) {
1627  //printd(5, "qore_socket_private::readHTTPData(timeout: %d) hdr='%s' (len: %d), rc=" QSD ", errno: %d: '%s'\n", timeout, hdr->getBuffer(), hdr->strlen(), rc, errno, strerror(errno));
1628 
1629  if (!*xsink) {
1630  if (!count) {
1631  //printd(5, "qore_socket_private::readHTTPData() this: %p rc: %d count: %d (%d) timeout: %d\n", this, rc, count, hdr->size(), timeout);
1632  se_closed("Socket", meth, xsink);
1633  } else {
1634  xsink->raiseExceptionArg("SOCKET-HTTP-ERROR", hdr.release(), "socket closed on remote end while reading header data after reading " QSD " byte%s", count, count == 1 ? "" : "s");
1635  }
1636  }
1637  return 0;
1638  }
1639  char c = buf[0];
1640  if (++count == QORE_MAX_HEADER_SIZE) {
1641  // FIXME: remove check
1642  if (xsink)
1643  xsink->raiseException("SOCKET-HTTP-ERROR", "header size cannot exceed " QSD " bytes", count);
1644  return 0;
1645  }
1646 
1647  // check if we can progress to the next state
1648  if (c == '\n') {
1649  if (state == -1) {
1650  state = 3;
1651  continue;
1652  }
1653  if (!state) {
1654  if (exit_early && hdr->empty())
1655  return 0;
1656  state = 1;
1657  continue;
1658  }
1659  assert(state > 0);
1660  break;
1661  } else if (c == '\r') {
1662  if (state == -1) {
1663  state = 0;
1664  continue;
1665  }
1666  if (!state)
1667  break;
1668  if (state == 1) {
1669  state = 2;
1670  continue;
1671  }
1672  }
1673 
1674  if (state != -1) {
1675  switch (state) {
1676  case 0: hdr->concat('\r'); break;
1677  case 1: hdr->concat("\r\n"); break;
1678  case 2: hdr->concat("\r\n\r"); break;
1679  case 3: hdr->concat('\n'); break;
1680  }
1681  state = -1;
1682  }
1683  hdr->concat(c);
1684  }
1685  hdr->concat('\n');
1686 
1687  //printd(5, "qore_socket_private::readHTTPData(timeout: %d) hdr='%s' (%d)\n", timeout, hdr->getBuffer(), hdr->size());
1688 
1689  th.finalize(hdr->size());
1690 
1691  return hdr.release();
1692  }
1693 
1694  DLLLOCAL QoreStringNode* recv(ExceptionSink* xsink, qore_offset_t bufsize, int timeout, qore_offset_t& rc) {
1695  assert(xsink);
1696  if (sock == QORE_INVALID_SOCKET) {
1697  se_not_open("Socket", "recv", xsink);
1698  rc = QSE_NOT_OPEN;
1699  return 0;
1700  }
1701  if (in_op >= 0) {
1702  if (in_op == gettid()) {
1703  se_in_op("Socket", "recv", xsink);
1704  return 0;
1705  }
1706  se_in_op_thread("Socket", "recv", xsink);
1707  return 0;
1708  }
1709 
1710  PrivateQoreSocketThroughputHelper th(this, false);
1711 
1712  qore_size_t bs = bufsize > 0 && bufsize < DEFAULT_SOCKET_BUFSIZE ? bufsize : DEFAULT_SOCKET_BUFSIZE;
1713 
1714  QoreStringNodeHolder str(new QoreStringNode(enc));
1715 
1716  char* buf;
1717 
1718  while (true) {
1719  rc = brecv(xsink, "recv", buf, bs, 0, timeout, false);
1720 
1721  if (rc <= 0) {
1722  printd(5, "qore_socket_private::recv(" QSD ", %d) bs=" QSD ", br=" QSD ", rc=" QSD ", errno: %d (%s)\n", bufsize, timeout, bs, str->size(), rc, errno, strerror(errno));
1723  break;
1724  }
1725 
1726  str->concat(buf, rc);
1727 
1728  // register event
1729  do_read_event(rc, str->size(), bufsize);
1730 
1731  if (bufsize > 0) {
1732  if (str->size() >= (qore_size_t)bufsize)
1733  break;
1734  if ((bufsize - str->size()) < bs)
1735  bs = bufsize - str->size();
1736  }
1737  }
1738 
1739  printd(5, "qore_socket_private::recv() received " QSD " byte(s), bufsize=" QSD ", strlen=" QSD " str='%s'\n", str->size(), bufsize, (str ? str->strlen() : 0), str ? str->getBuffer() : "n/a");
1740 
1741  // "fix" return code value if no error occurred
1742  if (rc >= 0)
1743  rc = str->size();
1744 
1745  th.finalize(str->size());
1746 
1747  return *xsink ? 0 : str.release();
1748  }
1749 
1750  DLLLOCAL QoreStringNode* recv(ExceptionSink* xsink, int timeout, qore_offset_t& rc) {
1751  assert(xsink);
1752  if (sock == QORE_INVALID_SOCKET) {
1753  se_not_open("Socket", "recv", xsink);
1754  rc = QSE_NOT_OPEN;
1755  return 0;
1756  }
1757  if (in_op >= 0) {
1758  if (in_op == gettid()) {
1759  se_in_op("Socket", "recv", xsink);
1760  return 0;
1761  }
1762  se_in_op_thread("Socket", "recv", xsink);
1763  return 0;
1764  }
1765 
1766  PrivateQoreSocketThroughputHelper th(this, false);
1767 
1768  QoreStringNodeHolder str(new QoreStringNode(enc));
1769 
1770  // perform first read with timeout
1771  char* buf;
1772  rc = brecv(xsink, "recv", buf, DEFAULT_SOCKET_BUFSIZE, 0, timeout, false);
1773  if (rc <= 0)
1774  return 0;
1775 
1776  str->concat(buf, rc);
1777 
1778  // register event
1779  do_read_event(rc, rc);
1780 
1781  // keep reading data until no more data is available without a timeout
1782  if (isDataAvailable(0, "recv", xsink)) {
1783  do {
1784  rc = brecv(xsink, "recv", buf, DEFAULT_SOCKET_BUFSIZE, 0, 0, false);
1785  //printd(5, "qore_socket_private::recv(to: %d) rc=" QSD " rd=" QSD "\n", timeout, rc, str->size());
1786  // if the remote end has closed the connection, return what we have
1787  if (!rc)
1788  break;
1789  if (rc < 0) {
1790  th.finalize(str->size());
1791  return 0;
1792  }
1793  str->concat(buf, rc);
1794 
1795  // register event
1796  do_read_event(rc, str->size());
1797  } while (isDataAvailable(0, "recv", xsink));
1798  }
1799 
1800  th.finalize(str->size());
1801 
1802  if (*xsink)
1803  return 0;
1804 
1805  rc = str->size();
1806  return str.release();
1807  }
1808 
1809  DLLLOCAL int recv(int fd, qore_offset_t size, int timeout_ms, ExceptionSink* xsink);
1810 
1811  DLLLOCAL BinaryNode* recvBinary(qore_offset_t bufsize, int timeout, qore_offset_t& rc, ExceptionSink* xsink) {
1812  assert(xsink);
1813  if (sock == QORE_INVALID_SOCKET) {
1814  se_not_open("Socket", "recvBinary", xsink);
1815  rc = QSE_NOT_OPEN;
1816  return 0;
1817  }
1818  if (in_op >= 0) {
1819  if (in_op == gettid()) {
1820  se_in_op("Socket", "recvBinary", xsink);
1821  return 0;
1822  }
1823  se_in_op_thread("Socket", "recvBinary", xsink);
1824  return 0;
1825  }
1826 
1827  PrivateQoreSocketThroughputHelper th(this, false);
1828 
1829  qore_size_t bs = bufsize > 0 && bufsize < DEFAULT_SOCKET_BUFSIZE ? bufsize : DEFAULT_SOCKET_BUFSIZE;
1830 
1832 
1833  char* buf;
1834  while (true) {
1835  rc = brecv(xsink, "recvBinary", buf, bs, 0, timeout);
1836  if (rc <= 0)
1837  break;
1838 
1839  b->append(buf, rc);
1840 
1841  if (bufsize > 0) {
1842  if (b->size() >= (qore_size_t)bufsize)
1843  break;
1844  if ((bufsize - b->size()) < bs)
1845  bs = bufsize - b->size();
1846  }
1847  }
1848 
1849  th.finalize(b->size());
1850 
1851  if (*xsink)
1852  return 0;
1853 
1854  // "fix" return code value if no error occurred
1855  if (rc >= 0)
1856  rc = b->size();
1857 
1858  printd(5, "qore_socket_private::recvBinary() received " QSD " byte(s), bufsize=" QSD ", blen=" QSD "\n", b->size(), bufsize, b->size());
1859  return b.release();
1860  }
1861 
1862  DLLLOCAL BinaryNode* recvBinary(int timeout, qore_offset_t& rc, ExceptionSink* xsink) {
1863  assert(xsink);
1864  if (sock == QORE_INVALID_SOCKET) {
1865  se_not_open("Socket", "recvBinary", xsink);
1866  rc = QSE_NOT_OPEN;
1867  return 0;
1868  }
1869  if (in_op >= 0) {
1870  if (in_op == gettid()) {
1871  se_in_op("Socket", "recvBinary", xsink);
1872  return 0;
1873  }
1874  se_in_op_thread("Socket", "recvBinary", xsink);
1875  return 0;
1876  }
1877 
1878  PrivateQoreSocketThroughputHelper th(this, false);
1879 
1881 
1882  //printd(5, "QoreSocket::recvBinary(%d, " QSD ") this: %p\n", timeout, rc, this);
1883  // perform first read with timeout
1884  char* buf;
1885  rc = brecv(xsink, "recvBinary", buf, DEFAULT_SOCKET_BUFSIZE, 0, timeout, false);
1886  if (rc <= 0)
1887  return 0;
1888 
1889  b->append(buf, rc);
1890 
1891  // register event
1892  do_read_event(rc, rc);
1893 
1894  // keep reading data until no more data is available without a timeout
1895  if (isDataAvailable(0, "recvBinary", xsink)) {
1896  do {
1897  rc = brecv(xsink, "recvBinary", buf, DEFAULT_SOCKET_BUFSIZE, 0, 0, false);
1898  // if the remote end has closed the connection, return what we have
1899  if (!rc)
1900  break;
1901  if (rc < 0) {
1902  th.finalize(b->size());
1903  return 0;
1904  }
1905 
1906  b->append(buf, rc);
1907 
1908  // register event
1909  do_read_event(rc, b->size());
1910  } while (isDataAvailable(0, "recvBinary", xsink));
1911  }
1912 
1913  th.finalize(b->size());
1914 
1915  if (*xsink)
1916  return 0;
1917 
1918  rc = b->size();
1919  //printd(5, "qore_socket_private() this: %p b: %p size: %lld\n", this, b->getPtr(), rc);
1920  return b.release();
1921  }
1922 
1923  DLLLOCAL void recvToOutputStream(OutputStream *os, int64 size, int64 timeout, ExceptionSink *xsink, QoreThreadLock* l) {
1924  if (sock == QORE_INVALID_SOCKET) {
1925  se_not_open("Socket", "recvToOutputStream", xsink);
1926  return;
1927  }
1928  if (in_op >= 0) {
1929  if (in_op == gettid()) {
1930  se_in_op("Socket", "recvToOutputStream", xsink);
1931  return;
1932  }
1933  se_in_op_thread("Socket", "recvToOutputStream", xsink);
1934  return;
1935  }
1936 
1937  qore_socket_op_helper oh(this);
1938 
1939  char* buf;
1940  qore_offset_t br = 0;
1941  while (size < 0 || br < size) {
1942  // calculate bytes needed
1943  int bn = size < 0 ? DEFAULT_SOCKET_BUFSIZE : QORE_MIN(size - br, DEFAULT_SOCKET_BUFSIZE);
1944 
1945  qore_offset_t rc = brecv(xsink, "recvToOutputStream", buf, bn, 0, timeout);
1946  if (rc < 0) {
1947  //error - already reported in xsink
1948  return;
1949  }
1950  if (rc == 0) {
1951  //eof
1952  if (size >= 0) {
1953  //not all size bytes were read
1954  xsink->raiseException("SOCKET-RECV-ERROR", "Unexpected end of stream");
1955  }
1956  return;
1957  }
1958 
1959  // write buffer to the stream
1960  {
1961  AutoUnlocker al(l);
1962  os->write(buf, rc, xsink);
1963  if (*xsink) {
1964  return;
1965  }
1966  }
1967 
1968  br += rc;
1969  }
1970  }
1971 
1972  DLLLOCAL QoreStringNode* readHTTPHeaderString(ExceptionSink* xsink, int timeout, int source) {
1973  assert(xsink);
1974  qore_offset_t rc;
1975  QoreStringNodeHolder hdr(readHTTPData(xsink, "readHTTPHeaderString", timeout, rc));
1976  if (!hdr) {
1977  assert(*xsink);
1978  return 0;
1979  }
1980  assert(rc > 0);
1981  return hdr.release();
1982  }
1983 
1984  DLLLOCAL QoreHashNode* readHTTPHeader(ExceptionSink* xsink, QoreHashNode* info, int timeout,
1985  qore_offset_t& rc, int source, const char* headers_raw_key = "headers-raw") {
1986  assert(xsink);
1987  QoreStringNodeHolder hdr(readHTTPData(xsink, "readHTTPHeader", timeout, rc));
1988  if (!hdr) {
1989  assert(*xsink);
1990  return nullptr;
1991  }
1992  assert(rc > 0);
1993 
1994  const char* buf = hdr->getBuffer();
1995 
1996  char* p;
1997  if ((p = (char*)strstr(buf, "\r\n"))) {
1998  *p = '\0';
1999  p += 2;
2000  } else if ((p = (char*)strchr(buf, '\n'))) {
2001  *p = '\0';
2002  ++p;
2003  } else if ((p = (char*)strchr(buf, '\r'))) {
2004  *p = '\0';
2005  ++p;
2006  } else {
2007  // readHTTPData will only return a string that satisifies one of the above conditions,
2008  // however an embedded 0 could have been sent which would make the above searches invalid
2009  xsink->raiseException("SOCKET-HTTP-ERROR", "invalid header received with embedded nulls in Socket::readHTTPHeader()");
2010  return nullptr;
2011  }
2012 
2013  char* t1;
2014  if (!(t1 = (char*)strstr(buf, "HTTP/"))) {
2015  xsink->raiseExceptionArg("SOCKET-HTTP-ERROR", hdr.release(), "missing HTTP version string in first header line in Socket::readHTTPHeader()");
2016  return nullptr;
2017  }
2018 
2019  ReferenceHolder<QoreHashNode> h(new QoreHashNode(autoTypeInfo), xsink);
2020 
2021 #if 0
2022  h->setKeyValue("dbg_hdr", new QoreStringNode(buf), 0);
2023 #endif
2024 
2025  // process header flags
2026  int flags = CHF_PROCESS;
2027 
2028  // get version
2029  {
2030  QoreStringNode* hv = new QoreStringNode(t1 + 5, 3, enc);
2031  h->setKeyValue("http_version", hv, 0);
2032  if (*hv == "1.1")
2033  flags |= CHF_HTTP11;
2034  }
2035 
2036  // if we are getting a response
2037  if (t1 == buf) {
2038  char* t2 = (char*)strchr(buf + 8, ' ');
2039  if (t2) {
2040  t2++;
2041  if (isdigit(*(t2))) {
2042  h->setKeyValue("status_code", atoi(t2), 0);
2043  if (strlen(t2) > 4) {
2044  h->setKeyValue("status_message", new QoreStringNode(t2 + 4), 0);
2045  }
2046  }
2047  }
2048  if (info)
2049  info->setKeyValue("response-uri", new QoreStringNode(buf), 0);
2050  } else { // get method and path
2051  char* t2 = (char*)strchr(buf, ' ');
2052  if (t2) {
2053  *t2 = '\0';
2054  h->setKeyValue("method", new QoreStringNode(buf), 0);
2055  t2++;
2056  t1 = strchr(t2, ' ');
2057  if (t1) {
2058  *t1 = '\0';
2059  //printd(5, "found path '%s'\n", t2);
2060  // the path is returned as-is with no decodings - use decode_url() to decode
2061  h->setKeyValue("path", new QoreStringNode(t2, enc), 0);
2062  }
2063  if (info)
2064  info->setKeyValue("request-uri", new QoreStringNode(buf), 0);
2065  }
2066  flags |= CHF_REQUEST;
2067  }
2068 
2069  bool close = convertHeaderToHash(*h, p, flags, info, &http_exp_chunked_body, headers_raw_key);
2070  do_read_http_header(QORE_EVENT_HTTP_MESSAGE_RECEIVED, *h, source);
2071 
2072  // process header info
2073  if ((flags & CHF_REQUEST) && info)
2074  info->setKeyValue("close", close, 0);
2075 
2076  return h.release();
2077  }
2078 
2079  // info must be already referenced for the assignment, if present
2080  DLLLOCAL int runHeaderCallback(ExceptionSink* xsink, const char* cname, const char* mname, const ResolvedCallReferenceNode& callback, QoreThreadLock* l, const QoreHashNode* hdr, QoreHashNode* info, bool send_aborted = false, QoreObject* obj = nullptr) {
2081  assert(xsink);
2082  assert(obj);
2083  ReferenceHolder<QoreListNode> args(new QoreListNode(autoTypeInfo), xsink);
2084  QoreHashNode* arg = new QoreHashNode(autoTypeInfo);
2085  arg->setKeyValue("hdr", hdr ? hdr->refSelf() : nullptr, xsink);
2086  arg->setKeyValue("info", info, xsink);
2087  if (obj)
2088  arg->setKeyValue("obj", obj->refSelf(), xsink);
2089  arg->setKeyValue("send_aborted", send_aborted, xsink);
2090  args->push(arg, nullptr);
2091 
2092  ValueHolder rv(xsink);
2093  return runCallback(xsink, cname, mname, rv, callback, l, *args);
2094  }
2095 
2096  DLLLOCAL int runTrailerCallback(ExceptionSink* xsink, const char* cname, const char* mname, const ResolvedCallReferenceNode& callback, QoreThreadLock* l, ReferenceHolder<QoreHashNode>& hdr) {
2097  ValueHolder rv(xsink);
2098  if (runCallback(xsink, cname, mname, rv, callback, l, nullptr))
2099  return -1;
2100 
2101  switch (rv->getType()) {
2102  case NT_NOTHING:
2103  break;
2104  case NT_HASH: {
2105  hdr = rv.release().get<QoreHashNode>();
2106  break;
2107  }
2108  default:
2109  xsink->raiseException("HTTP-TRAILER-ERROR", "chunked callback returned type '%s'; expecting 'hash' or 'NOTHING'", rv->getTypeName());
2110  return -1;
2111  }
2112  return 0;
2113  }
2114 
2115  DLLLOCAL int runDataCallback(ExceptionSink* xsink, const char* cname, const char* mname, const ResolvedCallReferenceNode& callback, QoreThreadLock* l, const AbstractQoreNode* data, bool chunked) {
2116  assert(xsink);
2117  ReferenceHolder<QoreListNode> args(new QoreListNode(autoTypeInfo), xsink);
2118  QoreHashNode* arg = new QoreHashNode(autoTypeInfo);
2119  arg->setKeyValue("data", data->realCopy(), xsink);
2120  arg->setKeyValue("chunked", chunked, xsink);
2121  args->push(arg, nullptr);
2122 
2123  ValueHolder rv(xsink);
2124  return runCallback(xsink, cname, mname, rv, callback, l, *args);
2125  }
2126 
2127  DLLLOCAL int runCallback(ExceptionSink* xsink, const char* cname, const char* mname, ValueHolder& res, const ResolvedCallReferenceNode& callback, QoreThreadLock* l, const QoreListNode* args = 0) {
2128  assert(xsink);
2129  // FIXME: subtract callback execution time from socket performance measurement
2130 
2131  // unlock and execute callback
2132  {
2133  AutoUnlocker al(l);
2134  res = callback.execValue(args, xsink);
2135  }
2136 
2137  // check exception and socket status
2138  assert(xsink);
2139  if (*xsink)
2140  return -1;
2141 
2142  if (sock == QORE_INVALID_SOCKET) {
2143  se_not_open(cname, mname, xsink);
2144  return QSE_NOT_OPEN;
2145  }
2146 
2147  return 0;
2148  }
2149 
2150  DLLLOCAL int sendHttpChunkedWithCallback(ExceptionSink* xsink, const char* cname, const char* mname, const ResolvedCallReferenceNode& send_callback, QoreThreadLock& l, int source, int timeout_ms = -1, bool* aborted = 0) {
2151  assert(xsink);
2152  assert(!aborted || !(*aborted));
2153 
2154  if (sock == QORE_INVALID_SOCKET) {
2155  se_not_open(cname, mname, xsink);
2156  return QSE_NOT_OPEN;
2157  }
2158  if (in_op >= 0) {
2159  if (in_op == gettid()) {
2160  se_in_op(cname, mname, xsink);
2161  return 0;
2162  }
2163  se_in_op_thread(cname, mname, xsink);
2164  return 0;
2165  }
2166 
2167  PrivateQoreSocketThroughputHelper th(this, true);
2168 
2169  // set the non-blocking flag (for use with non-ssl connections)
2170  bool nb = (timeout_ms >= 0);
2171  // set non-blocking I/O (and restore on exit) if we have a timeout and a non-ssl connection
2172  OptionalNonBlockingHelper onbh(*this, !ssl && nb, xsink);
2173  if (*xsink)
2174  return -1;
2175 
2176  qore_socket_op_helper oh(this);
2177 
2178  qore_offset_t rc;
2179  int64 total = 0;
2180  bool done = false;
2181 
2182  while (!done) {
2183  // if we have response data already, then we assume an error and abort
2184  if (aborted) {
2185  bool data_available = tryReadSocketData(mname, xsink);
2186  //printd(5, "qore_socket_private::sendHttpChunkedWithCallback() this: %p aborted: %p iDA: %d\n", this, aborted, data_available);
2187  if (data_available || *xsink) {
2188  *aborted = true;
2189  return *xsink ? -1 : 0;
2190  }
2191  }
2192 
2193  // FIXME: subtract callback execution time from socket performance measurement
2194  ValueHolder res(xsink);
2195  rc = runCallback(xsink, cname, mname, res, send_callback, &l);
2196  if (rc)
2197  return rc;
2198 
2199  //printd(5, "qore_socket_private::sendHttpChunkedWithCallback() this: %p res: %s\n", this, get_type_name(*res));
2200 
2201  // check callback return val
2202  QoreString buf;
2203 
2204  switch (res->getType()) {
2205  case NT_STRING: {
2206  const QoreStringNode* str = res->get<const QoreStringNode>();
2207  if (str->empty()) {
2208  done = true;
2209  break;
2210  }
2211  buf.sprintf("%x\r\n", (int)str->size());
2212  buf.concat(str->getBuffer(), str->size());
2213  break;
2214  }
2215 
2216  case NT_BINARY: {
2217  const BinaryNode* b = res->get<const BinaryNode>();
2218  if (b->empty()) {
2219  done = true;
2220  break;
2221  }
2222  buf.sprintf("%x\r\n", (int)b->size());
2223  buf.concat((const char*)b->getPtr(), b->size());
2224  break;
2225  }
2226 
2227  case NT_HASH: {
2228  buf.concat("0\r\n");
2229 
2230  ConstHashIterator hi(res->get<const QoreHashNode>());
2231 
2232  while (hi.next()) {
2233  const QoreValue v = hi.get();
2234  const char* key = hi.getKey();
2235 
2236  //printd(5, "qore_socket_private::sendHttpChunkedWithCallback() this: %p trailer %s\n", this, key);
2237 
2238  if (v.getType() == NT_LIST) {
2239  ConstListIterator li(v.get<const QoreListNode>());
2240  while (li.next())
2241  do_header(key, buf, li.getValue());
2242  }
2243  else
2244  do_header(key, buf, v);
2245  }
2246 
2247  // fall through to next case
2248  }
2249 
2250  case NT_NOTHING:
2251  case NT_NULL:
2252  done = true;
2253  break;
2254 
2255  default:
2256  xsink->raiseException("SOCKET-CALLBACK-ERROR", "HTTP chunked data callback returned type '%s'; expecting one of: 'string', 'binary', 'hash', 'nothing' (or 'NULL')", res->getTypeName());
2257  return -1;
2258  }
2259 
2260  if (buf.empty())
2261  buf.concat("0\r\n");
2262 
2263  // add trailing \r\n
2264  buf.concat("\r\n");
2265 
2266  // send chunk buffer data
2267  rc = sendIntern(xsink, cname, mname, buf.getBuffer(), buf.size(), timeout_ms, total, true);
2268 
2269  if (rc < 0) {
2270  // if we have a socket I/O error, but also data to be read on the socket, then clear the exception and return 0
2271  if (aborted && *xsink) {
2272  bool data_available = tryReadSocketData(mname, xsink);
2273  //printd(5, "qore_socket_private::sendHttpChunkedWithCallback() this: %p aborted: %p iDA: %d\n", this, aborted, data_available);
2274  if (data_available) {
2275  *aborted = true;
2276  return *xsink ? -1 : 0;
2277  }
2278  }
2279 
2280  //printd(5, "qore_socket_private::sendHttpChunkedWithCallback() this: %p rc: %d sock: %d xsink: %d\n", this, rc, sock, xsink->isException());
2281  }
2282 
2283  //printd(5, "qore_socket_private::sendHttpChunkedWithCallback() this: %p sent: %s\n", this, buf.getBuffer());
2284 
2285  if (rc < 0 || sock == QORE_INVALID_SOCKET)
2286  break;
2287  }
2288 
2289  th.finalize(total);
2290 
2291  return rc < 0 || sock == QORE_INVALID_SOCKET ? -1 : 0;
2292  }
2293 
2294  DLLLOCAL int sendIntern(ExceptionSink* xsink, const char* cname, const char* mname, const char* buf, qore_size_t size, int timeout_ms, int64& total, bool stream = false) {
2295  assert(xsink);
2296  qore_offset_t rc;
2297  qore_size_t bs = 0;
2298 
2299  // set the non-blocking flag (for use with non-ssl connections)
2300  bool nb = (timeout_ms >= 0);
2301 
2302  while (true) {
2303  if (ssl) {
2304  // SSL_MODE_ENABLE_PARTIAL_WRITE is enabled so we can get finer-grained socket events for do_send_event() below
2305  rc = ssl->write(mname, buf + bs, size - bs, timeout_ms, xsink);
2306  } else {
2307  while (true) {
2308  rc = ::send(sock, buf + bs, size - bs, 0);
2309  //printd(5, "qore_socket_private::send() this: %p Socket::%s() buf: %p size: " QLLD " timeout_ms: %d ssl: %p nb: %d bs: " QLLD " rc: " QLLD "\n", this, mname, buf, size, timeout_ms, ssl, nb, bs, rc);
2310  // try again if we were interrupted by a signal
2311  if (rc >= 0)
2312  break;
2313  sock_get_error();
2314  // check that the send finishes before the timeout if we are using non-blocking I/O
2315  if (nb && (errno == EAGAIN
2316 #ifdef EWOULDBLOCK
2317  || errno == EWOULDBLOCK
2318 #endif
2319  )) {
2320  if (!isWriteFinished(timeout_ms, mname, xsink)) {
2321  if (*xsink)
2322  return -1;
2323  se_timeout("Socket", mname, timeout_ms, xsink);
2324  rc = QSE_TIMEOUT;
2325  break;
2326  }
2327  continue;
2328  }
2329  if (errno != EINTR) {
2330  //printd(5, "qore_socket_private::send() bs: %ld rc: " QSD " len: " QSD " (total: " QSD ") errno: %d sock: %d\n", bs, rc, size - bs, size, errno, sock);
2331  // FIXME: remove check
2332  if (xsink)
2333  xsink->raiseErrnoException("SOCKET-SEND-ERROR", errno, "error while executing %s::%s()", cname, mname);
2334 
2335  // do not close the socket even if we have EPIPE or ECONNRESET in case there is data to be read when streaming
2336 #ifdef EPIPE
2337  if (!stream && errno == EPIPE)
2338  close();
2339 #endif
2340 #ifdef ECONNRESET
2341  if (!stream && errno == ECONNRESET)
2342  close();
2343 #endif
2344  break;
2345  }
2346  }
2347  }
2348 
2349  total += rc;
2350 
2351  //printd(5, "qore_socket_private::send() bs: %ld rc: " QSD " len: " QSD " (total: " QSD ") errno: %d\n", bs, rc, size - bs, size, errno);
2352  if (rc < 0 || sock == QORE_INVALID_SOCKET)
2353  break;
2354 
2355  bs += rc;
2356 
2357  do_send_event(rc, bs, size);
2358 
2359  if (bs >= size)
2360  break;
2361  }
2362 
2363  return rc;
2364  }
2365 
2366  DLLLOCAL int send(ExceptionSink* xsink, const char* mname, const char* buf, qore_size_t size, int timeout_ms = -1) {
2367  return send(xsink, "Socket", mname, buf, size, timeout_ms);
2368  }
2369 
2370  DLLLOCAL int send(int fd, qore_offset_t size, int timeout_ms, ExceptionSink* xsink);
2371 
2372  DLLLOCAL int send(ExceptionSink* xsink, const char* cname, const char* mname, const char* buf, qore_size_t size, int timeout_ms = -1) {
2373  assert(xsink);
2374  if (sock == QORE_INVALID_SOCKET) {
2375  se_not_open(cname, mname, xsink);
2376 
2377  return QSE_NOT_OPEN;
2378  }
2379  if (in_op >= 0) {
2380  if (in_op == gettid()) {
2381  se_in_op(cname, mname, xsink);
2382  return 0;
2383  }
2384  se_in_op_thread(cname, mname, xsink);
2385  return 0;
2386  }
2387 
2388  PrivateQoreSocketThroughputHelper th(this, true);
2389 
2390  // set the non-blocking flag (for use with non-ssl connections)
2391  bool nb = (timeout_ms >= 0);
2392  // set non-blocking I/O (and restore on exit) if we have a timeout and a non-ssl connection
2393  OptionalNonBlockingHelper onbh(*this, !ssl && nb, xsink);
2394  if (*xsink)
2395  return -1;
2396 
2397  int64 total = 0;
2398  qore_offset_t rc = sendIntern(xsink, cname, mname, buf, size, timeout_ms, total);
2399  th.finalize(total);
2400 
2401  return rc < 0 || sock == QORE_INVALID_SOCKET ? rc : 0;
2402  }
2403 
2404  DLLLOCAL void sendFromInputStream(InputStream *is, int64 size, int64 timeout, ExceptionSink *xsink, QoreThreadLock* l) {
2405  if (sock == QORE_INVALID_SOCKET) {
2406  se_not_open("Socket", "sendFromInputStream", xsink);
2407  return;
2408  }
2409  if (in_op >= 0) {
2410  if (in_op == gettid()) {
2411  se_in_op("Socket", "sendFromInputStream", xsink);
2412  return;
2413  }
2414  se_in_op_thread("Socket", "sendFromInputStream", xsink);
2415  return;
2416  }
2417 
2418  qore_socket_op_helper oh(this);
2419 
2420  PrivateQoreSocketThroughputHelper th(this, true);
2421 
2422  // set the non-blocking flag (for use with non-ssl connections)
2423  bool nb = (timeout >= 0);
2424  // set non-blocking I/O (and restore on exit) if we have a timeout and a non-ssl connection
2425  OptionalNonBlockingHelper onbh(*this, !ssl && nb, xsink);
2426  if (*xsink)
2427  return;
2428 
2429  char buf[DEFAULT_SOCKET_BUFSIZE];
2430  int64 sent = 0;
2431  int64 total = 0;
2432  while (size < 0 || sent < size) {
2433  int64 toRead = size < 0 ? DEFAULT_SOCKET_BUFSIZE : QORE_MIN(size - sent, DEFAULT_SOCKET_BUFSIZE);
2434  int64 r;
2435  {
2436  AutoUnlocker al(l);
2437  r = is->read(buf, toRead, xsink);
2438  if (*xsink) {
2439  return;
2440  }
2441  }
2442  if (r == 0) {
2443  //eof
2444  if (size >= 0) {
2445  //not all size bytes were sent
2446  xsink->raiseException("SOCKET-SEND-ERROR", "Unexpected end of stream");
2447  return;
2448  }
2449  break;
2450  }
2451 
2452  qore_offset_t rc = sendIntern(xsink, "Socket", "sendFromInputStream", buf, r, timeout, total);
2453  if (rc < 0) {
2454  return;
2455  }
2456  sent += r;
2457  }
2458  th.finalize(total);
2459  }
2460 
2461  DLLLOCAL void sendHttpChunkedBodyFromInputStream(InputStream* is, size_t max_chunk_size, int timeout, ExceptionSink* xsink, QoreThreadLock* l, const ResolvedCallReferenceNode* trailer_callback) {
2462  if (sock == QORE_INVALID_SOCKET) {
2463  se_not_open("Socket", "sendHttpChunkedBodyFromInputStream", xsink);
2464  return;
2465  }
2466  if (in_op >= 0) {
2467  if (in_op == gettid()) {
2468  se_in_op("Socket", "sendHttpChunkedBodyFromInputStream", xsink);
2469  return;
2470  }
2471  se_in_op_thread("Socket", "sendHttpChunkedBodyFromInputStream", xsink);
2472  return;
2473  }
2474 
2475  qore_socket_op_helper oh(this);
2476 
2477  PrivateQoreSocketThroughputHelper th(this, true);
2478 
2479  // set the non-blocking flag (for use with non-ssl connections)
2480  bool nb = (timeout >= 0);
2481  // set non-blocking I/O (and restore on exit) if we have a timeout and a non-ssl connection
2482  OptionalNonBlockingHelper onbh(*this, !ssl && nb, xsink);
2483  if (*xsink)
2484  return;
2485 
2487  // reserve enough space for the maximum size of the buffer + HTTP overhead
2488  buf->preallocate(max_chunk_size);
2489  int64 total = 0;
2490  while (true) {
2491  int64 r;
2492  {
2493  AutoUnlocker al(l);
2494  r = is->read((void*)buf->getPtr(), sizeof(max_chunk_size), xsink);
2495  if (*xsink)
2496  return;
2497  }
2498 
2499  // send HTTP chunk prelude with chunk size
2500  QoreString str;
2501  str.sprintf("%x\r\n", (int)r);
2502  int rc = sendIntern(xsink, "Socket", "sendHttpChunkedBodyFromInputStream", str.c_str(), str.size(), timeout, total, true);
2503  if (rc < 0)
2504  return;
2505 
2506  bool trailers = false;
2507 
2508  // send chunk data, if any
2509  if (r) {
2510  rc = sendIntern(xsink, "Socket", "sendHttpChunkedBodyFromInputStream", (const char*)buf->getPtr(), r, timeout, total, true);
2511  if (rc < 0)
2512  return;
2513  } else if (trailer_callback) {
2514  // get and send chunk trailers, if any
2516 
2517  if (runTrailerCallback(xsink, "Socket", "sendHttpChunkedBodyFromInputStream", *trailer_callback, l, h))
2518  return;
2519  if (h) {
2520  str.clear();
2521  do_headers(str, *h, 0);
2522 
2523  rc = sendIntern(xsink, "Socket", "sendHttpChunkedBodyFromInputStream", str.c_str(), str.size(), timeout, total, true);
2524  if (rc < 0)
2525  return;
2526 
2527  trailers = true;
2528  }
2529  }
2530 
2531  // close chunk if we sent no trailers
2532  if (!trailers) {
2533  str.set("\r\n");
2534  rc = sendIntern(xsink, "Socket", "sendHttpChunkedBodyFromInputStream", str.c_str(), str.size(), timeout, total, true);
2535  if (rc < 0)
2536  return;
2537  }
2538 
2539  if (!r) {
2540  // end of stream
2541  break;
2542  }
2543  }
2544  th.finalize(total);
2545  }
2546 
2547  DLLLOCAL void sendHttpChunkedBodyTrailer(const QoreHashNode* headers, int timeout, ExceptionSink* xsink) {
2548  if (sock == QORE_INVALID_SOCKET) {
2549  se_not_open("Socket", "sendHttpChunkedBodyTrailer", xsink);
2550  return;
2551  }
2552  if (in_op >= 0) {
2553  if (in_op == gettid()) {
2554  se_in_op("Socket", "sendHttpChunkedBodyTrailer", xsink);
2555  return;
2556  }
2557  se_in_op_thread("Socket", "sendHttpChunkedBodyTrailer", xsink);
2558  return;
2559  }
2560 
2561  QoreString buf;
2562  if (!headers) {
2563  ConstHashIterator hi(headers);
2564 
2565  while (hi.next()) {
2566  const QoreValue v = hi.get();
2567  const char* key = hi.getKey();
2568 
2569  if (v.getType() == NT_LIST) {
2570  ConstListIterator li(v.get<const QoreListNode>());
2571  while (li.next())
2572  do_header(key, buf, li.getValue());
2573  }
2574  else
2575  do_header(key, buf, v);
2576  }
2577  }
2578  buf.concat("\r\n");
2579  int64 total;
2580  sendIntern(xsink, "Socket", "sendHttpChunkedBodyTrailer", buf.getBuffer(), buf.size(), timeout, total, true);
2581  }
2582 
2583  DLLLOCAL int sendHttpMessage(ExceptionSink* xsink, QoreHashNode* info, const char* cname, const char* mname, const char* method, const char* path, const char* http_version, const QoreHashNode* headers, const void *data, qore_size_t size, const ResolvedCallReferenceNode* send_callback, InputStream* is, size_t max_chunk_size, const ResolvedCallReferenceNode* trailer_callback, int source, int timeout_ms = -1, QoreThreadLock* l = 0, bool* aborted = 0) {
2584  assert(xsink);
2585  assert(!(data && send_callback));
2586  assert(!(data && is));
2587  assert(!(send_callback && is));
2588 
2589  // prepare header string
2590  QoreString hdr(enc);
2591 
2592  hdr.sprintf("%s %s HTTP/%s", method, path && path[0] ? path : "/", http_version);
2593 
2594  // write request-uri key if info hash is non-null
2595  if (info)
2596  info->setKeyValue("request-uri", new QoreStringNode(hdr), 0);
2597 
2598  do_send_http_message(hdr, headers, source);
2599  hdr.concat("\r\n");
2600 
2601  // insert headers
2602  do_headers(hdr, headers, size && data ? size : 0);
2603 
2604  //printd(5, "qore_socket_private::sendHttpMessage() hdr: %s\n", hdr.getBuffer());
2605 
2606  int rc;
2607  if ((rc = send(xsink, cname, mname, hdr.getBuffer(), hdr.strlen(), timeout_ms)))
2608  return rc;
2609 
2610  if (size && data)
2611  return send(xsink, cname, mname, (char*)data, size, timeout_ms);
2612  else if (send_callback) {
2613  assert(l);
2614  assert(!aborted || !(*aborted));
2615  return sendHttpChunkedWithCallback(xsink, cname, mname, *send_callback, *l, source, timeout_ms, aborted);
2616  } else if (is) {
2617  assert(l);
2618  assert(!aborted || !(*aborted));
2619 
2620  sendHttpChunkedBodyFromInputStream(is, max_chunk_size, timeout_ms, xsink, l, trailer_callback);
2621  return *xsink ? -1 : 0;
2622  }
2623 
2624  return 0;
2625  }
2626 
2627  DLLLOCAL int sendHttpResponse(ExceptionSink* xsink, const char* cname, const char* mname, int code, const char* desc, const char* http_version, const QoreHashNode* headers, const void *data, qore_size_t size, const ResolvedCallReferenceNode* send_callback, int source, int timeout_ms = -1, QoreThreadLock* l = 0, bool* aborted = 0) {
2628  assert(!(data && send_callback));
2629  // prepare header string
2630  QoreString hdr(enc);
2631 
2632  hdr.sprintf("HTTP/%s %03d %s", http_version, code, desc);
2633 
2634  do_send_http_message(hdr, headers, source);
2635 
2636  hdr.concat("\r\n");
2637 
2638  do_headers(hdr, headers, size && data ? size : 0, true);
2639 
2640  //printd(5, "QoreSocket::sendHTTPResponse() this: %p data: %p size: %ld send_callback: %p hdr: %s", this, data, size, send_callback, hdr.getBuffer());
2641 
2642  int rc;
2643  if ((rc = send(xsink, cname, mname, hdr.getBuffer(), hdr.strlen(), timeout_ms)))
2644  return rc;
2645 
2646  if (size && data)
2647  return send(xsink, cname, mname, (char*)data, size, timeout_ms);
2648  else if (send_callback) {
2649  assert(l);
2650  assert(!aborted || !(*aborted));
2651  return sendHttpChunkedWithCallback(xsink, cname, mname, *send_callback, *l, source, timeout_ms, aborted);
2652  }
2653 
2654  return 0;
2655  }
2656 
2657  DLLLOCAL QoreHashNode* readHttpChunkedBodyBinary(int timeout, ExceptionSink* xsink, const char* cname, int source, const ResolvedCallReferenceNode* recv_callback = nullptr, QoreThreadLock* l = nullptr, QoreObject* obj = nullptr, OutputStream* os = nullptr) {
2658  assert(xsink);
2659 
2660  if (sock == QORE_INVALID_SOCKET) {
2661  se_not_open(cname, "readHTTPChunkedBodyBinary", xsink);
2662  return 0;
2663  }
2664  if (in_op >= 0) {
2665  if (in_op == gettid()) {
2666  se_in_op(cname, "readHTTPChunkedBodyBinary", xsink);
2667  return 0;
2668  }
2669  se_in_op_thread(cname, "readHTTPChunkedBodyBinary", xsink);
2670  return 0;
2671  }
2672 
2673  // reset "expecting HTTP chunked body" flag
2674  if (http_exp_chunked_body)
2675  http_exp_chunked_body = false;
2676 
2677  qore_socket_op_helper oh(this);
2678 
2679  SimpleRefHolder<BinaryNode> b(os ? nullptr : new BinaryNode);
2680  QoreString str; // for reading the size of each chunk
2681 
2682  qore_offset_t rc;
2683  // read the size then read the data and append to buffer
2684  while (true) {
2685  // state = 0, nothing
2686  // state = 1, \r received
2687  int state = 0;
2688  while (true) {
2689  char* buf;
2690  rc = brecv(xsink, "readHTTPChunkedBodyBinary", buf, 1, 0, timeout, false);
2691  if (rc <= 0) {
2692  if (!*xsink) {
2693  assert(!rc);
2694  se_closed(cname, "readHTTPChunkedBodyBinary", xsink);
2695  }
2696  return 0;
2697  }
2698 
2699  char c = buf[0];
2700 
2701  if (!state && c == '\r')
2702  state = 1;
2703  else if (state && c == '\n')
2704  break;
2705  else {
2706  if (state) {
2707  state = 0;
2708  str.concat('\r');
2709  }
2710  str.concat(c);
2711  }
2712  }
2713  // DEBUG
2714  //printd(5, "QoreSocket::readHTTPChunkedBodyBinary(): got chunk size (" QSD " bytes) string: %s\n", str.strlen(), str.getBuffer());
2715 
2716  // terminate string at ';' char if present
2717  char* p = (char*)strchr(str.getBuffer(), ';');
2718  if (p)
2719  *p = '\0';
2720  long size = strtol(str.getBuffer(), 0, 16);
2721  do_chunked_read(QORE_EVENT_HTTP_CHUNK_SIZE, size, str.strlen(), source);
2722 
2723  if (!size)
2724  break;
2725 
2726  if (size < 0) {
2727  xsink->raiseException("READ-HTTP-CHUNK-ERROR", "negative value given for chunk size (%ld)", size);
2728  return 0;
2729  }
2730 
2731  // prepare string for chunk
2732  //str.allocate(size + 1);
2733 
2734  qore_offset_t bs = size < DEFAULT_SOCKET_BUFSIZE ? size : DEFAULT_SOCKET_BUFSIZE;
2735  qore_offset_t br = 0; // bytes received
2736  while (true) {
2737  char* buf;
2738  rc = brecv(xsink, "readHTTPChunkedBodyBinary", buf, bs, 0, timeout, false);
2739  //printd(5, "qore_socket_private::readHTTPChunkedBodyBinary() str: '%s' bs: %lld rc: %lld b: %p (%lld) recv_callback: %p\n", str.c_str(), bs, rc, *b, b->size(), recv_callback);
2740  if (rc <= 0) {
2741  if (!*xsink) {
2742  assert(!rc);
2743  se_closed(cname, "readHTTPChunkedBodyBinary", xsink);
2744  }
2745  return 0;
2746  }
2747 
2748  if (os) {
2749  AutoUnlocker al(l);
2750  os->write(buf, rc, xsink);
2751  if (*xsink)
2752  return nullptr;
2753  } else {
2754  b->append(buf, rc);
2755  }
2756  br += rc;
2757 
2758  if (br >= size)
2759  break;
2760  if (size - br < bs)
2761  bs = size - br;
2762  }
2763 
2764  // DEBUG
2765  //printd(5, "QoreSocket::readHTTPChunkedBodyBinary(): received binary chunk: size: %d br=" QSD " total=" QSD "\n", size, br, b->size());
2766 
2767  // read crlf after chunk
2768  // FIXME: bytes read are not checked if they equal CRLF
2769  br = 0;
2770  while (br < 2) {
2771  char* buf;
2772  rc = brecv(xsink, "readHTTPChunkedBodyBinary", buf, 2 - br, 0, timeout, false);
2773  if (rc <= 0) {
2774  if (!*xsink) {
2775  assert(!rc);
2776  se_closed(cname, "readHTTPChunkedBodyBinary", xsink);
2777  }
2778  return 0;
2779  }
2780  br += rc;
2781  }
2782 
2783  do_chunked_read(QORE_EVENT_HTTP_CHUNKED_DATA_RECEIVED, size, size + 2, source);
2784 
2785  if (recv_callback && !os) {
2786  if (runDataCallback(xsink, cname, "readHTTPChunkedBodyBinary", *recv_callback, l, *b, true))
2787  return 0;
2788  if (b)
2789  b->clear();
2790  }
2791 
2792  // ensure string is blanked for next read
2793  str.clear();
2794  }
2795 
2796  // read footers or nothing
2797  QoreStringNodeHolder hdr(readHTTPData(xsink, "readHTTPChunkedBodyBinary", timeout, rc, true));
2798  if (*xsink)
2799  return 0;
2800 
2801  ReferenceHolder<QoreHashNode> h(new QoreHashNode(autoTypeInfo), xsink);
2802  if (!recv_callback && !os)
2803  h->setKeyValue("body", b.release(), xsink);
2804 
2805  ReferenceHolder<QoreHashNode> info(xsink);
2806 
2807  if (hdr) {
2808  if (hdr->strlen() >= 2 && hdr->strlen() <= 4)
2809  return recv_callback ? 0 : h.release();
2810 
2811  if (recv_callback) {
2812  info = new QoreHashNode(autoTypeInfo);
2813  }
2814  convertHeaderToHash(*h, (char*)hdr->c_str(), 0, *info, nullptr, "response-headers-raw");
2815  do_read_http_header(QORE_EVENT_HTTP_FOOTERS_RECEIVED, *h, source);
2816  }
2817 
2818  if (recv_callback) {
2819  runHeaderCallback(xsink, cname, "readHTTPChunkedBodyBinary", *recv_callback, l, h->empty() ? nullptr : *h,
2820  info.release(), false, obj);
2821  return 0;
2822  }
2823 
2824  return h.release();
2825  }
2826 
2827  // receive a message in HTTP chunked format
2828  DLLLOCAL QoreHashNode* readHttpChunkedBody(int timeout, ExceptionSink* xsink, const char* cname, int source, const ResolvedCallReferenceNode* recv_callback = 0, QoreThreadLock* l = 0, QoreObject* obj = 0) {
2829  assert(xsink);
2830 
2831  if (sock == QORE_INVALID_SOCKET) {
2832  se_not_open(cname, "readHTTPChunkedBody", xsink);
2833  return 0;
2834  }
2835  if (in_op >= 0) {
2836  if (in_op == gettid()) {
2837  se_in_op(cname, "readHTTPChunkedBody", xsink);
2838  return 0;
2839  }
2840  se_in_op_thread(cname, "readHTTPChunkedBody", xsink);
2841  return 0;
2842  }
2843 
2844  // reset "expecting HTTP chunked body" flag
2845  if (http_exp_chunked_body)
2846  http_exp_chunked_body = false;
2847 
2848  qore_socket_op_helper oh(this);
2849 
2850  QoreStringNodeHolder buf(new QoreStringNode(enc));
2851  QoreString str; // for reading the size of each chunk
2852 
2853  qore_offset_t rc;
2854  // read the size then read the data and append to buf
2855  while (true) {
2856  // state = 0, nothing
2857  // state = 1, \r received
2858  int state = 0;
2859  while (true) {
2860  char* tbuf;
2861  rc = brecv(xsink, "readHTTPChunkedBody", tbuf, 1, 0, timeout, false);
2862  if (rc <= 0) {
2863  if (!*xsink) {
2864  assert(!rc);
2865  se_closed(cname, "readHTTPChunkedBody", xsink);
2866  }
2867  return 0;
2868  }
2869 
2870  char c = tbuf[0];
2871 
2872  if (!state && c == '\r')
2873  state = 1;
2874  else if (state && c == '\n')
2875  break;
2876  else {
2877  if (state) {
2878  state = 0;
2879  str.concat('\r');
2880  }
2881  str.concat(c);
2882  }
2883  }
2884  // DEBUG
2885  //printd(5, "got chunk size (" QSD " bytes) string: %s\n", str.strlen(), str.getBuffer());
2886 
2887  // terminate string at ';' char if present
2888  char* p = (char*)strchr(str.getBuffer(), ';');
2889  if (p)
2890  *p = '\0';
2891  qore_offset_t size = strtol(str.getBuffer(), 0, 16);
2892  do_chunked_read(QORE_EVENT_HTTP_CHUNK_SIZE, size, str.strlen(), source);
2893 
2894  if (!size)
2895  break;
2896 
2897  if (size < 0) {
2898  xsink->raiseException("READ-HTTP-CHUNK-ERROR", "negative value given for chunk size (%ld)", size);
2899  return 0;
2900  }
2901  // ensure string is blanked for next read
2902  str.clear();
2903 
2904  // prepare string for chunk
2905  //buf->allocate((unsigned)(buf->strlen() + size + 1));
2906 
2907  // read chunk directly into string buffer
2908  qore_offset_t bs = size < DEFAULT_SOCKET_BUFSIZE ? size : DEFAULT_SOCKET_BUFSIZE;
2909  qore_offset_t br = 0; // bytes received
2910  str.clear();
2911  while (true) {
2912  char* tbuf;
2913  rc = brecv(xsink, "readHTTPChunkedBody", tbuf, bs, 0, timeout, false);
2914  if (rc <= 0) {
2915  if (!*xsink) {
2916  assert(!rc);
2917  se_closed(cname, "readHTTPChunkedBody", xsink);
2918  }
2919  return 0;
2920  }
2921  br += rc;
2922  buf->concat(tbuf, rc);
2923 
2924  if (br >= size)
2925  break;
2926  if (size - br < bs)
2927  bs = size - br;
2928  }
2929 
2930  // DEBUG
2931  //printd(5, "got chunk (" QSD " bytes): %s\n", br, buf->getBuffer() + buf->strlen() - size);
2932 
2933  // read crlf after chunk
2934  // FIXME: bytes read are not checked if they equal CRLF
2935  br = 0;
2936  while (br < 2) {
2937  char* tbuf;
2938  rc = brecv(xsink, "readHTTPChunkedBody", tbuf, 2 - br, 0, timeout, false);
2939  if (rc <= 0) {
2940  if (!*xsink) {
2941  assert(!rc);
2942  se_closed(cname, "readHTTPChunkedBody", xsink);
2943  }
2944  return 0;
2945  }
2946  br += rc;
2947  }
2948 
2949  do_chunked_read(QORE_EVENT_HTTP_CHUNKED_DATA_RECEIVED, size, size + 2, source);
2950 
2951  if (recv_callback) {
2952  if (runDataCallback(xsink, cname, "readHTTPChunkedBody", *recv_callback, l, *buf, true))
2953  return 0;
2954  buf->clear();
2955  }
2956  }
2957 
2958  // read footers or nothing
2959  QoreStringNodeHolder hdr(readHTTPData(xsink, "readHTTPChunkedBody", timeout, rc, true));
2960  if (*xsink)
2961  return 0;
2962 
2963  //printd(5, "chunked body encoding: %s\n", buf->getEncoding()->getCode());
2964  ReferenceHolder<QoreHashNode> h(new QoreHashNode(autoTypeInfo), xsink);
2965  if (!recv_callback)
2966  h->setKeyValue("body", buf.release(), xsink);
2967 
2968  ReferenceHolder<QoreHashNode> info(xsink);
2969 
2970  if (hdr) {
2971  if (hdr->strlen() >= 2 && hdr->strlen() <= 4)
2972  return recv_callback ? 0 : h.release();
2973 
2974  if (recv_callback) {
2975  info = new QoreHashNode(autoTypeInfo);
2976  }
2977  convertHeaderToHash(*h, (char*)hdr->c_str(), 0, *info, nullptr, "response-headers-raw");
2978  do_read_http_header(QORE_EVENT_HTTP_FOOTERS_RECEIVED, *h, source);
2979  }
2980 
2981  if (recv_callback) {
2982  runHeaderCallback(xsink, cname, "readHTTPChunkedBody", *recv_callback, l, h->empty() ? nullptr : *h,
2983  info.release(), false, obj);
2984  return 0;
2985  }
2986 
2987  return h.release();
2988  }
2989 
2990  DLLLOCAL static void do_accept_encoding(char* t, QoreHashNode& info) {
2991  ReferenceHolder<QoreListNode> l(new QoreListNode(autoTypeInfo), 0);
2992 
2993  char* a = t;
2994  bool ok = true;
2995  while (*a) {
2996  if (ok) {
2997  ok = false;
2999  while (*a && *a != ';' && *a != ',')
3000  str->concat(*(a++));
3001  str->trim();
3002  if (!str->empty())
3003  l->push(str.release(), nullptr);
3004  continue;
3005  }
3006  else if (*a == ',')
3007  ok = true;
3008 
3009  ++a;
3010  }
3011 
3012  if (!l->empty())
3013  info.setKeyValue("accept-encoding", l.release(), 0);
3014  }
3015 
3016  DLLLOCAL bool do_accept_charset(char* t, QoreHashNode& info) {
3017  bool acceptcharset = false;
3018 
3019  // see if we have "*" or utf8 or utf-8, in which case set it
3020  // otherwise set the first charset in the list
3021  char* a = t;
3022  char* div = 0;
3023  bool utf8 = false;
3024  bool ok = true;
3025  while (*a) {
3026  if (ok) {
3027  if (*a == '*') {
3028  utf8 = true;
3029  break;
3030  }
3031  ok = false;
3032  if (*a == 'u' || *a == 'U') {
3033  ++a;
3034  if (*a == 't' || *a == 'T') {
3035  ++a;
3036  if (*a == 'f' || *a == 'F') {
3037  ++a;
3038  if (*a == '-')
3039  ++a;
3040  if (*a == '8') {
3041  utf8 = true;
3042  break;
3043  }
3044  }
3045  }
3046  continue;
3047  }
3048  } else if (*a == ',') {
3049  if (!div)
3050  div = a;
3051  ok = true;
3052  } else if (*a == ';') {
3053  if (!div)
3054  div = a;
3055  }
3056 
3057  ++a;
3058  }
3059  if (utf8) {
3060  info.setKeyValue("accept-charset", new QoreStringNode("utf8"), 0);
3061  acceptcharset = true;
3062  } else {
3064  if (div)
3065  ac->concat(t, div - t);
3066  else
3067  ac->concat(t);
3068  ac->trim();
3069  if (!ac->empty()) {
3070  info.setKeyValue("accept-charset", ac.release(), 0);
3071  acceptcharset = true;
3072  }
3073  }
3074 
3075  return acceptcharset;
3076  }
3077 
3078  // returns true if the connection should be closed, false if not
3079  DLLLOCAL bool convertHeaderToHash(QoreHashNode* h, char* p, int flags = 0, QoreHashNode* info = nullptr,
3080  bool* chunked = nullptr, const char* headers_raw_key = "headers-raw") {
3081  bool close = !(flags & CHF_HTTP11);
3082  // socket encoding
3083  const char* senc = 0;
3084  // accept-charset
3085  bool acceptcharset = false;
3086 
3087  QoreHashNode* raw_hdr = nullptr;
3088  if (info) {
3089  info->setKeyValue(headers_raw_key, raw_hdr = new QoreHashNode(autoTypeInfo), nullptr);
3090  }
3091 
3092  // raw key for setting raw headers
3093  std::string raw_key;
3094 
3095  while (*p) {
3096  char* buf = p;
3097 
3098  if ((p = strstr(buf, "\r\n"))) {
3099  *p = '\0';
3100  p += 2;
3101  } else if ((p = strchr(buf, '\n'))) {
3102  *p = '\0';
3103  p++;
3104  } else if ((p = strchr(buf, '\r'))) {
3105  *p = '\0';
3106  p++;
3107  } else
3108  break;
3109  char* t = strchr(buf, ':');
3110  if (!t)
3111  break;
3112  *t = '\0';
3113  t++;
3114  while (t && qore_isblank(*t))
3115  t++;
3116  if (raw_hdr) {
3117  raw_key = buf;
3118  }
3119  strtolower(buf);
3120  //printd(5, "setting %s = '%s'\n", buf, t);
3121 
3122  ReferenceHolder<> val(new QoreStringNode(t), nullptr);
3123 
3124  if (flags & CHF_PROCESS) {
3125  if (!strcmp(buf, "connection")) {
3126  if (flags & CHF_HTTP11) {
3127  if (strcasestr(t, "close"))
3128  close = true;
3129  } else {
3130  if (strcasestr(t, "keep-alive"))
3131  close = false;
3132  }
3133  } else if (!strcmp(buf, "content-type")) {
3134  char* a = strcasestr(t, "charset=");
3135  if (a) {
3136  // find end
3137  char* e = strchr(a + 8, ';');
3138 
3139  QoreString cs;
3140  if (e)
3141  cs.concat(a + 8, e - a - 8);
3142  else
3143  cs.concat(a + 8);
3144  cs.trim();
3145  senc = cs.getBuffer();
3146  //printd(5, "got encoding '%s' from request\n", senc);
3147  enc = QEM.findCreate(senc);
3148 
3149  if (info) {
3150  qore_size_t len = cs.size();
3151  info->setKeyValue("charset", new QoreStringNode(cs.giveBuffer(), len, len + 1, QCS_DEFAULT), nullptr);
3152  }
3153 
3154  if (info) {
3156  // remove any whitespace and ';' before charset=
3157  if (a != t) {
3158  do {
3159  --a;
3160  } while (a > t && (*a == ' ' || *a == ';'));
3161  }
3162 
3163  if (a == t) {
3164  if (e)
3165  ct->concat(e + 1);
3166  } else {
3167  ct->concat(t, a - t + 1);
3168  if (e)
3169  ct->concat(e);
3170  }
3171  ct->trim();
3172  if (!ct->empty())
3173  info->setKeyValue("body-content-type", ct.release(), nullptr);
3174  }
3175  } else if (info) {
3176  info->setKeyValue("charset", new QoreStringNode("iso-8859-1"), nullptr);
3177  info->setKeyValue("body-content-type", val->refSelf(), nullptr);
3178  }
3179  } else if (chunked && !strcmp(buf, "transfer-encoding") && !strcasecmp(t, "chunked")) {
3180  *chunked = true;
3181  } else if (info) {
3182  if (!strcmp(buf, "accept-charset"))
3183  acceptcharset = do_accept_charset(t, *info);
3184  else if ((flags & CHF_REQUEST) && !strcmp(buf, "accept-encoding"))
3185  do_accept_encoding(t, *info);
3186  }
3187  }
3188 
3189  ReferenceHolder<> val_copy(nullptr);
3190  if (raw_hdr && val) {
3191  val_copy = val->realCopy();
3192  }
3193 
3194  // see if header exists, and if so make it a list and add value to the list
3195  hash_assignment_priv ha(*h, buf);
3196  if (!(*ha).isNothing()) {
3197  QoreListNode* l;
3198  if ((*ha).getType() == NT_LIST) {
3199  l = (*ha).get<QoreListNode>();
3200  } else {
3201  l = new QoreListNode(autoTypeInfo);
3202  l->push(ha.swap(l), nullptr);
3203  }
3204  l->push(val.release(), nullptr);
3205  } else // otherwise set header normally
3206  ha.assign(val.release(), 0);
3207 
3208  // set raw headers if applicable
3209  if (raw_hdr) {
3210  hash_assignment_priv ha(*raw_hdr, raw_key);
3211  if (!(*ha).isNothing()) {
3212  QoreListNode* l;
3213  if ((*ha).getType() == NT_LIST) {
3214  l = (*ha).get<QoreListNode>();
3215  } else {
3216  l = new QoreListNode(autoTypeInfo);
3217  l->push(ha.swap(l), nullptr);
3218  }
3219  l->push(val_copy.release(), nullptr);
3220  } else // otherwise set header normally
3221  ha.assign(val_copy.release(), nullptr);
3222  }
3223  }
3224 
3225  if ((flags & CHF_PROCESS)) {
3226  if (!senc)
3227  enc = QEM.findCreate("iso-8859-1");
3228  // according to RFC-2616 section 14.2, "If no Accept-Charset header is present, the default is that any character set is acceptable" so we will use utf-8
3229  if (info && !acceptcharset)
3230  info->setKeyValue("accept-charset", new QoreStringNode("utf8"), nullptr);
3231  }
3232 
3233  return close;
3234  }
3235 
3236  DLLLOCAL int recvix(const char* meth, int len, void* targ, int timeout_ms, ExceptionSink* xsink) {
3237  assert(xsink);
3238  if (sock == QORE_INVALID_SOCKET) {
3239  se_not_open("Socket", meth, xsink);
3240  return QSE_NOT_OPEN;
3241  }
3242  if (in_op >= 0) {
3243  if (in_op == gettid()) {
3244  se_in_op("Socket", meth, xsink);
3245  return 0;
3246  }
3247  se_in_op_thread("Socket", meth, xsink);
3248  return 0;
3249  }
3250 
3251  PrivateQoreSocketThroughputHelper th(this, false);
3252 
3253  char* buf;
3254  qore_offset_t br = 0;
3255  while (true) {
3256  qore_offset_t rc = brecv(xsink, meth, buf, len - br, 0, timeout_ms);
3257  if (rc <= 0) {
3258  do_read_error(rc, meth, timeout_ms, xsink);
3259  return (int)rc;
3260  }
3261 
3262  memcpy(targ, buf, rc);
3263 
3264  br += rc;
3265  if (br >= len)
3266  break;
3267  }
3268 
3269  th.finalize(br);
3270 
3271  return (int)br;
3272  }
3273 
3274  DLLLOCAL void clearWarningQueue(ExceptionSink* xsink) {
3275  if (warn_queue) {
3276  if (callback_arg) {
3277  callback_arg.discard(xsink);
3278  callback_arg = QoreValue();
3279  }
3280  warn_queue->deref(xsink);
3281  warn_queue = 0;
3282  tl_warning_us = 0;
3283  tp_warning_bs = 0.0;
3284  tp_us_min = 0;
3285  }
3286  }
3287 
3288  DLLLOCAL void setWarningQueue(ExceptionSink* xsink, int64 warning_ms, int64 warning_bs, Queue* wq, QoreValue arg, int64 min_ms = 1000) {
3289  ReferenceHolder<Queue> qholder(wq, xsink);
3290  ValueHolder holder(arg, xsink);
3291  if (warning_ms <= 0 && warning_bs <= 0) {
3292  xsink->raiseException("SOCKET-SETWARNINGQUEUE-ERROR", "Socket::setWarningQueue() at least one of warning ms argument: " QLLD " and warning B/s argument: " QLLD " must be greater than zero; to clear, call Socket::clearWarningQueue() with no arguments", warning_ms, warning_bs);
3293  return;
3294  }
3295 
3296  if (warning_ms < 0)
3297  warning_ms = 0;
3298  if (warning_bs < 0)
3299  warning_bs = 0;
3300 
3301  if (warn_queue) {
3302  warn_queue->deref(xsink);
3303  callback_arg.discard(xsink);
3304  }
3305 
3306  warn_queue = qholder.release();
3307  callback_arg = holder.release();
3308  tl_warning_us = (int64)warning_ms * 1000;
3309  tp_warning_bs = warning_bs;
3310  tp_us_min = min_ms * 1000;
3311  }
3312 
3313  DLLLOCAL void getUsageInfo(QoreHashNode& h, qore_socket_private& s) const {
3314  if (warn_queue) {
3315  h.setKeyValue("arg", callback_arg.refSelf(), 0);
3316  h.setKeyValue("timeout", tl_warning_us, 0);
3317  h.setKeyValue("min_throughput", (int64)tp_warning_bs, 0);
3318  h.setKeyValue("min_throughput_us", (int64)tp_us_min, 0);
3319  }
3320 
3321  h.setKeyValue("bytes_sent", tp_bytes_sent + s.tp_bytes_sent, 0);
3322  h.setKeyValue("bytes_recv", tp_bytes_recv + s.tp_bytes_sent, 0);
3323  h.setKeyValue("us_sent", tp_us_sent + s.tp_us_sent, 0);
3324  h.setKeyValue("us_recv", tp_us_recv + s.tp_us_recv, 0);
3325  }
3326 
3327  DLLLOCAL void getUsageInfo(QoreHashNode& h) const {
3328  if (warn_queue) {
3329  h.setKeyValue("arg", callback_arg.refSelf(), 0);
3330  h.setKeyValue("timeout", tl_warning_us, 0);
3331  h.setKeyValue("min_throughput", (int64)tp_warning_bs, 0);
3332  h.setKeyValue("min_throughput_us", (int64)tp_us_min, 0);
3333  }
3334 
3335  h.setKeyValue("bytes_sent", tp_bytes_sent, 0);
3336  h.setKeyValue("bytes_recv", tp_bytes_recv, 0);
3337  h.setKeyValue("us_sent", tp_us_sent, 0);
3338  h.setKeyValue("us_recv", tp_us_recv, 0);
3339  }
3340 
3341  DLLLOCAL QoreHashNode* getUsageInfo() const {
3342  QoreHashNode* h = new QoreHashNode(autoTypeInfo);
3343  getUsageInfo(*h);
3344  return h;
3345  }
3346 
3347  DLLLOCAL void clearStats() {
3348  tp_bytes_sent = 0;
3349  tp_bytes_recv = 0;
3350  tp_us_sent = 0;
3351  tp_us_recv = 0;
3352  }
3353 
3354  DLLLOCAL void doTimeoutWarning(const char* op, int64 dt) {
3355  assert(warn_queue);
3356  assert(dt > tl_warning_us);
3357 
3358  QoreHashNode* h = new QoreHashNode(autoTypeInfo);
3359 
3360  h->setKeyValue("type", new QoreStringNode("SOCKET-OPERATION-WARNING"), 0);
3361  h->setKeyValue("operation", new QoreStringNode(op), 0);
3362  h->setKeyValue("us", dt, 0);
3363  h->setKeyValue("timeout", tl_warning_us, 0);
3364  if (callback_arg)
3365  h->setKeyValue("arg", callback_arg.refSelf(), 0);
3366 
3367  warn_queue->pushAndTakeRef(h);
3368  }
3369 
3370  DLLLOCAL void doThroughputWarning(bool send, int64 bytes, int64 dt, double bs) {
3371  assert(warn_queue);
3372  assert(bs < tp_warning_bs);
3373 
3374  QoreHashNode* h = new QoreHashNode(autoTypeInfo);
3375 
3376  h->setKeyValue("type", new QoreStringNode("SOCKET-THROUGHPUT-WARNING"), 0);
3377  h->setKeyValue("dir", new QoreStringNode(send ? "send" : "recv"), 0);
3378  h->setKeyValue("bytes", bytes, 0);
3379  h->setKeyValue("us", dt, 0);
3380  h->setKeyValue("bytes_sec", bs, 0);
3381  h->setKeyValue("threshold", (int64)tp_warning_bs, 0);
3382  if (callback_arg)
3383  h->setKeyValue("arg", callback_arg.refSelf(), 0);
3384 
3385  warn_queue->pushAndTakeRef(h);
3386  }
3387 
3388  DLLLOCAL bool pendingHttpChunkedBody() const {
3389  return http_exp_chunked_body && sock != QORE_INVALID_SOCKET;
3390  }
3391 
3392  DLLLOCAL void setSslVerifyMode(int mode) {
3393  //printd(5, "qore_socket_private::setSslVerifyMode() this: %p mode: %d\n", this, mode);
3394  ssl_verify_mode = mode;
3395  if (ssl)
3396  ssl->setVerifyMode(ssl_verify_mode, ssl_accept_all_certs);
3397  }
3398 
3399  DLLLOCAL void acceptAllCertificates(bool accept_all = true) {
3400  ssl_accept_all_certs = accept_all;
3401  if (ssl)
3402  ssl->setVerifyMode(ssl_verify_mode, ssl_accept_all_certs);
3403  }
3404 
3405  DLLLOCAL static void getUsageInfo(const QoreSocket& sock, QoreHashNode& h, const QoreSocket& s) {
3406  sock.priv->getUsageInfo(h, *s.priv);
3407  }
3408 
3409  DLLLOCAL static qore_socket_private* get(QoreSocket& sock) {
3410  return sock.priv;
3411  }
3412 
3413  DLLLOCAL static const qore_socket_private* get(const QoreSocket& sock) {
3414  return sock.priv;
3415  }
3416 
3417  DLLLOCAL static void captureRemoteCert(X509_STORE_CTX* x509_ctx);
3418 };
3419 
3420 #endif
const qore_type_t NT_BINARY
type value for BinaryNode
Definition: node_types.h:49
DLLEXPORT const char * c_str() const
returns the string&#39;s buffer; this data should not be changed
Qore&#39;s arbitrary-precision number value type, dynamically-allocated only, reference counted...
Definition: QoreNumberNode.h:51
defines string encoding functions in Qore
Definition: QoreEncoding.h:83
a helper class for getting socket origination information
Definition: QoreSocket.h:73
DLLEXPORT bool empty() const
returns true if the string is empty, false if not
This is the hash or associative list container type in Qore, dynamically allocated only...
Definition: QoreHashNode.h:50
DLLEXPORT const QoreEncoding * QCS_DEFAULT
the default encoding for the Qore library
DLLEXPORT AbstractQoreNode * raiseExceptionArg(const char *err, QoreValue arg, const char *fmt,...)
appends a Qore-language exception to the list, and sets the &#39;arg&#39; member (this object takes over the ...
DLLEXPORT void set(const char *str, const QoreEncoding *new_qorecharset=QCS_DEFAULT)
copies the c-string passed and sets the value of the string and its encoding
DLLEXPORT int gettid() noexcept
returns the current TID number
const qore_type_t NT_LIST
type value for QoreListNode
Definition: node_types.h:50
DLLEXPORT bool empty() const
returns true if empty
DLLEXPORT char * giveBuffer()
returns the character buffer and leaves the QoreString empty, the caller owns the memory returned (mu...
DLLEXPORT int sprintf(const char *fmt,...)
this will concatentate a formatted string to the existing string according to the format string and t...
The base class for all value and parse types in Qore expression trees.
Definition: AbstractQoreNode.h:54
const qore_type_t NT_NOTHING
type value for QoreNothingNode
Definition: node_types.h:42
DLLEXPORT int setKeyValue(const char *key, QoreValue value, ExceptionSink *xsink)
sets the value of "key" to "value"
virtual int64 read(void *ptr, int64 limit, ExceptionSink *xsink)=0
Reads up to `limit` bytes from the input stream.
size_t qore_size_t
used for sizes (same range as a pointer)
Definition: common.h:73
DLLEXPORT int getInfo(ExceptionSink *xsink, const char *node, const char *service, int family=Q_AF_UNSPEC, int flags=0, int socktype=Q_SOCK_STREAM, int protocol=0)
get address info with the given parameters, if any errors occur, a Qore-language exception is thrown ...
DLLEXPORT QoreHashNode * hashRefSelf() const
returns "this" with an incremented reference count
DLLLOCAL T * release()
releases the pointer to the caller
Definition: ReferenceHolder.h:91
DLLLOCAL hashdecl addrinfo * getAddrInfo() const
returns the hashdecl addrinfo * being managed (may by 0)
Definition: QoreNet.h:159
DLLEXPORT AbstractQoreNode * raiseException(const char *err, const char *fmt,...)
appends a Qore-language exception to the list
DLLEXPORT AbstractQoreNode * raiseErrnoException(const char *err, int en, const char *fmt,...)
appends a Qore-language exception to the list and appends the result of strerror(errno) to the descri...
provides an interface to getaddrinfo
Definition: QoreNet.h:132
DLLEXPORT int push(QoreValue val, ExceptionSink *xsink)
adds a value to the list
const qore_type_t NT_NULL
type value for QoreNullNode
Definition: node_types.h:48
Interface for private data of output streams.
Definition: OutputStream.h:44
DLLEXPORT AbstractQoreNode * refSelf() const
returns "this" with an incremented reference count
Qore&#39;s string type supported by the QoreEncoding class.
Definition: QoreString.h:81
Qore&#39;s string value type, reference counted, dynamically-allocated only.
Definition: QoreStringNode.h:50
virtual void write(const void *ptr, int64 count, ExceptionSink *xsink)=0
Writes bytes to the output stream.
DLLEXPORT const char * getBuffer() const
returns the string&#39;s buffer; this data should not be changed
static DLLEXPORT const char * getFamilyName(int address_family)
returns the name of the address family as a string (ie AF_INET = "ipv4", etc)
DLLEXPORT void concat(const QoreString *str, ExceptionSink *xsink)
concatenates a string and converts encodings if necessary
This is the list container type in Qore, dynamically allocated only, reference counted.
Definition: QoreListNode.h:52
DLLEXPORT QoreEncodingManager QEM
the QoreEncodingManager object
DLLEXPORT qore_size_t strlen() const
returns number of bytes in the string (not including the null pointer)
const qore_type_t NT_HASH
type value for QoreHashNode
Definition: node_types.h:51
static DLLEXPORT const QoreEncoding * findCreate(const char *name)
finds an encoding if it exists (also looks up against alias names) and creates a new one if it doesn&#39;...
virtual DLLLOCAL QoreValue execValue(const QoreListNode *args, ExceptionSink *xsink) const =0
pure virtual function for executing the function reference
The main value class in Qore, designed to be passed by value.
Definition: QoreValue.h:262
const qore_type_t NT_STRING
type value for QoreStringNode
Definition: node_types.h:45
provides access to sockets using Qore data structures
Definition: QoreSocket.h:126
virtual DLLEXPORT AbstractQoreNode * realCopy() const =0
returns a copy of the object; the caller owns the reference count
the implementation of Qore&#39;s object data type, reference counted, dynamically-allocated only ...
Definition: QoreObject.h:61
DLLEXPORT bool empty() const
returns true if the hash has no members, false if not
DLLEXPORT void clear()
frees any managed memory and sets the size to 0
container for holding Qore-language exception information and also for registering a "thread_exit" ca...
Definition: ExceptionSink.h:46
Interface for private data of input streams.
Definition: InputStream.h:44
const qore_type_t NT_FLOAT
type value for floating-point values (QoreValue only)
Definition: node_types.h:44
DLLEXPORT QoreStringNode * q_strerror(int errnum)
returns the error string as a QoreStringNode
const qore_type_t NT_BOOLEAN
type value for bools (QoreValue only)
Definition: node_types.h:47
static void strtolower(char *str)
convert a string to lower-case in place
Definition: QoreLib.h:270
const qore_type_t NT_INT
type value for integers (QoreValue only)
Definition: node_types.h:43
DLLEXPORT void deref(ExceptionSink *xsink)
decrements the reference count and calls derefImpl() if there_can_be_only_one is false, otherwise does nothing
long long int64
64bit integer type, cannot use int64_t here since it breaks the API on some 64-bit systems due to equ...
Definition: common.h:260
DLLEXPORT const void * getPtr() const
returns the pointer to the data
intptr_t qore_offset_t
used for offsets that could be negative
Definition: common.h:76
DLLEXPORT void trim(const char *chars=0)
remove leading and trailing whitespace or other characters
base class for resolved call references
Definition: CallReferenceNode.h:105
#define QORE_MIN(a, b)
macro to return the minimum of 2 numbers
Definition: QoreLib.h:538
constant iterator class for QoreHashNode, to be only created on the stack
Definition: QoreHashNode.h:563
provides a mutually-exclusive thread lock
Definition: QoreThreadLock.h:47
static DLLEXPORT QoreStringNode * getAddressDesc(int address_family, const char *addr)
returns a descriptive string for the address family and an address string (ie AF_INET6, "::1" = "ipv6[::1]")
const qore_type_t NT_NUMBER
type value for QoreNumberNode
Definition: node_types.h:53
DLLEXPORT void setValue(const char *key, QoreValue val, ExceptionSink *xsink)
sets the value of the given member to the given value
DLLEXPORT qore_size_t size() const
returns the number of bytes in the object
DLLEXPORT void clear()
reset string to zero length; memory is not deallocated; string encoding does not change ...
DLLEXPORT qore_size_t size() const
returns number of bytes in the string (not including the null pointer)
holds an object and dereferences it in the destructor
Definition: QoreValue.h:452
DLLEXPORT QoreValue refSelf() const
references the contained value if type == QV_Node, returns itself
DLLEXPORT void append(const void *nptr, qore_size_t size)
resizes the object and appends a copy of the data passed to the object
DLLEXPORT void discard(ExceptionSink *xsink)
dereferences any contained AbstractQoreNode pointer and sets to 0; does not modify other values ...
provides a safe and exception-safe way to release and re-acquire locks in Qore, only to be used on th...
Definition: QoreThreadLock.h:187
holds arbitrary binary data
Definition: BinaryNode.h:41
For use on the stack only: iterates through elements of a const QoreListNode.
Definition: QoreListNode.h:566