Qore Programming Language 1.19.5
Loading...
Searching...
No Matches
qore_socket_private.h
1/* -*- mode: c++; indent-tabs-mode: nil -*- */
2/*
3 qore_socket_private.h
4
5 Qore Programming Language
6
7 Copyright (C) 2003 - 2023 Qore Technologies, s.r.o.
8
9 Permission is hereby granted, free of charge, to any person obtaining a
10 copy of this software and associated documentation files (the "Software"),
11 to deal in the Software without restriction, including without limitation
12 the rights to use, copy, modify, merge, publish, distribute, sublicense,
13 and/or sell copies of the Software, and to permit persons to whom the
14 Software is furnished to do so, subject to the following conditions:
15
16 The above copyright notice and this permission notice shall be included in
17 all copies or substantial portions of the Software.
18
19 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
24 FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
25 DEALINGS IN THE SOFTWARE.
26
27 Note that the Qore library is released under a choice of three open-source
28 licenses: MIT (as above), LGPL 2+, or GPL 2+; see README-LICENSE for more
29 information.
30*/
31
32#ifndef _QORE_QORE_SOCKET_PRIVATE_H
33#define _QORE_QORE_SOCKET_PRIVATE_H
34
35#include "qore/AbstractPollState.h"
36#include "qore/QoreSocket.h"
37#include "qore/InputStream.h"
38#include "qore/OutputStream.h"
39
40#include "qore/intern/SSLSocketHelper.h"
41#include "qore/intern/QC_Queue.h"
42
43#include <cctype>
44#include <cctype>
45#include <cerrno>
46#include <cstdlib>
47#include <cstring>
48#include <strings.h>
49
50#include <openssl/ssl.h>
51#include <openssl/err.h>
52
53#if defined HAVE_POLL
54#include <poll.h>
55#elif defined HAVE_SYS_SELECT_H
56#include <sys/select.h>
57#elif (defined _WIN32 || defined __WIN32__) && ! defined __CYGWIN__
58#define HAVE_SELECT 1
59#else
60#error no async socket I/O APIs available
61#endif
62
63#ifndef DEFAULT_SOCKET_BUFSIZE
64#define DEFAULT_SOCKET_BUFSIZE (64 * 1024)
65#endif
66
67#ifndef QORE_MAX_HEADER_SIZE
68#define QORE_MAX_HEADER_SIZE 16384
69#endif
70
71#define CHF_HTTP11 (1 << 0)
72#define CHF_PROCESS (1 << 1)
73#define CHF_REQUEST (1 << 2)
74
75#ifndef DEFAULT_SOCKET_MIN_THRESHOLD_BYTES
76#define DEFAULT_SOCKET_MIN_THRESHOLD_BYTES 1024
77#endif
78
79static constexpr int SOCK_POLLIN = (1 << 0);
80static constexpr int SOCK_POLLOUT = (1 << 1);
81static constexpr int SOCK_POLLERR = (1 << 2);
82
83DLLLOCAL void concat_target(QoreString& str, const struct sockaddr *addr, const char* type = "target");
84DLLLOCAL int do_read_error(ssize_t rc, const char* method_name, int timeout_ms, ExceptionSink* xsink);
85DLLLOCAL int sock_get_raw_error();
86DLLLOCAL int sock_get_error();
87DLLLOCAL void qore_socket_error(ExceptionSink* xsink, const char* err, const char* cdesc, const char* mname = nullptr,
88 const char* host = nullptr, const char* svc = nullptr, const struct sockaddr *addr = nullptr);
89DLLLOCAL void qore_socket_error_intern(int rc, ExceptionSink* xsink, const char* err, const char* cdesc,
90 const char* mname = nullptr, const char* host = nullptr, const char* svc = nullptr,
91 const struct sockaddr* addr = nullptr);
92DLLLOCAL void se_in_op(const char* cname, const char* meth, ExceptionSink* xsink);
93DLLLOCAL void se_in_op_thread(const char* cname, const char* meth, ExceptionSink* xsink);
94DLLLOCAL void se_not_open(const char* cname, const char* meth, ExceptionSink* xsink, const char* extra = nullptr);
95DLLLOCAL void se_timeout(const char* cname, const char* meth, int timeout_ms, ExceptionSink* xsink,
96 const char* extra = nullptr);
97DLLLOCAL void se_closed(const char* cname, const char* mname, ExceptionSink* xsink);
98
99#ifdef _Q_WINDOWS
100#define GETSOCKOPT_ARG_4 char*
101#define SETSOCKOPT_ARG_4 const char*
102#define SHUTDOWN_ARG SD_BOTH
103#define QORE_INVALID_SOCKET ((int)INVALID_SOCKET)
104#define QORE_SOCKET_ERROR SOCKET_ERROR
105DLLLOCAL int check_windows_rc(int rc);
106DLLLOCAL int windows_set_errno();
107
108#ifndef ECONNRESET
109#define ECONNRESET WSAECONNRESET
110#endif
111
112#else
113// UNIX/Cygwin
114#define GETSOCKOPT_ARG_4 void*
115#define SETSOCKOPT_ARG_4 void*
116#define SHUTDOWN_ARG SHUT_RDWR
117#define QORE_INVALID_SOCKET -1
118#define QORE_SOCKET_ERROR -1
119#endif
120
121template <typename T>
122class PrivateDataListHolder {
123public:
124 DLLLOCAL PrivateDataListHolder(ExceptionSink* xsink) : xsink(xsink) {
125 }
126
127 DLLLOCAL ~PrivateDataListHolder() {
128 for (auto& i : pd_vec)
129 i->deref(xsink);
130 }
131
132 DLLLOCAL T* add(const QoreObject* o, qore_classid_t cid) {
133 T* pd = static_cast<T*>(o->getReferencedPrivateData(cid, xsink));
134 if (!pd)
135 return nullptr;
136 pd_vec.push_back(pd);
137 return pd;
138 }
139
140private:
141 typedef std::vector<T*> pd_vec_t;
142 pd_vec_t pd_vec;
143 ExceptionSink* xsink;
144};
145
146hashdecl qore_socketsource_private {
147 QoreStringNode* address;
148 QoreStringNode* hostname;
149
150 DLLLOCAL qore_socketsource_private() : address(0), hostname(0) {
151 }
152
153 DLLLOCAL ~qore_socketsource_private() {
154 if (address) address->deref();
155 if (hostname) hostname->deref();
156 }
157
158 DLLLOCAL void setAddress(QoreStringNode* addr) {
159 assert(!address);
160 address = addr;
161 }
162
163 DLLLOCAL void setAddress(const char* addr) {
164 assert(!address);
165 address = new QoreStringNode(addr);
166 }
167
168 DLLLOCAL void setHostName(const char* host) {
169 assert(!hostname);
170 hostname = new QoreStringNode(host);
171 }
172
173 DLLLOCAL void setAll(QoreObject* o, ExceptionSink* xsink) {
174 if (address) {
175 o->setValue("source", address, xsink);
176 address = 0;
177 }
178
179 if (hostname) {
180 o->setValue("source_host", hostname, xsink);
181 hostname = 0;
182 }
183 }
184};
185
186class OptionalNonBlockingHelper {
187public:
188 qore_socket_private& sock;
189 ExceptionSink* xsink;
190 bool set;
191
192 DLLLOCAL OptionalNonBlockingHelper(qore_socket_private& s, bool n_set, ExceptionSink* xs);
193 DLLLOCAL ~OptionalNonBlockingHelper();
194};
195
196class PrivateQoreSocketTimeoutBase {
197public:
198 DLLLOCAL PrivateQoreSocketTimeoutBase(qore_socket_private* s) : sock(s), start(sock ? q_clock_getmicros() : 0) {
199 }
200
201protected:
202 hashdecl qore_socket_private* sock;
203 int64 start;
204};
205
206class PrivateQoreSocketTimeoutHelper : public PrivateQoreSocketTimeoutBase {
207public:
208 DLLLOCAL PrivateQoreSocketTimeoutHelper(qore_socket_private* s, const char* op);
209 DLLLOCAL ~PrivateQoreSocketTimeoutHelper();
210
211protected:
212 const char* op;
213};
214
215class PrivateQoreSocketThroughputHelper : public PrivateQoreSocketTimeoutBase {
216public:
217 DLLLOCAL PrivateQoreSocketThroughputHelper(qore_socket_private* s, bool snd);
218 DLLLOCAL ~PrivateQoreSocketThroughputHelper();
219
220 DLLLOCAL void finalize(int64 bytes);
221
222protected:
223 bool send;
224};
225
226hashdecl qore_socket_private;
227
228hashdecl qore_socket_op_helper {
229protected:
230 qore_socket_private* s;
231
232public:
233 DLLLOCAL qore_socket_op_helper(qore_socket_private* sock);
234 DLLLOCAL ~qore_socket_op_helper();
235};
236
237class SSLSocketHelperHelper {
238protected:
239 qore_socket_private* s;
240 SSLSocketHelper* ssl;
241 bool context_saved = false;
242
243public:
244 DLLLOCAL SSLSocketHelperHelper(qore_socket_private* sock, bool set_thread_context = false);
245
246 DLLLOCAL ~SSLSocketHelperHelper();
247
248 DLLLOCAL void error();
249};
250
251constexpr int SCIPS_CONNECT = 0;
252constexpr int SCIPS_CHECK_CONNECT = 1;
253
254class SocketConnectInetPollState : public AbstractPollState {
255public:
256 DLLLOCAL SocketConnectInetPollState(ExceptionSink* xsink, qore_socket_private* sock, const char* host,
257 const char* service, int family = AF_UNSPEC, int type = SOCK_STREAM, int protocol = 0);
258
265 DLLLOCAL virtual int continuePoll(ExceptionSink* xsink);
266
267private:
268 QoreAddrInfo ai;
269 qore_socket_private* sock;
270 std::string host, service;
271 hashdecl addrinfo* p = nullptr;
272 int prt = -1;
273 int state = SCIPS_CONNECT;
274
275 DLLLOCAL int doConnect(ExceptionSink* xsink);
276
277 // returns 0 = connected, 1 = try again, -1 = error
278 DLLLOCAL int checkConnection(ExceptionSink* xsink);
279
281 DLLLOCAL int next(ExceptionSink* xsink);
282
284 DLLLOCAL int nextIntern(ExceptionSink* xsink);
285};
286
287#ifndef _Q_WINDOWS
288class SocketConnectUnixPollState : public AbstractPollState {
289public:
290 DLLLOCAL SocketConnectUnixPollState(ExceptionSink* xsink, qore_socket_private* sock, const char* name,
291 int type = SOCK_STREAM, int protocol = 0);
292
299 DLLLOCAL virtual int continuePoll(ExceptionSink* xsink);
300
301private:
302 qore_socket_private* sock;
303 std::string name;
304 hashdecl sockaddr_un addr;
305 int state = SCIPS_CONNECT;
306
307 DLLLOCAL int doConnect(ExceptionSink* xsink);
308
309 // returns 0 = connected, 1 = try again, -1 = error
310 DLLLOCAL int checkConnection(ExceptionSink* xsink);
311};
312#endif
313
314class SocketConnectSslPollState : public AbstractPollState {
315public:
316 DLLLOCAL SocketConnectSslPollState(ExceptionSink* xsink, qore_socket_private* sock,
317 QoreSSLCertificate* cert = nullptr, QoreSSLPrivateKey* pkey = nullptr);
318
325 DLLLOCAL virtual int continuePoll(ExceptionSink* xsink);
326
327private:
328 qore_socket_private* sock;
329
330 // returns 0 = connected, 1 = try again, -1 = error
331 DLLLOCAL int checkConnection(ExceptionSink* xsink);
332};
333
334#if 0
335class SocketAcceptPollState : public AbstractPollState {
336public:
337 DLLLOCAL SocketAcceptPollState(ExceptionSink* xsink, qore_socket_private* sock);
338private:
339 qore_socket_private* sock;
340};
341
342class SocketAcceptSslPollState : public AbstractPollState {
343 DLLLOCAL SocketAcceptSslPollState(ExceptionSink* xsink, qore_socket_private* sock,
344 QoreSSLCertificate* cert = nullptr,QoreSSLPrivateKey* pkey = nullptr) : sock(sock) {
345 assert(!sock->ssl);
346 SSLSocketHelperHelper sshh(sock, true);
347
348 sock->do_start_ssl_event();
349 int rc;
350 if (rc = sock->ssl->setServer(xsink, "acceptSSL", sock->sock, cert, pkey)) {
351 sshh.error();
352 assert(*xsink);
353 return;
354 }
355
356 ssl->startAccept(xsink);
357 }
358};
359#endif
360
361class SocketSendPollState : public AbstractPollState {
362public:
363 DLLLOCAL SocketSendPollState(ExceptionSink* xsink, qore_socket_private* sock, const char* data, size_t size);
364
371 DLLLOCAL virtual int continuePoll(ExceptionSink* xsink);
372
373private:
374 qore_socket_private* sock;
375 const char* data;
376 size_t size;
377 size_t sent = 0;
378};
379
380class SocketRecvPollState : public AbstractPollState {
381public:
382 DLLLOCAL SocketRecvPollState(ExceptionSink* xsink, qore_socket_private* sock, size_t size);
383
390 DLLLOCAL virtual int continuePoll(ExceptionSink* xsink);
391
393 DLLLOCAL virtual QoreValue takeOutput() {
394 QoreValue rv = bin.release();
395 bin = nullptr;
396 return rv;
397 }
398
399private:
400 qore_socket_private* sock;
402 size_t size;
403 size_t received = 0;
404};
405
406class SocketRecvUntilBytesPollState : public AbstractPollState {
407public:
408 DLLLOCAL SocketRecvUntilBytesPollState(ExceptionSink* xsink, qore_socket_private* sock, const char* bytes,
409 size_t size);
410
417 DLLLOCAL virtual int continuePoll(ExceptionSink* xsink);
418
420 DLLLOCAL virtual QoreValue takeOutput() {
421 size_t len = bin->size();
422 BinaryNode* rv = new BinaryNode(bin->giveBuffer(), len);
423 bin = nullptr;
424 return rv;
425 }
426
427private:
428 qore_socket_private* sock;
429 // we are using QoreStringNode as it has a much better append / concat implementation than BinaryNode
431 const char* bytes;
432 size_t size;
433 size_t matched = 0;
434
435 DLLLOCAL int doRecv(ExceptionSink* xsink);
436};
437
438
439hashdecl qore_socket_private {
440 friend class PrivateQoreSocketTimeoutHelper;
441 friend class PrivateQoreSocketThroughputHelper;
442 friend class SocketConnectInetPollState;
443
444 // for client certificate capture
445 static thread_local qore_socket_private* current_socket;
446
447 int sock, sfamily, port, stype, sprot;
448
449 // issue #3558: connection sequence to show when a connection has been reestablished
450 int64 connection_id = 0;
451
452 const QoreEncoding* enc;
453
454 std::string socketname;
455 // issue #3053: client target for SNI
456 std::string client_target;
457 SSLSocketHelper* ssl = nullptr;
458 Queue* event_queue = nullptr,
459 * warn_queue = nullptr;
460
461 // issue #3633: HTTP encoding to assume
462 std::string assume_http_encoding = "ISO-8859-1";
463
464 // socket buffer for buffered reads
465 char rbuf[DEFAULT_SOCKET_BUFSIZE];
466
467 // current buffer size
468 size_t buflen = 0,
469 bufoffset = 0;
470
471 int64 tl_warning_us = 0; // timeout threshold for network action warning in microseconds
472 double tp_warning_bs = 0; // throughput warning threshold in B/s
473 int64 tp_bytes_sent = 0, // throughput: bytes sent
474 tp_bytes_recv = 0, // throughput: bytes received
475 tp_us_sent = 0, // throughput: time sending
476 tp_us_recv = 0, // throughput: time receiving
477 tp_us_min = 0 // throughput: minimum time for transfer to be considered
478 ;
479
481 QoreValue warn_callback_arg;
483 QoreValue event_arg;
484 bool del = false,
485 http_exp_chunked_body = false,
486 ssl_accept_all_certs = false,
487 ssl_capture_remote_cert = false,
488 event_data = false;
489 int in_op = -1,
490 ssl_verify_mode = SSL_VERIFY_NONE;
491
492 // issue #3512: the remote certificate captured
493 QoreObject* remote_cert = nullptr;
494
495 // issue #3818: verbose certificate verification error info
496 QoreStringNode* ssl_err_str = nullptr;
497
498 DLLLOCAL qore_socket_private(int n_sock = QORE_INVALID_SOCKET, int n_sfamily = AF_UNSPEC,
499 int n_stype = SOCK_STREAM, int n_prot = 0, const QoreEncoding* n_enc = QCS_DEFAULT) :
500 sock(n_sock), sfamily(n_sfamily), port(-1), stype(n_stype), sprot(n_prot), enc(n_enc) {
501 }
502
503 DLLLOCAL ~qore_socket_private() {
504 close_internal();
505
506 // must be dereferenced and removed before deleting
507 assert(!event_queue);
508 assert(!warn_queue);
509 }
510
511 DLLLOCAL bool isOpen() {
512 return sock != QORE_INVALID_SOCKET;
513 }
514
515 DLLLOCAL int close() {
516 int rc = close_internal();
517 if (in_op >= 0)
518 in_op = -1;
519 if (http_exp_chunked_body)
520 http_exp_chunked_body = false;
521 sfamily = AF_UNSPEC;
522 stype = SOCK_STREAM;
523 sprot = 0;
524
525 return rc;
526 }
527
528 DLLLOCAL int close_and_reset() {
529 assert(sock != QORE_INVALID_SOCKET);
530 int rc;
531 while (true) {
532#ifdef _Q_WINDOWS
533 rc = ::closesocket(sock);
534#else
535 rc = ::close(sock);
536#endif
537 // try again if close was interrupted by a signal
538 if (!rc || sock_get_error() != EINTR)
539 break;
540 }
541 //printd(5, "qore_socket_private::close_and_reset(this: %p) close(%d) returned %d\n", this, sock, rc);
542 sock = QORE_INVALID_SOCKET;
543 if (buflen)
544 buflen = 0;
545 if (bufoffset)
546 bufoffset = 0;
547 if (del)
548 del = false;
549 if (port != -1)
550 port = -1;
551 // issue #3053: clear hostname for SNI
552 client_target.clear();
553 return rc;
554 }
555
556 DLLLOCAL int close_internal() {
557 //printd(5, "qore_socket_private::close_internal(this: %p) sock: %d\n", this, sock);
558 if (ssl_err_str) {
559 ssl_err_str->deref();
560 ssl_err_str = nullptr;
561 }
562 if (remote_cert) {
563 remote_cert->deref(nullptr);
564 remote_cert = nullptr;
565 }
566 if (sock >= 0) {
567 // if an SSL connection has been established, shut it down first
568 if (ssl) {
569 ssl->shutdown();
570 ssl->deref();
571 ssl = nullptr;
572 }
573
574 if (!socketname.empty()) {
575 if (del)
576 unlink(socketname.c_str());
577 socketname.clear();
578 }
579 do_close_event();
580 // issue #3558: increment the connection sequence here. so the connection sequence is different as soon as
581 // it's closed
582 ++connection_id;
583
584 return close_and_reset();
585 } else {
586 return 0;
587 }
588 }
589
590 DLLLOCAL void setAssumedEncoding(const char* str) {
591 assume_http_encoding = str;
592 }
593
594 DLLLOCAL const char* getAssumedEncoding() const {
595 return assume_http_encoding.c_str();
596 }
597
598 DLLLOCAL int getSendTimeout() const {
599 hashdecl timeval tv;
600
601#if defined(HPUX) && defined(__ia64) && defined(__LP64__)
602 // on HPUX 64-bit the OS defines socklen_t to be 8 bytes
603 // but the library expects a 32-bit value
604 int size = sizeof(hashdecl timeval);
605#else
606 socklen_t size = sizeof(hashdecl timeval);
607#endif
608
609 if (getsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, (GETSOCKOPT_ARG_4)&tv, (socklen_t *)&size))
610 return -1;
611
612 return tv.tv_sec * 1000 + tv.tv_usec / 1000;
613 }
614
615 DLLLOCAL int getRecvTimeout() const {
616 hashdecl timeval tv;
617
618#if defined(HPUX) && defined(__ia64) && defined(__LP64__)
619 // on HPUX 64-bit the OS defines socklen_t to be 8 bytes
620 // but the library expects a 32-bit value
621 int size = sizeof(hashdecl timeval);
622#else
623 socklen_t size = sizeof(hashdecl timeval);
624#endif
625
626 if (getsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (GETSOCKOPT_ARG_4)&tv, (socklen_t *)&size))
627 return -1;
628
629 return tv.tv_sec * 1000 + tv.tv_usec / 1000;
630 }
631
632 DLLLOCAL int getPort() {
633 // if we don't need to find out what port we are, then return current value
634 if (sock == QORE_INVALID_SOCKET || (sfamily != AF_INET && sfamily != AF_INET6) || port > 0)
635 return port;
636
637 // otherwise find out what port we're connected to
638 hashdecl sockaddr_storage addr;
639#if defined(HPUX) && defined(__ia64) && defined(__LP64__)
640 // on HPUX 64-bit the OS defines socklen_t to be 8 bytes, but the library expects a 32-bit value
641 int size = sizeof addr;
642#else
643 socklen_t size = sizeof addr;
644#endif
645
646 if (getsockname(sock, (struct sockaddr *)&addr, (socklen_t *)&size) < 0)
647 return -1;
648
649 port = q_get_port_from_addr((const struct sockaddr *)&addr);
650 return port;
651 }
652
653 DLLLOCAL static void do_header(const char* key, QoreString& hdr, const QoreValue& v) {
654 switch (v.getType()) {
655 case NT_STRING:
656 hdr.sprintf("%s: %s\r\n", key, v.get<const QoreStringNode>()->c_str());
657 break;
658 case NT_INT:
659 hdr.sprintf("%s: " QLLD "\r\n", key, v.getAsBigInt());
660 break;
661 case NT_FLOAT: {
662 hdr.sprintf("%s: ", key);
663 size_t offset = hdr.size();
664 hdr.sprintf("%f\r\n", v.getAsFloat());
665 // issue 1556: external modules that call setlocale() can change
666 // the decimal point character used here from '.' to ','
667 // only search the double added, QoreString::sprintf() concatenates
668 q_fix_decimal(&hdr, offset);
669 break;
670 }
671 case NT_NUMBER:
672 hdr.sprintf("%s: ", key);
673 v.get<const QoreNumberNode>()->toString(hdr);
674 hdr.concat("\r\n");
675 break;
676 case NT_BOOLEAN:
677 hdr.sprintf("%s: %d\r\n", key, (int)v.getAsBool());
678 break;
679 }
680 }
681
682 // issue #3879: must add Content-Length if not present, even if there is no message body
685 DLLLOCAL static void do_headers(QoreString& hdr, const QoreHashNode* headers, size_t size, bool addsize = true) {
686 // RFC-2616 4.4 (http://tools.ietf.org/html/rfc2616#section-4.4)
687 // add Content-Length: 0 to headers for responses without a body where there is no transfer-encoding
688 if (headers) {
689 ConstHashIterator hi(headers);
690
691 while (hi.next()) {
692 const QoreValue v = hi.get();
693 const char* key = hi.getKey();
694 if (addsize && !strcasecmp(key, "transfer-encoding"))
695 addsize = false;
696 if ((addsize || size) && !strcasecmp(key, "content-length")) {
697 // ignore Content-Length given manually
698 continue;
699 }
700 if (v.getType() == NT_LIST) {
701 ConstListIterator li(v.get<const QoreListNode>());
702 while (li.next())
703 do_header(key, hdr, li.getValue());
704 } else
705 do_header(key, hdr, v);
706 }
707 }
708 // add data and content-length header if necessary
709 if (size || addsize) {
710 hdr.sprintf("Content-Length: %zu\r\n", size);
711 //printd(5, "qore_socket_private::do_headers() added Content-Length: %zu\n", size);
712 }
713
714 hdr.concat("\r\n");
715 }
716
717 DLLLOCAL int listen(int backlog = 20) {
718 if (sock == QORE_INVALID_SOCKET)
719 return QSE_NOT_OPEN;
720 if (in_op >= 0)
721 return QSE_IN_OP;
722#ifdef _Q_WINDOWS
723 if (::listen(sock, backlog)) {
724 // set errno
725 sock_get_error();
726 return -1;
727 }
728 return 0;
729#else
730 return ::listen(sock, backlog);
731#endif
732 }
733
734 DLLLOCAL int accept_intern(ExceptionSink* xsink, struct sockaddr *addr, socklen_t *size, int timeout_ms = -1) {
735 //printd(5, "qore_socket_private::accept_intern() to: %d\n", timeout_ms);
736 assert(xsink);
737 while (true) {
738 if (timeout_ms >= 0 && !isDataAvailable(timeout_ms, "accept", xsink)) {
739 if (*xsink)
740 return -1;
741 // do not throw exception here, NOTHING will be returned in Qore on timeout
742 return QSE_TIMEOUT; // -3
743 }
744
745 int rc = ::accept(sock, addr, size);
746 if (rc != QORE_INVALID_SOCKET)
747 return rc;
748
749 // retry if interrupted by a signal
750 if (sock_get_error() == EINTR)
751 continue;
752
753 qore_socket_error(xsink, "SOCKET-ACCEPT-ERROR", "error in accept()", 0, 0, 0, addr);
754 return -1;
755 }
756 }
757
758 // returns a new socket
759 DLLLOCAL int accept_internal(ExceptionSink* xsink, SocketSource *source, int timeout_ms = -1) {
760 assert(xsink);
761 if (sock == QORE_INVALID_SOCKET) {
762 xsink->raiseException("SOCKET-NOT-OPEN", "socket must be opened, bound, and in a listening state before "
763 "new connections can be accepted");
764 return QSE_NOT_OPEN;
765 }
766 if (in_op >= 0) {
767 if (in_op == q_gettid()) {
768 se_in_op("Socket", "accept", xsink);
769 return QSE_IN_OP;
770 }
771 se_in_op_thread("Socket", "accept", xsink);
772 return QSE_IN_OP_THREAD;
773 }
774
775 int rc;
776 if (sfamily == AF_UNIX) {
777#ifdef _Q_WINDOWS
778 xsink->raiseException("SOCKET-ACCEPT-ERROR", "UNIX sockets are not available under Windows");
779 return -1;
780#else
781 hashdecl sockaddr_un addr_un;
782
783#if defined(HPUX) && defined(__ia64) && defined(__LP64__)
784 // on HPUX 64-bit the OS defines socklen_t to be 8 bytes
785 // but the library expects a 32-bit value
786 int size = sizeof(hashdecl sockaddr_un);
787#else
788 socklen_t size = sizeof(hashdecl sockaddr_un);
789#endif
790 rc = accept_intern(xsink, (struct sockaddr *)&addr_un, (socklen_t *)&size, timeout_ms);
791 //printd(1, "qore_socket_private::accept_internal() " QSD " bytes returned\n", size);
792
793 if (rc >= 0 && source) {
794 QoreStringNode* addr = new QoreStringNode(enc);
795 addr->sprintf("UNIX socket: %s", socketname.c_str());
796 source->priv->setAddress(addr);
797 source->priv->setHostName("localhost");
798 }
799#endif // windows
800 } else if (sfamily == AF_INET || sfamily == AF_INET6) {
801 hashdecl sockaddr_storage addr_in;
802#if defined(HPUX) && defined(__ia64) && defined(__LP64__)
803 // on HPUX 64-bit the OS defines socklen_t to be 8 bytes
804 // but the library expects a 32-bit value
805 int size = sizeof(addr_in);
806#else
807 socklen_t size = sizeof(addr_in);
808#endif
809
810 rc = accept_intern(xsink, (struct sockaddr *)&addr_in, (socklen_t *)&size, timeout_ms);
811 //printd(1, "qore_socket_private::accept_internal() rc: %d, %d bytes returned\n", rc, size);
812
813 if (rc >= 0 && source) {
814 char host[NI_MAXHOST + 1];
815 char service[NI_MAXSERV + 1];
816
817 if (!getnameinfo((struct sockaddr *)&addr_in, qore_get_in_len((struct sockaddr *)&addr_in), host,
818 sizeof(host), service, sizeof(service), NI_NUMERICSERV)) {
819 source->priv->setHostName(host);
820 }
821
822 // get ipv4 or ipv6 address
823 char ifname[INET6_ADDRSTRLEN];
824 if (inet_ntop(addr_in.ss_family, qore_get_in_addr((struct sockaddr *)&addr_in), ifname,
825 sizeof(ifname))) {
826 //printd(5, "inet_ntop() '%s' host: '%s'\n", ifname, host);
827 source->priv->setAddress(ifname);
828 }
829 }
830 } else {
831 // should not happen
832 xsink->raiseException("SOCKET-ACCEPT-ERROR", "do not know how to accept connections with address "
833 "family %d", sfamily);
834 rc = -1;
835 }
836 return rc;
837 }
838
839 DLLLOCAL QoreHashNode* getEvent(int event, int source = QORE_SOURCE_SOCKET) const {
840 QoreHashNode* h = new QoreHashNode(autoTypeInfo);
841 if (event_arg) {
842 h->setKeyValue("arg", event_arg.refSelf(), nullptr);
843 }
844
845 h->setKeyValue("event", event, nullptr);
846 h->setKeyValue("source", source, nullptr);
847 h->setKeyValue("id", (int64)this, nullptr);
848
849 return h;
850 }
851
852 DLLLOCAL void cleanup(ExceptionSink* xsink) {
853 if (event_queue) {
854 // close the socket before the delete message is put on the queue
855 // the socket would be closed anyway in the destructor
856 close_internal();
857
858 event_queue->pushAndTakeRef(getEvent(QORE_EVENT_DELETED));
859
860 // deref and remove event queue
861 event_queue->deref(xsink);
862 event_queue = nullptr;
863 }
864 if (warn_queue) {
865 warn_queue->deref(xsink);
866 warn_queue = nullptr;
867 if (warn_callback_arg) {
868 warn_callback_arg.discard(xsink);
869 warn_callback_arg.clear();
870 }
871 }
872 }
873
874 DLLLOCAL void setEventQueue(ExceptionSink* xsink, Queue* q, QoreValue arg, bool with_data) {
875 if (event_queue) {
876 if (event_arg) {
877 event_arg.discard(xsink);
878 }
879 event_queue->deref(xsink);
880 }
881 event_queue = q;
882 event_arg = arg;
883 event_data = with_data;
884 }
885
886 DLLLOCAL void do_start_ssl_event() {
887 if (event_queue) {
888 event_queue->pushAndTakeRef(getEvent(QORE_EVENT_START_SSL));
889 }
890 }
891
892 DLLLOCAL void do_ssl_established_event() {
893 if (event_queue) {
894 QoreHashNode* h = getEvent(QORE_EVENT_SSL_ESTABLISHED);
895 h->setKeyValue("cipher", new QoreStringNode(ssl->getCipherName()), nullptr);
896 h->setKeyValue("cipher_version", new QoreStringNode(ssl->getCipherVersion()), nullptr);
897 event_queue->pushAndTakeRef(h);
898 }
899 }
900
901 DLLLOCAL void do_connect_event(int af, const struct sockaddr* addr, const char* target,
902 const char* service = nullptr, int prt = -1) {
903 if (event_queue) {
904 QoreHashNode* h = getEvent(QORE_EVENT_CONNECTING);
905 QoreStringNode* str = q_addr_to_string2(addr);
906 if (str) {
907 h->setKeyValue("address", str, nullptr);
908 } else {
909 h->setKeyValue("error", q_strerror(sock_get_error()), nullptr);
910 }
911 q_af_to_hash(af, *h, nullptr);
912 h->setKeyValue("target", new QoreStringNode(target), nullptr);
913 if (service)
914 h->setKeyValue("service", new QoreStringNode(service), nullptr);
915 if (prt != -1)
916 h->setKeyValue("port", prt, nullptr);
917 event_queue->pushAndTakeRef(h);
918 }
919 }
920
921 DLLLOCAL void do_connected_event() {
922 if (event_queue) {
923 event_queue->pushAndTakeRef(getEvent(QORE_EVENT_CONNECTED));
924 }
925 }
926
927 DLLLOCAL void do_data_event_intern(int event, int source, const QoreStringNode& str) const {
928 assert(event_queue && event_data && str.size());
929 ReferenceHolder<QoreHashNode> h(getEvent(event, source), nullptr);
930 h->setKeyValue("data", str.refSelf(), nullptr);
931 event_queue->pushAndTakeRef(h.release());
932 }
933
934 DLLLOCAL void do_data_event(int event, int source, const QoreStringNode& str) const {
935 if (event_queue && event_data && str.size()) {
936 do_data_event_intern(event, source, str);
937 }
938 }
939
940 DLLLOCAL void do_data_event(int event, int source, const BinaryNode& b) const {
941 if (event_queue && event_data && b.size()) {
942 ReferenceHolder<QoreHashNode> h(getEvent(event, source), nullptr);
943 h->setKeyValue("data", b.refSelf(), nullptr);
944 event_queue->pushAndTakeRef(h.release());
945 }
946 }
947
948 DLLLOCAL void do_data_event(int event, int source, const void* data, size_t size) const {
949 if (event_queue && event_data && size) {
950 ReferenceHolder<QoreHashNode> h(getEvent(event, source), nullptr);
952 b->append(data, size);
953 h->setKeyValue("data", b.release(), nullptr);
954 event_queue->pushAndTakeRef(h.release());
955 }
956 }
957
958 DLLLOCAL void do_header_event(int event, int source, const QoreHashNode& hdr) const {
959 if (event_queue && event_data && !hdr.empty()) {
960 ReferenceHolder<QoreHashNode> h(getEvent(event, source), nullptr);
961 h->setKeyValue("headers", hdr.refSelf(), nullptr);
962 event_queue->pushAndTakeRef(h.release());
963 }
964 }
965
966 DLLLOCAL void do_chunked_read(int event, size_t bytes, size_t total_read, int source) {
967 if (event_queue) {
968 QoreHashNode* h = getEvent(event, source);
969 if (event == QORE_EVENT_HTTP_CHUNKED_DATA_RECEIVED)
970 h->setKeyValue("read", bytes, nullptr);
971 else
972 h->setKeyValue("size", bytes, nullptr);
973 h->setKeyValue("total_read", total_read, nullptr);
974 event_queue->pushAndTakeRef(h);
975 }
976 }
977
978 DLLLOCAL void do_read_http_header(int event, const QoreHashNode* headers, int source) {
979 if (event_queue) {
980 QoreHashNode* h = getEvent(event, source);
981 h->setKeyValue("headers", headers->hashRefSelf(), nullptr);
982 event_queue->pushAndTakeRef(h);
983 }
984 }
985
986 DLLLOCAL void do_send_http_message_event(const QoreString& str, const QoreHashNode* headers, int source) {
987 if (event_queue) {
988 QoreHashNode* h = getEvent(QORE_EVENT_HTTP_SEND_MESSAGE, source);
989 h->setKeyValue("message", new QoreStringNode(str), nullptr);
990 //printd(5, "do_send_http_message_event() str='%s' headers: %p (%d %s)\n", str.getBuffer(), headers, headers->getType(), headers->getTypeName());
991 h->setKeyValue("headers", headers->copy(), nullptr);
992 event_queue->pushAndTakeRef(h);
993 }
994 }
995
996 DLLLOCAL void do_close_event() {
997 if (event_queue) {
998 event_queue->pushAndTakeRef(getEvent(QORE_EVENT_CHANNEL_CLOSED));
999 }
1000 }
1001
1002 DLLLOCAL void do_read_event(size_t bytes_read, size_t total_read, size_t bufsize = 0, int source = QORE_SOURCE_SOCKET) {
1003 // post bytes read on event queue, if any
1004 if (event_queue) {
1005 QoreHashNode* h = getEvent(QORE_EVENT_PACKET_READ, source);
1006 h->setKeyValue("read", bytes_read, nullptr);
1007 h->setKeyValue("total_read", total_read, nullptr);
1008 // set total bytes to read and remaining bytes if bufsize > 0
1009 if (bufsize > 0)
1010 h->setKeyValue("total_to_read", bufsize, nullptr);
1011 event_queue->pushAndTakeRef(h);
1012 }
1013 }
1014
1015 DLLLOCAL void do_send_event(int bytes_sent, int total_sent, int bufsize) {
1016 // post bytes sent on event queue, if any
1017 if (event_queue) {
1018 QoreHashNode* h = getEvent(QORE_EVENT_PACKET_SENT);
1019 h->setKeyValue("sent", bytes_sent, nullptr);
1020 h->setKeyValue("total_sent", total_sent, nullptr);
1021 h->setKeyValue("total_to_send", bufsize, nullptr);
1022 event_queue->pushAndTakeRef(h);
1023 }
1024 }
1025
1026 DLLLOCAL void do_resolve_event(const char* host, const char* service = 0) {
1027 // post bytes sent on event queue, if any
1028 if (event_queue) {
1029 QoreHashNode* h = getEvent(QORE_EVENT_HOSTNAME_LOOKUP);
1030 if (host)
1031 h->setKeyValue("name", new QoreStringNode(host), nullptr);
1032 if (service)
1033 h->setKeyValue("service", new QoreStringNode(service), nullptr);
1034 event_queue->pushAndTakeRef(h);
1035 }
1036 }
1037
1038 DLLLOCAL void do_resolved_event(const struct sockaddr* addr) {
1039 // post bytes sent on event queue, if any
1040 if (event_queue) {
1041 QoreHashNode* h = getEvent(QORE_EVENT_HOSTNAME_RESOLVED);
1042 QoreStringNode* str = q_addr_to_string2(addr);
1043 if (str)
1044 h->setKeyValue("address", str, nullptr);
1045 else
1046 h->setKeyValue("error", q_strerror(sock_get_error()), nullptr);
1047 int prt = q_get_port_from_addr(addr);
1048 if (prt > 0)
1049 h->setKeyValue("port", prt, nullptr);
1050 q_af_to_hash(addr->sa_family, *h, nullptr);
1051 event_queue->pushAndTakeRef(h);
1052 }
1053 }
1054
1055 DLLLOCAL int64 getObjectIDForEvents() const {
1056 return (int64)this;
1057 }
1058
1059 DLLLOCAL int connectUNIX(const char* p, int sock_type, int protocol, ExceptionSink* xsink) {
1060 assert(xsink);
1061 assert(p);
1062 QORE_TRACE("connectUNIX()");
1063
1064#ifdef _Q_WINDOWS
1065 xsink->raiseException("SOCKET-CONNECTUNIX-ERROR", "UNIX sockets are not available under Windows");
1066 return -1;
1067#else
1068 // close socket if already open
1069 close();
1070
1071 printd(5, "qore_socket_private::connectUNIX(%s)\n", p);
1072
1073 hashdecl sockaddr_un addr;
1074
1075 addr.sun_family = AF_UNIX;
1076 // copy path and terminate if necessary
1077 strncpy(addr.sun_path, p, sizeof(addr.sun_path) - 1);
1078 addr.sun_path[sizeof(addr.sun_path) - 1] = '\0';
1079 if ((sock = socket(AF_UNIX, sock_type, protocol)) == QORE_SOCKET_ERROR) {
1080 xsink->raiseErrnoException("SOCKET-CONNECT-ERROR", errno, "error connecting to UNIX socket: '%s'", p);
1081 return -1;
1082 }
1083
1084 do_connect_event(AF_UNIX, (sockaddr*)&addr, p);
1085 while (true) {
1086 if (!::connect(sock, (const sockaddr *)&addr, sizeof(struct sockaddr_un)))
1087 break;
1088
1089 // try again if we were interrupted by a signal
1090 if (sock_get_error() == EINTR)
1091 continue;
1092
1093 // otherwise close the socket and return an exception with the error code
1094 // do not have to worry about windows API calls here; this is a UNIX-only function
1095 close_and_reset();
1096 qore_socket_error(xsink, "SOCKET-CONNECT-ERROR", "error in connect()", 0, p);
1097
1098 return -1;
1099 }
1100
1101 // save file name for deleting when socket is closed
1102 socketname = addr.sun_path;
1103 sfamily = AF_UNIX;
1104
1105 do_connected_event();
1106
1107 return 0;
1108#endif // windows
1109 }
1110
1111 // socket must be open or -1 is returned and a Qore-language exception is raised
1112 /* return values:
1113 -1: error
1114 0: timeout
1115 > 0: I/O can continue
1116 */
1117 DLLLOCAL int asyncIoWait(int timeout_ms, bool read, bool write, const char* cname, const char* mname,
1118 ExceptionSink* xsink) const {
1119 assert(xsink);
1120 assert(read || write);
1121 if (sock == QORE_INVALID_SOCKET) {
1122 se_not_open(cname, mname, xsink, "asyncIoWait");
1123 return -1;
1124 }
1125
1126 return asyncIoWait(timeout_ms, read, write, xsink);
1127 }
1128
1129 DLLLOCAL int asyncIoWait(int timeout_ms, bool read, bool write, ExceptionSink* xsink) const {
1130 assert(xsink);
1131#if defined HAVE_POLL
1132 return poll_intern(xsink, timeout_ms, read, write);
1133#elif defined HAVE_SELECT
1134 return select_intern(xsink, timeout_ms, read, write);
1135#else
1136#error no async socket operations supported
1137#endif
1138 }
1139
1140#if defined HAVE_POLL
1141 DLLLOCAL int poll_intern(ExceptionSink* xsink, int timeout_ms, bool read, bool write) const {
1142 int rc;
1143 short arg = 0;
1144 if (read)
1145 arg |= POLLIN;
1146 if (write)
1147 arg |= POLLOUT;
1148 pollfd fds = {sock, arg, 0};
1149 while (true) {
1150 rc = ::poll(&fds, 1, timeout_ms);
1151 if (rc == -1 && errno == EINTR)
1152 continue;
1153 break;
1154 }
1155 if (rc < 0)
1156 qore_socket_error(xsink, "SOCKET-SELECT-ERROR", "poll(2) returned an error");
1157 else if (!rc && ((fds.revents & POLLHUP) || (fds.revents & (POLLERR|POLLNVAL))))
1158 rc = -1;
1159
1160 return rc;
1161 }
1162#elif defined HAVE_SELECT
1163 DLLLOCAL int select_intern(ExceptionSink* xsink, int timeout_ms, bool read, bool write) const {
1164 bool aborted = false;
1165 int rc = select_intern(xsink, timeout_ms, read, write, aborted);
1166 if (rc != QORE_SOCKET_ERROR && aborted)
1167 rc = -1;
1168 return rc;
1169 }
1170
1171 DLLLOCAL int select_intern(ExceptionSink* xsink, int timeout_ms, bool read, bool write, bool& aborted) const {
1172 assert(xsink);
1173 assert(!aborted);
1174 // windows does not use FD_SETSIZE to limit the value of the highest socket descriptor in the set
1175 // instead it has a maximum of 64 sockets in the set; we only need one anyway
1176#ifndef _Q_WINDOWS
1177 // select is inherently broken since it can only handle descriptors < FD_SETSIZE, which is 1024 on Linux for example
1178 if (sock >= FD_SETSIZE) {
1179 xsink->raiseException("SOCKET-SELECT-ERROR", "fd is %d which is >= %d; contact the Qore developers to implement an alternative to select() on this platform", sock, FD_SETSIZE);
1180 return -1;
1181 }
1182#endif
1183 hashdecl timeval tv;
1184 int rc;
1185 while (true) {
1186 // to be safe, we set the file descriptor arg after each EINTR (required on Linux for example)
1187 fd_set sfs, err;
1188
1189 FD_ZERO(&sfs);
1190 FD_ZERO(&err);
1191 FD_SET(sock, &sfs);
1192 FD_SET(sock, &err);
1193
1194 tv.tv_sec = timeout_ms / 1000;
1195 tv.tv_usec = (timeout_ms % 1000) * 1000;
1196
1197 fd_set* readfd = read ? &sfs : 0;
1198 fd_set* writefd = write ? &sfs : 0;
1199
1200 rc = select(sock + 1, readfd, writefd, &err, &tv);
1201 //printd(5, "select_intern() rc: %d err: %d\n", rc, FD_ISSET(sock, &err));
1202 if (rc != QORE_SOCKET_ERROR) {
1203 if (FD_ISSET(sock, &err))
1204 aborted = true;
1205 break;
1206 }
1207 if (sock_get_error() != EINTR)
1208 break;
1209 }
1210 if (rc == QORE_SOCKET_ERROR) {
1211 // do not close the socket here, even in case of EBADF, just return an error
1212 rc = 0;
1213 qore_socket_error(xsink, "SOCKET-SELECT-ERROR", "select(2) returned an error");
1214 }
1215
1216 return rc;
1217 }
1218#endif
1219
1220 DLLLOCAL bool tryReadSocketData(const char* mname, ExceptionSink* xsink) {
1221 assert(xsink);
1222 assert(!buflen);
1223 if (!ssl) {
1224 // issue #3564: see if any data is available on the socket
1225 return asyncIoWait(0, true, false, "Socket", mname, xsink);
1226 }
1227 // select can return true if there is protocol negotiation data available,
1228 // so we try to peek 1 byte of application data with a timeout of 0 with the SSL connection
1229 int rc = ssl->doSSLRW(xsink, mname, rbuf, 1, 0, PEEK, false);
1230 if (*xsink || (rc == QSE_TIMEOUT)) {
1231 return false;
1232 }
1233 return rc > 0 ? true : false;
1234 }
1235
1236 DLLLOCAL bool isSocketDataAvailable(int timeout_ms, const char* mname, ExceptionSink* xsink) {
1237 return asyncIoWait(timeout_ms, true, false, "Socket", mname, xsink);
1238 }
1239
1240 DLLLOCAL bool isDataAvailable(int timeout_ms, const char* mname, ExceptionSink* xsink) {
1241 if (buflen)
1242 return true;
1243 return isSocketDataAvailable(timeout_ms, mname, xsink);
1244 }
1245
1246 DLLLOCAL bool isWriteFinished(int timeout_ms, const char* mname, ExceptionSink* xsink) {
1247 return asyncIoWait(timeout_ms, false, true, "Socket", mname, xsink);
1248 }
1249
1250 DLLLOCAL int close_and_exit() {
1251 if (sock != QORE_INVALID_SOCKET)
1252 close_and_reset();
1253 return -1;
1254 }
1255
1256 DLLLOCAL int connectINETTimeout(int timeout_ms, const struct sockaddr* ai_addr, size_t ai_addrlen,
1257 ExceptionSink* xsink, bool only_timeout) {
1258 assert(xsink);
1259 PrivateQoreSocketTimeoutHelper toh(this, "connect");
1260
1261 while (true) {
1262 if (!::connect(sock, ai_addr, ai_addrlen))
1263 return 0;
1264
1265#ifdef _Q_WINDOWS
1266 if (sock_get_error() != EAGAIN) {
1267 qore_socket_error(xsink, "SOCKET-CONNECT-ERROR", "error in connect()", 0, 0, 0, ai_addr);
1268 break;
1269 }
1270#else
1271 // try again if we were interrupted by a signal
1272 if (errno == EINTR)
1273 continue;
1274
1275 if (errno != EINPROGRESS)
1276 break;
1277#endif
1278
1279 //printd(5, "qore_socket_private::connectINETTimeout() timeout_ms: %d errno: %d\n", timeout_ms, errno);
1280
1281 // check for timeout or connection with EINPROGRESS
1282 while (true) {
1283#ifdef _Q_WINDOWS
1284 bool aborted = false;
1285 int rc = select_intern(xsink, timeout_ms, false, true, aborted);
1286
1287 //printd(5, "qore_socket_private::connectINETTimeout() timeout_ms: %d rc: %d aborted: %d\n",
1288 // timeout_ms, rc, aborted);
1289
1290 // windows select() returns an error in the error socket set instead of an WSAECONNREFUSED error like
1291 // UNIX, so we simulate it here
1292 if (rc != QORE_SOCKET_ERROR && aborted) {
1293 qore_socket_error(xsink, "SOCKET-CONNECT-ERROR", "error in connect()", 0, 0, 0, ai_addr);
1294 return -1;
1295 }
1296#else
1297 int rc = asyncIoWait(timeout_ms, false, true, "Socket", "connectINETTimeout", xsink);
1298#endif
1299 if (*xsink)
1300 return -1;
1301
1302 //printd(5, "asyncIoWait(%d) returned %d\n", timeout_ms, rc);
1303 if (rc == QORE_SOCKET_ERROR && sock_get_error() != EINTR) {
1304 if (!only_timeout)
1305 qore_socket_error(xsink, "SOCKET-CONNECT-ERROR", "error in asyncIoWait() with "
1306 "Socket::connect() with timeout", 0, 0, 0, ai_addr);
1307 return -1;
1308 } else if (rc > 0) {
1309 return checkConnected(xsink, nullptr, ai_addr, only_timeout);
1310 } else {
1311 SimpleRefHolder<QoreStringNode> desc(new QoreStringNodeMaker("timeout in connection after %dms",
1312 timeout_ms));
1313 concat_target(*(*desc), ai_addr);
1314 xsink->raiseException("SOCKET-CONNECT-ERROR", desc.release());
1315 return -1;
1316 }
1317 }
1318 }
1319
1320 return -1;
1321 }
1322
1323 DLLLOCAL int checkConnected(ExceptionSink* xsink, const char* hostsvc, const struct sockaddr* ai_addr = nullptr,
1324 bool only_timeout = false) {
1325 assert(sock);
1326
1327 // socket selected for write
1328 socklen_t lon = sizeof(int);
1329 int val;
1330
1331 if (getsockopt(sock, SOL_SOCKET, SO_ERROR, (GETSOCKOPT_ARG_4)(&val), &lon) == QORE_SOCKET_ERROR) {
1332 if (!only_timeout) {
1333 qore_socket_error(xsink, "SOCKET-CONNECT-ERROR", "error in getsockopt()", nullptr, hostsvc, nullptr,
1334 ai_addr);
1335 }
1336 return -1;
1337 }
1338
1339 if (val) {
1340 if (only_timeout) {
1341 errno = val;
1342 return -1;
1343 }
1344 qore_socket_error_intern(val, xsink, "SOCKET-CONNECT-ERROR", "error in getsockopt()", nullptr, hostsvc,
1345 nullptr, ai_addr);
1346 return -1;
1347 }
1348
1349 // connected successfully within the timeout period
1350 return 0;
1351 }
1352
1353 DLLLOCAL void confirmConnected(const char* host) {
1354 do_connected_event();
1355
1356 // issue #3053: save hostname for SNI
1357 if (host) {
1358 client_target = host;
1359 }
1360 }
1361
1362 DLLLOCAL int sock_errno_err(const char* err, const char* desc, ExceptionSink* xsink) {
1363 //sock = QORE_INVALID_SOCKET;
1364 qore_socket_error(xsink, err, desc);
1365 return -1;
1366 }
1367
1368 DLLLOCAL int set_non_blocking(bool non_blocking, ExceptionSink* xsink) {
1369 assert(xsink);
1370 // ignore call when socket already closed
1371 if (sock == QORE_INVALID_SOCKET) {
1372 assert(*xsink);
1373 return -1;
1374 }
1375
1376#ifdef _Q_WINDOWS
1377 u_long mode = non_blocking ? 1 : 0;
1378 int rc = ioctlsocket(sock, FIONBIO, &mode);
1379 if (check_windows_rc(rc)) {
1380 return sock_errno_err("SOCKET-CONNECT-ERROR", "error in ioctlsocket(FIONBIO)", xsink);
1381 }
1382#else
1383 int arg;
1384
1385 // get socket descriptor status flags
1386 if ((arg = fcntl(sock, F_GETFL, 0)) < 0) {
1387 return sock_errno_err("SOCKET-CONNECT-ERROR", "error in fcntl() getting socket descriptor status "
1388 "flag", xsink);
1389 }
1390
1391 if (non_blocking) { // set non-blocking
1392 arg |= O_NONBLOCK;
1393 } else { // set blocking
1394 arg &= ~O_NONBLOCK;
1395 }
1396
1397 if (fcntl(sock, F_SETFL, arg) < 0) {
1398 return sock_errno_err("SOCKET-CONNECT-ERROR", "error in fcntl() setting socket descriptor status "
1399 "flag", xsink);
1400 }
1401#endif
1402 //printd(5, "qore_socket_private::set_non_blocking() set: %d\n", non_blocking);
1403
1404 return 0;
1405 }
1406
1407 DLLLOCAL int connectINET(const char* host, const char* service, int timeout_ms, ExceptionSink* xsink,
1408 int family = AF_UNSPEC, int type = SOCK_STREAM, int protocol = 0) {
1409 assert(xsink);
1410 family = q_get_af(family);
1411 type = q_get_sock_type(type);
1412
1413 QORE_TRACE("qore_socket_private::connectINET()");
1414
1415 // close socket if already open
1416 close();
1417
1418 printd(5, "qore_socket_private::connectINET(%s:%s, %dms)\n", host, service, timeout_ms);
1419
1420 do_resolve_event(host, service);
1421
1422 QoreAddrInfo ai;
1423 if (ai.getInfo(xsink, host, service, family, 0, type, protocol))
1424 return -1;
1425
1426 hashdecl addrinfo* aip = ai.getAddrInfo();
1427
1428 // emit all "resolved" events
1429 if (event_queue)
1430 for (struct addrinfo* p = aip; p; p = p->ai_next)
1431 do_resolved_event(p->ai_addr);
1432
1433 int prt = q_get_port_from_addr(aip->ai_addr);
1434
1435 for (struct addrinfo* p = aip; p; p = p->ai_next) {
1436 if (!connectINETIntern(host, service, p->ai_family, p->ai_addr, p->ai_addrlen, p->ai_socktype,
1437 p->ai_protocol, prt, timeout_ms, xsink, true)) {
1438 return 0;
1439 }
1440 if (*xsink) {
1441 break;
1442 }
1443 }
1444
1445 if (!*xsink) {
1446 qore_socket_error(xsink, "SOCKET-CONNECT-ERROR", "error in connect()", 0, host, service);
1447 }
1448 return -1;
1449 }
1450
1451 DLLLOCAL int connectINETIntern(const char* host, const char* service, int ai_family, struct sockaddr* ai_addr,
1452 size_t ai_addrlen, int ai_socktype, int ai_protocol, int prt, int timeout_ms, ExceptionSink* xsink,
1453 bool only_timeout = false) {
1454 assert(xsink);
1455 printd(5, "qore_socket_private::connectINETIntern() host: %s service: %s family: %d timeout_ms: %d\n", host,
1456 service, ai_family, timeout_ms);
1457 if ((sock = socket(ai_family, ai_socktype, ai_protocol)) == QORE_INVALID_SOCKET) {
1458 xsink->raiseErrnoException("SOCKET-CONNECT-ERROR", errno, "cannot establish a connection to %s:%s", host,
1459 service);
1460 return -1;
1461 }
1462
1463 //printd(5, "qore_socket_private::connectINETIntern(this: %p, host: '%s', port: %d, timeout_ms: %d) "
1464 // "sock: %d\n", this, host, port, timeout_ms, sock);
1465
1466 int rc;
1467
1468 // perform connect with timeout if a non-negative timeout was passed
1469 if (timeout_ms >= 0) {
1470 // set non-blocking
1471 if (set_non_blocking(true, xsink))
1472 return close_and_exit();
1473
1474 do_connect_event(ai_family, ai_addr, host, service, prt);
1475
1476 rc = connectINETTimeout(timeout_ms, ai_addr, ai_addrlen, xsink, only_timeout);
1477 //printd(5, "qore_socket_private::connectINETIntern() errno: %d rc: %d, xsink: %d\n", errno, rc, xsink && *xsink);
1478
1479 // set blocking
1480 if (set_non_blocking(false, xsink))
1481 return close_and_exit();
1482 } else {
1483 do_connect_event(ai_family, ai_addr, host, service, prt);
1484
1485 while (true) {
1486 rc = ::connect(sock, ai_addr, ai_addrlen);
1487
1488 // try again if rc == -1 and errno == EINTR
1489 if (!rc || sock_get_error() != EINTR)
1490 break;
1491 }
1492 }
1493
1494 if (rc < 0) {
1495 if (!only_timeout || errno == ETIMEDOUT)
1496 qore_socket_error(xsink, "SOCKET-CONNECT-ERROR", "error in connect()", 0, host, service);
1497
1498 return close_and_exit();
1499 }
1500
1501 sfamily = ai_family;
1502 stype = ai_socktype;
1503 sprot = ai_protocol;
1504 port = prt;
1505 //printd(5, "qore_socket_private::connectINETIntern(this: %p, host='%s', port: %d, timeout_ms: %d) success, "
1506 // "rc: %d, sock: %d\n", this, host, port, timeout_ms, rc, sock);
1507
1508 confirmConnected(host);
1509 return 0;
1510 }
1511
1512 DLLLOCAL int upgradeClientToSSLIntern(ExceptionSink* xsink, const char* mname, const char* sni_target_host,
1513 int timeout_ms, QoreSSLCertificate* cert = nullptr, QoreSSLPrivateKey* pkey = nullptr) {
1514 assert(!ssl);
1515 SSLSocketHelperHelper sshh(this, true);
1516
1517 int rc;
1518 do_start_ssl_event();
1519 // issue #3053: send target hostname to support SNI
1520 if (!sni_target_host && !client_target.empty()) {
1521 sni_target_host = client_target.c_str();
1522 }
1523 if ((rc = ssl->setClient(xsink, mname, sni_target_host, sock, cert, pkey))
1524 || ssl->connect(mname, timeout_ms,
1525 xsink)) {
1526 sshh.error();
1527 return rc ? rc : -1;
1528 }
1529 do_ssl_established_event();
1530
1531 return 0;
1532 }
1533
1534 DLLLOCAL int upgradeServerToSSLIntern(ExceptionSink* xsink, const char* mname, int timeout_ms,
1535 QoreSSLCertificate* cert = nullptr, QoreSSLPrivateKey* pkey = nullptr) {
1536 assert(!ssl);
1537 //printd(5, "qore_socket_private::upgradeServerToSSLIntern() this: %p mode: %d\n", this, ssl_verify_mode);
1538 SSLSocketHelperHelper sshh(this, true);
1539
1540 do_start_ssl_event();
1541 if (ssl->setServer(xsink, mname, sock, cert, pkey) || ssl->accept(mname, timeout_ms, xsink)) {
1542 sshh.error();
1543 return -1;
1544 }
1545 do_ssl_established_event();
1546
1547 return 0;
1548 }
1549
1550 // returns 0 = success, -1 = error
1551 DLLLOCAL int openUNIX(int sock_type = SOCK_STREAM, int protocol = 0) {
1552 if (sock != QORE_INVALID_SOCKET)
1553 close();
1554
1555 if ((sock = socket(AF_UNIX, sock_type, protocol)) == QORE_INVALID_SOCKET) {
1556 return -1;
1557 }
1558
1559 sfamily = AF_UNIX;
1560 stype = sock_type;
1561 sprot = protocol;
1562 port = -1;
1563 return 0;
1564 }
1565
1566 // returns 0 = success, -1 = error
1567 DLLLOCAL int openINET(int family = AF_INET, int sock_type = SOCK_STREAM, int protocol = 0) {
1568 if (sock != QORE_INVALID_SOCKET)
1569 close();
1570
1571 if ((sock = socket(family, sock_type, protocol)) == QORE_INVALID_SOCKET)
1572 return -1;
1573
1574 sfamily = family;
1575 stype = sock_type;
1576 sprot = protocol;
1577 port = -1;
1578 return 0;
1579 }
1580
1581 DLLLOCAL int reuse(int opt) {
1582 //printf("qore_socket_private::reuse(%s)\n", opt ? "true" : "false");
1583 return setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (SETSOCKOPT_ARG_4)&opt, sizeof(int));
1584 }
1585
1586 // the only place where xsink is optional
1587 DLLLOCAL int bindIntern(struct sockaddr* ai_addr, size_t ai_addrlen, int prt, bool reuseaddr, ExceptionSink* xsink = 0) {
1588 reuse(reuseaddr);
1589
1590 if ((::bind(sock, ai_addr, ai_addrlen)) == QORE_SOCKET_ERROR) {
1591 if (xsink)
1592 qore_socket_error(xsink, "SOCKET-BIND-ERROR", "error in bind()", 0, 0, 0, ai_addr);
1593 close();
1594 return -1;
1595 }
1596
1597 // set port number
1598 if (prt)
1599 port = prt;
1600 else {
1601 // get port number
1602#if defined(HPUX) && defined(__ia64) && defined(__LP64__)
1603 // on HPUX 64-bit the OS defines socklen_t to be 8 bytes, but the library expects a 32-bit value
1604 int len = ai_addrlen;
1605#else
1606 socklen_t len = ai_addrlen;
1607#endif
1608
1609 if (getsockname(sock, ai_addr, &len))
1610 port = -1;
1611 else
1612 port = q_get_port_from_addr(ai_addr);
1613 }
1614 return 0;
1615 }
1616
1617 // bind to UNIX domain socket file
1618 DLLLOCAL int bindUNIX(ExceptionSink* xsink, const char* name, int socktype = SOCK_STREAM, int protocol = 0) {
1619 assert(xsink);
1620#ifdef _Q_WINDOWS
1621 xsink->raiseException("SOCKET-BINDUNIX-ERROR", "UNIX sockets are not available under Windows");
1622 return -1;
1623#else
1624 close();
1625
1626 // try to open socket if necessary
1627 if (openUNIX(socktype, protocol)) {
1628 xsink->raiseErrnoException("SOCKET-BIND-ERROR", errno, "error opening UNIX socket ('%s') for bind", name);
1629 return -1;
1630 }
1631
1632 hashdecl sockaddr_un addr;
1633 addr.sun_family = AF_UNIX;
1634 // copy path and terminate if necessary
1635 strncpy(addr.sun_path, name, sizeof(addr.sun_path) - 1);
1636 addr.sun_path[sizeof(addr.sun_path) - 1] = '\0';
1637
1638 if (bindIntern((sockaddr*)&addr, sizeof(struct sockaddr_un), -1, false, xsink))
1639 return -1;
1640
1641 // save socket file name for deleting on close
1642 socketname = addr.sun_path;
1643 // delete UNIX domain socket on close
1644 del = true;
1645 return 0;
1646#endif // windows
1647 }
1648
1649 DLLLOCAL int bindINET(ExceptionSink* xsink, const char* name, const char* service, bool reuseaddr = true, int family = AF_UNSPEC, int socktype = SOCK_STREAM, int protocol = 0) {
1650 assert(xsink);
1651 family = q_get_af(family);
1652 socktype = q_get_sock_type(socktype);
1653
1654 close();
1655
1656 QoreAddrInfo ai;
1657 do_resolve_event(name, service);
1658 if (ai.getInfo(xsink, name, service, family, AI_PASSIVE, socktype, protocol))
1659 return -1;
1660
1661 hashdecl addrinfo* aip = ai.getAddrInfo();
1662 // first emit all "resolved" events
1663 if (event_queue)
1664 for (struct addrinfo* p = aip; p; p = p->ai_next)
1665 do_resolved_event(p->ai_addr);
1666
1667 // try to open socket if necessary
1668 if (openINET(aip->ai_family, aip->ai_socktype, protocol)) {
1669 qore_socket_error(xsink, "SOCKET-BINDINET-ERROR", "error opening socket for bind", 0, name, service);
1670 return -1;
1671 }
1672
1673 int prt = q_get_port_from_addr(aip->ai_addr);
1674
1675 int en = 0;
1676 // iterate through addresses and bind to the first interface possible
1677 for (struct addrinfo* p = aip; p; p = p->ai_next) {
1678 if (!bindIntern(p->ai_addr, p->ai_addrlen, prt, reuseaddr)) {
1679 //printd(5, "qore_socket_private::bindINET(family: %d) bound: name: %s service: %s f: %d st: %d p: %d\n", family, name ? name : "(null)", service ? service : "(null)", p->ai_family, p->ai_socktype, p->ai_protocol);
1680 return 0;
1681 }
1682
1683 en = sock_get_raw_error();
1684 //printd(5, "qore_socket_private::bindINET() failed to bind: name: %s service: %s f: %d st: %d p: %d, errno: %d (%s)\n", name ? name : "(null)", service ? service : "(null)", p->ai_family, p->ai_socktype, p->ai_protocol, en, strerror(en));
1685 }
1686
1687 // if no bind was possible, then raise an exception
1688 qore_socket_error_intern(en, xsink, "SOCKET-BIND-ERROR", "error binding on socket", 0, name, service);
1689 return -1;
1690 }
1691
1692 // only called from qore-bound code - always with xsink
1693 DLLLOCAL QoreHashNode* getPeerInfo(ExceptionSink* xsink, bool host_lookup = true) const {
1694 assert(xsink);
1695 if (sock == QORE_INVALID_SOCKET) {
1696 se_not_open("Socket", "getPeerInfo", xsink);
1697 return 0;
1698 }
1699
1700 hashdecl sockaddr_storage addr;
1701 socklen_t len = sizeof addr;
1702 if (getpeername(sock, (struct sockaddr*)&addr, &len)) {
1703 qore_socket_error(xsink, "SOCKET-GETPEERINFO-ERROR", "error in getpeername()");
1704 return 0;
1705 }
1706
1707 return getAddrInfo(addr, len, host_lookup);
1708 }
1709
1710 // only called from qore-bound code - always with xsink
1711 DLLLOCAL QoreHashNode* getSocketInfo(ExceptionSink* xsink, bool host_lookup = true) const {
1712 assert(xsink);
1713 if (sock == QORE_INVALID_SOCKET) {
1714 se_not_open("Socket", "getSocketInfo", xsink);
1715 return 0;
1716 }
1717
1718 hashdecl sockaddr_storage addr;
1719#if defined(HPUX) && defined(__ia64) && defined(__LP64__)
1720 // on HPUX 64-bit the OS defines socklen_t to be 8 bytes, but the library expects a 32-bit value
1721 int len = sizeof addr;
1722#else
1723 socklen_t len = sizeof addr;
1724#endif
1725
1726 if (getsockname(sock, (struct sockaddr*)&addr, &len)) {
1727 qore_socket_error(xsink, "SOCKET-GETSOCKETINFO-ERROR", "error in getsockname()");
1728 return 0;
1729 }
1730
1731 return getAddrInfo(addr, len, host_lookup);
1732 }
1733
1734 DLLLOCAL QoreHashNode* getAddrInfo(const struct sockaddr_storage& addr, socklen_t len, bool host_lookup = true) const {
1735 QoreHashNode* h = new QoreHashNode(autoTypeInfo);
1736
1737 if (addr.ss_family == AF_INET || addr.ss_family == AF_INET6) {
1738 if (host_lookup) {
1739 char host[NI_MAXHOST + 1];
1740
1741 if (!getnameinfo((struct sockaddr*)&addr, qore_get_in_len((struct sockaddr*)&addr), host, sizeof(host), 0, 0, 0)) {
1742 QoreStringNode* hoststr = new QoreStringNode(host);
1743 h->setKeyValue("hostname", hoststr, 0);
1744 h->setKeyValue("hostname_desc", QoreAddrInfo::getAddressDesc(addr.ss_family, hoststr->getBuffer()), 0);
1745 }
1746 }
1747
1748 // get ipv4 or ipv6 address
1749 char ifname[INET6_ADDRSTRLEN];
1750 if (inet_ntop(addr.ss_family, qore_get_in_addr((struct sockaddr*)&addr), ifname, sizeof(ifname))) {
1751 QoreStringNode* addrstr = new QoreStringNode(ifname);
1752 h->setKeyValue("address", addrstr, 0);
1753 h->setKeyValue("address_desc", QoreAddrInfo::getAddressDesc(addr.ss_family, addrstr->getBuffer()), 0);
1754 }
1755
1756 int tport;
1757 if (addr.ss_family == AF_INET) {
1758 hashdecl sockaddr_in* s = (hashdecl sockaddr_in*)&addr;
1759 tport = ntohs(s->sin_port);
1760 } else {
1761 hashdecl sockaddr_in6* s = (hashdecl sockaddr_in6*)&addr;
1762 tport = ntohs(s->sin6_port);
1763 }
1764
1765 h->setKeyValue("port", tport, 0);
1766 }
1767#ifndef _Q_WINDOWS
1768 else if (addr.ss_family == AF_UNIX) {
1769 assert(!socketname.empty());
1770 QoreStringNode* addrstr = new QoreStringNode(socketname);
1771 h->setKeyValue("address", addrstr, 0);
1772 h->setKeyValue("address_desc", QoreAddrInfo::getAddressDesc(addr.ss_family, addrstr->getBuffer()), 0);
1773 }
1774#endif
1775
1776 h->setKeyValue("family", addr.ss_family, 0);
1777 h->setKeyValue("familystr", new QoreStringNode(QoreAddrInfo::getFamilyName(addr.ss_family)), 0);
1778
1779 return h;
1780 }
1781
1782 // set backwards-compatible object members on accept
1783 // to be (hopefully) deleted in a future version of qore
1784 DLLLOCAL void setAccept(QoreObject* o) {
1785 hashdecl sockaddr_storage addr;
1786
1787 socklen_t len = sizeof addr;
1788 if (getpeername(sock, (struct sockaddr*)&addr, &len))
1789 return;
1790
1791 if (addr.ss_family == AF_INET || addr.ss_family == AF_INET6) {
1792 // get ipv4 or ipv6 address
1793 char ifname[INET6_ADDRSTRLEN];
1794 if (inet_ntop(addr.ss_family, qore_get_in_addr((struct sockaddr *)&addr), ifname, sizeof(ifname))) {
1795 //printd(5, "inet_ntop() '%s' host: '%s'\n", ifname, host);
1796 o->setValue("source", new QoreStringNode(ifname), 0);
1797 }
1798
1799 char host[NI_MAXHOST + 1];
1800 if (!getnameinfo((struct sockaddr *)&addr, qore_get_in_len((struct sockaddr *)&addr), host, sizeof(host),
1801 0, 0, 0)) {
1802 o->setValue("source_host", new QoreStringNode(host), 0);
1803 }
1804 }
1805#ifndef _Q_WINDOWS
1806 else if (addr.ss_family == AF_UNIX) {
1807 QoreStringNode* astr = new QoreStringNode(enc);
1808 hashdecl sockaddr_un *addr_un = (hashdecl sockaddr_un *)&addr;
1809 astr->sprintf("UNIX socket: %s", addr_un->sun_path);
1810 o->setValue("source", astr, 0);
1811 o->setValue("source_host", new QoreStringNode("localhost"), 0);
1812 }
1813#endif
1814 }
1815
1817
1822 DLLLOCAL int readByteFromBuffer(char& output) {
1823 // must be checked if open/connected before this function is called
1824 assert(sock != QORE_INVALID_SOCKET);
1825
1826 // always returned buffered data first
1827 if (!buflen) {
1828 return -1;
1829 }
1830
1831 output = *(rbuf + bufoffset);
1832 if (buflen == 1) {
1833 buflen = 0;
1834 bufoffset = 0;
1835 } else {
1836 --buflen;
1837 ++bufoffset;
1838 }
1839 return 0;
1840 }
1841
1842 // buffered reads for high performance
1843 DLLLOCAL ssize_t brecv(ExceptionSink* xsink, const char* meth, char*& buf, size_t bs, int flags,
1844 int timeout, bool do_event = true) {
1845 assert(xsink);
1846 // must be checked if open/connected before this function is called
1847 assert(sock != QORE_INVALID_SOCKET);
1848 assert(meth);
1849
1850 // always returned buffered data first
1851 if (buflen) {
1852 buf = rbuf + bufoffset;
1853 if (buflen <= bs) {
1854 bs = buflen;
1855 buflen = 0;
1856 bufoffset = 0;
1857 } else {
1858 buflen -= bs;
1859 bufoffset += bs;
1860 }
1861 return (ssize_t)bs;
1862 }
1863
1864 // real socket reads are only done when the buffer is empty
1865
1866 //printd(5, "qore_socket_private::brecv(buf: %p, bs: %d, flags: %d, timeout: %d, do_event: %d) this: %p "
1867 // ssl: %d\n", buf, (int)bs, flags, timeout, (int)do_event, this, ssl);
1868
1869 ssize_t rc;
1870 if (!ssl) {
1871 if (timeout != -1 && !isDataAvailable(timeout, meth, xsink)) {
1872 if (*xsink) {
1873 return -1;
1874 }
1875 se_timeout("Socket", meth, timeout, xsink);
1876 return QSE_TIMEOUT;
1877 }
1878
1879 while (true) {
1880#ifdef DEBUG
1881 errno = 0;
1882#endif
1883 rc = ::recv(sock, rbuf, DEFAULT_SOCKET_BUFSIZE, flags);
1884 if (rc == QORE_SOCKET_ERROR) {
1885 sock_get_error();
1886 if (errno == EINTR)
1887 continue;
1888#ifdef ECONNRESET
1889 if (errno == ECONNRESET) {
1890 se_closed("Socket", meth, xsink);
1891 close();
1892 } else
1893#endif
1894 qore_socket_error(xsink, "SOCKET-RECV-ERROR", "error in recv()", meth);
1895 break;
1896 }
1897 //printd(5, "qore_socket_private::brecv(%d, %p, %ld, %d) rc: %ld errno: %d\n", sock, buf, bs, flags,
1898 // rc, errno);
1899 // try again if we were interrupted by a signal
1900 if (rc >= 0)
1901 break;
1902 }
1903 } else {
1904 rc = ssl->read(meth, rbuf, DEFAULT_SOCKET_BUFSIZE, timeout, xsink);
1905 }
1906
1907 //printd(5, "qore_socket_private::brecv(%d, %p, %ld, %d) rc: %ld errno: %d\n", sock, buf, bs, flags, rc, errno);
1908 if (rc > 0) {
1909 buf = rbuf;
1910 assert(!buflen);
1911 assert(!bufoffset);
1912 if (rc > (ssize_t)bs) {
1913 buflen = rc - bs;
1914 bufoffset = bs;
1915 rc = bs;
1916 }
1917
1918 // register event
1919 if (do_event)
1920 do_read_event(rc, rc);
1921 } else {
1922#ifdef DEBUG
1923 buf = 0;
1924#endif
1925 if (!rc)
1926 close();
1927 }
1928
1929 return rc;
1930 }
1931
1933 DLLLOCAL QoreStringNode* readHTTPData(ExceptionSink* xsink, const char* meth, int timeout, ssize_t& rc,
1934 bool exit_early = false) {
1935 assert(xsink);
1936 assert(meth);
1937 if (sock == QORE_INVALID_SOCKET) {
1938 se_not_open("Socket", meth, xsink, "readHTTPData");
1939 rc = QSE_NOT_OPEN;
1940 return 0;
1941 }
1942
1943 PrivateQoreSocketThroughputHelper th(this, false);
1944
1945 // state:
1946 // 0 = '\r' received
1947 // 1 = '\r\n' received
1948 // 2 = '\r\n\r' received
1949 // 3 = '\n' received
1950 // read in HHTP header until \r\n\r\n or \n\n from socket
1951 int state = -1;
1953
1954 size_t count = 0;
1955
1956 while (true) {
1957 char* buf;
1958 rc = brecv(xsink, meth, buf, 1, 0, timeout, false);
1959 //printd(5, "qore_socket_private::readHTTPData() this: %p Socket::%s(): rc: %zd read char: %c (%03d) "
1960 // "(old state: %d)\n", this, meth, rc, rc > 0 && buf[0] > 31 ? buf[0] : '?', rc > 0 ? buf[0] : 0, state);
1961 if (rc <= 0) {
1962 //printd(5, "qore_socket_private::readHTTPData(timeout: %d) hdr='%s' (len: %d), rc=" QSD ", "
1963 // "errno: %d: '%s'\n", timeout, hdr->getBuffer(), hdr->strlen(), rc, errno, strerror(errno));
1964
1965 if (!*xsink) {
1966 if (!count) {
1967 //printd(5, "qore_socket_private::readHTTPData() this: %p rc: %d count: %d (%d) "
1968 // "timeout: %d\n", this, rc, count, hdr->size(), timeout);
1969 se_closed("Socket", meth, xsink);
1970 } else {
1971 xsink->raiseExceptionArg("SOCKET-HTTP-ERROR", hdr.release(), "socket closed on remote end "
1972 "while reading header data after reading " QSD " byte%s", count, count == 1 ? "" : "s");
1973 }
1974 }
1975 return 0;
1976 }
1977 char c = buf[0];
1978 if (++count == QORE_MAX_HEADER_SIZE) {
1979 xsink->raiseException("SOCKET-HTTP-ERROR", "header size cannot exceed " QSD " bytes", count);
1980 return 0;
1981 }
1982
1983 // check if we can progress to the next state
1984 if (c == '\n') {
1985 if (state == -1) {
1986 state = 3;
1987 continue;
1988 }
1989 if (!state) {
1990 if (exit_early && hdr->empty())
1991 return 0;
1992 state = 1;
1993 continue;
1994 }
1995 assert(state > 0);
1996 break;
1997 } else if (c == '\r') {
1998 if (state == -1) {
1999 state = 0;
2000 continue;
2001 }
2002 if (!state)
2003 break;
2004 if (state == 1) {
2005 state = 2;
2006 continue;
2007 }
2008 }
2009
2010 if (state != -1) {
2011 switch (state) {
2012 case 0: hdr->concat('\r'); break;
2013 case 1: hdr->concat("\r\n"); break;
2014 case 2: hdr->concat("\r\n\r"); break;
2015 case 3: hdr->concat('\n'); break;
2016 }
2017 state = -1;
2018 }
2019 hdr->concat(c);
2020 }
2021 hdr->concat('\n');
2022 //printd(5, "qore_socket_private::readHTTPData(timeout: %d) hdr='%s' (%d)\n", timeout, hdr->getBuffer(),
2023 // hdr->size());
2024 th.finalize(hdr->size());
2025 return hdr.release();
2026 }
2027
2028 DLLLOCAL QoreStringNode* recv(ExceptionSink* xsink, ssize_t bufsize, int timeout, ssize_t& rc,
2029 int source = QORE_SOURCE_SOCKET) {
2030 assert(xsink);
2031 if (sock == QORE_INVALID_SOCKET) {
2032 se_not_open("Socket", "recv", xsink, "recv");
2033 rc = QSE_NOT_OPEN;
2034 return 0;
2035 }
2036 if (in_op >= 0) {
2037 if (in_op == q_gettid()) {
2038 se_in_op("Socket", "recv", xsink);
2039 return 0;
2040 }
2041 se_in_op_thread("Socket", "recv", xsink);
2042 return 0;
2043 }
2044
2045 PrivateQoreSocketThroughputHelper th(this, false);
2046
2047 size_t bs = bufsize > 0 && bufsize < DEFAULT_SOCKET_BUFSIZE ? bufsize : DEFAULT_SOCKET_BUFSIZE;
2048
2050
2051 char* buf;
2052
2053 while (true) {
2054 rc = brecv(xsink, "recv", buf, bs, 0, timeout, false);
2055
2056 if (rc <= 0) {
2057 if (*xsink) {
2058 xsink->appendLastDescription(" (%zd bytes requested; %zu bytes received)", bs, str->size());
2059 }
2060 printd(5, "qore_socket_private::recv(" QSD ", %d) bs=" QSD ", br=" QSD ", rc=" QSD ", errno: %d "
2061 "(%s)\n", bufsize, timeout, bs, str->size(), rc, errno, strerror(errno));
2062 break;
2063 }
2064
2065 str->concat(buf, rc);
2066
2067 // register event
2068 if (source > 0) {
2069 do_read_event(rc, str->size(), bufsize, source);
2070 }
2071
2072 if (bufsize > 0) {
2073 if (str->size() >= (size_t)bufsize)
2074 break;
2075 if ((bufsize - str->size()) < bs)
2076 bs = bufsize - str->size();
2077 }
2078 }
2079
2080 printd(5, "qore_socket_private::recv() received " QSD " byte(s), bufsize=" QSD ", strlen=" QSD " str='%s'\n",
2081 str->size(), bufsize, (str ? str->strlen() : 0), str ? str->getBuffer() : "n/a");
2082
2083 // "fix" return code value if no error occurred
2084 if (rc >= 0)
2085 rc = str->size();
2086
2087 th.finalize(str->size());
2088
2089 if (*xsink) {
2090 return nullptr;
2091 }
2092
2093 if (source > 0) {
2094 do_data_event(QORE_EVENT_SOCKET_DATA_READ, source, **str);
2095 }
2096 return str.release();
2097 }
2098
2099 DLLLOCAL QoreStringNode* recvAll(ExceptionSink* xsink, int timeout, ssize_t& rc,
2100 int source = QORE_SOURCE_SOCKET) {
2101 assert(xsink);
2102 if (sock == QORE_INVALID_SOCKET) {
2103 se_not_open("Socket", "recv", xsink, "recvAll");
2104 rc = QSE_NOT_OPEN;
2105 return 0;
2106 }
2107 if (in_op >= 0) {
2108 if (in_op == q_gettid()) {
2109 se_in_op("Socket", "recv", xsink);
2110 return 0;
2111 }
2112 se_in_op_thread("Socket", "recv", xsink);
2113 return 0;
2114 }
2115
2116 PrivateQoreSocketThroughputHelper th(this, false);
2117
2119
2120 // perform first read with timeout
2121 char* buf;
2122 rc = brecv(xsink, "recv", buf, DEFAULT_SOCKET_BUFSIZE, 0, timeout, false);
2123 if (rc <= 0)
2124 return 0;
2125
2126 str->concat(buf, rc);
2127
2128 // register event
2129 do_read_event(rc, rc);
2130
2131 // keep reading data until no more data is available without a timeout
2132 if (isDataAvailable(0, "recv", xsink)) {
2133 do {
2134 rc = brecv(xsink, "recv", buf, DEFAULT_SOCKET_BUFSIZE, 0, 0, false);
2135 //printd(5, "qore_socket_private::recv(to: %d) rc=" QSD " rd=" QSD "\n", timeout, rc, str->size());
2136 // if the remote end has closed the connection, return what we have
2137 if (!rc)
2138 break;
2139 if (rc < 0) {
2140 th.finalize(str->size());
2141 return 0;
2142 }
2143 str->concat(buf, rc);
2144
2145 // register event
2146 do_read_event(rc, str->size());
2147 } while (isDataAvailable(0, "recv", xsink));
2148 }
2149
2150 th.finalize(str->size());
2151
2152 if (*xsink) {
2153 return nullptr;
2154 }
2155
2156 rc = str->size();
2157 if (source > 0) {
2158 do_data_event(QORE_EVENT_SOCKET_DATA_READ, source, **str);
2159 }
2160 return str.release();
2161 }
2162
2163 DLLLOCAL int recv(int fd, ssize_t size, int timeout_ms, ExceptionSink* xsink);
2164
2165 DLLLOCAL BinaryNode* recvBinary(ExceptionSink* xsink, ssize_t bufsize, int timeout, ssize_t& rc,
2166 int source = QORE_SOURCE_SOCKET) {
2167 assert(xsink);
2168 if (sock == QORE_INVALID_SOCKET) {
2169 se_not_open("Socket", "recvBinary", xsink, "recvBinary");
2170 rc = QSE_NOT_OPEN;
2171 return 0;
2172 }
2173 if (in_op >= 0) {
2174 if (in_op == q_gettid()) {
2175 se_in_op("Socket", "recvBinary", xsink);
2176 return 0;
2177 }
2178 se_in_op_thread("Socket", "recvBinary", xsink);
2179 return 0;
2180 }
2181
2182 PrivateQoreSocketThroughputHelper th(this, false);
2183
2184 size_t bs = bufsize > 0 && bufsize < DEFAULT_SOCKET_BUFSIZE ? bufsize : DEFAULT_SOCKET_BUFSIZE;
2185
2187
2188 char* buf;
2189 while (true) {
2190 rc = brecv(xsink, "recvBinary", buf, bs, 0, timeout);
2191 if (rc <= 0)
2192 break;
2193
2194 b->append(buf, rc);
2195
2196 if (bufsize > 0) {
2197 if (b->size() >= (size_t)bufsize)
2198 break;
2199 if ((bufsize - b->size()) < bs)
2200 bs = bufsize - b->size();
2201 }
2202 }
2203
2204 th.finalize(b->size());
2205
2206 if (*xsink)
2207 return nullptr;
2208
2209 // "fix" return code value if no error occurred
2210 if (rc >= 0)
2211 rc = b->size();
2212
2213 if (source > 0) {
2214 do_data_event(QORE_EVENT_SOCKET_DATA_READ, source, **b);
2215 }
2216 printd(5, "qore_socket_private::recvBinary() received " QSD " byte(s), bufsize=" QSD ", blen=" QSD "\n", b->size(), bufsize, b->size());
2217 return b.release();
2218 }
2219
2220 DLLLOCAL BinaryNode* recvBinaryAll(ExceptionSink* xsink, int timeout, ssize_t& rc, int source = QORE_SOURCE_SOCKET) {
2221 assert(xsink);
2222 if (sock == QORE_INVALID_SOCKET) {
2223 se_not_open("Socket", "recvBinary", xsink, "recvBinaryAll");
2224 rc = QSE_NOT_OPEN;
2225 return 0;
2226 }
2227 if (in_op >= 0) {
2228 if (in_op == q_gettid()) {
2229 se_in_op("Socket", "recvBinary", xsink);
2230 return 0;
2231 }
2232 se_in_op_thread("Socket", "recvBinary", xsink);
2233 return 0;
2234 }
2235
2236 PrivateQoreSocketThroughputHelper th(this, false);
2237
2239
2240 //printd(5, "QoreSocket::recvBinary(%d, " QSD ") this: %p\n", timeout, rc, this);
2241 // perform first read with timeout
2242 char* buf;
2243 rc = brecv(xsink, "recvBinary", buf, DEFAULT_SOCKET_BUFSIZE, 0, timeout, false);
2244 if (rc <= 0)
2245 return 0;
2246
2247 b->append(buf, rc);
2248
2249 // register event
2250 do_read_event(rc, rc);
2251
2252 // keep reading data until no more data is available without a timeout
2253 if (isDataAvailable(0, "recvBinary", xsink)) {
2254 do {
2255 rc = brecv(xsink, "recvBinary", buf, DEFAULT_SOCKET_BUFSIZE, 0, 0, false);
2256 // if the remote end has closed the connection, return what we have
2257 if (!rc)
2258 break;
2259 if (rc < 0) {
2260 th.finalize(b->size());
2261 return 0;
2262 }
2263
2264 b->append(buf, rc);
2265
2266 // register event
2267 do_read_event(rc, b->size());
2268 } while (isDataAvailable(0, "recvBinary", xsink));
2269 }
2270
2271 th.finalize(b->size());
2272
2273 if (*xsink)
2274 return nullptr;
2275
2276 if (source > 0) {
2277 do_data_event(QORE_EVENT_SOCKET_DATA_READ, source, **b);
2278 }
2279 rc = b->size();
2280 //printd(5, "qore_socket_private() this: %p b: %p size: %lld\n", this, b->getPtr(), rc);
2281 return b.release();
2282 }
2283
2284 DLLLOCAL void recvToOutputStream(OutputStream *os, int64 size, int64 timeout, ExceptionSink *xsink, QoreThreadLock* l, int source = QORE_SOURCE_SOCKET) {
2285 if (sock == QORE_INVALID_SOCKET) {
2286 se_not_open("Socket", "recvToOutputStream", xsink);
2287 return;
2288 }
2289 if (in_op >= 0) {
2290 if (in_op == q_gettid()) {
2291 se_in_op("Socket", "recvToOutputStream", xsink);
2292 return;
2293 }
2294 se_in_op_thread("Socket", "recvToOutputStream", xsink);
2295 return;
2296 }
2297
2298 qore_socket_op_helper oh(this);
2299
2300 char* buf;
2301 ssize_t br = 0;
2302 while (size < 0 || br < size) {
2303 // calculate bytes needed
2304 int bn = size < 0 ? DEFAULT_SOCKET_BUFSIZE : QORE_MIN(size - br, DEFAULT_SOCKET_BUFSIZE);
2305
2306 ssize_t rc = brecv(xsink, "recvToOutputStream", buf, bn, 0, timeout);
2307 if (rc < 0) {
2308 //error - already reported in xsink
2309 return;
2310 }
2311 if (rc == 0) {
2312 //eof
2313 if (size >= 0) {
2314 //not all size bytes were read
2315 xsink->raiseException("SOCKET-RECV-ERROR", "Unexpected end of stream");
2316 }
2317 return;
2318 }
2319
2320 if (source > 0) {
2321 do_data_event(QORE_EVENT_SOCKET_DATA_READ, source, buf, rc);
2322 }
2323
2324 // write buffer to the stream
2325 {
2326 AutoUnlocker al(l);
2327 os->write(buf, rc, xsink);
2328 if (*xsink) {
2329 return;
2330 }
2331 }
2332
2333 br += rc;
2334 }
2335 }
2336
2337 DLLLOCAL QoreStringNode* readHTTPHeaderString(ExceptionSink* xsink, int timeout, int source) {
2338 assert(xsink);
2339 ssize_t rc;
2340 QoreStringNodeHolder hdr(readHTTPData(xsink, "readHTTPHeaderString", timeout, rc));
2341 if (!hdr) {
2342 assert(*xsink);
2343 return 0;
2344 }
2345 assert(rc > 0);
2346 do_data_event(QORE_EVENT_HTTP_HEADERS_READ, source, **hdr);
2347 return hdr.release();
2348 }
2349
2350 DLLLOCAL QoreHashNode* readHTTPHeader(ExceptionSink* xsink, QoreHashNode* info, int timeout,
2351 ssize_t& rc, int source, const char* headers_raw_key = "headers-raw") {
2352 assert(xsink);
2353 QoreStringNodeHolder hdr(readHTTPData(xsink, "readHTTPHeader", timeout, rc));
2354 if (!hdr) {
2355 assert(*xsink);
2356 return nullptr;
2357 }
2358
2359 if (hdr->empty()) {
2360 xsink->raiseException("SOCKET-HTTP-ERROR", "remote closed the connection while reading the HTTP header");
2361 return nullptr;
2362 }
2363 assert(rc > 0);
2364
2365 return processHttpHeaderString(xsink, hdr, info, source, headers_raw_key);
2366 }
2367
2369 DLLLOCAL QoreHashNode* processHttpHeaderString(ExceptionSink* xsink, QoreStringNodeHolder& hdr,
2370 QoreHashNode* info, int source, const char* headers_raw_key = "headers-raw") {
2371 const char* buf = hdr->c_str();
2372 char* p;
2373 if ((p = (char*)strstr(buf, "\r\n"))) {
2374 *p = '\0';
2375 p += 2;
2376 } else if ((p = (char*)strchr(buf, '\n'))) {
2377 *p = '\0';
2378 ++p;
2379 } else if ((p = (char*)strchr(buf, '\r'))) {
2380 *p = '\0';
2381 ++p;
2382 } else {
2383 // readHTTPData will only return a string that satisifies one of the above conditions,
2384 // however an embedded 0 could have been sent which would make the above searches invalid
2385 xsink->raiseException("SOCKET-HTTP-ERROR", "invalid header received with embedded nulls in "
2386 "Socket::readHTTPHeader()");
2387 return nullptr;
2388 }
2389
2390 char* t1;
2391 if (!(t1 = (char*)strstr(buf, "HTTP/"))) {
2392 xsink->raiseExceptionArg("SOCKET-HTTP-ERROR", hdr.release(), "missing HTTP version string in "
2393 "first header line in Socket::readHTTPHeader()");
2394 return nullptr;
2395 }
2396
2397 ReferenceHolder<QoreHashNode> h(new QoreHashNode(autoTypeInfo), xsink);
2398
2399 // process header flags
2400 int flags = CHF_PROCESS;
2401
2402 // get version
2403 {
2404 QoreStringNode* hv = new QoreStringNode(t1 + 5, 3, enc);
2405 h->setKeyValue("http_version", hv, nullptr);
2406 if (*hv == "1.1") {
2407 flags |= CHF_HTTP11;
2408 }
2409 }
2410
2411 // if we are getting a response
2412 // key for info if applicable
2413 const char* info_key;
2414 if (t1 == buf) {
2415 char* t2 = (char*)strchr(buf + 8, ' ');
2416 if (t2) {
2417 t2++;
2418 if (isdigit(*(t2))) {
2419 h->setKeyValue("status_code", atoi(t2), nullptr);
2420 if (strlen(t2) > 4) {
2421 h->setKeyValue("status_message", new QoreStringNode(t2 + 4), nullptr);
2422 }
2423 }
2424 }
2425 // write the status line as the "response-uri" key in the info hash if present
2426 // NOTE: this is not a URI, so the name is not really appropriate
2427 info_key = "response-uri";
2428 } else { // get method and path
2429 char* t2 = (char*)strchr(buf, ' ');
2430 if (t2) {
2431 *t2 = '\0';
2432 h->setKeyValue("method", new QoreStringNode(buf), nullptr);
2433 t2++;
2434 t1 = strchr(t2, ' ');
2435 if (t1) {
2436 *t1 = '\0';
2437 //printd(5, "found path '%s'\n", t2);
2438 // the path is returned as-is with no decodings - use decode_url() to decode
2439 h->setKeyValue("path", new QoreStringNode(t2, enc), nullptr);
2440 }
2441 }
2442 info_key = "request-uri";
2443 flags |= CHF_REQUEST;
2444 }
2445
2446 // write status line or request line to the info hash and raise a data event if applicable
2447 if (info || (event_queue && event_data)) {
2448 QoreStringNodeHolder status_line(new QoreStringNode(buf));
2449 if (info && event_queue && event_data) {
2450 status_line->ref();
2451 }
2452 if (event_queue && event_data) {
2453 do_data_event_intern(QORE_EVENT_SOCKET_DATA_READ, source, **status_line);
2454 }
2455 if (info) {
2456 info->setKeyValue(info_key, *status_line, nullptr);
2457 }
2458 status_line.release();
2459 }
2460
2461 bool close = convertHeaderToHash(*h, p, flags, info, &http_exp_chunked_body, headers_raw_key);
2462 do_read_http_header(QORE_EVENT_HTTP_MESSAGE_RECEIVED, *h, source);
2463
2464 // process header info
2465 if ((flags & CHF_REQUEST) && info) {
2466 info->setKeyValue("close", close, 0);
2467 }
2468
2469 return h.release();
2470 }
2471
2472 // info must be already referenced for the assignment, if present
2473 DLLLOCAL int runHeaderCallback(ExceptionSink* xsink, const char* cname, const char* mname,
2474 const ResolvedCallReferenceNode& callback, QoreThreadLock* l, const QoreHashNode* hdr, QoreHashNode* info,
2475 bool send_aborted = false, QoreObject* obj = nullptr) {
2476 assert(xsink);
2477 assert(obj);
2478 ReferenceHolder<QoreListNode> args(new QoreListNode(autoTypeInfo), xsink);
2479 QoreHashNode* arg = new QoreHashNode(autoTypeInfo);
2480 arg->setKeyValue("hdr", hdr ? hdr->refSelf() : nullptr, xsink);
2481 arg->setKeyValue("info", info, xsink);
2482 if (obj)
2483 arg->setKeyValue("obj", obj->refSelf(), xsink);
2484 arg->setKeyValue("send_aborted", send_aborted, xsink);
2485 args->push(arg, nullptr);
2486
2487 ValueHolder rv(xsink);
2488 return runCallback(xsink, cname, mname, rv, callback, l, *args);
2489 }
2490
2491 DLLLOCAL int runTrailerCallback(ExceptionSink* xsink, const char* cname, const char* mname,
2493 ValueHolder rv(xsink);
2494 if (runCallback(xsink, cname, mname, rv, callback, l, nullptr))
2495 return -1;
2496
2497 switch (rv->getType()) {
2498 case NT_NOTHING:
2499 break;
2500 case NT_HASH: {
2501 hdr = rv.release().get<QoreHashNode>();
2502 break;
2503 }
2504 default:
2505 xsink->raiseException("HTTP-TRAILER-ERROR", "chunked callback returned type '%s'; expecting 'hash' "
2506 "or 'NOTHING'", rv->getTypeName());
2507 return -1;
2508 }
2509 return 0;
2510 }
2511
2512 DLLLOCAL int runDataCallback(ExceptionSink* xsink, const char* cname, const char* mname,
2513 const ResolvedCallReferenceNode& callback, QoreThreadLock* l, const AbstractQoreNode* data, bool chunked) {
2514 assert(xsink);
2515 ReferenceHolder<QoreListNode> args(new QoreListNode(autoTypeInfo), xsink);
2516 QoreHashNode* arg = new QoreHashNode(autoTypeInfo);
2517 arg->setKeyValue("data", data->realCopy(), xsink);
2518 arg->setKeyValue("chunked", chunked, xsink);
2519 args->push(arg, nullptr);
2520
2521 ValueHolder rv(xsink);
2522 return runCallback(xsink, cname, mname, rv, callback, l, *args);
2523 }
2524
2525 DLLLOCAL int runCallback(ExceptionSink* xsink, const char* cname, const char* mname, ValueHolder& res,
2526 const ResolvedCallReferenceNode& callback, QoreThreadLock* l, const QoreListNode* args = nullptr) {
2527 assert(xsink);
2528 // FIXME: subtract callback execution time from socket performance measurement
2529
2530 // unlock and execute callback
2531 {
2532 AutoUnlocker al(l);
2533 res = callback.execValue(args, xsink);
2534 }
2535
2536 // check exception and socket status
2537 assert(xsink);
2538 return *xsink ? -1 : 0;
2539 }
2540
2541 DLLLOCAL int sendHttpChunkedWithCallback(ExceptionSink* xsink, const char* cname, const char* mname,
2542 const ResolvedCallReferenceNode& send_callback, QoreThreadLock& l, int source, int timeout_ms = -1,
2543 bool* aborted = nullptr) {
2544 assert(xsink);
2545 assert(!aborted || !(*aborted));
2546
2547 if (sock == QORE_INVALID_SOCKET) {
2548 se_not_open(cname, mname, xsink, "sendHttpChunkedWithCallback");
2549 return QSE_NOT_OPEN;
2550 }
2551 if (in_op >= 0) {
2552 if (in_op == q_gettid()) {
2553 se_in_op(cname, mname, xsink);
2554 return 0;
2555 }
2556 se_in_op_thread(cname, mname, xsink);
2557 return 0;
2558 }
2559
2560 PrivateQoreSocketThroughputHelper th(this, true);
2561
2562 // set the non-blocking flag (for use with non-ssl connections)
2563 bool nb = (timeout_ms >= 0);
2564 // set non-blocking I/O (and restore on exit) if we have a timeout and a non-ssl connection
2565 OptionalNonBlockingHelper onbh(*this, !ssl && nb, xsink);
2566 if (*xsink)
2567 return -1;
2568
2569 qore_socket_op_helper oh(this);
2570
2571 ssize_t rc;
2572 int64 total = 0;
2573 bool done = false;
2574
2575 while (!done) {
2576 // if we have response data already, then we assume an error and abort
2577 if (aborted) {
2578 bool data_available = tryReadSocketData(mname, xsink);
2579 //printd(5, "qore_socket_private::sendHttpChunkedWithCallback() this: %p aborted: %p iDA: %d\n", this, aborted, data_available);
2580 if (data_available || *xsink) {
2581 *aborted = true;
2582 return *xsink ? -1 : 0;
2583 }
2584 }
2585
2586 // FIXME: subtract callback execution time from socket performance measurement
2587 ValueHolder res(xsink);
2588 rc = runCallback(xsink, cname, mname, res, send_callback, &l);
2589 if (rc)
2590 return rc;
2591
2592 //printd(5, "qore_socket_private::sendHttpChunkedWithCallback() this: %p res: %s\n", this, get_type_name(*res));
2593
2594 // check callback return val
2595 QoreString buf;
2596 // do not copy data here; set references and send the data directly
2597 const char* data_ptr = nullptr;
2598 size_t data_size = 0;
2599
2600 switch (res->getType()) {
2601 case NT_STRING: {
2602 const QoreStringNode* str = res->get<const QoreStringNode>();
2603 if (str->empty()) {
2604 done = true;
2605 break;
2606 }
2607 buf.sprintf("%x\r\n", (int)str->size());
2608 data_ptr = str->c_str();
2609 data_size = str->size();
2610 //buf.concat(str->c_str(), str->size());
2611 break;
2612 }
2613
2614 case NT_BINARY: {
2615 const BinaryNode* b = res->get<const BinaryNode>();
2616 if (b->empty()) {
2617 done = true;
2618 break;
2619 }
2620 buf.sprintf("%x\r\n", (int)b->size());
2621 data_ptr = static_cast<const char*>(b->getPtr());
2622 data_size = b->size();
2623 //buf.concat((const char*)b->getPtr(), b->size());
2624 break;
2625 }
2626
2627 case NT_HASH: {
2628 buf.concat("0\r\n");
2629
2630 const QoreHashNode* h = res->get<const QoreHashNode>();
2631 ConstHashIterator hi(h);
2632 while (hi.next()) {
2633 const QoreValue v = hi.get();
2634 const char* key = hi.getKey();
2635
2636 //printd(5, "qore_socket_private::sendHttpChunkedWithCallback() this: %p trailer %s\n", this, key);
2637
2638 if (v.getType() == NT_LIST) {
2639 ConstListIterator li(v.get<const QoreListNode>());
2640 while (li.next())
2641 do_header(key, buf, li.getValue());
2642 } else
2643 do_header(key, buf, v);
2644 }
2645 // fall through to next case
2646 }
2647
2648 case NT_NOTHING:
2649 case NT_NULL:
2650 done = true;
2651 break;
2652
2653 default:
2654 xsink->raiseException("SOCKET-CALLBACK-ERROR", "HTTP chunked data callback returned type '%s'; expecting one of: 'string', 'binary', 'hash', 'nothing' (or 'NULL')", res->getTypeName());
2655 return -1;
2656 }
2657
2658 // send chunk buffer data
2659 if (!buf.empty()) {
2660 rc = sendIntern(xsink, cname, mname, buf.c_str(), buf.size(), timeout_ms, total, true);
2661 }
2662
2663 if (!*xsink) {
2664 assert(rc >= 0);
2665 // send actual data, if available
2666 if (data_ptr && data_size) {
2667 rc = sendIntern(xsink, cname, mname, data_ptr, data_size, timeout_ms, total, true);
2668 }
2669
2670 if (!*xsink) {
2671 assert(rc >= 0);
2672 if (buf.empty() && (!data_ptr || !data_size)) {
2673 buf.set("0\r\n\r\n");
2674 } else {
2675 buf.set("\r\n");
2676 }
2677 rc = sendIntern(xsink, cname, mname, buf.c_str(), buf.size(), timeout_ms, total, true);
2678 }
2679 }
2680
2681 if (!*xsink) {
2682 // do events
2683 switch (res->getType()) {
2684 case NT_STRING: {
2685 const QoreStringNode* str = res->get<const QoreStringNode>();
2686 if (!str->empty()) {
2687 do_data_event(QORE_EVENT_HTTP_CHUNKED_DATA_SENT, source, *str);
2688 }
2689 break;
2690 }
2691
2692 case NT_BINARY: {
2693 const BinaryNode* b = res->get<const BinaryNode>();
2694 if (!b->empty()) {
2695 do_data_event(QORE_EVENT_HTTP_CHUNKED_DATA_SENT, source, *b);
2696 }
2697 break;
2698 }
2699
2700 case NT_HASH: {
2701 const QoreHashNode* h = res->get<const QoreHashNode>();
2702 do_header_event(QORE_EVENT_HTTP_FOOTERS_SENT, source, *h);
2703 break;
2704 }
2705 }
2706 }
2707
2708 if (rc < 0) {
2709 // if we have a socket I/O error, but also data to be read on the socket, then clear the exception and return 0
2710 if (aborted && *xsink) {
2711 bool data_available = tryReadSocketData(mname, xsink);
2712 //printd(5, "qore_socket_private::sendHttpChunkedWithCallback() this: %p aborted: %p iDA: %d\n", this, aborted, data_available);
2713 if (data_available) {
2714 *aborted = true;
2715 return *xsink ? -1 : 0;
2716 }
2717 }
2718
2719 //printd(5, "qore_socket_private::sendHttpChunkedWithCallback() this: %p rc: %d sock: %d xsink: %d\n", this, rc, sock, xsink->isException());
2720 }
2721
2722 //printd(5, "qore_socket_private::sendHttpChunkedWithCallback() this: %p sent: %s\n", this, buf.getBuffer());
2723
2724 if (rc < 0 || sock == QORE_INVALID_SOCKET)
2725 break;
2726 }
2727
2728 th.finalize(total);
2729
2730 return rc < 0 || sock == QORE_INVALID_SOCKET ? -1 : 0;
2731 }
2732
2733 // return: the number of bytes sent or an error (< 0)
2734 DLLLOCAL ssize_t sendIntern(ExceptionSink* xsink, const char* cname, const char* mname, const char* buf, size_t size,
2735 int timeout_ms, int64& total, bool stream = false) {
2736 assert(xsink);
2737 ssize_t rc;
2738 size_t bs = 0;
2739
2740 // set the non-blocking flag (for use with non-ssl connections)
2741 bool nb = (timeout_ms >= 0);
2742
2743 while (true) {
2744 if (ssl) {
2745 // SSL_MODE_ENABLE_PARTIAL_WRITE is enabled so we can get finer-grained socket events for do_send_event() below
2746 rc = ssl->write(mname, buf + bs, size - bs, timeout_ms, xsink);
2747 } else {
2748 while (true) {
2749 rc = ::send(sock, buf + bs, size - bs, 0);
2750 //printd(5, "qore_socket_private::send() this: %p Socket::%s() buf: %p size: %zu timeout_ms: %d ssl: %p nb: %d bs: %zu" rc: %zd\n", this, mname, buf, size, timeout_ms, ssl, nb, bs, rc);
2751 // try again if we were interrupted by a signal
2752 if (rc >= 0)
2753 break;
2754 sock_get_error();
2755 // check that the send finishes before the timeout if we are using non-blocking I/O
2756 if (nb && (errno == EAGAIN
2757#ifdef EWOULDBLOCK
2758 || errno == EWOULDBLOCK
2759#endif
2760 )) {
2761 if (!isWriteFinished(timeout_ms, mname, xsink)) {
2762 if (*xsink)
2763 return -1;
2764 se_timeout("Socket", mname, timeout_ms, xsink);
2765 rc = QSE_TIMEOUT;
2766 break;
2767 }
2768 continue;
2769 }
2770 if (errno != EINTR) {
2771 //printd(5, "qore_socket_private::send() bs: %ld rc: " QSD " len: " QSD " (total: " QSD ") errno: %d sock: %d\n", bs, rc, size - bs, size, errno, sock);
2772 xsink->raiseErrnoException("SOCKET-SEND-ERROR", errno, "error while executing %s::%s()", cname, mname);
2773
2774 // do not close the socket even if we have EPIPE or ECONNRESET in case there is data to be read when streaming
2775#ifdef EPIPE
2776 if (!stream && errno == EPIPE) {
2777 close();
2778 }
2779#endif
2780#ifdef ECONNRESET
2781 if (!stream && errno == ECONNRESET) {
2782 close();
2783 }
2784#endif
2785 break;
2786 }
2787 }
2788 }
2789
2790 if (rc > 0) {
2791 total += rc;
2792 }
2793 //printd(5, "qore_socket_private::send() bs: %ld rc: " QSD " len: " QSD " (total: " QSD ") errno: %d\n", bs, rc, size - bs, size, errno);
2794 if (rc < 0 || sock == QORE_INVALID_SOCKET) {
2795 break;
2796 }
2797 bs += rc;
2798 do_send_event(rc, bs, size);
2799 if (bs >= size) {
2800 break;
2801 }
2802 }
2803
2804 return (ssize_t)bs;
2805 }
2806
2807 DLLLOCAL int send(int fd, ssize_t size, int timeout_ms, ExceptionSink* xsink);
2808
2809 // returns: 0 = OK
2810 DLLLOCAL int send(ExceptionSink* xsink, const char* cname, const char* mname, const char* buf, size_t size,
2811 int timeout_ms = -1, int source = QORE_SOURCE_SOCKET) {
2812 assert(xsink);
2813 if (sock == QORE_INVALID_SOCKET) {
2814 se_not_open(cname, mname, xsink, "send");
2815 return QSE_NOT_OPEN;
2816 }
2817 if (in_op >= 0) {
2818 if (in_op == q_gettid()) {
2819 se_in_op(cname, mname, xsink);
2820 return 0;
2821 }
2822 se_in_op_thread(cname, mname, xsink);
2823 return 0;
2824 }
2825 if (!size) {
2826 return 0;
2827 }
2828
2829 PrivateQoreSocketThroughputHelper th(this, true);
2830
2831 // set the non-blocking flag (for use with non-ssl connections)
2832 bool nb = (timeout_ms >= 0);
2833 // set non-blocking I/O (and restore on exit) if we have a timeout and a non-ssl connection
2834 OptionalNonBlockingHelper onbh(*this, !ssl && nb, xsink);
2835 if (*xsink) {
2836 return -1;
2837 }
2838
2839 int64 total = 0;
2840 ssize_t rc = sendIntern(xsink, cname, mname, buf, size, timeout_ms, total);
2841 th.finalize(total);
2842
2843 if (rc > 0 && source > 0) {
2844 do_data_event(QORE_EVENT_SOCKET_DATA_SENT, source, buf, size);
2845 }
2846
2847 return rc < 0 || sock == QORE_INVALID_SOCKET ? rc : 0;
2848 }
2849
2850 DLLLOCAL void sendFromInputStream(InputStream *is, int64 size, int64 timeout, ExceptionSink *xsink,
2851 QoreThreadLock* l) {
2852 if (sock == QORE_INVALID_SOCKET) {
2853 se_not_open("Socket", "sendFromInputStream", xsink);
2854 return;
2855 }
2856 if (in_op >= 0) {
2857 if (in_op == q_gettid()) {
2858 se_in_op("Socket", "sendFromInputStream", xsink);
2859 return;
2860 }
2861 se_in_op_thread("Socket", "sendFromInputStream", xsink);
2862 return;
2863 }
2864
2865 qore_socket_op_helper oh(this);
2866
2867 PrivateQoreSocketThroughputHelper th(this, true);
2868
2869 // set the non-blocking flag (for use with non-ssl connections)
2870 bool nb = (timeout >= 0);
2871 // set non-blocking I/O (and restore on exit) if we have a timeout and a non-ssl connection
2872 OptionalNonBlockingHelper onbh(*this, !ssl && nb, xsink);
2873 if (*xsink)
2874 return;
2875
2876 char buf[DEFAULT_SOCKET_BUFSIZE];
2877 int64 sent = 0;
2878 int64 total = 0;
2879 while (size < 0 || sent < size) {
2880 int64 toRead = size < 0 ? DEFAULT_SOCKET_BUFSIZE : QORE_MIN(size - sent, DEFAULT_SOCKET_BUFSIZE);
2881 int64 r;
2882 {
2883 AutoUnlocker al(l);
2884 r = is->read(buf, toRead, xsink);
2885 if (*xsink) {
2886 return;
2887 }
2888 }
2889 if (r == 0) {
2890 //eof
2891 if (size >= 0) {
2892 //not all size bytes were sent
2893 xsink->raiseException("SOCKET-SEND-ERROR", "Unexpected end of stream");
2894 return;
2895 }
2896 break;
2897 }
2898
2899 ssize_t rc = sendIntern(xsink, "Socket", "sendFromInputStream", buf, r, timeout, total);
2900 if (rc < 0) {
2901 return;
2902 }
2903 do_data_event(QORE_EVENT_SOCKET_DATA_SENT, QORE_SOURCE_SOCKET, buf, r);
2904 sent += r;
2905 }
2906 th.finalize(total);
2907 }
2908
2909 DLLLOCAL void sendHttpChunkedBodyFromInputStream(InputStream* is, size_t max_chunk_size, int timeout, ExceptionSink* xsink, QoreThreadLock* l, const ResolvedCallReferenceNode* trailer_callback) {
2910 if (sock == QORE_INVALID_SOCKET) {
2911 se_not_open("Socket", "sendHttpChunkedBodyFromInputStream", xsink);
2912 return;
2913 }
2914 if (in_op >= 0) {
2915 if (in_op == q_gettid()) {
2916 se_in_op("Socket", "sendHttpChunkedBodyFromInputStream", xsink);
2917 return;
2918 }
2919 se_in_op_thread("Socket", "sendHttpChunkedBodyFromInputStream", xsink);
2920 return;
2921 }
2922
2923 qore_socket_op_helper oh(this);
2924
2925 PrivateQoreSocketThroughputHelper th(this, true);
2926
2927 // set the non-blocking flag (for use with non-ssl connections)
2928 bool nb = (timeout >= 0);
2929 // set non-blocking I/O (and restore on exit) if we have a timeout and a non-ssl connection
2930 OptionalNonBlockingHelper onbh(*this, !ssl && nb, xsink);
2931 if (*xsink)
2932 return;
2933
2935 // reserve enough space for the maximum size of the buffer + HTTP overhead
2936 buf->preallocate(max_chunk_size);
2937 int64 total = 0;
2938 while (true) {
2939 int64 r;
2940 {
2941 AutoUnlocker al(l);
2942 r = is->read((void*)buf->getPtr(), sizeof(max_chunk_size), xsink);
2943 if (*xsink)
2944 return;
2945 }
2946
2947 // send HTTP chunk prelude with chunk size
2948 QoreString str;
2949 str.sprintf("%x\r\n", (int)r);
2950 int rc = sendIntern(xsink, "Socket", "sendHttpChunkedBodyFromInputStream", str.c_str(), str.size(), timeout, total, true);
2951 if (rc < 0)
2952 return;
2953
2954 bool trailers = false;
2955
2956 // send chunk data, if any
2957 if (r) {
2958 rc = sendIntern(xsink, "Socket", "sendHttpChunkedBodyFromInputStream", (const char*)buf->getPtr(), r, timeout, total, true);
2959 if (rc < 0)
2960 return;
2961 do_data_event(QORE_EVENT_HTTP_CHUNKED_DATA_SENT, QORE_SOURCE_SOCKET, buf->getPtr(), r);
2962 } else if (trailer_callback) {
2963 // get and send chunk trailers, if any
2965
2966 if (runTrailerCallback(xsink, "Socket", "sendHttpChunkedBodyFromInputStream", *trailer_callback, l, h))
2967 return;
2968 if (h) {
2969 str.clear();
2970 do_headers(str, *h, 0, false);
2971
2972 rc = sendIntern(xsink, "Socket", "sendHttpChunkedBodyFromInputStream", str.c_str(), str.size(), timeout, total, true);
2973 if (rc < 0)
2974 return;
2975
2976 do_header_event(QORE_EVENT_HTTP_FOOTERS_SENT, QORE_SOURCE_SOCKET, **h);
2977 trailers = true;
2978 }
2979 }
2980
2981 // close chunk if we sent no trailers
2982 if (!trailers) {
2983 str.set("\r\n");
2984 rc = sendIntern(xsink, "Socket", "sendHttpChunkedBodyFromInputStream", str.c_str(), str.size(), timeout, total, true);
2985 if (rc < 0)
2986 return;
2987 }
2988
2989 if (!r) {
2990 // end of stream
2991 break;
2992 }
2993 }
2994 th.finalize(total);
2995 }
2996
2997 DLLLOCAL void sendHttpChunkedBodyTrailer(const QoreHashNode* headers, int timeout, ExceptionSink* xsink) {
2998 if (sock == QORE_INVALID_SOCKET) {
2999 se_not_open("Socket", "sendHttpChunkedBodyTrailer", xsink);
3000 return;
3001 }
3002 if (in_op >= 0) {
3003 if (in_op == q_gettid()) {
3004 se_in_op("Socket", "sendHttpChunkedBodyTrailer", xsink);
3005 return;
3006 }
3007 se_in_op_thread("Socket", "sendHttpChunkedBodyTrailer", xsink);
3008 return;
3009 }
3010
3011 QoreString buf;
3012 if (!headers) {
3013 ConstHashIterator hi(headers);
3014
3015 while (hi.next()) {
3016 const QoreValue v = hi.get();
3017 const char* key = hi.getKey();
3018
3019 if (v.getType() == NT_LIST) {
3020 ConstListIterator li(v.get<const QoreListNode>());
3021 while (li.next())
3022 do_header(key, buf, li.getValue());
3023 }
3024 else
3025 do_header(key, buf, v);
3026 }
3027 }
3028 buf.concat("\r\n");
3029 int64 total;
3030 sendIntern(xsink, "Socket", "sendHttpChunkedBodyTrailer", buf.getBuffer(), buf.size(), timeout, total, true);
3031 if (!*xsink) {
3032 do_header_event(QORE_EVENT_HTTP_FOOTERS_SENT, QORE_SOURCE_SOCKET, *headers);
3033 }
3034 }
3035
3036 DLLLOCAL void getSendHttpMessageHeaders(QoreString& hdr, QoreHashNode* info, const char* method, const char* path,
3037 const char* http_version, const QoreHashNode* headers, size_t size, int source) {
3038 // prepare header string
3039 hdr.sprintf("%s %s HTTP/%s", method, path && path[0] ? path : "/", http_version);
3040
3041 // write request-uri key if info hash is non-null
3042 if (info) {
3043 info->setKeyValue("request-uri", new QoreStringNode(hdr), nullptr);
3044 }
3045
3046 getSendHttpMessageHeadersCommon(hdr, info, headers, size, source);
3047 }
3048
3049 DLLLOCAL void getSendHttpMessageHeadersCommon(QoreString& hdr, QoreHashNode* info, const QoreHashNode* headers,
3050 size_t size, int source) {
3051 // send event
3052 do_send_http_message_event(hdr, headers, source);
3053
3054 // add headers
3055 hdr.concat("\r\n");
3056 // insert headers
3057 do_headers(hdr, headers, size);
3058 }
3059
3060 DLLLOCAL int sendHttpMessage(ExceptionSink* xsink, QoreHashNode* info, const char* cname, const char* mname,
3061 const char* method, const char* path, const char* http_version, const QoreHashNode* headers,
3062 const QoreStringNode* body, const void* data, size_t size,
3063 const ResolvedCallReferenceNode* send_callback, InputStream* input_stream, size_t max_chunk_size,
3064 const ResolvedCallReferenceNode* trailer_callback, int source, int timeout_ms = -1,
3065 QoreThreadLock* l = nullptr, bool* aborted = nullptr) {
3066 // prepare header string
3067 QoreString hdr(enc);
3068
3069 hdr.sprintf("%s %s HTTP/%s", method, path && path[0] ? path : "/", http_version);
3070
3071 // write request-uri key if info hash is non-null
3072 if (info) {
3073 info->setKeyValue("request-uri", new QoreStringNode(hdr), nullptr);
3074 }
3075
3076 return sendHttpMessageCommon(xsink, hdr, info, cname, mname, headers, body, data, size, send_callback,
3077 input_stream, max_chunk_size, trailer_callback, source, timeout_ms, l, aborted);
3078 }
3079
3080 DLLLOCAL int sendHttpResponse(ExceptionSink* xsink, QoreHashNode* info, const char* cname, const char* mname,
3081 int code, const char* desc, const char* http_version, const QoreHashNode* headers, const QoreStringNode* body,
3082 const void* data, size_t size, const ResolvedCallReferenceNode* send_callback, InputStream* input_stream,
3083 size_t max_chunk_size, const ResolvedCallReferenceNode* trailer_callback, int source, int timeout_ms = -1,
3084 QoreThreadLock* l = nullptr, bool* aborted = nullptr) {
3085 // prepare header string
3086 QoreString hdr(enc);
3087
3088 // write HTTP response status line
3089 hdr.sprintf("HTTP/%s %03d %s", http_version, code, desc);
3090
3091 // write the status line as the "response-uri" key if info hash is non-null
3092 // NOTE: this is not a URI, so the name is not really appropriate
3093 if (info) {
3094 info->setKeyValue("response-uri", new QoreStringNode(hdr), nullptr);
3095 }
3096
3097 return sendHttpMessageCommon(xsink, hdr, info, cname, mname, headers, body, data, size, send_callback,
3098 input_stream, max_chunk_size, trailer_callback, source, timeout_ms, l, aborted);
3099 }
3100
3101 DLLLOCAL int sendHttpMessageCommon(ExceptionSink* xsink, QoreString& hdr, QoreHashNode* info, const char* cname,
3102 const char* mname, const QoreHashNode* headers, const QoreStringNode* body, const void* data,
3103 size_t size, const ResolvedCallReferenceNode* send_callback, InputStream* input_stream,
3104 size_t max_chunk_size, const ResolvedCallReferenceNode* trailer_callback, int source, int timeout_ms = -1,
3105 QoreThreadLock* l = nullptr, bool* aborted = nullptr) {
3106 assert(xsink);
3107 assert(!(data && send_callback));
3108 assert(!(data && input_stream));
3109 assert(!(send_callback && input_stream));
3110
3111 // send event
3112 do_send_http_message_event(hdr, headers, source);
3113
3114 // add headers
3115 hdr.concat("\r\n");
3116 // insert headers
3117 do_headers(hdr, headers, size && data ? size : 0);
3118
3119 //printd(5, "qore_socket_private::sendHttpMessage() hdr: %s\n", hdr.c_str());
3120
3121 // send URI and headers
3122 int rc;
3123 if ((rc = send(xsink, cname, mname, hdr.c_str(), hdr.size(), timeout_ms, -1)))
3124 return rc;
3125
3126 // header message sent above with do_sent_http_message_event()
3127 if (size && data) {
3128 int rc = send(xsink, cname, mname, (char*)data, size, timeout_ms, -1);
3129 if (!rc) {
3130 if (body) {
3131 do_data_event(QORE_EVENT_SOCKET_DATA_SENT, source, *body);
3132 } else {
3133 do_data_event(QORE_EVENT_SOCKET_DATA_SENT, source, data, size);
3134 }
3135 }
3136 return rc;
3137 } else if (send_callback) {
3138 assert(l);
3139 assert(!aborted || !(*aborted));
3140 return sendHttpChunkedWithCallback(xsink, cname, mname, *send_callback, *l, source, timeout_ms, aborted);
3141 } else if (input_stream) {
3142 assert(l);
3143 sendHttpChunkedBodyFromInputStream(input_stream, max_chunk_size, timeout_ms, xsink, l, trailer_callback);
3144 return *xsink ? -1 : 0;
3145 }
3146
3147 return 0;
3148 }
3149
3150 DLLLOCAL QoreHashNode* readHttpChunkedBodyBinary(int timeout, ExceptionSink* xsink, const char* cname, int source, const ResolvedCallReferenceNode* recv_callback = nullptr, QoreThreadLock* l = nullptr, QoreObject* obj = nullptr, OutputStream* os = nullptr) {
3151 assert(xsink);
3152
3153 if (sock == QORE_INVALID_SOCKET) {
3154 se_not_open(cname, "readHTTPChunkedBodyBinary", xsink);
3155 return 0;
3156 }
3157 if (in_op >= 0) {
3158 if (in_op == q_gettid()) {
3159 se_in_op(cname, "readHTTPChunkedBodyBinary", xsink);
3160 return 0;
3161 }
3162 se_in_op_thread(cname, "readHTTPChunkedBodyBinary", xsink);
3163 return 0;
3164 }
3165
3166 // reset "expecting HTTP chunked body" flag
3167 if (http_exp_chunked_body)
3168 http_exp_chunked_body = false;
3169
3170 qore_socket_op_helper oh(this);
3171
3172 SimpleRefHolder<BinaryNode> b(os ? nullptr : new BinaryNode);
3173 QoreString str; // for reading the size of each chunk
3174
3175 ssize_t rc;
3176 // read the size then read the data and append to buffer
3177 while (true) {
3178 // state = 0, nothing
3179 // state = 1, \r received
3180 int state = 0;
3181 while (true) {
3182 char* buf;
3183 rc = brecv(xsink, "readHTTPChunkedBodyBinary", buf, 1, 0, timeout, false);
3184 if (rc <= 0) {
3185 if (!*xsink) {
3186 assert(!rc);
3187 se_closed(cname, "readHTTPChunkedBodyBinary", xsink);
3188 }
3189 return 0;
3190 }
3191
3192 char c = buf[0];
3193
3194 if (!state && c == '\r')
3195 state = 1;
3196 else if (state && c == '\n')
3197 break;
3198 else {
3199 if (state) {
3200 state = 0;
3201 str.concat('\r');
3202 }
3203 str.concat(c);
3204 }
3205 }
3206 // DEBUG
3207 //printd(5, "QoreSocket::readHTTPChunkedBodyBinary(): got chunk size (" QSD " bytes) string: %s\n", str.strlen(), str.getBuffer());
3208
3209 // terminate string at ';' char if present
3210 char* p = (char*)strchr(str.getBuffer(), ';');
3211 if (p)
3212 *p = '\0';
3213 long size = strtol(str.c_str(), 0, 16);
3214 do_chunked_read(QORE_EVENT_HTTP_CHUNK_SIZE, size, str.size(), source);
3215
3216 if (!size)
3217 break;
3218
3219 if (size < 0) {
3220 xsink->raiseException("READ-HTTP-CHUNK-ERROR", "negative value given for chunk size (%ld)", size);
3221 return 0;
3222 }
3223
3224 // prepare string for chunk
3225 //str.allocate(size + 1);
3226
3227 ssize_t bs = size < DEFAULT_SOCKET_BUFSIZE ? size : DEFAULT_SOCKET_BUFSIZE;
3228 ssize_t br = 0; // bytes received
3229 while (true) {
3230 char* buf;
3231 rc = brecv(xsink, "readHTTPChunkedBodyBinary", buf, bs, 0, timeout, false);
3232 //printd(5, "qore_socket_private::readHTTPChunkedBodyBinary() str: '%s' bs: %lld rc: %lld b: %p (%lld) recv_callback: %p\n", str.c_str(), bs, rc, *b, b->size(), recv_callback);
3233 if (rc <= 0) {
3234 if (!*xsink) {
3235 assert(!rc);
3236 se_closed(cname, "readHTTPChunkedBodyBinary", xsink);
3237 }
3238 return nullptr;
3239 }
3240
3241 do_data_event(QORE_EVENT_HTTP_CHUNKED_DATA_READ, source, buf, (size_t)rc);
3242
3243 if (os) {
3244 AutoUnlocker al(l);
3245 os->write(buf, rc, xsink);
3246 if (*xsink)
3247 return nullptr;
3248 } else {
3249 b->append(buf, rc);
3250 }
3251 br += rc;
3252
3253 if (br >= size)
3254 break;
3255 if (size - br < bs)
3256 bs = size - br;
3257 }
3258
3259 // DEBUG
3260 //printd(5, "QoreSocket::readHTTPChunkedBodyBinary(): received binary chunk: size: %d br=" QSD " total=" QSD "\n", size, br, b->size());
3261
3262 // read crlf after chunk
3263 // FIXME: bytes read are not checked if they equal CRLF
3264 br = 0;
3265 while (br < 2) {
3266 char* buf;
3267 rc = brecv(xsink, "readHTTPChunkedBodyBinary", buf, 2 - br, 0, timeout, false);
3268 if (rc <= 0) {
3269 if (!*xsink) {
3270 assert(!rc);
3271 se_closed(cname, "readHTTPChunkedBodyBinary", xsink);
3272 }
3273 return nullptr;
3274 }
3275 br += rc;
3276 }
3277
3278 do_chunked_read(QORE_EVENT_HTTP_CHUNKED_DATA_RECEIVED, size, size + 2, source);
3279
3280 if (recv_callback && !os) {
3281 if (runDataCallback(xsink, cname, "readHTTPChunkedBodyBinary", *recv_callback, l, *b, true))
3282 return nullptr;
3283 if (b)
3284 b->clear();
3285 }
3286
3287 // ensure string is blanked for next read
3288 str.clear();
3289 }
3290
3291 // read footers or nothing
3292 QoreStringNodeHolder hdr(readHTTPData(xsink, "readHTTPChunkedBodyBinary", timeout, rc, true));
3293 if (*xsink)
3294 return nullptr;
3295
3296 ReferenceHolder<QoreHashNode> h(new QoreHashNode(autoTypeInfo), xsink);
3297 if (!recv_callback && !os) {
3298 h->setKeyValue("body", b.release(), xsink);
3299 }
3300
3302
3303 if (hdr) {
3304 if (hdr->strlen() >= 2 && hdr->strlen() <= 4)
3305 return recv_callback ? 0 : h.release();
3306
3307 if (recv_callback) {
3308 info = new QoreHashNode(autoTypeInfo);
3309 }
3310 convertHeaderToHash(*h, (char*)hdr->c_str(), 0, *info, nullptr, "response-headers-raw");
3311 do_read_http_header(QORE_EVENT_HTTP_FOOTERS_RECEIVED, *h, source);
3312 }
3313
3314 if (recv_callback) {
3315 runHeaderCallback(xsink, cname, "readHTTPChunkedBodyBinary", *recv_callback, l, h->empty() ? nullptr : *h,
3316 info.release(), false, obj);
3317 return 0;
3318 }
3319
3320 return h.release();
3321 }
3322
3323 // receive a message in HTTP chunked format
3324 DLLLOCAL QoreHashNode* readHttpChunkedBody(int timeout, ExceptionSink* xsink, const char* cname, int source, const ResolvedCallReferenceNode* recv_callback = 0, QoreThreadLock* l = 0, QoreObject* obj = 0) {
3325 assert(xsink);
3326
3327 if (sock == QORE_INVALID_SOCKET) {
3328 se_not_open(cname, "readHTTPChunkedBody", xsink);
3329 return 0;
3330 }
3331 if (in_op >= 0) {
3332 if (in_op == q_gettid()) {
3333 se_in_op(cname, "readHTTPChunkedBody", xsink);
3334 return 0;
3335 }
3336 se_in_op_thread(cname, "readHTTPChunkedBody", xsink);
3337 return 0;
3338 }
3339
3340 // reset "expecting HTTP chunked body" flag
3341 if (http_exp_chunked_body)
3342 http_exp_chunked_body = false;
3343
3344 qore_socket_op_helper oh(this);
3345
3347 QoreString str; // for reading the size of each chunk
3348
3349 ssize_t rc;
3350 // read the size then read the data and append to buf
3351 while (true) {
3352 // state = 0, nothing
3353 // state = 1, \r received
3354 int state = 0;
3355 while (true) {
3356 char* tbuf;
3357 rc = brecv(xsink, "readHTTPChunkedBody", tbuf, 1, 0, timeout, false);
3358 if (rc <= 0) {
3359 if (!*xsink) {
3360 assert(!rc);
3361 se_closed(cname, "readHTTPChunkedBody", xsink);
3362 }
3363 return 0;
3364 }
3365
3366 char c = tbuf[0];
3367
3368 if (!state && c == '\r')
3369 state = 1;
3370 else if (state && c == '\n')
3371 break;
3372 else {
3373 if (state) {
3374 state = 0;
3375 str.concat('\r');
3376 }
3377 str.concat(c);
3378 }
3379 }
3380 // DEBUG
3381 //printd(5, "got chunk size (" QSD " bytes) string: %s\n", str.strlen(), str.getBuffer());
3382
3383 // terminate string at ';' char if present
3384 char* p = (char*)strchr(str.getBuffer(), ';');
3385 if (p)
3386 *p = '\0';
3387 ssize_t size = strtol(str.getBuffer(), 0, 16);
3388 do_chunked_read(QORE_EVENT_HTTP_CHUNK_SIZE, size, str.strlen(), source);
3389
3390 if (!size)
3391 break;
3392
3393 if (size < 0) {
3394 xsink->raiseException("READ-HTTP-CHUNK-ERROR", "negative value given for chunk size (%ld)", size);
3395 return 0;
3396 }
3397 // ensure string is blanked for next read
3398 str.clear();
3399
3400 // prepare string for chunk
3401 //buf->allocate((unsigned)(buf->strlen() + size + 1));
3402
3403 // read chunk directly into string buffer
3404 ssize_t bs = size < DEFAULT_SOCKET_BUFSIZE ? size : DEFAULT_SOCKET_BUFSIZE;
3405 ssize_t br = 0; // bytes received
3406 str.clear();
3407 while (true) {
3408 char* tbuf;
3409 rc = brecv(xsink, "readHTTPChunkedBody", tbuf, bs, 0, timeout, false);
3410 if (rc <= 0) {
3411 if (!*xsink) {
3412 assert(!rc);
3413 se_closed(cname, "readHTTPChunkedBody", xsink);
3414 }
3415 return 0;
3416 }
3417
3418 br += rc;
3419 buf->concat(tbuf, rc);
3420
3421 do_data_event(QORE_EVENT_HTTP_CHUNKED_DATA_READ, source, tbuf, (size_t)rc);
3422
3423 if (br >= size)
3424 break;
3425 if (size - br < bs)
3426 bs = size - br;
3427 }
3428
3429 // DEBUG
3430 //printd(5, "got chunk (" QSD " bytes): %s\n", br, buf->getBuffer() + buf->strlen() - size);
3431
3432 // read crlf after chunk
3433 // FIXME: bytes read are not checked if they equal CRLF
3434 br = 0;
3435 while (br < 2) {
3436 char* tbuf;
3437 rc = brecv(xsink, "readHTTPChunkedBody", tbuf, 2 - br, 0, timeout, false);
3438 if (rc <= 0) {
3439 if (!*xsink) {
3440 assert(!rc);
3441 se_closed(cname, "readHTTPChunkedBody", xsink);
3442 }
3443 return nullptr;
3444 }
3445 br += rc;
3446 }
3447
3448 do_chunked_read(QORE_EVENT_HTTP_CHUNKED_DATA_RECEIVED, size, size + 2, source);
3449
3450 if (recv_callback) {
3451 if (runDataCallback(xsink, cname, "readHTTPChunkedBody", *recv_callback, l, *buf, true))
3452 return nullptr;
3453 buf->clear();
3454 }
3455 }
3456
3457 // read footers or nothing
3458 QoreStringNodeHolder hdr(readHTTPData(xsink, "readHTTPChunkedBody", timeout, rc, true));
3459 if (*xsink)
3460 return nullptr;
3461
3462 //printd(5, "chunked body encoding: %s\n", buf->getEncoding()->getCode());
3463 ReferenceHolder<QoreHashNode> h(new QoreHashNode(autoTypeInfo), xsink);
3464 if (!recv_callback) {
3465 h->setKeyValue("body", buf.release(), xsink);
3466 }
3467
3469
3470 if (hdr) {
3471 if (hdr->strlen() >= 2 && hdr->strlen() <= 4)
3472 return recv_callback ? 0 : h.release();
3473
3474 if (recv_callback) {
3475 info = new QoreHashNode(autoTypeInfo);
3476 }
3477 convertHeaderToHash(*h, (char*)hdr->c_str(), 0, *info, nullptr, "response-headers-raw");
3478 do_read_http_header(QORE_EVENT_HTTP_FOOTERS_RECEIVED, *h, source);
3479 }
3480
3481 if (recv_callback) {
3482 runHeaderCallback(xsink, cname, "readHTTPChunkedBody", *recv_callback, l, h->empty() ? nullptr : *h,
3483 info.release(), false, obj);
3484 return 0;
3485 }
3486
3487 return h.release();
3488 }
3489
3490 DLLLOCAL static void do_accept_encoding(char* t, QoreHashNode& info) {
3491 ReferenceHolder<QoreListNode> l(new QoreListNode(autoTypeInfo), 0);
3492
3493 char* a = t;
3494 bool ok = true;
3495 while (*a) {
3496 if (ok) {
3497 ok = false;
3499 while (*a && *a != ';' && *a != ',')
3500 str->concat(*(a++));
3501 str->trim();
3502 if (!str->empty())
3503 l->push(str.release(), nullptr);
3504 continue;
3505 }
3506 else if (*a == ',')
3507 ok = true;
3508
3509 ++a;
3510 }
3511
3512 if (!l->empty())
3513 info.setKeyValue("accept-encoding", l.release(), 0);
3514 }
3515
3516 DLLLOCAL bool do_accept_charset(char* t, QoreHashNode& info) {
3517 bool acceptcharset = false;
3518
3519 // see if we have "*" or utf8 or utf-8, in which case set it
3520 // otherwise set the first charset in the list
3521 char* a = t;
3522 char* div = 0;
3523 bool utf8 = false;
3524 bool ok = true;
3525 while (*a) {
3526 if (ok) {
3527 if (*a == '*') {
3528 utf8 = true;
3529 break;
3530 }
3531 ok = false;
3532 if (*a == 'u' || *a == 'U') {
3533 ++a;
3534 if (*a == 't' || *a == 'T') {
3535 ++a;
3536 if (*a == 'f' || *a == 'F') {
3537 ++a;
3538 if (*a == '-')
3539 ++a;
3540 if (*a == '8') {
3541 utf8 = true;
3542 break;
3543 }
3544 }
3545 }
3546 continue;
3547 }
3548 } else if (*a == ',') {
3549 if (!div)
3550 div = a;
3551 ok = true;
3552 } else if (*a == ';') {
3553 if (!div)
3554 div = a;
3555 }
3556
3557 ++a;
3558 }
3559 if (utf8) {
3560 info.setKeyValue("accept-charset", new QoreStringNode("utf8"), 0);
3561 acceptcharset = true;
3562 } else {
3564 if (div)
3565 ac->concat(t, div - t);
3566 else
3567 ac->concat(t);
3568 ac->trim();
3569 if (!ac->empty()) {
3570 info.setKeyValue("accept-charset", ac.release(), 0);
3571 acceptcharset = true;
3572 }
3573 }
3574
3575 return acceptcharset;
3576 }
3577
3578 // returns true if the connection should be closed, false if not
3579 DLLLOCAL bool convertHeaderToHash(QoreHashNode* h, char* p, int flags = 0, QoreHashNode* info = nullptr,
3580 bool* chunked = nullptr, const char* headers_raw_key = "headers-raw") {
3581 bool close = !(flags & CHF_HTTP11);
3582 // socket encoding
3583 const char* senc = nullptr;
3584 // accept-charset
3585 bool acceptcharset = false;
3586
3587 QoreHashNode* raw_hdr = nullptr;
3588 if (info) {
3589 info->setKeyValue(headers_raw_key, raw_hdr = new QoreHashNode(autoTypeInfo), nullptr);
3590 }
3591
3592 // raw key for setting raw headers
3593 std::string raw_key;
3594
3595 while (*p) {
3596 char* buf = p;
3597
3598 if ((p = strstr(buf, "\r\n"))) {
3599 *p = '\0';
3600 p += 2;
3601 } else if ((p = strchr(buf, '\n'))) {
3602 *p = '\0';
3603 p++;
3604 } else if ((p = strchr(buf, '\r'))) {
3605 *p = '\0';
3606 p++;
3607 } else
3608 break;
3609 char* t = strchr(buf, ':');
3610 if (!t)
3611 break;
3612 *t = '\0';
3613 t++;
3614 while (t && qore_isblank(*t))
3615 t++;
3616 if (raw_hdr) {
3617 raw_key = buf;
3618 }
3619 strtolower(buf);
3620 //printd(5, "setting %s = '%s'\n", buf, t);
3621
3622 ReferenceHolder<> val(new QoreStringNode(t), nullptr);
3623
3624 if (flags & CHF_PROCESS) {
3625 if (!strcmp(buf, "connection")) {
3626 if (flags & CHF_HTTP11) {
3627 if (strcasestr(t, "close"))
3628 close = true;
3629 } else {
3630 if (strcasestr(t, "keep-alive"))
3631 close = false;
3632 }
3633 } else if (!strcmp(buf, "content-type")) {
3634 char* a = strcasestr(t, "charset=");
3635 if (a) {
3636 // find end
3637 char* e = strchr(a + 8, ';');
3638
3639 QoreString cs;
3640 if (e)
3641 cs.concat(a + 8, e - a - 8);
3642 else
3643 cs.concat(a + 8);
3644 cs.trim();
3645 senc = cs.getBuffer();
3646 //printd(5, "got encoding '%s' from request\n", senc);
3647 enc = QEM.findCreate(senc);
3648
3649 if (info) {
3650 size_t len = cs.size();
3651 info->setKeyValue("charset", new QoreStringNode(cs.giveBuffer(), len, len + 1, QCS_DEFAULT), nullptr);
3652 }
3653
3654 if (info) {
3656 // remove any whitespace and ';' before charset=
3657 if (a != t) {
3658 do {
3659 --a;
3660 } while (a > t && (*a == ' ' || *a == ';'));
3661 }
3662
3663 if (a == t) {
3664 if (e)
3665 ct->concat(e + 1);
3666 } else {
3667 ct->concat(t, a - t + 1);
3668 if (e)
3669 ct->concat(e);
3670 }
3671 ct->trim();
3672 if (!ct->empty())
3673 info->setKeyValue("body-content-type", ct.release(), nullptr);
3674 }
3675 } else {
3676 enc = QEM.findCreate(assume_http_encoding.c_str());
3677 if (info) {
3678 info->setKeyValue("charset", new QoreStringNode(assume_http_encoding), nullptr);
3679 info->setKeyValue("body-content-type", val->refSelf(), nullptr);
3680 }
3681 }
3682 } else if (chunked && !strcmp(buf, "transfer-encoding") && !strcasecmp(t, "chunked")) {
3683 *chunked = true;
3684 } else if (info) {
3685 if (!strcmp(buf, "accept-charset"))
3686 acceptcharset = do_accept_charset(t, *info);
3687 else if ((flags & CHF_REQUEST) && !strcmp(buf, "accept-encoding"))
3688 do_accept_encoding(t, *info);
3689 }
3690 }
3691
3692 ReferenceHolder<> val_copy(nullptr);
3693 if (raw_hdr && val) {
3694 val_copy = val->realCopy();
3695 }
3696
3697 // see if header exists, and if so make it a list and add value to the list
3698 hash_assignment_priv ha(*h, buf);
3699 if (!(*ha).isNothing()) {
3700 QoreListNode* l;
3701 if ((*ha).getType() == NT_LIST) {
3702 l = (*ha).get<QoreListNode>();
3703 } else {
3704 l = new QoreListNode(autoTypeInfo);
3705 l->push(ha.swap(l), nullptr);
3706 }
3707 l->push(val.release(), nullptr);
3708 } else // otherwise set header normally
3709 ha.assign(val.release(), 0);
3710
3711 // set raw headers if applicable
3712 if (raw_hdr) {
3713 hash_assignment_priv ha(*raw_hdr, raw_key);
3714 if (!(*ha).isNothing()) {
3715 QoreListNode* l;
3716 if ((*ha).getType() == NT_LIST) {
3717 l = (*ha).get<QoreListNode>();
3718 } else {
3719 l = new QoreListNode(autoTypeInfo);
3720 l->push(ha.swap(l), nullptr);
3721 }
3722 l->push(val_copy.release(), nullptr);
3723 } else // otherwise set header normally
3724 ha.assign(val_copy.release(), nullptr);
3725 }
3726 }
3727
3728 if ((flags & CHF_PROCESS)) {
3729 if (!senc)
3730 enc = QEM.findCreate(assume_http_encoding.c_str());
3731 // according to RFC-2616 section 14.2, "If no Accept-Charset header is present, the default is that any character set is acceptable" so we will use utf-8
3732 if (info && !acceptcharset)
3733 info->setKeyValue("accept-charset", new QoreStringNode("utf8"), nullptr);
3734 }
3735
3736 return close;
3737 }
3738
3739 DLLLOCAL int recvix(const char* meth, int len, void* targ, int timeout_ms, ExceptionSink* xsink) {
3740 assert(xsink);
3741 if (sock == QORE_INVALID_SOCKET) {
3742 se_not_open("Socket", meth, xsink, "recvix");
3743 return QSE_NOT_OPEN;
3744 }
3745 if (in_op >= 0) {
3746 if (in_op == q_gettid()) {
3747 se_in_op("Socket", meth, xsink);
3748 return 0;
3749 }
3750 se_in_op_thread("Socket", meth, xsink);
3751 return 0;
3752 }
3753
3754 PrivateQoreSocketThroughputHelper th(this, false);
3755
3756 char* buf;
3757 ssize_t br = 0;
3758 while (true) {
3759 ssize_t rc = brecv(xsink, meth, buf, len - br, 0, timeout_ms);
3760 if (rc <= 0) {
3761 do_read_error(rc, meth, timeout_ms, xsink);
3762 return (int)rc;
3763 }
3764
3765 memcpy(targ, buf, rc);
3766
3767 br += rc;
3768 if (br >= len)
3769 break;
3770 }
3771
3772 th.finalize(br);
3773 do_data_event(QORE_EVENT_SOCKET_DATA_READ, QORE_SOURCE_SOCKET, targ, br);
3774 return (int)br;
3775 }
3776
3777 DLLLOCAL void clearWarningQueue(ExceptionSink* xsink) {
3778 if (warn_queue) {
3779 if (warn_callback_arg) {
3780 warn_callback_arg.discard(xsink);
3781 warn_callback_arg = QoreValue();
3782 }
3783 warn_queue->deref(xsink);
3784 warn_queue = nullptr;
3785 tl_warning_us = 0;
3786 tp_warning_bs = 0.0;
3787 tp_us_min = 0;
3788 }
3789 }
3790
3791 DLLLOCAL void setWarningQueue(ExceptionSink* xsink, int64 warning_ms, int64 warning_bs, Queue* wq, QoreValue arg,
3792 int64 min_ms = 1000) {
3793 ReferenceHolder<Queue> qholder(wq, xsink);
3794 ValueHolder holder(arg, xsink);
3795 if (warning_ms <= 0 && warning_bs <= 0) {
3796 xsink->raiseException("SOCKET-SETWARNINGQUEUE-ERROR", "Socket::setWarningQueue() at least one of warning "
3797 "ms argument: " QLLD " and warning B/s argument: " QLLD " must be greater than zero; to clear, call "\
3798 "Socket::clearWarningQueue() with no arguments", warning_ms, warning_bs);
3799 return;
3800 }
3801
3802 if (warning_ms < 0)
3803 warning_ms = 0;
3804 if (warning_bs < 0)
3805 warning_bs = 0;
3806
3807 if (warn_queue) {
3808 warn_queue->deref(xsink);
3809 warn_callback_arg.discard(xsink);
3810 }
3811
3812 warn_queue = qholder.release();
3813 warn_callback_arg = holder.release();
3814 tl_warning_us = (int64)warning_ms * 1000;
3815 tp_warning_bs = warning_bs;
3816 tp_us_min = min_ms * 1000;
3817 }
3818
3819 DLLLOCAL void getUsageInfo(QoreHashNode& h, qore_socket_private& s) const {
3820 if (warn_queue) {
3821 h.setKeyValue("arg", warn_callback_arg.refSelf(), 0);
3822 h.setKeyValue("timeout", tl_warning_us, 0);
3823 h.setKeyValue("min_throughput", (int64)tp_warning_bs, 0);
3824 h.setKeyValue("min_throughput_us", (int64)tp_us_min, 0);
3825 }
3826
3827 h.setKeyValue("bytes_sent", tp_bytes_sent + s.tp_bytes_sent, 0);
3828 h.setKeyValue("bytes_recv", tp_bytes_recv + s.tp_bytes_sent, 0);
3829 h.setKeyValue("us_sent", tp_us_sent + s.tp_us_sent, 0);
3830 h.setKeyValue("us_recv", tp_us_recv + s.tp_us_recv, 0);
3831 }
3832
3833 DLLLOCAL void getUsageInfo(QoreHashNode& h) const {
3834 if (warn_queue) {
3835 h.setKeyValue("arg", warn_callback_arg.refSelf(), 0);
3836 h.setKeyValue("timeout", tl_warning_us, 0);
3837 h.setKeyValue("min_throughput", (int64)tp_warning_bs, 0);
3838 h.setKeyValue("min_throughput_us", (int64)tp_us_min, 0);
3839 }
3840
3841 h.setKeyValue("bytes_sent", tp_bytes_sent, 0);
3842 h.setKeyValue("bytes_recv", tp_bytes_recv, 0);
3843 h.setKeyValue("us_sent", tp_us_sent, 0);
3844 h.setKeyValue("us_recv", tp_us_recv, 0);
3845 }
3846
3847 DLLLOCAL QoreHashNode* getUsageInfo() const {
3848 QoreHashNode* h = new QoreHashNode(autoTypeInfo);
3849 getUsageInfo(*h);
3850 return h;
3851 }
3852
3853 DLLLOCAL void clearStats() {
3854 tp_bytes_sent = 0;
3855 tp_bytes_recv = 0;
3856 tp_us_sent = 0;
3857 tp_us_recv = 0;
3858 }
3859
3860 DLLLOCAL void doTimeoutWarning(const char* op, int64 dt) {
3861 assert(warn_queue);
3862 assert(dt > tl_warning_us);
3863
3864 QoreHashNode* h = new QoreHashNode(autoTypeInfo);
3865
3866 h->setKeyValue("type", new QoreStringNode("SOCKET-OPERATION-WARNING"), 0);
3867 h->setKeyValue("operation", new QoreStringNode(op), 0);
3868 h->setKeyValue("us", dt, 0);
3869 h->setKeyValue("timeout", tl_warning_us, 0);
3870 if (warn_callback_arg)
3871 h->setKeyValue("arg", warn_callback_arg.refSelf(), 0);
3872
3873 warn_queue->pushAndTakeRef(h);
3874 }
3875
3876 DLLLOCAL void doThroughputWarning(bool send, int64 bytes, int64 dt, double bs) {
3877 assert(warn_queue);
3878 assert(bs < tp_warning_bs);
3879
3880 QoreHashNode* h = new QoreHashNode(autoTypeInfo);
3881
3882 h->setKeyValue("type", new QoreStringNode("SOCKET-THROUGHPUT-WARNING"), 0);
3883 h->setKeyValue("dir", new QoreStringNode(send ? "send" : "recv"), 0);
3884 h->setKeyValue("bytes", bytes, 0);
3885 h->setKeyValue("us", dt, 0);
3886 h->setKeyValue("bytes_sec", bs, 0);
3887 h->setKeyValue("threshold", (int64)tp_warning_bs, 0);
3888 if (warn_callback_arg)
3889 h->setKeyValue("arg", warn_callback_arg.refSelf(), 0);
3890
3891 warn_queue->pushAndTakeRef(h);
3892 }
3893
3894 DLLLOCAL bool pendingHttpChunkedBody() const {
3895 return http_exp_chunked_body && sock != QORE_INVALID_SOCKET;
3896 }
3897
3898 DLLLOCAL void setSslVerifyMode(int mode) {
3899 //printd(5, "qore_socket_private::setSslVerifyMode() this: %p mode: %d\n", this, mode);
3900 ssl_verify_mode = mode;
3901 if (ssl)
3902 ssl->setVerifyMode(ssl_verify_mode, ssl_accept_all_certs, client_target);
3903 }
3904
3905 DLLLOCAL void acceptAllCertificates(bool accept_all = true) {
3906 ssl_accept_all_certs = accept_all;
3907 if (ssl)
3908 ssl->setVerifyMode(ssl_verify_mode, ssl_accept_all_certs, client_target);
3909 }
3910
3911 DLLLOCAL void setSslErrorString(QoreStringNode* err_str) {
3912 if (ssl_err_str) {
3913 ssl_err_str->concat("; ");
3914 ssl_err_str->concat(err_str);
3915 err_str->deref();
3916 } else {
3917 ssl_err_str = err_str;
3918 }
3919 }
3920
3921 DLLLOCAL static void getUsageInfo(const QoreSocket& sock, QoreHashNode& h, const QoreSocket& s) {
3922 sock.priv->getUsageInfo(h, *s.priv);
3923 }
3924
3925 DLLLOCAL static qore_socket_private* get(QoreSocket& sock) {
3926 return sock.priv;
3927 }
3928
3929 DLLLOCAL static const qore_socket_private* get(const QoreSocket& sock) {
3930 return sock.priv;
3931 }
3932
3933 DLLLOCAL static void captureRemoteCert(X509_STORE_CTX* x509_ctx);
3934
3935 DLLLOCAL static QoreListNode* poll(const QoreListNode* poll_list, int timeout_ms, ExceptionSink* xsink);
3936};
3937
3938#endif
DLLEXPORT const QoreEncoding * QCS_DEFAULT
the default encoding for the Qore library
DLLEXPORT QoreEncodingManager QEM
the QoreEncodingManager object
DLLEXPORT QoreStringNode * q_strerror(int errnum)
returns the error string as a QoreStringNode
#define QORE_MIN(a, b)
macro to return the minimum of 2 numbers
Definition: QoreLib.h:616
static void strtolower(char *str)
convert a string to lower-case in place
Definition: QoreLib.h:268
The base class for all value and parse types in Qore expression trees.
Definition: AbstractQoreNode.h:57
DLLEXPORT AbstractQoreNode * refSelf() const
returns "this" with an incremented reference count
virtual DLLEXPORT AbstractQoreNode * realCopy() const =0
returns a copy of the object; the caller owns the reference count
DLLEXPORT void deref(ExceptionSink *xsink)
decrements the reference count and calls derefImpl() if there_can_be_only_one is false,...
provides a safe and exception-safe way to release and re-acquire locks in Qore, only to be used on th...
Definition: QoreThreadLock.h:190
holds arbitrary binary data
Definition: BinaryNode.h:41
DLLEXPORT void append(const void *nptr, size_t size)
resizes the object and appends a copy of the data passed to the object
DLLEXPORT size_t size() const
returns the number of bytes in the object
DLLEXPORT bool empty() const
returns true if empty
DLLEXPORT void clear()
frees any managed memory and sets the size to 0
DLLEXPORT const void * getPtr() const
returns the pointer to the data
constant iterator class for QoreHashNode, to be only created on the stack
Definition: QoreHashNode.h:594
For use on the stack only: iterates through elements of a const QoreListNode.
Definition: QoreListNode.h:563
container for holding Qore-language exception information and also for registering a "thread_exit" ca...
Definition: ExceptionSink.h:50
DLLEXPORT int appendLastDescription(const char *fmt,...)
appends a formatted string to the top exception description if the desc value is a string
DLLEXPORT AbstractQoreNode * raiseException(const char *err, const char *fmt,...)
appends a Qore-language exception to the list
DLLEXPORT AbstractQoreNode * raiseErrnoException(const char *err, int en, const char *fmt,...)
appends a Qore-language exception to the list and appends the result of strerror(errno) to the descri...
DLLEXPORT AbstractQoreNode * raiseExceptionArg(const char *err, QoreValue arg, const char *fmt,...)
appends a Qore-language exception to the list, and sets the 'arg' member (this object takes over the ...
Interface for private data of input streams.
Definition: InputStream.h:44
virtual int64 read(void *ptr, int64 limit, ExceptionSink *xsink)=0
Reads up to `limit` bytes from the input stream.
Interface for private data of output streams.
Definition: OutputStream.h:44
virtual void write(const void *ptr, int64 count, ExceptionSink *xsink)=0
Writes bytes to the output stream.
provides an interface to getaddrinfo
Definition: QoreNet.h:132
DLLLOCAL hashdecl addrinfo * getAddrInfo() const
returns the hashdecl addrinfo * being managed (may by 0)
Definition: QoreNet.h:159
static DLLEXPORT QoreStringNode * getAddressDesc(int address_family, const char *addr)
returns a descriptive string for the address family and an address string (ie AF_INET6,...
DLLEXPORT int getInfo(ExceptionSink *xsink, const char *node, const char *service, int family=Q_AF_UNSPEC, int flags=0, int socktype=Q_SOCK_STREAM, int protocol=0)
get address info with the given parameters, if any errors occur, a Qore-language exception is thrown
static DLLEXPORT const char * getFamilyName(int address_family)
returns the name of the address family as a string (ie AF_INET = "ipv4", etc)
defines string encoding functions in Qore
Definition: QoreEncoding.h:83
static DLLEXPORT const QoreEncoding * findCreate(const char *name)
finds an encoding if it exists (also looks up against alias names) and creates a new one if it doesn'...
This is the hash or associative list container type in Qore, dynamically allocated only,...
Definition: QoreHashNode.h:51
DLLEXPORT int setKeyValue(const char *key, QoreValue value, ExceptionSink *xsink)
sets the value of "key" to "value"
DLLEXPORT QoreHashNode * hashRefSelf() const
returns "this" with an incremented reference count
DLLEXPORT size_t size() const
returns the number of members in the hash, executes in constant time
DLLEXPORT bool empty() const
returns true if the hash has no members, false if not
DLLEXPORT QoreHashNode * copy() const
performs a copy of the hash and returns the new hash
This is the list container type in Qore, dynamically allocated only, reference counted.
Definition: QoreListNode.h:52
DLLEXPORT int push(QoreValue val, ExceptionSink *xsink)
adds a value to the list
Qore's arbitrary-precision number value type, dynamically-allocated only, reference counted.
Definition: QoreNumberNode.h:51
the implementation of Qore's object data type, reference counted, dynamically-allocated only
Definition: QoreObject.h:61
DLLEXPORT AbstractPrivateData * getReferencedPrivateData(qore_classid_t key, ExceptionSink *xsink) const
returns the private data corresponding to the class ID passed with an incremented reference count,...
DLLEXPORT void setValue(const char *key, QoreValue val, ExceptionSink *xsink)
sets the value of the given member to the given value
represents an X509 certificate, reference-counted, dynamically-allocated only
Definition: QoreSSLCertificate.h:42
provides access to a private key data structure for SSL connections
Definition: QoreSSLPrivateKey.h:40
DLLEXPORT double getAsFloat() const
returns the value as a float
DLLLOCAL detail::QoreValueCastHelper< T >::Result get()
returns the value as the given type
Definition: QoreValue.h:214
DLLEXPORT qore_type_t getType() const
returns the type of value contained
DLLEXPORT bool getAsBool() const
returns the value as a bool
DLLEXPORT int64 getAsBigInt() const
returns the value as an int
DLLEXPORT void clear()
unconditionally set the QoreValue to QoreNothingNode (does not dereference any possible contained Abs...
provides access to sockets using Qore data structures
Definition: QoreSocket.h:129
Qore's string type supported by the QoreEncoding class.
Definition: QoreString.h:93
DLLEXPORT void set(const char *str, const QoreEncoding *new_qorecharset=QCS_DEFAULT)
copies the c-string passed and sets the value of the string and its encoding
DLLEXPORT const char * c_str() const
returns the string's buffer; this data should not be changed
DLLEXPORT size_t strlen() const
returns number of bytes in the string (not including the null pointer)
DLLEXPORT void clear()
reset string to zero length; memory is not deallocated; string encoding does not change
DLLEXPORT char * giveBuffer()
returns the character buffer and leaves the QoreString empty, the caller owns the memory returned (mu...
DLLEXPORT void concat(const QoreString *str, ExceptionSink *xsink)
concatenates a string and converts encodings if necessary
DLLEXPORT const char * getBuffer() const
returns the string's buffer; this data should not be changed
DLLEXPORT int sprintf(const char *fmt,...)
this will concatentate a formatted string to the existing string according to the format string and t...
DLLEXPORT size_t size() const
returns number of bytes in the string (not including the null pointer)
DLLEXPORT void trim(const char *chars=0)
remove leading and trailing whitespace or other characters
DLLEXPORT bool empty() const
returns true if the string is empty, false if not
Qore's string value type, reference counted, dynamically-allocated only.
Definition: QoreStringNode.h:50
provides a mutually-exclusive thread lock
Definition: QoreThreadLock.h:49
a templated class to manage a reference count of an object that can throw a Qore-language exception w...
Definition: ReferenceHolder.h:52
DLLLOCAL T * release()
releases the pointer to the caller
Definition: ReferenceHolder.h:83
base class for resolved call references
Definition: CallReferenceNode.h:109
virtual DLLLOCAL QoreValue execValue(const QoreListNode *args, ExceptionSink *xsink) const =0
pure virtual function for executing the function reference
manages a reference count of a pointer to a class that takes a simple "deref()" call with no argument...
Definition: ReferenceHolder.h:127
a helper class for getting socket origination information
Definition: QoreSocket.h:76
holds an object and dereferences it in the destructor
Definition: QoreValue.h:487
unsigned qore_classid_t
used for the unique class ID for QoreClass objects
Definition: common.h:79
long long int64
64bit integer type, cannot use int64_t here since it breaks the API on some 64-bit systems due to equ...
Definition: common.h:260
const qore_type_t NT_BOOLEAN
type value for bools (QoreValue only)
Definition: node_types.h:47
const qore_type_t NT_NUMBER
type value for QoreNumberNode
Definition: node_types.h:53
const qore_type_t NT_BINARY
type value for BinaryNode
Definition: node_types.h:49
const qore_type_t NT_LIST
type value for QoreListNode
Definition: node_types.h:50
const qore_type_t NT_NULL
type value for QoreNullNode
Definition: node_types.h:48
const qore_type_t NT_INT
type value for integers (QoreValue only)
Definition: node_types.h:43
const qore_type_t NT_STRING
type value for QoreStringNode
Definition: node_types.h:45
const qore_type_t NT_FLOAT
type value for floating-point values (QoreValue only)
Definition: node_types.h:44
const qore_type_t NT_HASH
type value for QoreHashNode
Definition: node_types.h:51
const qore_type_t NT_NOTHING
type value for QoreNothingNode
Definition: node_types.h:42
DLLEXPORT int q_gettid() noexcept
returns the current TID number
The main value class in Qore, designed to be passed by value.
Definition: QoreValue.h:279
DLLEXPORT void discard(ExceptionSink *xsink)
dereferences any contained AbstractQoreNode pointer and sets to 0; does not modify other values
DLLEXPORT QoreValue refSelf() const
references the contained value if type == QV_Node, returns itself